feat: Convert image generation and specific service calls to asynchronous operations with timeouts using `AsyncOpenAI` and `asyncio.to_thread`.
dd2695a
| """ | |
| Image generation service supporting both Replicate and OpenAI APIs. | |
| Supports multiple image generation models with automatic fallback. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import random | |
| import base64 | |
| from typing import Optional, Tuple, Dict, Any | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| import replicate | |
| import httpx | |
| from openai import AsyncOpenAI | |
| import asyncio | |
| from config import settings | |
| # Model registry for PsyAdGenesis | |
| MODEL_REGISTRY: Dict[str, Dict[str, Any]] = { | |
| "nano-banana": { | |
| "id": "google/nano-banana", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "nano-banana-pro": { | |
| "id": "google/nano-banana-pro", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "imagen-4": { | |
| "id": "google/imagen-4", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "imagen-4-ultra": { | |
| "id": "google/imagen-4-ultra", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "z-image-turbo": { | |
| "id": "prunaai/z-image-turbo", | |
| "param_name": "height", | |
| "uses_dimensions": True, | |
| }, | |
| "seedream-3": { | |
| "id": "bytedance/seedream-3", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "recraft-v3": { | |
| "id": "recraft-ai/recraft-v3", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "photon": { | |
| "id": "luma/photon", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "ideogram-v3": { | |
| "id": "ideogram-ai/ideogram-v3-quality", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "seedream-4.5": { | |
| "id": "bytedance/seedream-4.5", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "flux-2-max": { | |
| "id": "black-forest-labs/flux-2-max", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "qwen-image": { | |
| "id": "prunaai/z-image-turboqwen/qwen-image", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "p-image": { | |
| "id": "prunaai/p-image", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| "ideogram-v3-turbo": { | |
| "id": "ideogram-ai/ideogram-v3-turbo", | |
| "param_name": "aspect_ratio", | |
| "uses_dimensions": False, | |
| }, | |
| } | |
| # Default model fallback chain (same as original project) | |
| DEFAULT_FALLBACK_MODELS = ["nano-banana", "imagen-4", "z-image-turbo"] | |
| RETRY_ATTEMPTS = 3 | |
| REQUEST_TIMEOUT = 60 | |
| def convert_dimensions_to_aspect_ratio(width: int, height: int) -> str: | |
| """Convert width/height to aspect ratio string.""" | |
| if width == height: | |
| return "1:1" | |
| elif width > height: | |
| ratio = width / height | |
| if abs(ratio - 16/9) < 0.1: | |
| return "16:9" | |
| elif abs(ratio - 4/3) < 0.1: | |
| return "4:3" | |
| elif abs(ratio - 3/2) < 0.1: | |
| return "3:2" | |
| else: | |
| return "16:9" | |
| else: | |
| ratio = height / width | |
| if abs(ratio - 16/9) < 0.1: | |
| return "9:16" | |
| elif abs(ratio - 4/3) < 0.1: | |
| return "3:4" | |
| elif abs(ratio - 3/2) < 0.1: | |
| return "2:3" | |
| else: | |
| return "9:16" | |
| class ImageService: | |
| """Image generation service supporting Replicate and OpenAI APIs.""" | |
| def __init__(self): | |
| """Initialize image generation clients.""" | |
| self.api_token = settings.replicate_api_token | |
| if not self.api_token: | |
| raise ValueError("REPLICATE_API_TOKEN not configured") | |
| self.client = replicate.Client(api_token=self.api_token) | |
| self.default_model = settings.image_model | |
| # Initialize OpenAI client for gpt-image-1.5 support | |
| self.openai_client = None | |
| if hasattr(settings, 'openai_api_key') and settings.openai_api_key: | |
| self.openai_client = AsyncOpenAI(api_key=settings.openai_api_key) | |
| def _fetch_image(self, url: str) -> Optional[bytes]: | |
| """Fetch image from URL with retry logic.""" | |
| for attempt in range(RETRY_ATTEMPTS): | |
| try: | |
| response = httpx.get( | |
| url, | |
| timeout=REQUEST_TIMEOUT, | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "User-Agent": "AdGeneratorLite/1.0", | |
| }, | |
| ) | |
| response.raise_for_status() | |
| return response.content | |
| except Exception as e: | |
| if attempt == RETRY_ATTEMPTS - 1: | |
| print(f"Failed to fetch image from {url}: {e}") | |
| return None | |
| time.sleep(1) # Sync fetch so sync sleep is fine here or use asyncio.sleep | |
| return None | |
| async def load_image( | |
| self, | |
| image_id: Optional[str] = None, | |
| image_url: Optional[str] = None, | |
| image_bytes: Optional[bytes] = None, | |
| filepath: Optional[str] = None, | |
| ) -> Optional[bytes]: | |
| """ | |
| Load image from various sources (database ID, URL, bytes, or filepath). | |
| Args: | |
| image_id: Database ID of ad creative (will fetch from database) | |
| image_url: Direct URL to image | |
| image_bytes: Raw image bytes | |
| filepath: Local file path | |
| Returns: | |
| Image bytes or None if failed | |
| """ | |
| # Priority: bytes > filepath > URL > database ID | |
| if image_bytes: | |
| return image_bytes | |
| if filepath: | |
| try: | |
| with open(filepath, "rb") as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"Failed to load image from filepath {filepath}: {e}") | |
| return None | |
| if image_url: | |
| return self._fetch_image(image_url) | |
| if image_id: | |
| # Try to fetch from database | |
| try: | |
| from services.database import db_service | |
| ad = await db_service.get_ad_creative(image_id) | |
| if ad: | |
| # Try image_url first | |
| if ad.get("image_url"): | |
| return self._fetch_image(ad["image_url"]) | |
| # Try local file | |
| if ad.get("image_filename"): | |
| filepath = os.path.join(settings.output_dir, ad["image_filename"]) | |
| if os.path.exists(filepath): | |
| with open(filepath, "rb") as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"Failed to load image from database ID {image_id}: {e}") | |
| return None | |
| return None | |
| def _extract_image_from_output(self, output) -> Tuple[Optional[bytes], Optional[str]]: | |
| """ | |
| Extract image bytes and URL from Replicate output. | |
| Returns: | |
| Tuple of (image_bytes, image_url) | |
| """ | |
| try: | |
| # Handle file-like object | |
| if hasattr(output, 'read'): | |
| url = getattr(output, 'url', None) | |
| return output.read(), url | |
| # Handle URL attribute | |
| if hasattr(output, 'url'): | |
| url = output.url | |
| return self._fetch_image(url), url | |
| # Handle list of outputs | |
| if isinstance(output, list) and len(output) > 0: | |
| first = output[0] | |
| url = getattr(first, "url", str(first)) | |
| return self._fetch_image(url), url | |
| # Handle string URL | |
| if isinstance(output, str): | |
| return self._fetch_image(output), output | |
| print(f"Unknown output type: {type(output)}") | |
| return None, None | |
| except Exception as e: | |
| print(f"Error extracting image from output: {e}") | |
| return None, None | |
| async def generate( | |
| self, | |
| prompt: str, | |
| width: int = 1024, | |
| height: int = 1024, | |
| seed: Optional[int] = None, | |
| model_key: Optional[str] = None, | |
| image_url: Optional[str] = None, | |
| ) -> Tuple[bytes, str, Optional[str]]: | |
| """ | |
| Generate an image using Replicate API (official library). | |
| Args: | |
| prompt: Image generation prompt | |
| width: Image width | |
| height: Image height | |
| seed: Random seed for uniqueness (if None, generates random) | |
| model_key: Which model to use (default from config) | |
| image_url: Optional image URL for image-to-image generation | |
| Returns: | |
| Tuple of (image_bytes, model_used, image_url) | |
| """ | |
| # Use random seed if not provided (ensures unique images) | |
| if seed is None: | |
| seed = random.randint(1, 2147483647) | |
| # Get models to try (fallback chain) | |
| model_key = model_key or self.default_model | |
| # Check if using OpenAI image generation API (gpt-image-1.5) | |
| if model_key == "gpt-image-1.5": | |
| if not self.openai_client: | |
| raise ValueError("OpenAI API key not configured for gpt-image-1.5") | |
| try: | |
| print("Generating image with gpt-image-1.5") | |
| size_str = f"{width}x{height}" | |
| # Use a timeout for OpenAI image generation | |
| result = await asyncio.wait_for( | |
| self.openai_client.images.generate( | |
| model="gpt-image-1.5", | |
| prompt=prompt, | |
| quality="auto", | |
| background="auto", | |
| moderation="auto", | |
| size=size_str, | |
| output_format="jpeg", | |
| output_compression=90, | |
| ), | |
| timeout=120.0 | |
| ) | |
| if result.data and len(result.data) > 0: | |
| image_base64 = result.data[0].b64_json | |
| if image_base64: | |
| image_bytes = base64.b64decode(image_base64) | |
| print("Successfully generated image with gpt-image-1.5") | |
| return image_bytes, "gpt-image-1.5", None | |
| raise Exception("No image data returned from OpenAI API") | |
| except asyncio.TimeoutError: | |
| print("Timed out generating image with gpt-image-1.5") | |
| raise Exception("Timeout: Image generation with gpt-image-1.5 took too long") | |
| except Exception as e: | |
| print(f"OpenAI image generation failed: {e}") | |
| print("Falling back to Replicate models...") | |
| model_key = None # Will use default fallback chain | |
| # Build fallback chain for Replicate models | |
| if model_key and model_key in MODEL_REGISTRY: | |
| models_to_try = [model_key] + [m for m in DEFAULT_FALLBACK_MODELS if m != model_key] | |
| else: | |
| models_to_try = DEFAULT_FALLBACK_MODELS | |
| last_error = None | |
| for current_model in models_to_try: | |
| cfg = MODEL_REGISTRY.get(current_model) | |
| if not cfg: | |
| continue | |
| # Build input parameters | |
| input_data = {"prompt": prompt} | |
| # Add image URL for image-to-image if provided (for nano-banana and nano-banana-pro) | |
| # Google Nano Banana models expect image_input as an array | |
| if image_url and current_model in ["nano-banana", "nano-banana-pro"]: | |
| input_data["image_input"] = [image_url] | |
| # Note: guidance_scale may not be supported by nano-banana on Replicate | |
| # Relying on minimal prompts to preserve the original image | |
| # Add seed if supported | |
| input_data["seed"] = seed | |
| # Some models use width/height, others use aspect_ratio | |
| if cfg.get("uses_dimensions"): | |
| input_data["width"] = width | |
| input_data["height"] = height | |
| else: | |
| aspect_ratio = convert_dimensions_to_aspect_ratio(width, height) | |
| input_data[cfg["param_name"]] = aspect_ratio | |
| # Retry logic | |
| for attempt in range(RETRY_ATTEMPTS): | |
| try: | |
| print(f"Generating image with {current_model} (attempt {attempt + 1})") | |
| # Use official Replicate client - offload blocking call to thread and add timeout | |
| try: | |
| # Wrap the blocking call in a thread and add a 3-minute timeout | |
| output = await asyncio.wait_for( | |
| asyncio.to_thread(self.client.run, cfg["id"], input=input_data), | |
| timeout=180.0 # 3 minutes timeout | |
| ) | |
| except asyncio.TimeoutError: | |
| print(f"Timed out generating image with {current_model} after 180 seconds") | |
| raise Exception(f"Timeout: Image generation with {current_model} took too long") | |
| # Extract image bytes and URL | |
| image_bytes, image_url = self._extract_image_from_output(output) | |
| if image_bytes: | |
| print(f"Successfully generated image with {current_model}") | |
| return image_bytes, current_model, image_url | |
| except Exception as e: | |
| last_error = e | |
| if attempt < RETRY_ATTEMPTS - 1: | |
| print(f"Attempt {attempt + 1} failed: {e}, retrying...") | |
| await asyncio.sleep(2 ** attempt) # Exponential backoff (use await for async sleep) | |
| continue | |
| # Model failed, try next in fallback chain | |
| print(f"Model {current_model} failed, trying next...") | |
| # All models failed | |
| raise Exception(f"All image generation models failed. Last error: {last_error}") | |
| async def generate_with_retry( | |
| self, | |
| prompt: str, | |
| width: int = 1024, | |
| height: int = 1024, | |
| max_retries: int = 2, | |
| ) -> Tuple[bytes, str, Optional[str]]: | |
| """ | |
| Generate image with automatic retry on failure. | |
| Uses different random seed each attempt for variety. | |
| Returns: | |
| Tuple of (image_bytes, model_used, image_url) | |
| """ | |
| last_error = None | |
| for attempt in range(max_retries + 1): | |
| try: | |
| seed = random.randint(1, 2147483647) | |
| return await self.generate( | |
| prompt=prompt, | |
| width=width, | |
| height=height, | |
| seed=seed, | |
| ) | |
| except Exception as e: | |
| last_error = e | |
| if attempt < max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| raise last_error | |
| # Global instance | |
| image_service = ImageService() | |