Qwenimg / image_server.py
Samfy001's picture
Update image_server.py
588c29f verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
import json
import time
import uuid
import logging
import traceback
import base64
import io
from curl_cffi import CurlError
from curl_cffi.requests import Session
import os
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Obfuscated configuration
_config = base64.b64decode(b'aGZfdmhRZmZYRWxJcXBvV25rU3lJUFJJUktYQ1hoekFFSmpLQw==').decode('utf-8')
class HuggingFaceImageGenerator:
def __init__(self, api_token=None):
self.api_url = "https://router.huggingface.co/replicate/v1/models/qwen/qwen-image/predictions"
self.api_token = api_token or _config
self.session = Session()
self.session.headers.update({
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0"
})
self.max_retries = 3
self.retry_delay = 2.0
def _retry_request(self, func, *args, **kwargs):
"""Retry mechanism for API requests"""
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except (CurlError, ConnectionError, TimeoutError) as e:
last_exception = e
if attempt < self.max_retries - 1:
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Image generation request failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}")
time.sleep(wait_time)
else:
logger.error(f"Image generation failed after {self.max_retries} attempts: {e}")
except Exception as e:
logger.error(f"Unexpected error in image generation: {e}")
last_exception = e
break
raise last_exception or Exception("Image generation failed after retries")
def generate_image(self, prompt, **kwargs):
"""Generate image using Hugging Face Replicate API"""
if not prompt or not prompt.strip():
logger.error("Empty prompt provided for image generation")
return None
def _generate():
# Map OpenAI parameters to Hugging Face parameters
size = kwargs.get('size', '1024x1024')
quality = kwargs.get('quality', 'standard')
style = kwargs.get('style', 'vivid')
# Convert size to aspect ratio (based on Qwen model available ratios)
aspect_ratio_map = {
'1024x1024': '1:1', # Square
'1792x1024': '16:9', # Landscape
'1024x1792': '9:16', # Portrait
'1365x1024': '4:3', # 4:3 landscape
'1024x1365': '3:4' # 3:4 portrait
}
aspect_ratio = aspect_ratio_map.get(size, '1:1')
# Convert quality to image_size - default to optimize_for_quality
image_size = "optimize_for_quality"
# Prepare request data
data = {
"input": {
"prompt": prompt,
"go_fast": quality != "hd",
"image_size": image_size,
"aspect_ratio": aspect_ratio,
"output_format": "webp",
"enhance_prompt": style == "vivid",
"output_quality": 100 if quality == "hd" else 80,
"num_inference_steps": 50 if quality == "hd" else 30
}
}
logger.info(f"Generating image with prompt: {prompt[:50]}...")
response = self.session.post(
self.api_url,
json=data,
timeout=30 # Initial request timeout
)
if response.status_code not in [200, 201]:
raise Exception(f"Image generation failed: {response.status_code} - {response.text}")
# Parse the prediction response
try:
prediction = response.json()
logger.info(f"Prediction created: {prediction.get('id', 'unknown')}")
# Try to use the stream URL first, then fall back to polling
stream_url = prediction.get('urls', {}).get('stream')
get_url = prediction.get('urls', {}).get('get')
if stream_url:
logger.info(f"Trying stream URL: {stream_url}")
try:
# Try to get the image directly from stream URL
stream_response = self.session.get(stream_url, timeout=120)
if stream_response.status_code == 200 and len(stream_response.content) > 1000:
logger.info(f"Got image from stream: {len(stream_response.content)} bytes")
return stream_response.content
except Exception as e:
logger.warning(f"Stream URL failed: {e}")
# If stream doesn't work, try a different approach
# Since we can't poll Replicate directly, let's try waiting and checking the stream URL multiple times
if stream_url:
logger.info("Waiting for image generation via stream URL...")
max_attempts = 24 # 2 minutes with 5-second intervals
for attempt in range(max_attempts):
logger.info(f"Stream check attempt {attempt + 1}/{max_attempts}")
try:
stream_response = self.session.get(stream_url, timeout=30)
# Check if we got actual image data
if (stream_response.status_code == 200 and
len(stream_response.content) > 1000 and
stream_response.headers.get('content-type', '').startswith('image/')):
logger.info(f"Image ready from stream: {len(stream_response.content)} bytes")
return stream_response.content
elif stream_response.status_code == 200:
# Check if it's a JSON response with the actual image URL
try:
stream_data = stream_response.json()
if 'output' in stream_data and stream_data['output']:
output = stream_data['output']
if isinstance(output, list) and output:
image_url = output[0]
elif isinstance(output, str):
image_url = output
else:
raise Exception("No valid image URL in stream response")
logger.info(f"Got image URL from stream: {image_url}")
# Download the actual image
img_response = self.session.get(image_url, timeout=60)
if img_response.status_code == 200:
logger.info(f"Image downloaded: {len(img_response.content)} bytes")
return img_response.content
else:
raise Exception(f"Failed to download image: {img_response.status_code}")
except json.JSONDecodeError:
pass # Not JSON, continue waiting
except Exception as e:
logger.debug(f"Stream attempt {attempt + 1} failed: {e}")
if attempt < max_attempts - 1:
time.sleep(5)
raise Exception("Image generation timed out - no result from stream URL")
else:
raise Exception("No stream URL available for image generation")
except json.JSONDecodeError:
raise Exception("Invalid JSON response from prediction API")
try:
return self._retry_request(_generate)
except Exception as e:
logger.error(f"Image generation failed: {e}")
logger.error(f"Prompt was: {prompt[:100]}...")
return None
def create_prediction_only(self, prompt, **kwargs):
"""Create prediction and return raw response without waiting for completion"""
if not prompt or not prompt.strip():
logger.error("Empty prompt provided for prediction creation")
return None
def _create_prediction():
# Map OpenAI parameters to Hugging Face parameters
size = kwargs.get('size', '1024x1024')
quality = kwargs.get('quality', 'standard')
style = kwargs.get('style', 'vivid')
# Convert size to aspect ratio (based on Qwen model available ratios)
aspect_ratio_map = {
'1024x1024': '1:1', # Square
'1792x1024': '16:9', # Landscape
'1024x1792': '9:16', # Portrait
'1365x1024': '4:3', # 4:3 landscape
'1024x1365': '3:4' # 3:4 portrait
}
aspect_ratio = aspect_ratio_map.get(size, '1:1')
# Convert quality to image_size - default to optimize_for_quality
image_size = "optimize_for_quality"
# Prepare request data
data = {
"input": {
"prompt": prompt,
"go_fast": quality != "hd",
"image_size": image_size,
"aspect_ratio": aspect_ratio,
"output_format": "webp",
"enhance_prompt": style == "vivid",
"output_quality": 100 if quality == "hd" else 80,
"num_inference_steps": 50 if quality == "hd" else 30
}
}
logger.info(f"Creating prediction with prompt: {prompt[:50]}...")
response = self.session.post(
self.api_url,
json=data,
timeout=30
)
if response.status_code not in [200, 201]:
raise Exception(f"Prediction creation failed: {response.status_code} - {response.text}")
# Return the raw prediction response
prediction = response.json()
logger.info(f"Prediction created successfully: {prediction.get('id', 'unknown')}")
return prediction
try:
return self._retry_request(_create_prediction)
except Exception as e:
logger.error(f"Prediction creation failed: {e}")
logger.error(f"Prompt was: {prompt[:100]}...")
return None
# OpenAI Compatible Image Generation Models
class ImageGenerationRequest(BaseModel):
prompt: str = Field(..., description="A text description of the desired image(s)")
model: Optional[str] = Field(default="qwen-image", description="The model to use for image generation")
n: Optional[int] = Field(default=1, ge=1, le=10, description="Number of images to generate")
quality: Optional[str] = Field(default="standard", description="Quality of the image")
response_format: Optional[str] = Field(default="url", description="Format of the response")
size: Optional[str] = Field(default="1024x1024", description="Size of the generated images")
style: Optional[str] = Field(default="vivid", description="Style of the generated images")
user: Optional[str] = Field(default=None, description="A unique identifier for the user")
class ImageObject(BaseModel):
b64_json: Optional[str] = None
url: Optional[str] = None
revised_prompt: Optional[str] = None
class ImageGenerationResponse(BaseModel):
created: int
data: List[ImageObject]
# Global variables
image_generator = None
startup_time = time.time()
request_count = 0
error_count = 0
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize the image generator on startup"""
global image_generator
try:
logger.info("Initializing Hugging Face image generator...")
image_generator = HuggingFaceImageGenerator()
logger.info("Image generator initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize image generator: {e}")
image_generator = None
yield # Server is running
# Cleanup on shutdown (if needed)
logger.info("Shutting down image generator...")
# FastAPI App
app = FastAPI(
title="Qwen Image Generation API",
version="1.0.0",
description="OpenAI-compatible API for image generation using Qwen Image model via Hugging Face",
lifespan=lifespan
)
# Add CORS middleware
try:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
except ImportError:
pass
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc: HTTPException):
logger.error(f"HTTP error: {exc.status_code} - {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"message": exc.detail,
"type": "api_error",
"code": exc.status_code
}
}
)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
logger.error(f"Unexpected error: {exc}\n{traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"error": {
"message": "Internal server error",
"type": "server_error",
"code": 500
}
}
)
@app.get("/")
async def root():
return {"message": "Qwen Image Generation API", "version": "1.0.0", "status": "running", "model": "qwen-image"}
@app.get("/v1/models")
async def list_models():
"""List available models - only Qwen Image"""
return {
"object": "list",
"data": [
{
"id": "qwen-image",
"object": "model",
"created": int(time.time()),
"owned_by": "qwen"
}
]
}
@app.get("/models")
async def list_models_alt():
"""Alternative endpoint for models"""
return await list_models()
@app.get("/health")
async def health_check():
"""Health check with system status"""
global image_generator, startup_time, request_count, error_count
uptime = time.time() - startup_time
status = "healthy"
# Check image generator status
generator_status = "unknown"
if image_generator is None:
generator_status = "not_initialized"
status = "degraded"
else:
generator_status = "ready"
return {
"status": status,
"timestamp": int(time.time()),
"uptime_seconds": int(uptime),
"generator_status": generator_status,
"stats": {
"total_requests": request_count,
"total_errors": error_count,
"error_rate": error_count / max(request_count, 1)
}
}
@app.post("/v1/images/generations")
async def create_image(request: ImageGenerationRequest):
"""Return raw Hugging Face prediction response format"""
global request_count, error_count, image_generator
request_count += 1
request_id = f"img-{uuid.uuid4().hex[:8]}"
logger.info(f"[{request_id}] Image generation request: prompt='{request.prompt[:50]}...', size={request.size}, quality={request.quality}")
# Check if image generator is available
if image_generator is None:
error_count += 1
logger.error(f"[{request_id}] Image generator not initialized")
raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable")
try:
# Validate model
if request.model and request.model != "qwen-image":
raise HTTPException(status_code=400, detail="Only 'qwen-image' model is supported")
# Validate parameters - updated to match Qwen model available aspect ratios
valid_sizes = ['1024x1024', '1792x1024', '1024x1792', '1365x1024', '1024x1365']
if request.size not in valid_sizes:
raise HTTPException(status_code=400, detail=f"Invalid size. Must be one of: {valid_sizes}")
valid_qualities = ['standard', 'hd']
if request.quality not in valid_qualities:
raise HTTPException(status_code=400, detail=f"Invalid quality. Must be one of: {valid_qualities}")
# Create the prediction and return raw response
raw_prediction = image_generator.create_prediction_only(
request.prompt,
size=request.size,
quality=request.quality,
style=request.style
)
if raw_prediction is None:
error_count += 1
logger.error(f"[{request_id}] Failed to create prediction")
raise HTTPException(status_code=500, detail="Failed to create image prediction")
logger.info(f"[{request_id}] Prediction created: {raw_prediction.get('id', 'unknown')}")
# Return the exact same format as Hugging Face backend
return raw_prediction
except HTTPException:
error_count += 1
raise
except Exception as e:
error_count += 1
logger.error(f"[{request_id}] Unexpected error: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Internal server error occurred")
@app.get("/v1/predictions/{prediction_id}")
async def get_prediction(prediction_id: str):
"""Get prediction status - returns raw Hugging Face format"""
global image_generator
if image_generator is None:
raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable")
try:
# Try to get prediction status from stream URL approach
# Since we can't access Replicate API directly, we'll return a simulated response
# In a real implementation, you'd need Replicate API access
logger.info(f"Getting prediction status for: {prediction_id}")
# For now, return a basic response indicating the limitation
return {
"id": prediction_id,
"status": "processing",
"message": "Use the stream URL from the original prediction response to get the image"
}
except Exception as e:
logger.error(f"Error getting prediction {prediction_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get prediction status")
@app.post("/v1/images/generations/openai")
async def create_image_openai_format(request: ImageGenerationRequest):
"""OpenAI-compatible endpoint that returns OpenAI format"""
global request_count, error_count, image_generator
request_count += 1
request_id = f"img-{uuid.uuid4().hex[:8]}"
logger.info(f"[{request_id}] OpenAI format image generation: prompt='{request.prompt[:50]}...'")
if image_generator is None:
error_count += 1
raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable")
try:
# Generate image using the full generation method
image_data = image_generator.generate_image(
request.prompt,
size=request.size,
quality=request.quality,
style=request.style
)
if image_data is None:
error_count += 1
raise HTTPException(status_code=500, detail="Failed to generate image")
logger.info(f"[{request_id}] Image generated successfully, size: {len(image_data)} bytes")
# Return OpenAI format
images = []
if request.response_format == "b64_json":
b64_data = base64.b64encode(image_data).decode('utf-8')
images.append(ImageObject(
b64_json=b64_data,
revised_prompt=request.prompt
))
else: # url format
b64_data = base64.b64encode(image_data).decode('utf-8')
data_url = f"data:image/webp;base64,{b64_data}"
images.append(ImageObject(
url=data_url,
revised_prompt=request.prompt
))
response = ImageGenerationResponse(
created=int(time.time()),
data=images
)
return response
except HTTPException:
error_count += 1
raise
except Exception as e:
error_count += 1
logger.error(f"[{request_id}] Unexpected error: {e}")
raise HTTPException(status_code=500, detail="Internal server error occurred")
if __name__ == "__main__":
try:
import uvicorn
port = int(os.getenv("PORT", 7860)) # Hugging Face default port
host = os.getenv("HOST", "0.0.0.0")
logger.info(f"Starting image generation server on {host}:{port}")
uvicorn.run(
app, # Use the app object directly instead of string import
host=host,
port=port,
reload=False,
log_level="info",
access_log=True
)
except ImportError:
logger.error("uvicorn not installed. Install with: pip install uvicorn")
except Exception as e:
logger.error(f"Failed to start server: {e}")