fluxapi / main.py
wynai's picture
Update main.py
6272863 verified
import fastapi
import uvicorn
import httpx
import time
import base64
import urllib.parse
import os
import math # Thêm import math
from pydantic import BaseModel, Field
from typing import List, Optional, Union, Literal
# --- Pydantic Models for Request and Response ---
class OpenAIImageRequest(BaseModel):
prompt: str
n: int = Field(default=1, description="Number of images to generate.", ge=1, le=4)
size: str = Field(default="1024x1024", description="Size of the generated images. e.g., 'widthxheight'.")
response_format: Optional[Literal['url', 'b64_json']] = "url"
user: Optional[str] = None
# Pollinations specific parameters
seed_value: Optional[int] = Field(default=None, alias="seed", description="Seed provided by the user. Will be transformed before use.")
model: Optional[str] = Field(default=None, description="Model to use for Pollinations.")
enhance: Optional[bool] = Field(default=True, description="Enhance parameter for Pollinations.")
nologo: Optional[bool] = Field(default=True, description="NoLogo parameter for Pollinations.")
class Config:
allow_population_by_field_name = True
class ImageURL(BaseModel):
url: str
class ImageB64(BaseModel):
b64_json: str
class OpenAIImageResponse(BaseModel):
created: int = Field(default_factory=lambda: int(time.time()))
data: List[Union[ImageURL, ImageB64]]
# --- FastAPI Application ---
app = fastapi.FastAPI(
title="OpenAI-compatible Image Generation API for Pollinations",
description="This API wraps the image.pollinations.ai service to provide an OpenAI-like interface with custom seed transformation.",
version="1.1.0" # Updated version
)
# --- Helper Functions ---
def parse_size(size_str: str) -> tuple[Optional[int], Optional[int]]:
"""Parses size string like '1024x768' into (width, height)."""
parts = size_str.lower().split('x')
if len(parts) == 2:
try:
return int(parts[0]), int(parts[1])
except ValueError:
return None, None
return None, None
async def fetch_and_encode_image(client: httpx.AsyncClient, url: str) -> Optional[str]:
"""Fetches an image from a URL and returns its Base64 encoded string."""
try:
response = await client.get(url, timeout=60.0)
response.raise_for_status()
image_bytes = await response.aread()
return base64.b64encode(image_bytes).decode('utf-8')
except httpx.HTTPStatusError as e:
print(f"HTTP error fetching image from {url}: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
print(f"Request error fetching image from {url}: {e}")
except Exception as e:
print(f"An unexpected error occurred while fetching/encoding image from {url}: {e}")
return None
# --- API Endpoint ---
@app.post("/v1/images/generations", response_model=OpenAIImageResponse)
async def create_image_generation(request: OpenAIImageRequest):
"""
Mimics the OpenAI image generation endpoint.
Receives a prompt and other parameters, then calls the Pollinations API.
If a seed is provided by the user, it's transformed before being sent.
"""
pollinations_base_url = "https://image.pollinations.ai/prompt/"
results_data: List[Union[ImageURL, ImageB64]] = []
width, height = parse_size(request.size)
if not width or not height:
raise fastapi.HTTPException(
status_code=400,
detail="Invalid 'size' format. Expected 'widthxheight', e.g., '1024x1024'."
)
# --- Seed Transformation Logic ---
final_seed_to_use = None
if request.seed_value is not None:
user_provided_seed = float(request.seed_value)
current_timestamp_int = int(time.time()) # Thời gian hiện tại (giây)
sqrt2_times_pi = math.sqrt(2) * math.pi # √2 × π
intermediate_sum = user_provided_seed + float(current_timestamp_int) + sqrt2_times_pi
final_seed_to_use = math.floor(intermediate_sum) % 10 # Lấy số nguyên hàng đơn vị (0-9)
# Ghi log để debug (bạn có thể xóa hoặc giữ lại)
print(f"User provided seed: {request.seed_value}")
print(f"Current timestamp (int): {current_timestamp_int}")
print(f"Calculated constant (sqrt(2)*pi): {sqrt2_times_pi}")
print(f"Intermediate sum for seed: {intermediate_sum}")
print(f"Transformed seed for Pollinations: {final_seed_to_use}")
# --- End Seed Transformation Logic ---
async with httpx.AsyncClient() as client:
for _ in range(request.n):
encoded_prompt = urllib.parse.quote(request.prompt)
current_pollinations_url_path = f"{pollinations_base_url}{encoded_prompt}"
query_params = {}
if width:
query_params["width"] = width
if height:
query_params["height"] = height
if final_seed_to_use is not None: # Sử dụng seed đã biến đổi nếu có
query_params["seed"] = final_seed_to_use
# Nếu người dùng không cung cấp seed_value, final_seed_to_use sẽ là None
# và không có tham số 'seed' nào được gửi, để Pollinations tự xử lý.
if request.model:
query_params["model"] = request.model
if request.enhance is not None:
query_params["enhance"] = str(request.enhance).lower()
if request.nologo is not None:
query_params["nologo"] = str(request.nologo).lower()
if query_params:
pollinations_image_url = f"{current_pollinations_url_path}?{urllib.parse.urlencode(query_params)}"
else:
pollinations_image_url = current_pollinations_url_path
print(f"Requesting Pollinations URL: {pollinations_image_url}")
if request.response_format == "url":
results_data.append(ImageURL(url=pollinations_image_url))
elif request.response_format == "b64_json":
b64_data = await fetch_and_encode_image(client, pollinations_image_url)
if b64_data:
results_data.append(ImageB64(b64_json=b64_data))
else:
raise fastapi.HTTPException(
status_code=500,
detail=f"Failed to fetch or encode image from Pollinations: {pollinations_image_url}"
)
else:
raise fastapi.HTTPException(status_code=400, detail="Invalid response_format.")
if not results_data and request.n > 0:
raise fastapi.HTTPException(
status_code=500,
detail="No images were successfully generated or processed."
)
return OpenAIImageResponse(data=results_data)
# --- Main guard for running with Uvicorn ---
if __name__ == "__main__":
port_to_use = 7860
try:
port_from_env = os.environ.get("PORT")
if port_from_env:
port_to_use = int(port_from_env)
except ValueError:
print(f"Warning: Invalid PORT environment variable '{port_from_env}'. Using default port {port_to_use}.")
except Exception as e:
print(f"Error reading PORT environment variable: {e}. Using default port {port_to_use}.")
print(f"Starting Uvicorn server on host 0.0.0.0, port {port_to_use}")
uvicorn.run("main:app", host="0.0.0.0", port=port_to_use, reload=True)