Spaces:
Sleeping
Sleeping
| import os | |
| import zipfile | |
| import tempfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from meta_data import meta_data_helper_function | |
| from image_functions import generate_image | |
| from prompt_service import get_prompts | |
| from r2_uploader import upload_image_to_r2 | |
| from mongo_logger import create_log, update_log_status | |
| from helpers import encode_image_to_base64, is_valid_image | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def process_zip_and_generate_images( | |
| zip_path, category, size, quality, user_prompt, | |
| sentiment, platform, num_images, demo_mode, existing_images, blur | |
| ): | |
| num_images = 1 if demo_mode else num_images | |
| num_images = 3 if not demo_mode and platform == "Google Display Network" else num_images | |
| try: | |
| if zip_path.endswith('.zip'): | |
| temp_dir = extract_zip_file(zip_path) | |
| image_files = get_valid_image_files(temp_dir) | |
| else: | |
| image_files = [(os.path.basename(zip_path.name), zip_path.name)] | |
| results = process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur) | |
| all_urls = [url for entry in results for url in entry["urls"]] | |
| return existing_images + all_urls if existing_images else all_urls | |
| except Exception as e: | |
| logger.exception(f"Global error during processing file: {zip_path}") | |
| return [f"Error: {str(e)}"] | |
| def extract_zip_file(zip_path): | |
| try: | |
| temp_dir = tempfile.TemporaryDirectory() | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir.name) | |
| logger.info(f"Extracted ZIP file: {zip_path}") | |
| return temp_dir | |
| except Exception as e: | |
| logger.exception(f"Failed to extract ZIP file: {zip_path}") | |
| raise | |
| def get_valid_image_files(temp_dir): | |
| valid_files = [] | |
| zip_contents = os.listdir(temp_dir.name) | |
| for file in zip_contents: | |
| file_path = os.path.join(temp_dir.name, file) | |
| if is_valid_image(file) and "__MACOSX" not in file: | |
| valid_files.append((file, file_path)) | |
| else: | |
| logger.warning(f"Ignored non-image or unwanted file: {file}") | |
| logger.info(f"Found {len(valid_files)} valid images.") | |
| return valid_files | |
| def process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur): | |
| final_results = [] | |
| logs = [] | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| futures = [] | |
| for file_name, file_path in image_files: | |
| log_id = create_log(category, file_name, user_prompt) | |
| logs.append(f"Processing {file_name}") | |
| futures.append( | |
| executor.submit( | |
| process_single_image, | |
| file_name, | |
| file_path, | |
| category, | |
| size, | |
| quality, | |
| user_prompt, | |
| sentiment, | |
| platform, | |
| num_images, | |
| blur, | |
| log_id, | |
| logs | |
| ) | |
| ) | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| if result: | |
| final_results.append(result) | |
| except Exception as e: | |
| logger.exception("Unhandled exception during image processing thread.") | |
| return final_results | |
| def process_single_image(file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, log_id, logs): | |
| try: | |
| # base64_img = encode_image_to_base64(file_path) | |
| # logs.append(f"Prompts generated for {file_name}") | |
| # | |
| # prompts = get_prompts(base64_img, category, user_prompt, sentiment, negative_prompt) | |
| # logs.append(f"Generating images for {file_name}") | |
| image_urls = generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur) | |
| status = "completed" if image_urls else "failed" | |
| message = "Completed successfully" if image_urls else "No images generated" | |
| update_log_status(log_id, status, urls=image_urls, message=message) | |
| if image_urls: | |
| return { | |
| "file_name": file_name, | |
| "urls": image_urls | |
| } | |
| return None | |
| except Exception as e: | |
| error_msg = f"Processing failed for {file_name}: {e}" | |
| logger.error(error_msg) | |
| logs.append(error_msg) | |
| update_log_status(log_id, "failed", urls=[], message=str(e)) | |
| return None | |
| def generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur): | |
| image_urls = [] | |
| def worker(i): | |
| try: | |
| image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i) | |
| if not image_bytes: | |
| logger.error("Image generation returned empty bytes.") | |
| return None | |
| logger.info("Successfully generated image bytes.") | |
| try: | |
| image_with_metadata = meta_data_helper_function(image_bytes) | |
| except Exception as e: | |
| logger.error(f"Metadata generation failed: {e}") | |
| return None | |
| s3_url = upload_image_to_r2(image_with_metadata) | |
| logger.info(f"Generated and uploaded image to: {s3_url}") | |
| return s3_url | |
| except Exception as e: | |
| logger.error(f"Image generation failed: {e}") | |
| return None | |
| with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor: | |
| futures = [executor.submit(worker, i) for i in range(num_images)] | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result: | |
| image_urls.append(result) | |
| return image_urls | |
| def log_error(message): | |
| logger.error(message) |