import os import zipfile import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed from meta_data import meta_data_helper_function from image_functions import generate_image from prompt_service import get_prompts from r2_uploader import upload_image_to_r2 from mongo_logger import create_log, update_log_status from helpers import encode_image_to_base64, is_valid_image import logging logger = logging.getLogger(__name__) def process_zip_and_generate_images( zip_path, category, size, quality, user_prompt, sentiment, platform, num_images, demo_mode, existing_images, blur ): num_images = 1 if demo_mode else num_images num_images = 3 if not demo_mode and platform == "Google Display Network" else num_images try: if zip_path.endswith('.zip'): temp_dir = extract_zip_file(zip_path) image_files = get_valid_image_files(temp_dir) else: image_files = [(os.path.basename(zip_path.name), zip_path.name)] results = process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur) all_urls = [url for entry in results for url in entry["urls"]] return existing_images + all_urls if existing_images else all_urls except Exception as e: logger.exception(f"Global error during processing file: {zip_path}") return [f"Error: {str(e)}"] def extract_zip_file(zip_path): try: temp_dir = tempfile.TemporaryDirectory() with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir.name) logger.info(f"Extracted ZIP file: {zip_path}") return temp_dir except Exception as e: logger.exception(f"Failed to extract ZIP file: {zip_path}") raise def get_valid_image_files(temp_dir): valid_files = [] zip_contents = os.listdir(temp_dir.name) for file in zip_contents: file_path = os.path.join(temp_dir.name, file) if is_valid_image(file) and "__MACOSX" not in file: valid_files.append((file, file_path)) else: logger.warning(f"Ignored non-image or unwanted file: {file}") logger.info(f"Found {len(valid_files)} valid images.") return valid_files def process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur): final_results = [] logs = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for file_name, file_path in image_files: log_id = create_log(category, file_name, user_prompt) logs.append(f"Processing {file_name}") futures.append( executor.submit( process_single_image, file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, log_id, logs ) ) for future in as_completed(futures): try: result = future.result() if result: final_results.append(result) except Exception as e: logger.exception("Unhandled exception during image processing thread.") return final_results def process_single_image(file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, log_id, logs): try: # base64_img = encode_image_to_base64(file_path) # logs.append(f"Prompts generated for {file_name}") # # prompts = get_prompts(base64_img, category, user_prompt, sentiment, negative_prompt) # logs.append(f"Generating images for {file_name}") image_urls = generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur) status = "completed" if image_urls else "failed" message = "Completed successfully" if image_urls else "No images generated" update_log_status(log_id, status, urls=image_urls, message=message) if image_urls: return { "file_name": file_name, "urls": image_urls } return None except Exception as e: error_msg = f"Processing failed for {file_name}: {e}" logger.error(error_msg) logs.append(error_msg) update_log_status(log_id, "failed", urls=[], message=str(e)) return None def generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur): image_urls = [] def worker(i): try: image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i) if not image_bytes: logger.error("Image generation returned empty bytes.") return None logger.info("Successfully generated image bytes.") try: image_with_metadata = meta_data_helper_function(image_bytes) except Exception as e: logger.error(f"Metadata generation failed: {e}") return None s3_url = upload_image_to_r2(image_with_metadata) logger.info(f"Generated and uploaded image to: {s3_url}") return s3_url except Exception as e: logger.error(f"Image generation failed: {e}") return None with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor: futures = [executor.submit(worker, i) for i in range(num_images)] for future in as_completed(futures): result = future.result() if result: image_urls.append(result) return image_urls def log_error(message): logger.error(message)