Spaces:
Running
Running
| import os, zipfile, tempfile, logging, base64 | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Tuple, Optional | |
| from PIL import Image | |
| import io | |
| from generator_function.image_function import generate_image | |
| from prompt.prompt_services import get_prompts | |
| from multimodel_services.replicate_generation_service import generate_image_with_model, convert_size_to_aspect_ratio | |
| from multimodel_services.model_manager import is_gpt_model, get_all_parameters | |
| from helpers_function.helper_meta_data import meta_data_helper_function | |
| from helpers_function.helpers import upload_image_to_r2 | |
| from helpers_function.helpers import is_valid_image | |
| from database.connections import get_results_collection as get_collection | |
| from database.operations import start_job, finish_job | |
| from util.session_state import current_uid | |
| logger = logging.getLogger(__name__) | |
| COL = get_collection() | |
| def _resolve_user_id() -> str: | |
| return current_uid() or os.getenv("DEFAULT_USER_ID", "anonymous") | |
| def process_zip_and_generate_images_multimodel( | |
| zip_path: str, | |
| category: str, | |
| size: str, | |
| quality: str, | |
| user_prompt: str, | |
| sentiment: str, | |
| platform: str, | |
| num_images: int, | |
| demo_mode: bool, | |
| existing_images: Optional[List[str]], | |
| blur: bool, | |
| uid: str, | |
| selected_model: str = "gpt_default", | |
| model_params: Optional[dict] = None, | |
| ) -> List[str]: | |
| """Enhanced image processor that supports both GPT and multimodel approaches""" | |
| num_images = 1 if demo_mode else num_images | |
| try: | |
| if zip_path.endswith(".zip"): | |
| temp_dir = extract_zip_file(zip_path) | |
| image_files = get_valid_image_files(temp_dir) | |
| else: | |
| image_files = [(os.path.basename(zip_path), zip_path)] | |
| results = process_image_files_multimodel( | |
| image_files, category, size, quality, user_prompt, sentiment, platform, | |
| num_images, blur, uid, selected_model, model_params | |
| ) | |
| all_urls = [url for entry in results for url in entry["urls"]] | |
| seen, deduped = set(), [] | |
| for u in all_urls: | |
| if u not in seen: | |
| seen.add(u); deduped.append(u) | |
| return (existing_images or []) + deduped | |
| except Exception: | |
| logger.exception(f"Global error during processing file: {zip_path}") | |
| return existing_images or [] | |
| def extract_zip_file(zip_path: str) -> tempfile.TemporaryDirectory: | |
| temp_dir = tempfile.TemporaryDirectory() | |
| with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
| zip_ref.extractall(temp_dir.name) | |
| logger.info(f"Extracted ZIP file: {zip_path}") | |
| return temp_dir | |
| def get_valid_image_files(temp_dir: tempfile.TemporaryDirectory) -> List[Tuple[str, str]]: | |
| valid_files: List[Tuple[str, str]] = [] | |
| for file in os.listdir(temp_dir.name): | |
| if "__MACOSX" in file: continue | |
| file_path = os.path.join(temp_dir.name, file) | |
| if is_valid_image(file): | |
| valid_files.append((file, file_path)) | |
| else: | |
| logger.warning(f"Ignored non-image file: {file}") | |
| logger.info(f"Found {len(valid_files)} valid images.") | |
| return valid_files | |
| def process_image_files_multimodel(image_files: List[Tuple[str, str]], category: str, size: str, | |
| quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool, | |
| uid: str, selected_model: str, model_params: Optional[dict]) -> List[dict]: | |
| """Process image files with multimodel support""" | |
| final_results: List[dict] = [] | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| futures = [] | |
| for file_name, file_path in image_files: | |
| job_id: Optional[str] = None | |
| if COL is not None: | |
| try: | |
| settings = { | |
| "size": size, "quality": quality, "sentiment": sentiment, | |
| "platform": platform, "num_images": num_images, "blur": bool(blur), | |
| "selected_model": selected_model, "model_params": model_params or {} | |
| } | |
| inputs = {"file_name": file_name, "mode": "img_or_zip_multimodel"} | |
| job_id = start_job( | |
| COL, | |
| type="variation_multimodel", | |
| created_by=uid, | |
| category=category or "general", | |
| inputs=inputs, | |
| settings=settings, | |
| user_prompt=user_prompt | |
| ) | |
| except Exception: | |
| logger.exception("Failed to start DB job; continuing without DB logging.") | |
| futures.append( | |
| executor.submit( | |
| process_single_image_multimodel, | |
| file_name, file_path, category, size, quality, user_prompt, sentiment, | |
| platform, num_images, blur, job_id, selected_model, model_params, | |
| ) | |
| ) | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| if result: final_results.append(result) | |
| except Exception: | |
| logger.exception("Unhandled exception during image processing thread.") | |
| return final_results | |
| def process_single_image_multimodel(file_name: str, file_path: str, category: str, size: str, | |
| quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool, | |
| job_id: Optional[str], selected_model: str, model_params: Optional[dict]) -> Optional[dict]: | |
| """Process single image with multimodel support""" | |
| try: | |
| image_urls = generate_images_from_prompts_multimodel( | |
| file_path, size, quality, category, sentiment, user_prompt, platform, | |
| num_images, blur, selected_model, model_params | |
| ) | |
| if COL is not None and job_id: | |
| try: | |
| finish_job(COL, job_id, status=("completed" if image_urls else "failed"), outputs_urls=image_urls) | |
| except Exception: | |
| logger.exception("Failed to finish DB job.") | |
| if image_urls: | |
| return {"file_name": file_name, "urls": image_urls} | |
| return None | |
| except Exception as e: | |
| logger.error(f"Processing failed for {file_name}: {e}") | |
| if COL is not None and job_id: | |
| try: | |
| finish_job(COL, job_id, status="failed", outputs_urls=[]) | |
| except Exception: | |
| logger.exception("Also failed to mark DB job as failed.") | |
| return None | |
| def generate_images_from_prompts_multimodel( | |
| file_path: str, size: str, quality: str, category: str, sentiment: str, user_prompt: str, | |
| platform: str, num_images: int, blur: bool, selected_model: str, model_params: Optional[dict], | |
| ) -> List[str]: | |
| """Generate images using either GPT or multimodel approach""" | |
| image_urls: List[str] = [] | |
| def worker(i: int) -> Optional[str]: | |
| try: | |
| if is_gpt_model(selected_model): | |
| # Use existing GPT approach | |
| image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i) | |
| else: | |
| # Use multimodel approach | |
| image_bytes = generate_image_multimodel( | |
| file_path, selected_model, category, sentiment, user_prompt, | |
| platform, size, model_params, i | |
| ) | |
| if not image_bytes: return None | |
| image_with_metadata = meta_data_helper_function(image_bytes) | |
| s3_url = upload_image_to_r2(image_with_metadata) | |
| return s3_url | |
| except Exception as e: | |
| logger.error(f"Image generation failed: {e}") | |
| return None | |
| with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor: | |
| futures = [executor.submit(worker, i) for i in range(num_images)] | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result: image_urls.append(result) | |
| return image_urls | |
| def generate_image_multimodel(file_path: str, model_name: str, category: str, sentiment: str, | |
| user_prompt: str, platform: str, size: str, model_params: Optional[dict], | |
| variation_index: int) -> Optional[bytes]: | |
| """Generate image using multimodel approach""" | |
| try: | |
| # Convert image to base64 | |
| with open(file_path, 'rb') as f: | |
| image_data = f.read() | |
| base64_image = base64.b64encode(image_data).decode() | |
| # Use existing prompt service to get prompt variations | |
| prompt_variations = get_prompts( | |
| base64_image, category, user_prompt, sentiment, None | |
| ) | |
| # Select a prompt based on variation index | |
| if prompt_variations and len(prompt_variations) > 0: | |
| selected_prompt = prompt_variations[variation_index % len(prompt_variations)] | |
| else: | |
| # Fallback prompt | |
| selected_prompt = f"Generate a high-quality {category or 'advertising'} image. {user_prompt}" | |
| # Prepare model parameters | |
| all_params = get_all_parameters(model_name, model_params) | |
| # Only convert size to aspect ratio if no aspect ratio was provided by user | |
| if model_name == "google/nano-banana" and "aspect_ratio" in all_params: | |
| # If aspect_ratio is the default value, convert from size | |
| if all_params["aspect_ratio"] == "match_input_image": # This is the default | |
| converted_ratio = convert_size_to_aspect_ratio(size, model_name) | |
| all_params["aspect_ratio"] = converted_ratio | |
| logger.info(f"Converted size '{size}' to aspect ratio '{converted_ratio}' for {model_name}") | |
| else: | |
| logger.info(f"Using user-selected aspect ratio '{all_params['aspect_ratio']}' for {model_name}") | |
| logger.info(f"Final parameters for {model_name}: {all_params}") | |
| # Generate image with selected model | |
| generated_image_data = generate_image_with_model( | |
| model_name, selected_prompt, all_params, base64_image | |
| ) | |
| return generated_image_data | |
| except Exception as e: | |
| logger.error(f"Multimodel generation failed: {e}") | |
| return None | |