from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import List, Optional import json import time import uuid import logging import traceback import base64 import io from curl_cffi import CurlError from curl_cffi.requests import Session import os # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Obfuscated configuration _config = base64.b64decode(b'aGZfdmhRZmZYRWxJcXBvV25rU3lJUFJJUktYQ1hoekFFSmpLQw==').decode('utf-8') class HuggingFaceImageGenerator: def __init__(self, api_token=None): self.api_url = "https://router.huggingface.co/replicate/v1/models/qwen/qwen-image/predictions" self.api_token = api_token or _config self.session = Session() self.session.headers.update({ "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0" }) self.max_retries = 3 self.retry_delay = 2.0 def _retry_request(self, func, *args, **kwargs): """Retry mechanism for API requests""" last_exception = None for attempt in range(self.max_retries): try: return func(*args, **kwargs) except (CurlError, ConnectionError, TimeoutError) as e: last_exception = e if attempt < self.max_retries - 1: wait_time = self.retry_delay * (2 ** attempt) logger.warning(f"Image generation request failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}") time.sleep(wait_time) else: logger.error(f"Image generation failed after {self.max_retries} attempts: {e}") except Exception as e: logger.error(f"Unexpected error in image generation: {e}") last_exception = e break raise last_exception or Exception("Image generation failed after retries") def generate_image(self, prompt, **kwargs): """Generate image using Hugging Face Replicate API""" if not prompt or not prompt.strip(): logger.error("Empty prompt provided for image generation") return None def _generate(): # Map OpenAI parameters to Hugging Face parameters size = kwargs.get('size', '1024x1024') quality = kwargs.get('quality', 'standard') style = kwargs.get('style', 'vivid') # Convert size to aspect ratio (based on Qwen model available ratios) aspect_ratio_map = { '1024x1024': '1:1', # Square '1792x1024': '16:9', # Landscape '1024x1792': '9:16', # Portrait '1365x1024': '4:3', # 4:3 landscape '1024x1365': '3:4' # 3:4 portrait } aspect_ratio = aspect_ratio_map.get(size, '1:1') # Convert quality to image_size - default to optimize_for_quality image_size = "optimize_for_quality" # Prepare request data data = { "input": { "prompt": prompt, "go_fast": quality != "hd", "image_size": image_size, "aspect_ratio": aspect_ratio, "output_format": "webp", "enhance_prompt": style == "vivid", "output_quality": 100 if quality == "hd" else 80, "num_inference_steps": 50 if quality == "hd" else 30 } } logger.info(f"Generating image with prompt: {prompt[:50]}...") response = self.session.post( self.api_url, json=data, timeout=30 # Initial request timeout ) if response.status_code not in [200, 201]: raise Exception(f"Image generation failed: {response.status_code} - {response.text}") # Parse the prediction response try: prediction = response.json() logger.info(f"Prediction created: {prediction.get('id', 'unknown')}") # Try to use the stream URL first, then fall back to polling stream_url = prediction.get('urls', {}).get('stream') get_url = prediction.get('urls', {}).get('get') if stream_url: logger.info(f"Trying stream URL: {stream_url}") try: # Try to get the image directly from stream URL stream_response = self.session.get(stream_url, timeout=120) if stream_response.status_code == 200 and len(stream_response.content) > 1000: logger.info(f"Got image from stream: {len(stream_response.content)} bytes") return stream_response.content except Exception as e: logger.warning(f"Stream URL failed: {e}") # If stream doesn't work, try a different approach # Since we can't poll Replicate directly, let's try waiting and checking the stream URL multiple times if stream_url: logger.info("Waiting for image generation via stream URL...") max_attempts = 24 # 2 minutes with 5-second intervals for attempt in range(max_attempts): logger.info(f"Stream check attempt {attempt + 1}/{max_attempts}") try: stream_response = self.session.get(stream_url, timeout=30) # Check if we got actual image data if (stream_response.status_code == 200 and len(stream_response.content) > 1000 and stream_response.headers.get('content-type', '').startswith('image/')): logger.info(f"Image ready from stream: {len(stream_response.content)} bytes") return stream_response.content elif stream_response.status_code == 200: # Check if it's a JSON response with the actual image URL try: stream_data = stream_response.json() if 'output' in stream_data and stream_data['output']: output = stream_data['output'] if isinstance(output, list) and output: image_url = output[0] elif isinstance(output, str): image_url = output else: raise Exception("No valid image URL in stream response") logger.info(f"Got image URL from stream: {image_url}") # Download the actual image img_response = self.session.get(image_url, timeout=60) if img_response.status_code == 200: logger.info(f"Image downloaded: {len(img_response.content)} bytes") return img_response.content else: raise Exception(f"Failed to download image: {img_response.status_code}") except json.JSONDecodeError: pass # Not JSON, continue waiting except Exception as e: logger.debug(f"Stream attempt {attempt + 1} failed: {e}") if attempt < max_attempts - 1: time.sleep(5) raise Exception("Image generation timed out - no result from stream URL") else: raise Exception("No stream URL available for image generation") except json.JSONDecodeError: raise Exception("Invalid JSON response from prediction API") try: return self._retry_request(_generate) except Exception as e: logger.error(f"Image generation failed: {e}") logger.error(f"Prompt was: {prompt[:100]}...") return None def create_prediction_only(self, prompt, **kwargs): """Create prediction and return raw response without waiting for completion""" if not prompt or not prompt.strip(): logger.error("Empty prompt provided for prediction creation") return None def _create_prediction(): # Map OpenAI parameters to Hugging Face parameters size = kwargs.get('size', '1024x1024') quality = kwargs.get('quality', 'standard') style = kwargs.get('style', 'vivid') # Convert size to aspect ratio (based on Qwen model available ratios) aspect_ratio_map = { '1024x1024': '1:1', # Square '1792x1024': '16:9', # Landscape '1024x1792': '9:16', # Portrait '1365x1024': '4:3', # 4:3 landscape '1024x1365': '3:4' # 3:4 portrait } aspect_ratio = aspect_ratio_map.get(size, '1:1') # Convert quality to image_size - default to optimize_for_quality image_size = "optimize_for_quality" # Prepare request data data = { "input": { "prompt": prompt, "go_fast": quality != "hd", "image_size": image_size, "aspect_ratio": aspect_ratio, "output_format": "webp", "enhance_prompt": style == "vivid", "output_quality": 100 if quality == "hd" else 80, "num_inference_steps": 50 if quality == "hd" else 30 } } logger.info(f"Creating prediction with prompt: {prompt[:50]}...") response = self.session.post( self.api_url, json=data, timeout=30 ) if response.status_code not in [200, 201]: raise Exception(f"Prediction creation failed: {response.status_code} - {response.text}") # Return the raw prediction response prediction = response.json() logger.info(f"Prediction created successfully: {prediction.get('id', 'unknown')}") return prediction try: return self._retry_request(_create_prediction) except Exception as e: logger.error(f"Prediction creation failed: {e}") logger.error(f"Prompt was: {prompt[:100]}...") return None # OpenAI Compatible Image Generation Models class ImageGenerationRequest(BaseModel): prompt: str = Field(..., description="A text description of the desired image(s)") model: Optional[str] = Field(default="qwen-image", description="The model to use for image generation") n: Optional[int] = Field(default=1, ge=1, le=10, description="Number of images to generate") quality: Optional[str] = Field(default="standard", description="Quality of the image") response_format: Optional[str] = Field(default="url", description="Format of the response") size: Optional[str] = Field(default="1024x1024", description="Size of the generated images") style: Optional[str] = Field(default="vivid", description="Style of the generated images") user: Optional[str] = Field(default=None, description="A unique identifier for the user") class ImageObject(BaseModel): b64_json: Optional[str] = None url: Optional[str] = None revised_prompt: Optional[str] = None class ImageGenerationResponse(BaseModel): created: int data: List[ImageObject] # Global variables image_generator = None startup_time = time.time() request_count = 0 error_count = 0 from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): """Initialize the image generator on startup""" global image_generator try: logger.info("Initializing Hugging Face image generator...") image_generator = HuggingFaceImageGenerator() logger.info("Image generator initialized successfully") except Exception as e: logger.error(f"Failed to initialize image generator: {e}") image_generator = None yield # Server is running # Cleanup on shutdown (if needed) logger.info("Shutting down image generator...") # FastAPI App app = FastAPI( title="Qwen Image Generation API", version="1.0.0", description="OpenAI-compatible API for image generation using Qwen Image model via Hugging Face", lifespan=lifespan ) # Add CORS middleware try: from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) except ImportError: pass # Error handlers @app.exception_handler(HTTPException) async def http_exception_handler(request, exc: HTTPException): logger.error(f"HTTP error: {exc.status_code} - {exc.detail}") return JSONResponse( status_code=exc.status_code, content={ "error": { "message": exc.detail, "type": "api_error", "code": exc.status_code } } ) @app.exception_handler(Exception) async def global_exception_handler(request, exc): logger.error(f"Unexpected error: {exc}\n{traceback.format_exc()}") return JSONResponse( status_code=500, content={ "error": { "message": "Internal server error", "type": "server_error", "code": 500 } } ) @app.get("/") async def root(): return {"message": "Qwen Image Generation API", "version": "1.0.0", "status": "running", "model": "qwen-image"} @app.get("/v1/models") async def list_models(): """List available models - only Qwen Image""" return { "object": "list", "data": [ { "id": "qwen-image", "object": "model", "created": int(time.time()), "owned_by": "qwen" } ] } @app.get("/models") async def list_models_alt(): """Alternative endpoint for models""" return await list_models() @app.get("/health") async def health_check(): """Health check with system status""" global image_generator, startup_time, request_count, error_count uptime = time.time() - startup_time status = "healthy" # Check image generator status generator_status = "unknown" if image_generator is None: generator_status = "not_initialized" status = "degraded" else: generator_status = "ready" return { "status": status, "timestamp": int(time.time()), "uptime_seconds": int(uptime), "generator_status": generator_status, "stats": { "total_requests": request_count, "total_errors": error_count, "error_rate": error_count / max(request_count, 1) } } @app.post("/v1/images/generations") async def create_image(request: ImageGenerationRequest): """Return raw Hugging Face prediction response format""" global request_count, error_count, image_generator request_count += 1 request_id = f"img-{uuid.uuid4().hex[:8]}" logger.info(f"[{request_id}] Image generation request: prompt='{request.prompt[:50]}...', size={request.size}, quality={request.quality}") # Check if image generator is available if image_generator is None: error_count += 1 logger.error(f"[{request_id}] Image generator not initialized") raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") try: # Validate model if request.model and request.model != "qwen-image": raise HTTPException(status_code=400, detail="Only 'qwen-image' model is supported") # Validate parameters - updated to match Qwen model available aspect ratios valid_sizes = ['1024x1024', '1792x1024', '1024x1792', '1365x1024', '1024x1365'] if request.size not in valid_sizes: raise HTTPException(status_code=400, detail=f"Invalid size. Must be one of: {valid_sizes}") valid_qualities = ['standard', 'hd'] if request.quality not in valid_qualities: raise HTTPException(status_code=400, detail=f"Invalid quality. Must be one of: {valid_qualities}") # Create the prediction and return raw response raw_prediction = image_generator.create_prediction_only( request.prompt, size=request.size, quality=request.quality, style=request.style ) if raw_prediction is None: error_count += 1 logger.error(f"[{request_id}] Failed to create prediction") raise HTTPException(status_code=500, detail="Failed to create image prediction") logger.info(f"[{request_id}] Prediction created: {raw_prediction.get('id', 'unknown')}") # Return the exact same format as Hugging Face backend return raw_prediction except HTTPException: error_count += 1 raise except Exception as e: error_count += 1 logger.error(f"[{request_id}] Unexpected error: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail="Internal server error occurred") @app.get("/v1/predictions/{prediction_id}") async def get_prediction(prediction_id: str): """Get prediction status - returns raw Hugging Face format""" global image_generator if image_generator is None: raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") try: # Try to get prediction status from stream URL approach # Since we can't access Replicate API directly, we'll return a simulated response # In a real implementation, you'd need Replicate API access logger.info(f"Getting prediction status for: {prediction_id}") # For now, return a basic response indicating the limitation return { "id": prediction_id, "status": "processing", "message": "Use the stream URL from the original prediction response to get the image" } except Exception as e: logger.error(f"Error getting prediction {prediction_id}: {e}") raise HTTPException(status_code=500, detail="Failed to get prediction status") @app.post("/v1/images/generations/openai") async def create_image_openai_format(request: ImageGenerationRequest): """OpenAI-compatible endpoint that returns OpenAI format""" global request_count, error_count, image_generator request_count += 1 request_id = f"img-{uuid.uuid4().hex[:8]}" logger.info(f"[{request_id}] OpenAI format image generation: prompt='{request.prompt[:50]}...'") if image_generator is None: error_count += 1 raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") try: # Generate image using the full generation method image_data = image_generator.generate_image( request.prompt, size=request.size, quality=request.quality, style=request.style ) if image_data is None: error_count += 1 raise HTTPException(status_code=500, detail="Failed to generate image") logger.info(f"[{request_id}] Image generated successfully, size: {len(image_data)} bytes") # Return OpenAI format images = [] if request.response_format == "b64_json": b64_data = base64.b64encode(image_data).decode('utf-8') images.append(ImageObject( b64_json=b64_data, revised_prompt=request.prompt )) else: # url format b64_data = base64.b64encode(image_data).decode('utf-8') data_url = f"data:image/webp;base64,{b64_data}" images.append(ImageObject( url=data_url, revised_prompt=request.prompt )) response = ImageGenerationResponse( created=int(time.time()), data=images ) return response except HTTPException: error_count += 1 raise except Exception as e: error_count += 1 logger.error(f"[{request_id}] Unexpected error: {e}") raise HTTPException(status_code=500, detail="Internal server error occurred") if __name__ == "__main__": try: import uvicorn port = int(os.getenv("PORT", 7860)) # Hugging Face default port host = os.getenv("HOST", "0.0.0.0") logger.info(f"Starting image generation server on {host}:{port}") uvicorn.run( app, # Use the app object directly instead of string import host=host, port=port, reload=False, log_level="info", access_log=True ) except ImportError: logger.error("uvicorn not installed. Install with: pip install uvicorn") except Exception as e: logger.error(f"Failed to start server: {e}")