import fastapi import uvicorn import httpx import time import base64 import urllib.parse import os import math # Thêm import math from pydantic import BaseModel, Field from typing import List, Optional, Union, Literal # --- Pydantic Models for Request and Response --- class OpenAIImageRequest(BaseModel): prompt: str n: int = Field(default=1, description="Number of images to generate.", ge=1, le=4) size: str = Field(default="1024x1024", description="Size of the generated images. e.g., 'widthxheight'.") response_format: Optional[Literal['url', 'b64_json']] = "url" user: Optional[str] = None # Pollinations specific parameters seed_value: Optional[int] = Field(default=None, alias="seed", description="Seed provided by the user. Will be transformed before use.") model: Optional[str] = Field(default=None, description="Model to use for Pollinations.") enhance: Optional[bool] = Field(default=True, description="Enhance parameter for Pollinations.") nologo: Optional[bool] = Field(default=True, description="NoLogo parameter for Pollinations.") class Config: allow_population_by_field_name = True class ImageURL(BaseModel): url: str class ImageB64(BaseModel): b64_json: str class OpenAIImageResponse(BaseModel): created: int = Field(default_factory=lambda: int(time.time())) data: List[Union[ImageURL, ImageB64]] # --- FastAPI Application --- app = fastapi.FastAPI( title="OpenAI-compatible Image Generation API for Pollinations", description="This API wraps the image.pollinations.ai service to provide an OpenAI-like interface with custom seed transformation.", version="1.1.0" # Updated version ) # --- Helper Functions --- def parse_size(size_str: str) -> tuple[Optional[int], Optional[int]]: """Parses size string like '1024x768' into (width, height).""" parts = size_str.lower().split('x') if len(parts) == 2: try: return int(parts[0]), int(parts[1]) except ValueError: return None, None return None, None async def fetch_and_encode_image(client: httpx.AsyncClient, url: str) -> Optional[str]: """Fetches an image from a URL and returns its Base64 encoded string.""" try: response = await client.get(url, timeout=60.0) response.raise_for_status() image_bytes = await response.aread() return base64.b64encode(image_bytes).decode('utf-8') except httpx.HTTPStatusError as e: print(f"HTTP error fetching image from {url}: {e.response.status_code} - {e.response.text}") except httpx.RequestError as e: print(f"Request error fetching image from {url}: {e}") except Exception as e: print(f"An unexpected error occurred while fetching/encoding image from {url}: {e}") return None # --- API Endpoint --- @app.post("/v1/images/generations", response_model=OpenAIImageResponse) async def create_image_generation(request: OpenAIImageRequest): """ Mimics the OpenAI image generation endpoint. Receives a prompt and other parameters, then calls the Pollinations API. If a seed is provided by the user, it's transformed before being sent. """ pollinations_base_url = "https://image.pollinations.ai/prompt/" results_data: List[Union[ImageURL, ImageB64]] = [] width, height = parse_size(request.size) if not width or not height: raise fastapi.HTTPException( status_code=400, detail="Invalid 'size' format. Expected 'widthxheight', e.g., '1024x1024'." ) # --- Seed Transformation Logic --- final_seed_to_use = None if request.seed_value is not None: user_provided_seed = float(request.seed_value) current_timestamp_int = int(time.time()) # Thời gian hiện tại (giây) sqrt2_times_pi = math.sqrt(2) * math.pi # √2 × π intermediate_sum = user_provided_seed + float(current_timestamp_int) + sqrt2_times_pi final_seed_to_use = math.floor(intermediate_sum) % 10 # Lấy số nguyên hàng đơn vị (0-9) # Ghi log để debug (bạn có thể xóa hoặc giữ lại) print(f"User provided seed: {request.seed_value}") print(f"Current timestamp (int): {current_timestamp_int}") print(f"Calculated constant (sqrt(2)*pi): {sqrt2_times_pi}") print(f"Intermediate sum for seed: {intermediate_sum}") print(f"Transformed seed for Pollinations: {final_seed_to_use}") # --- End Seed Transformation Logic --- async with httpx.AsyncClient() as client: for _ in range(request.n): encoded_prompt = urllib.parse.quote(request.prompt) current_pollinations_url_path = f"{pollinations_base_url}{encoded_prompt}" query_params = {} if width: query_params["width"] = width if height: query_params["height"] = height if final_seed_to_use is not None: # Sử dụng seed đã biến đổi nếu có query_params["seed"] = final_seed_to_use # Nếu người dùng không cung cấp seed_value, final_seed_to_use sẽ là None # và không có tham số 'seed' nào được gửi, để Pollinations tự xử lý. if request.model: query_params["model"] = request.model if request.enhance is not None: query_params["enhance"] = str(request.enhance).lower() if request.nologo is not None: query_params["nologo"] = str(request.nologo).lower() if query_params: pollinations_image_url = f"{current_pollinations_url_path}?{urllib.parse.urlencode(query_params)}" else: pollinations_image_url = current_pollinations_url_path print(f"Requesting Pollinations URL: {pollinations_image_url}") if request.response_format == "url": results_data.append(ImageURL(url=pollinations_image_url)) elif request.response_format == "b64_json": b64_data = await fetch_and_encode_image(client, pollinations_image_url) if b64_data: results_data.append(ImageB64(b64_json=b64_data)) else: raise fastapi.HTTPException( status_code=500, detail=f"Failed to fetch or encode image from Pollinations: {pollinations_image_url}" ) else: raise fastapi.HTTPException(status_code=400, detail="Invalid response_format.") if not results_data and request.n > 0: raise fastapi.HTTPException( status_code=500, detail="No images were successfully generated or processed." ) return OpenAIImageResponse(data=results_data) # --- Main guard for running with Uvicorn --- if __name__ == "__main__": port_to_use = 7860 try: port_from_env = os.environ.get("PORT") if port_from_env: port_to_use = int(port_from_env) except ValueError: print(f"Warning: Invalid PORT environment variable '{port_from_env}'. Using default port {port_to_use}.") except Exception as e: print(f"Error reading PORT environment variable: {e}. Using default port {port_to_use}.") print(f"Starting Uvicorn server on host 0.0.0.0, port {port_to_use}") uvicorn.run("main:app", host="0.0.0.0", port=port_to_use, reload=True)