import os, zipfile, tempfile, logging, base64 from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Tuple, Optional from PIL import Image import io from generator_function.image_function import generate_image from prompt.prompt_services import get_prompts from multimodel_services.replicate_generation_service import generate_image_with_model, convert_size_to_aspect_ratio from multimodel_services.model_manager import is_gpt_model, get_all_parameters from helpers_function.helper_meta_data import meta_data_helper_function from helpers_function.helpers import upload_image_to_r2 from helpers_function.helpers import is_valid_image from database.connections import get_results_collection as get_collection from database.operations import start_job, finish_job from util.session_state import current_uid logger = logging.getLogger(__name__) COL = get_collection() def _resolve_user_id() -> str: return current_uid() or os.getenv("DEFAULT_USER_ID", "anonymous") def process_zip_and_generate_images_multimodel( zip_path: str, category: str, size: str, quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, demo_mode: bool, existing_images: Optional[List[str]], blur: bool, uid: str, selected_model: str = "gpt_default", model_params: Optional[dict] = None, ) -> List[str]: """Enhanced image processor that supports both GPT and multimodel approaches""" num_images = 1 if demo_mode else num_images try: if zip_path.endswith(".zip"): temp_dir = extract_zip_file(zip_path) image_files = get_valid_image_files(temp_dir) else: image_files = [(os.path.basename(zip_path), zip_path)] results = process_image_files_multimodel( image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur, uid, selected_model, model_params ) all_urls = [url for entry in results for url in entry["urls"]] seen, deduped = set(), [] for u in all_urls: if u not in seen: seen.add(u); deduped.append(u) return (existing_images or []) + deduped except Exception: logger.exception(f"Global error during processing file: {zip_path}") return existing_images or [] def extract_zip_file(zip_path: str) -> tempfile.TemporaryDirectory: temp_dir = tempfile.TemporaryDirectory() with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(temp_dir.name) logger.info(f"Extracted ZIP file: {zip_path}") return temp_dir def get_valid_image_files(temp_dir: tempfile.TemporaryDirectory) -> List[Tuple[str, str]]: valid_files: List[Tuple[str, str]] = [] for file in os.listdir(temp_dir.name): if "__MACOSX" in file: continue file_path = os.path.join(temp_dir.name, file) if is_valid_image(file): valid_files.append((file, file_path)) else: logger.warning(f"Ignored non-image file: {file}") logger.info(f"Found {len(valid_files)} valid images.") return valid_files def process_image_files_multimodel(image_files: List[Tuple[str, str]], category: str, size: str, quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool, uid: str, selected_model: str, model_params: Optional[dict]) -> List[dict]: """Process image files with multimodel support""" final_results: List[dict] = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for file_name, file_path in image_files: job_id: Optional[str] = None if COL is not None: try: settings = { "size": size, "quality": quality, "sentiment": sentiment, "platform": platform, "num_images": num_images, "blur": bool(blur), "selected_model": selected_model, "model_params": model_params or {} } inputs = {"file_name": file_name, "mode": "img_or_zip_multimodel"} job_id = start_job( COL, type="variation_multimodel", created_by=uid, category=category or "general", inputs=inputs, settings=settings, user_prompt=user_prompt ) except Exception: logger.exception("Failed to start DB job; continuing without DB logging.") futures.append( executor.submit( process_single_image_multimodel, file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, job_id, selected_model, model_params, ) ) for future in as_completed(futures): try: result = future.result() if result: final_results.append(result) except Exception: logger.exception("Unhandled exception during image processing thread.") return final_results def process_single_image_multimodel(file_name: str, file_path: str, category: str, size: str, quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool, job_id: Optional[str], selected_model: str, model_params: Optional[dict]) -> Optional[dict]: """Process single image with multimodel support""" try: image_urls = generate_images_from_prompts_multimodel( file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur, selected_model, model_params ) if COL is not None and job_id: try: finish_job(COL, job_id, status=("completed" if image_urls else "failed"), outputs_urls=image_urls) except Exception: logger.exception("Failed to finish DB job.") if image_urls: return {"file_name": file_name, "urls": image_urls} return None except Exception as e: logger.error(f"Processing failed for {file_name}: {e}") if COL is not None and job_id: try: finish_job(COL, job_id, status="failed", outputs_urls=[]) except Exception: logger.exception("Also failed to mark DB job as failed.") return None def generate_images_from_prompts_multimodel( file_path: str, size: str, quality: str, category: str, sentiment: str, user_prompt: str, platform: str, num_images: int, blur: bool, selected_model: str, model_params: Optional[dict], ) -> List[str]: """Generate images using either GPT or multimodel approach""" image_urls: List[str] = [] def worker(i: int) -> Optional[str]: try: if is_gpt_model(selected_model): # Use existing GPT approach image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i) else: # Use multimodel approach image_bytes = generate_image_multimodel( file_path, selected_model, category, sentiment, user_prompt, platform, size, model_params, i ) if not image_bytes: return None image_with_metadata = meta_data_helper_function(image_bytes) s3_url = upload_image_to_r2(image_with_metadata) return s3_url except Exception as e: logger.error(f"Image generation failed: {e}") return None with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor: futures = [executor.submit(worker, i) for i in range(num_images)] for future in as_completed(futures): result = future.result() if result: image_urls.append(result) return image_urls def generate_image_multimodel(file_path: str, model_name: str, category: str, sentiment: str, user_prompt: str, platform: str, size: str, model_params: Optional[dict], variation_index: int) -> Optional[bytes]: """Generate image using multimodel approach""" try: # Convert image to base64 with open(file_path, 'rb') as f: image_data = f.read() base64_image = base64.b64encode(image_data).decode() # Use existing prompt service to get prompt variations prompt_variations = get_prompts( base64_image, category, user_prompt, sentiment, None ) # Select a prompt based on variation index if prompt_variations and len(prompt_variations) > 0: selected_prompt = prompt_variations[variation_index % len(prompt_variations)] else: # Fallback prompt selected_prompt = f"Generate a high-quality {category or 'advertising'} image. {user_prompt}" # Prepare model parameters all_params = get_all_parameters(model_name, model_params) # Only convert size to aspect ratio if no aspect ratio was provided by user if model_name == "google/nano-banana" and "aspect_ratio" in all_params: # If aspect_ratio is the default value, convert from size if all_params["aspect_ratio"] == "match_input_image": # This is the default converted_ratio = convert_size_to_aspect_ratio(size, model_name) all_params["aspect_ratio"] = converted_ratio logger.info(f"Converted size '{size}' to aspect ratio '{converted_ratio}' for {model_name}") else: logger.info(f"Using user-selected aspect ratio '{all_params['aspect_ratio']}' for {model_name}") logger.info(f"Final parameters for {model_name}: {all_params}") # Generate image with selected model generated_image_data = generate_image_with_model( model_name, selected_prompt, all_params, base64_image ) return generated_image_data except Exception as e: logger.error(f"Multimodel generation failed: {e}") return None