| | from fastapi import FastAPI, HTTPException |
| | from fastapi.responses import JSONResponse |
| | from pydantic import BaseModel, Field |
| | from typing import List, Optional |
| | import json |
| | import time |
| | import uuid |
| | import logging |
| | import traceback |
| | import base64 |
| | import io |
| | from curl_cffi import CurlError |
| | from curl_cffi.requests import Session |
| | import os |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | _config = base64.b64decode(b'aGZfdmhRZmZYRWxJcXBvV25rU3lJUFJJUktYQ1hoekFFSmpLQw==').decode('utf-8') |
| |
|
| | class HuggingFaceImageGenerator: |
| | def __init__(self, api_token=None): |
| | self.api_url = "https://router.huggingface.co/replicate/v1/models/qwen/qwen-image/predictions" |
| | self.api_token = api_token or _config |
| | self.session = Session() |
| | self.session.headers.update({ |
| | "Authorization": f"Bearer {self.api_token}", |
| | "Content-Type": "application/json", |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0" |
| | }) |
| | self.max_retries = 3 |
| | self.retry_delay = 2.0 |
| | |
| | def _retry_request(self, func, *args, **kwargs): |
| | """Retry mechanism for API requests""" |
| | last_exception = None |
| | |
| | for attempt in range(self.max_retries): |
| | try: |
| | return func(*args, **kwargs) |
| | except (CurlError, ConnectionError, TimeoutError) as e: |
| | last_exception = e |
| | if attempt < self.max_retries - 1: |
| | wait_time = self.retry_delay * (2 ** attempt) |
| | logger.warning(f"Image generation request failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}") |
| | time.sleep(wait_time) |
| | else: |
| | logger.error(f"Image generation failed after {self.max_retries} attempts: {e}") |
| | except Exception as e: |
| | logger.error(f"Unexpected error in image generation: {e}") |
| | last_exception = e |
| | break |
| | |
| | raise last_exception or Exception("Image generation failed after retries") |
| | |
| | def generate_image(self, prompt, **kwargs): |
| | """Generate image using Hugging Face Replicate API""" |
| | if not prompt or not prompt.strip(): |
| | logger.error("Empty prompt provided for image generation") |
| | return None |
| | |
| | def _generate(): |
| | |
| | size = kwargs.get('size', '1024x1024') |
| | quality = kwargs.get('quality', 'standard') |
| | style = kwargs.get('style', 'vivid') |
| | |
| | |
| | aspect_ratio_map = { |
| | '1024x1024': '1:1', |
| | '1792x1024': '16:9', |
| | '1024x1792': '9:16', |
| | '1365x1024': '4:3', |
| | '1024x1365': '3:4' |
| | } |
| | aspect_ratio = aspect_ratio_map.get(size, '1:1') |
| |
|
| | |
| | image_size = "optimize_for_quality" |
| | |
| | |
| | data = { |
| | "input": { |
| | "prompt": prompt, |
| | "go_fast": quality != "hd", |
| | "image_size": image_size, |
| | "aspect_ratio": aspect_ratio, |
| | "output_format": "webp", |
| | "enhance_prompt": style == "vivid", |
| | "output_quality": 100 if quality == "hd" else 80, |
| | "num_inference_steps": 50 if quality == "hd" else 30 |
| | } |
| | } |
| | |
| | logger.info(f"Generating image with prompt: {prompt[:50]}...") |
| | |
| | response = self.session.post( |
| | self.api_url, |
| | json=data, |
| | timeout=30 |
| | ) |
| |
|
| | if response.status_code not in [200, 201]: |
| | raise Exception(f"Image generation failed: {response.status_code} - {response.text}") |
| |
|
| | |
| | try: |
| | prediction = response.json() |
| | logger.info(f"Prediction created: {prediction.get('id', 'unknown')}") |
| |
|
| | |
| | stream_url = prediction.get('urls', {}).get('stream') |
| | get_url = prediction.get('urls', {}).get('get') |
| |
|
| | if stream_url: |
| | logger.info(f"Trying stream URL: {stream_url}") |
| | try: |
| | |
| | stream_response = self.session.get(stream_url, timeout=120) |
| | if stream_response.status_code == 200 and len(stream_response.content) > 1000: |
| | logger.info(f"Got image from stream: {len(stream_response.content)} bytes") |
| | return stream_response.content |
| | except Exception as e: |
| | logger.warning(f"Stream URL failed: {e}") |
| |
|
| | |
| | |
| | if stream_url: |
| | logger.info("Waiting for image generation via stream URL...") |
| | max_attempts = 24 |
| |
|
| | for attempt in range(max_attempts): |
| | logger.info(f"Stream check attempt {attempt + 1}/{max_attempts}") |
| |
|
| | try: |
| | stream_response = self.session.get(stream_url, timeout=30) |
| |
|
| | |
| | if (stream_response.status_code == 200 and |
| | len(stream_response.content) > 1000 and |
| | stream_response.headers.get('content-type', '').startswith('image/')): |
| |
|
| | logger.info(f"Image ready from stream: {len(stream_response.content)} bytes") |
| | return stream_response.content |
| |
|
| | elif stream_response.status_code == 200: |
| | |
| | try: |
| | stream_data = stream_response.json() |
| | if 'output' in stream_data and stream_data['output']: |
| | output = stream_data['output'] |
| | if isinstance(output, list) and output: |
| | image_url = output[0] |
| | elif isinstance(output, str): |
| | image_url = output |
| | else: |
| | raise Exception("No valid image URL in stream response") |
| |
|
| | logger.info(f"Got image URL from stream: {image_url}") |
| |
|
| | |
| | img_response = self.session.get(image_url, timeout=60) |
| | if img_response.status_code == 200: |
| | logger.info(f"Image downloaded: {len(img_response.content)} bytes") |
| | return img_response.content |
| | else: |
| | raise Exception(f"Failed to download image: {img_response.status_code}") |
| | except json.JSONDecodeError: |
| | pass |
| |
|
| | except Exception as e: |
| | logger.debug(f"Stream attempt {attempt + 1} failed: {e}") |
| |
|
| | if attempt < max_attempts - 1: |
| | time.sleep(5) |
| |
|
| | raise Exception("Image generation timed out - no result from stream URL") |
| |
|
| | else: |
| | raise Exception("No stream URL available for image generation") |
| |
|
| | except json.JSONDecodeError: |
| | raise Exception("Invalid JSON response from prediction API") |
| | |
| | try: |
| | return self._retry_request(_generate) |
| | except Exception as e: |
| | logger.error(f"Image generation failed: {e}") |
| | logger.error(f"Prompt was: {prompt[:100]}...") |
| | return None |
| |
|
| | def create_prediction_only(self, prompt, **kwargs): |
| | """Create prediction and return raw response without waiting for completion""" |
| | if not prompt or not prompt.strip(): |
| | logger.error("Empty prompt provided for prediction creation") |
| | return None |
| |
|
| | def _create_prediction(): |
| | |
| | size = kwargs.get('size', '1024x1024') |
| | quality = kwargs.get('quality', 'standard') |
| | style = kwargs.get('style', 'vivid') |
| |
|
| | |
| | aspect_ratio_map = { |
| | '1024x1024': '1:1', |
| | '1792x1024': '16:9', |
| | '1024x1792': '9:16', |
| | '1365x1024': '4:3', |
| | '1024x1365': '3:4' |
| | } |
| | aspect_ratio = aspect_ratio_map.get(size, '1:1') |
| |
|
| | |
| | image_size = "optimize_for_quality" |
| |
|
| | |
| | data = { |
| | "input": { |
| | "prompt": prompt, |
| | "go_fast": quality != "hd", |
| | "image_size": image_size, |
| | "aspect_ratio": aspect_ratio, |
| | "output_format": "webp", |
| | "enhance_prompt": style == "vivid", |
| | "output_quality": 100 if quality == "hd" else 80, |
| | "num_inference_steps": 50 if quality == "hd" else 30 |
| | } |
| | } |
| |
|
| | logger.info(f"Creating prediction with prompt: {prompt[:50]}...") |
| |
|
| | response = self.session.post( |
| | self.api_url, |
| | json=data, |
| | timeout=30 |
| | ) |
| |
|
| | if response.status_code not in [200, 201]: |
| | raise Exception(f"Prediction creation failed: {response.status_code} - {response.text}") |
| |
|
| | |
| | prediction = response.json() |
| | logger.info(f"Prediction created successfully: {prediction.get('id', 'unknown')}") |
| | return prediction |
| |
|
| | try: |
| | return self._retry_request(_create_prediction) |
| | except Exception as e: |
| | logger.error(f"Prediction creation failed: {e}") |
| | logger.error(f"Prompt was: {prompt[:100]}...") |
| | return None |
| |
|
| | |
| | class ImageGenerationRequest(BaseModel): |
| | prompt: str = Field(..., description="A text description of the desired image(s)") |
| | model: Optional[str] = Field(default="qwen-image", description="The model to use for image generation") |
| | n: Optional[int] = Field(default=1, ge=1, le=10, description="Number of images to generate") |
| | quality: Optional[str] = Field(default="standard", description="Quality of the image") |
| | response_format: Optional[str] = Field(default="url", description="Format of the response") |
| | size: Optional[str] = Field(default="1024x1024", description="Size of the generated images") |
| | style: Optional[str] = Field(default="vivid", description="Style of the generated images") |
| | user: Optional[str] = Field(default=None, description="A unique identifier for the user") |
| |
|
| | class ImageObject(BaseModel): |
| | b64_json: Optional[str] = None |
| | url: Optional[str] = None |
| | revised_prompt: Optional[str] = None |
| |
|
| | class ImageGenerationResponse(BaseModel): |
| | created: int |
| | data: List[ImageObject] |
| |
|
| | |
| | image_generator = None |
| | startup_time = time.time() |
| | request_count = 0 |
| | error_count = 0 |
| |
|
| | from contextlib import asynccontextmanager |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | """Initialize the image generator on startup""" |
| | global image_generator |
| | try: |
| | logger.info("Initializing Hugging Face image generator...") |
| | image_generator = HuggingFaceImageGenerator() |
| | logger.info("Image generator initialized successfully") |
| | except Exception as e: |
| | logger.error(f"Failed to initialize image generator: {e}") |
| | image_generator = None |
| |
|
| | yield |
| |
|
| | |
| | logger.info("Shutting down image generator...") |
| |
|
| | |
| | app = FastAPI( |
| | title="Qwen Image Generation API", |
| | version="1.0.0", |
| | description="OpenAI-compatible API for image generation using Qwen Image model via Hugging Face", |
| | lifespan=lifespan |
| | ) |
| |
|
| | |
| | try: |
| | from fastapi.middleware.cors import CORSMiddleware |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| | except ImportError: |
| | pass |
| |
|
| | |
| | @app.exception_handler(HTTPException) |
| | async def http_exception_handler(request, exc: HTTPException): |
| | logger.error(f"HTTP error: {exc.status_code} - {exc.detail}") |
| | return JSONResponse( |
| | status_code=exc.status_code, |
| | content={ |
| | "error": { |
| | "message": exc.detail, |
| | "type": "api_error", |
| | "code": exc.status_code |
| | } |
| | } |
| | ) |
| |
|
| | @app.exception_handler(Exception) |
| | async def global_exception_handler(request, exc): |
| | logger.error(f"Unexpected error: {exc}\n{traceback.format_exc()}") |
| | return JSONResponse( |
| | status_code=500, |
| | content={ |
| | "error": { |
| | "message": "Internal server error", |
| | "type": "server_error", |
| | "code": 500 |
| | } |
| | } |
| | ) |
| |
|
| | @app.get("/") |
| | async def root(): |
| | return {"message": "Qwen Image Generation API", "version": "1.0.0", "status": "running", "model": "qwen-image"} |
| |
|
| | @app.get("/v1/models") |
| | async def list_models(): |
| | """List available models - only Qwen Image""" |
| | return { |
| | "object": "list", |
| | "data": [ |
| | { |
| | "id": "qwen-image", |
| | "object": "model", |
| | "created": int(time.time()), |
| | "owned_by": "qwen" |
| | } |
| | ] |
| | } |
| |
|
| | @app.get("/models") |
| | async def list_models_alt(): |
| | """Alternative endpoint for models""" |
| | return await list_models() |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | """Health check with system status""" |
| | global image_generator, startup_time, request_count, error_count |
| | |
| | uptime = time.time() - startup_time |
| | status = "healthy" |
| | |
| | |
| | generator_status = "unknown" |
| | if image_generator is None: |
| | generator_status = "not_initialized" |
| | status = "degraded" |
| | else: |
| | generator_status = "ready" |
| | |
| | return { |
| | "status": status, |
| | "timestamp": int(time.time()), |
| | "uptime_seconds": int(uptime), |
| | "generator_status": generator_status, |
| | "stats": { |
| | "total_requests": request_count, |
| | "total_errors": error_count, |
| | "error_rate": error_count / max(request_count, 1) |
| | } |
| | } |
| |
|
| | @app.post("/v1/images/generations") |
| | async def create_image(request: ImageGenerationRequest): |
| | """Return raw Hugging Face prediction response format""" |
| | global request_count, error_count, image_generator |
| |
|
| | request_count += 1 |
| | request_id = f"img-{uuid.uuid4().hex[:8]}" |
| | logger.info(f"[{request_id}] Image generation request: prompt='{request.prompt[:50]}...', size={request.size}, quality={request.quality}") |
| |
|
| | |
| | if image_generator is None: |
| | error_count += 1 |
| | logger.error(f"[{request_id}] Image generator not initialized") |
| | raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
| |
|
| | try: |
| | |
| | if request.model and request.model != "qwen-image": |
| | raise HTTPException(status_code=400, detail="Only 'qwen-image' model is supported") |
| |
|
| | |
| | valid_sizes = ['1024x1024', '1792x1024', '1024x1792', '1365x1024', '1024x1365'] |
| | if request.size not in valid_sizes: |
| | raise HTTPException(status_code=400, detail=f"Invalid size. Must be one of: {valid_sizes}") |
| |
|
| | valid_qualities = ['standard', 'hd'] |
| | if request.quality not in valid_qualities: |
| | raise HTTPException(status_code=400, detail=f"Invalid quality. Must be one of: {valid_qualities}") |
| |
|
| | |
| | raw_prediction = image_generator.create_prediction_only( |
| | request.prompt, |
| | size=request.size, |
| | quality=request.quality, |
| | style=request.style |
| | ) |
| |
|
| | if raw_prediction is None: |
| | error_count += 1 |
| | logger.error(f"[{request_id}] Failed to create prediction") |
| | raise HTTPException(status_code=500, detail="Failed to create image prediction") |
| |
|
| | logger.info(f"[{request_id}] Prediction created: {raw_prediction.get('id', 'unknown')}") |
| |
|
| | |
| | return raw_prediction |
| |
|
| | except HTTPException: |
| | error_count += 1 |
| | raise |
| | except Exception as e: |
| | error_count += 1 |
| | logger.error(f"[{request_id}] Unexpected error: {e}\n{traceback.format_exc()}") |
| | raise HTTPException(status_code=500, detail="Internal server error occurred") |
| |
|
| | @app.get("/v1/predictions/{prediction_id}") |
| | async def get_prediction(prediction_id: str): |
| | """Get prediction status - returns raw Hugging Face format""" |
| | global image_generator |
| |
|
| | if image_generator is None: |
| | raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
| |
|
| | try: |
| | |
| | |
| | |
| |
|
| | logger.info(f"Getting prediction status for: {prediction_id}") |
| |
|
| | |
| | return { |
| | "id": prediction_id, |
| | "status": "processing", |
| | "message": "Use the stream URL from the original prediction response to get the image" |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Error getting prediction {prediction_id}: {e}") |
| | raise HTTPException(status_code=500, detail="Failed to get prediction status") |
| |
|
| | @app.post("/v1/images/generations/openai") |
| | async def create_image_openai_format(request: ImageGenerationRequest): |
| | """OpenAI-compatible endpoint that returns OpenAI format""" |
| | global request_count, error_count, image_generator |
| |
|
| | request_count += 1 |
| | request_id = f"img-{uuid.uuid4().hex[:8]}" |
| | logger.info(f"[{request_id}] OpenAI format image generation: prompt='{request.prompt[:50]}...'") |
| |
|
| | if image_generator is None: |
| | error_count += 1 |
| | raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
| |
|
| | try: |
| | |
| | image_data = image_generator.generate_image( |
| | request.prompt, |
| | size=request.size, |
| | quality=request.quality, |
| | style=request.style |
| | ) |
| |
|
| | if image_data is None: |
| | error_count += 1 |
| | raise HTTPException(status_code=500, detail="Failed to generate image") |
| |
|
| | logger.info(f"[{request_id}] Image generated successfully, size: {len(image_data)} bytes") |
| |
|
| | |
| | images = [] |
| | if request.response_format == "b64_json": |
| | b64_data = base64.b64encode(image_data).decode('utf-8') |
| | images.append(ImageObject( |
| | b64_json=b64_data, |
| | revised_prompt=request.prompt |
| | )) |
| | else: |
| | b64_data = base64.b64encode(image_data).decode('utf-8') |
| | data_url = f"data:image/webp;base64,{b64_data}" |
| | images.append(ImageObject( |
| | url=data_url, |
| | revised_prompt=request.prompt |
| | )) |
| |
|
| | response = ImageGenerationResponse( |
| | created=int(time.time()), |
| | data=images |
| | ) |
| |
|
| | return response |
| |
|
| | except HTTPException: |
| | error_count += 1 |
| | raise |
| | except Exception as e: |
| | error_count += 1 |
| | logger.error(f"[{request_id}] Unexpected error: {e}") |
| | raise HTTPException(status_code=500, detail="Internal server error occurred") |
| |
|
| | if __name__ == "__main__": |
| | try: |
| | import uvicorn |
| | port = int(os.getenv("PORT", 7860)) |
| | host = os.getenv("HOST", "0.0.0.0") |
| |
|
| | logger.info(f"Starting image generation server on {host}:{port}") |
| | uvicorn.run( |
| | app, |
| | host=host, |
| | port=port, |
| | reload=False, |
| | log_level="info", |
| | access_log=True |
| | ) |
| | except ImportError: |
| | logger.error("uvicorn not installed. Install with: pip install uvicorn") |
| | except Exception as e: |
| | logger.error(f"Failed to start server: {e}") |