""" Image generation service supporting both Replicate and OpenAI APIs. Supports multiple image generation models with automatic fallback. """ import os import sys import time import random import base64 from typing import Optional, Tuple, Dict, Any # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import replicate import httpx from openai import AsyncOpenAI import asyncio from config import settings # Model registry for PsyAdGenesis MODEL_REGISTRY: Dict[str, Dict[str, Any]] = { "nano-banana": { "id": "google/nano-banana", "param_name": "aspect_ratio", "uses_dimensions": False, }, "nano-banana-pro": { "id": "google/nano-banana-pro", "param_name": "aspect_ratio", "uses_dimensions": False, }, "imagen-4": { "id": "google/imagen-4", "param_name": "aspect_ratio", "uses_dimensions": False, }, "imagen-4-ultra": { "id": "google/imagen-4-ultra", "param_name": "aspect_ratio", "uses_dimensions": False, }, "z-image-turbo": { "id": "prunaai/z-image-turbo", "param_name": "height", "uses_dimensions": True, }, "seedream-3": { "id": "bytedance/seedream-3", "param_name": "aspect_ratio", "uses_dimensions": False, }, "recraft-v3": { "id": "recraft-ai/recraft-v3", "param_name": "aspect_ratio", "uses_dimensions": False, }, "photon": { "id": "luma/photon", "param_name": "aspect_ratio", "uses_dimensions": False, }, "ideogram-v3": { "id": "ideogram-ai/ideogram-v3-quality", "param_name": "aspect_ratio", "uses_dimensions": False, }, "seedream-4.5": { "id": "bytedance/seedream-4.5", "param_name": "aspect_ratio", "uses_dimensions": False, }, "flux-2-max": { "id": "black-forest-labs/flux-2-max", "param_name": "aspect_ratio", "uses_dimensions": False, }, "qwen-image": { "id": "prunaai/z-image-turboqwen/qwen-image", "param_name": "aspect_ratio", "uses_dimensions": False, }, "p-image": { "id": "prunaai/p-image", "param_name": "aspect_ratio", "uses_dimensions": False, }, "ideogram-v3-turbo": { "id": "ideogram-ai/ideogram-v3-turbo", "param_name": "aspect_ratio", "uses_dimensions": False, }, } # Default model fallback chain (same as original project) DEFAULT_FALLBACK_MODELS = ["nano-banana", "imagen-4", "z-image-turbo"] RETRY_ATTEMPTS = 3 REQUEST_TIMEOUT = 60 def convert_dimensions_to_aspect_ratio(width: int, height: int) -> str: """Convert width/height to aspect ratio string.""" if width == height: return "1:1" elif width > height: ratio = width / height if abs(ratio - 16/9) < 0.1: return "16:9" elif abs(ratio - 4/3) < 0.1: return "4:3" elif abs(ratio - 3/2) < 0.1: return "3:2" else: return "16:9" else: ratio = height / width if abs(ratio - 16/9) < 0.1: return "9:16" elif abs(ratio - 4/3) < 0.1: return "3:4" elif abs(ratio - 3/2) < 0.1: return "2:3" else: return "9:16" class ImageService: """Image generation service supporting Replicate and OpenAI APIs.""" def __init__(self): """Initialize image generation clients.""" self.api_token = settings.replicate_api_token if not self.api_token: raise ValueError("REPLICATE_API_TOKEN not configured") self.client = replicate.Client(api_token=self.api_token) self.default_model = settings.image_model # Initialize OpenAI client for gpt-image-1.5 support self.openai_client = None if hasattr(settings, 'openai_api_key') and settings.openai_api_key: self.openai_client = AsyncOpenAI(api_key=settings.openai_api_key) def _fetch_image(self, url: str) -> Optional[bytes]: """Fetch image from URL with retry logic.""" for attempt in range(RETRY_ATTEMPTS): try: response = httpx.get( url, timeout=REQUEST_TIMEOUT, headers={ "Cache-Control": "no-cache", "User-Agent": "AdGeneratorLite/1.0", }, ) response.raise_for_status() return response.content except Exception as e: if attempt == RETRY_ATTEMPTS - 1: print(f"Failed to fetch image from {url}: {e}") return None time.sleep(1) # Sync fetch so sync sleep is fine here or use asyncio.sleep return None async def load_image( self, image_id: Optional[str] = None, image_url: Optional[str] = None, image_bytes: Optional[bytes] = None, filepath: Optional[str] = None, ) -> Optional[bytes]: """ Load image from various sources (database ID, URL, bytes, or filepath). Args: image_id: Database ID of ad creative (will fetch from database) image_url: Direct URL to image image_bytes: Raw image bytes filepath: Local file path Returns: Image bytes or None if failed """ # Priority: bytes > filepath > URL > database ID if image_bytes: return image_bytes if filepath: try: with open(filepath, "rb") as f: return f.read() except Exception as e: print(f"Failed to load image from filepath {filepath}: {e}") return None if image_url: return self._fetch_image(image_url) if image_id: # Try to fetch from database try: from services.database import db_service ad = await db_service.get_ad_creative(image_id) if ad: # Try image_url first if ad.get("image_url"): return self._fetch_image(ad["image_url"]) # Try local file if ad.get("image_filename"): filepath = os.path.join(settings.output_dir, ad["image_filename"]) if os.path.exists(filepath): with open(filepath, "rb") as f: return f.read() except Exception as e: print(f"Failed to load image from database ID {image_id}: {e}") return None return None def _extract_image_from_output(self, output) -> Tuple[Optional[bytes], Optional[str]]: """ Extract image bytes and URL from Replicate output. Returns: Tuple of (image_bytes, image_url) """ try: # Handle file-like object if hasattr(output, 'read'): url = getattr(output, 'url', None) return output.read(), url # Handle URL attribute if hasattr(output, 'url'): url = output.url return self._fetch_image(url), url # Handle list of outputs if isinstance(output, list) and len(output) > 0: first = output[0] url = getattr(first, "url", str(first)) return self._fetch_image(url), url # Handle string URL if isinstance(output, str): return self._fetch_image(output), output print(f"Unknown output type: {type(output)}") return None, None except Exception as e: print(f"Error extracting image from output: {e}") return None, None async def generate( self, prompt: str, width: int = 1024, height: int = 1024, seed: Optional[int] = None, model_key: Optional[str] = None, image_url: Optional[str] = None, ) -> Tuple[bytes, str, Optional[str]]: """ Generate an image using Replicate API (official library). Args: prompt: Image generation prompt width: Image width height: Image height seed: Random seed for uniqueness (if None, generates random) model_key: Which model to use (default from config) image_url: Optional image URL for image-to-image generation Returns: Tuple of (image_bytes, model_used, image_url) """ # Use random seed if not provided (ensures unique images) if seed is None: seed = random.randint(1, 2147483647) # Get models to try (fallback chain) model_key = model_key or self.default_model # Check if using OpenAI image generation API (gpt-image-1.5) if model_key == "gpt-image-1.5": if not self.openai_client: raise ValueError("OpenAI API key not configured for gpt-image-1.5") try: print("Generating image with gpt-image-1.5") size_str = f"{width}x{height}" # Use a timeout for OpenAI image generation result = await asyncio.wait_for( self.openai_client.images.generate( model="gpt-image-1.5", prompt=prompt, quality="auto", background="auto", moderation="auto", size=size_str, output_format="jpeg", output_compression=90, ), timeout=120.0 ) if result.data and len(result.data) > 0: image_base64 = result.data[0].b64_json if image_base64: image_bytes = base64.b64decode(image_base64) print("Successfully generated image with gpt-image-1.5") return image_bytes, "gpt-image-1.5", None raise Exception("No image data returned from OpenAI API") except asyncio.TimeoutError: print("Timed out generating image with gpt-image-1.5") raise Exception("Timeout: Image generation with gpt-image-1.5 took too long") except Exception as e: print(f"OpenAI image generation failed: {e}") print("Falling back to Replicate models...") model_key = None # Will use default fallback chain # Build fallback chain for Replicate models if model_key and model_key in MODEL_REGISTRY: models_to_try = [model_key] + [m for m in DEFAULT_FALLBACK_MODELS if m != model_key] else: models_to_try = DEFAULT_FALLBACK_MODELS last_error = None for current_model in models_to_try: cfg = MODEL_REGISTRY.get(current_model) if not cfg: continue # Build input parameters input_data = {"prompt": prompt} # Add image URL for image-to-image if provided (for nano-banana and nano-banana-pro) # Google Nano Banana models expect image_input as an array if image_url and current_model in ["nano-banana", "nano-banana-pro"]: input_data["image_input"] = [image_url] # Note: guidance_scale may not be supported by nano-banana on Replicate # Relying on minimal prompts to preserve the original image # Add seed if supported input_data["seed"] = seed # Some models use width/height, others use aspect_ratio if cfg.get("uses_dimensions"): input_data["width"] = width input_data["height"] = height else: aspect_ratio = convert_dimensions_to_aspect_ratio(width, height) input_data[cfg["param_name"]] = aspect_ratio # Retry logic for attempt in range(RETRY_ATTEMPTS): try: print(f"Generating image with {current_model} (attempt {attempt + 1})") # Use official Replicate client - offload blocking call to thread and add timeout try: # Wrap the blocking call in a thread and add a 3-minute timeout output = await asyncio.wait_for( asyncio.to_thread(self.client.run, cfg["id"], input=input_data), timeout=180.0 # 3 minutes timeout ) except asyncio.TimeoutError: print(f"Timed out generating image with {current_model} after 180 seconds") raise Exception(f"Timeout: Image generation with {current_model} took too long") # Extract image bytes and URL image_bytes, image_url = self._extract_image_from_output(output) if image_bytes: print(f"Successfully generated image with {current_model}") return image_bytes, current_model, image_url except Exception as e: last_error = e if attempt < RETRY_ATTEMPTS - 1: print(f"Attempt {attempt + 1} failed: {e}, retrying...") await asyncio.sleep(2 ** attempt) # Exponential backoff (use await for async sleep) continue # Model failed, try next in fallback chain print(f"Model {current_model} failed, trying next...") # All models failed raise Exception(f"All image generation models failed. Last error: {last_error}") async def generate_with_retry( self, prompt: str, width: int = 1024, height: int = 1024, max_retries: int = 2, ) -> Tuple[bytes, str, Optional[str]]: """ Generate image with automatic retry on failure. Uses different random seed each attempt for variety. Returns: Tuple of (image_bytes, model_used, image_url) """ last_error = None for attempt in range(max_retries + 1): try: seed = random.randint(1, 2147483647) return await self.generate( prompt=prompt, width=width, height=height, seed=seed, ) except Exception as e: last_error = e if attempt < max_retries: await asyncio.sleep(2) continue raise last_error # Global instance image_service = ImageService()