sushilideaclan01's picture
feat: Convert image generation and specific service calls to asynchronous operations with timeouts using `AsyncOpenAI` and `asyncio.to_thread`.
dd2695a
"""
Image generation service supporting both Replicate and OpenAI APIs.
Supports multiple image generation models with automatic fallback.
"""
import os
import sys
import time
import random
import base64
from typing import Optional, Tuple, Dict, Any
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import replicate
import httpx
from openai import AsyncOpenAI
import asyncio
from config import settings
# Model registry for PsyAdGenesis
MODEL_REGISTRY: Dict[str, Dict[str, Any]] = {
"nano-banana": {
"id": "google/nano-banana",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"nano-banana-pro": {
"id": "google/nano-banana-pro",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"imagen-4": {
"id": "google/imagen-4",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"imagen-4-ultra": {
"id": "google/imagen-4-ultra",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"z-image-turbo": {
"id": "prunaai/z-image-turbo",
"param_name": "height",
"uses_dimensions": True,
},
"seedream-3": {
"id": "bytedance/seedream-3",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"recraft-v3": {
"id": "recraft-ai/recraft-v3",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"photon": {
"id": "luma/photon",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"ideogram-v3": {
"id": "ideogram-ai/ideogram-v3-quality",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"seedream-4.5": {
"id": "bytedance/seedream-4.5",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"flux-2-max": {
"id": "black-forest-labs/flux-2-max",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"qwen-image": {
"id": "prunaai/z-image-turboqwen/qwen-image",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"p-image": {
"id": "prunaai/p-image",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
"ideogram-v3-turbo": {
"id": "ideogram-ai/ideogram-v3-turbo",
"param_name": "aspect_ratio",
"uses_dimensions": False,
},
}
# Default model fallback chain (same as original project)
DEFAULT_FALLBACK_MODELS = ["nano-banana", "imagen-4", "z-image-turbo"]
RETRY_ATTEMPTS = 3
REQUEST_TIMEOUT = 60
def convert_dimensions_to_aspect_ratio(width: int, height: int) -> str:
"""Convert width/height to aspect ratio string."""
if width == height:
return "1:1"
elif width > height:
ratio = width / height
if abs(ratio - 16/9) < 0.1:
return "16:9"
elif abs(ratio - 4/3) < 0.1:
return "4:3"
elif abs(ratio - 3/2) < 0.1:
return "3:2"
else:
return "16:9"
else:
ratio = height / width
if abs(ratio - 16/9) < 0.1:
return "9:16"
elif abs(ratio - 4/3) < 0.1:
return "3:4"
elif abs(ratio - 3/2) < 0.1:
return "2:3"
else:
return "9:16"
class ImageService:
"""Image generation service supporting Replicate and OpenAI APIs."""
def __init__(self):
"""Initialize image generation clients."""
self.api_token = settings.replicate_api_token
if not self.api_token:
raise ValueError("REPLICATE_API_TOKEN not configured")
self.client = replicate.Client(api_token=self.api_token)
self.default_model = settings.image_model
# Initialize OpenAI client for gpt-image-1.5 support
self.openai_client = None
if hasattr(settings, 'openai_api_key') and settings.openai_api_key:
self.openai_client = AsyncOpenAI(api_key=settings.openai_api_key)
def _fetch_image(self, url: str) -> Optional[bytes]:
"""Fetch image from URL with retry logic."""
for attempt in range(RETRY_ATTEMPTS):
try:
response = httpx.get(
url,
timeout=REQUEST_TIMEOUT,
headers={
"Cache-Control": "no-cache",
"User-Agent": "AdGeneratorLite/1.0",
},
)
response.raise_for_status()
return response.content
except Exception as e:
if attempt == RETRY_ATTEMPTS - 1:
print(f"Failed to fetch image from {url}: {e}")
return None
time.sleep(1) # Sync fetch so sync sleep is fine here or use asyncio.sleep
return None
async def load_image(
self,
image_id: Optional[str] = None,
image_url: Optional[str] = None,
image_bytes: Optional[bytes] = None,
filepath: Optional[str] = None,
) -> Optional[bytes]:
"""
Load image from various sources (database ID, URL, bytes, or filepath).
Args:
image_id: Database ID of ad creative (will fetch from database)
image_url: Direct URL to image
image_bytes: Raw image bytes
filepath: Local file path
Returns:
Image bytes or None if failed
"""
# Priority: bytes > filepath > URL > database ID
if image_bytes:
return image_bytes
if filepath:
try:
with open(filepath, "rb") as f:
return f.read()
except Exception as e:
print(f"Failed to load image from filepath {filepath}: {e}")
return None
if image_url:
return self._fetch_image(image_url)
if image_id:
# Try to fetch from database
try:
from services.database import db_service
ad = await db_service.get_ad_creative(image_id)
if ad:
# Try image_url first
if ad.get("image_url"):
return self._fetch_image(ad["image_url"])
# Try local file
if ad.get("image_filename"):
filepath = os.path.join(settings.output_dir, ad["image_filename"])
if os.path.exists(filepath):
with open(filepath, "rb") as f:
return f.read()
except Exception as e:
print(f"Failed to load image from database ID {image_id}: {e}")
return None
return None
def _extract_image_from_output(self, output) -> Tuple[Optional[bytes], Optional[str]]:
"""
Extract image bytes and URL from Replicate output.
Returns:
Tuple of (image_bytes, image_url)
"""
try:
# Handle file-like object
if hasattr(output, 'read'):
url = getattr(output, 'url', None)
return output.read(), url
# Handle URL attribute
if hasattr(output, 'url'):
url = output.url
return self._fetch_image(url), url
# Handle list of outputs
if isinstance(output, list) and len(output) > 0:
first = output[0]
url = getattr(first, "url", str(first))
return self._fetch_image(url), url
# Handle string URL
if isinstance(output, str):
return self._fetch_image(output), output
print(f"Unknown output type: {type(output)}")
return None, None
except Exception as e:
print(f"Error extracting image from output: {e}")
return None, None
async def generate(
self,
prompt: str,
width: int = 1024,
height: int = 1024,
seed: Optional[int] = None,
model_key: Optional[str] = None,
image_url: Optional[str] = None,
) -> Tuple[bytes, str, Optional[str]]:
"""
Generate an image using Replicate API (official library).
Args:
prompt: Image generation prompt
width: Image width
height: Image height
seed: Random seed for uniqueness (if None, generates random)
model_key: Which model to use (default from config)
image_url: Optional image URL for image-to-image generation
Returns:
Tuple of (image_bytes, model_used, image_url)
"""
# Use random seed if not provided (ensures unique images)
if seed is None:
seed = random.randint(1, 2147483647)
# Get models to try (fallback chain)
model_key = model_key or self.default_model
# Check if using OpenAI image generation API (gpt-image-1.5)
if model_key == "gpt-image-1.5":
if not self.openai_client:
raise ValueError("OpenAI API key not configured for gpt-image-1.5")
try:
print("Generating image with gpt-image-1.5")
size_str = f"{width}x{height}"
# Use a timeout for OpenAI image generation
result = await asyncio.wait_for(
self.openai_client.images.generate(
model="gpt-image-1.5",
prompt=prompt,
quality="auto",
background="auto",
moderation="auto",
size=size_str,
output_format="jpeg",
output_compression=90,
),
timeout=120.0
)
if result.data and len(result.data) > 0:
image_base64 = result.data[0].b64_json
if image_base64:
image_bytes = base64.b64decode(image_base64)
print("Successfully generated image with gpt-image-1.5")
return image_bytes, "gpt-image-1.5", None
raise Exception("No image data returned from OpenAI API")
except asyncio.TimeoutError:
print("Timed out generating image with gpt-image-1.5")
raise Exception("Timeout: Image generation with gpt-image-1.5 took too long")
except Exception as e:
print(f"OpenAI image generation failed: {e}")
print("Falling back to Replicate models...")
model_key = None # Will use default fallback chain
# Build fallback chain for Replicate models
if model_key and model_key in MODEL_REGISTRY:
models_to_try = [model_key] + [m for m in DEFAULT_FALLBACK_MODELS if m != model_key]
else:
models_to_try = DEFAULT_FALLBACK_MODELS
last_error = None
for current_model in models_to_try:
cfg = MODEL_REGISTRY.get(current_model)
if not cfg:
continue
# Build input parameters
input_data = {"prompt": prompt}
# Add image URL for image-to-image if provided (for nano-banana and nano-banana-pro)
# Google Nano Banana models expect image_input as an array
if image_url and current_model in ["nano-banana", "nano-banana-pro"]:
input_data["image_input"] = [image_url]
# Note: guidance_scale may not be supported by nano-banana on Replicate
# Relying on minimal prompts to preserve the original image
# Add seed if supported
input_data["seed"] = seed
# Some models use width/height, others use aspect_ratio
if cfg.get("uses_dimensions"):
input_data["width"] = width
input_data["height"] = height
else:
aspect_ratio = convert_dimensions_to_aspect_ratio(width, height)
input_data[cfg["param_name"]] = aspect_ratio
# Retry logic
for attempt in range(RETRY_ATTEMPTS):
try:
print(f"Generating image with {current_model} (attempt {attempt + 1})")
# Use official Replicate client - offload blocking call to thread and add timeout
try:
# Wrap the blocking call in a thread and add a 3-minute timeout
output = await asyncio.wait_for(
asyncio.to_thread(self.client.run, cfg["id"], input=input_data),
timeout=180.0 # 3 minutes timeout
)
except asyncio.TimeoutError:
print(f"Timed out generating image with {current_model} after 180 seconds")
raise Exception(f"Timeout: Image generation with {current_model} took too long")
# Extract image bytes and URL
image_bytes, image_url = self._extract_image_from_output(output)
if image_bytes:
print(f"Successfully generated image with {current_model}")
return image_bytes, current_model, image_url
except Exception as e:
last_error = e
if attempt < RETRY_ATTEMPTS - 1:
print(f"Attempt {attempt + 1} failed: {e}, retrying...")
await asyncio.sleep(2 ** attempt) # Exponential backoff (use await for async sleep)
continue
# Model failed, try next in fallback chain
print(f"Model {current_model} failed, trying next...")
# All models failed
raise Exception(f"All image generation models failed. Last error: {last_error}")
async def generate_with_retry(
self,
prompt: str,
width: int = 1024,
height: int = 1024,
max_retries: int = 2,
) -> Tuple[bytes, str, Optional[str]]:
"""
Generate image with automatic retry on failure.
Uses different random seed each attempt for variety.
Returns:
Tuple of (image_bytes, model_used, image_url)
"""
last_error = None
for attempt in range(max_retries + 1):
try:
seed = random.randint(1, 2147483647)
return await self.generate(
prompt=prompt,
width=width,
height=height,
seed=seed,
)
except Exception as e:
last_error = e
if attempt < max_retries:
await asyncio.sleep(2)
continue
raise last_error
# Global instance
image_service = ImageService()