|
|
import fastapi |
|
|
import uvicorn |
|
|
import httpx |
|
|
import time |
|
|
import base64 |
|
|
import urllib.parse |
|
|
import os |
|
|
import math |
|
|
from pydantic import BaseModel, Field |
|
|
from typing import List, Optional, Union, Literal |
|
|
|
|
|
|
|
|
|
|
|
class OpenAIImageRequest(BaseModel): |
|
|
prompt: str |
|
|
n: int = Field(default=1, description="Number of images to generate.", ge=1, le=4) |
|
|
size: str = Field(default="1024x1024", description="Size of the generated images. e.g., 'widthxheight'.") |
|
|
response_format: Optional[Literal['url', 'b64_json']] = "url" |
|
|
user: Optional[str] = None |
|
|
|
|
|
|
|
|
seed_value: Optional[int] = Field(default=None, alias="seed", description="Seed provided by the user. Will be transformed before use.") |
|
|
model: Optional[str] = Field(default=None, description="Model to use for Pollinations.") |
|
|
enhance: Optional[bool] = Field(default=True, description="Enhance parameter for Pollinations.") |
|
|
nologo: Optional[bool] = Field(default=True, description="NoLogo parameter for Pollinations.") |
|
|
|
|
|
class Config: |
|
|
allow_population_by_field_name = True |
|
|
|
|
|
|
|
|
class ImageURL(BaseModel): |
|
|
url: str |
|
|
|
|
|
class ImageB64(BaseModel): |
|
|
b64_json: str |
|
|
|
|
|
class OpenAIImageResponse(BaseModel): |
|
|
created: int = Field(default_factory=lambda: int(time.time())) |
|
|
data: List[Union[ImageURL, ImageB64]] |
|
|
|
|
|
|
|
|
|
|
|
app = fastapi.FastAPI( |
|
|
title="OpenAI-compatible Image Generation API for Pollinations", |
|
|
description="This API wraps the image.pollinations.ai service to provide an OpenAI-like interface with custom seed transformation.", |
|
|
version="1.1.0" |
|
|
) |
|
|
|
|
|
|
|
|
def parse_size(size_str: str) -> tuple[Optional[int], Optional[int]]: |
|
|
"""Parses size string like '1024x768' into (width, height).""" |
|
|
parts = size_str.lower().split('x') |
|
|
if len(parts) == 2: |
|
|
try: |
|
|
return int(parts[0]), int(parts[1]) |
|
|
except ValueError: |
|
|
return None, None |
|
|
return None, None |
|
|
|
|
|
async def fetch_and_encode_image(client: httpx.AsyncClient, url: str) -> Optional[str]: |
|
|
"""Fetches an image from a URL and returns its Base64 encoded string.""" |
|
|
try: |
|
|
response = await client.get(url, timeout=60.0) |
|
|
response.raise_for_status() |
|
|
image_bytes = await response.aread() |
|
|
return base64.b64encode(image_bytes).decode('utf-8') |
|
|
except httpx.HTTPStatusError as e: |
|
|
print(f"HTTP error fetching image from {url}: {e.response.status_code} - {e.response.text}") |
|
|
except httpx.RequestError as e: |
|
|
print(f"Request error fetching image from {url}: {e}") |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred while fetching/encoding image from {url}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
@app.post("/v1/images/generations", response_model=OpenAIImageResponse) |
|
|
async def create_image_generation(request: OpenAIImageRequest): |
|
|
""" |
|
|
Mimics the OpenAI image generation endpoint. |
|
|
Receives a prompt and other parameters, then calls the Pollinations API. |
|
|
If a seed is provided by the user, it's transformed before being sent. |
|
|
""" |
|
|
pollinations_base_url = "https://image.pollinations.ai/prompt/" |
|
|
results_data: List[Union[ImageURL, ImageB64]] = [] |
|
|
|
|
|
width, height = parse_size(request.size) |
|
|
if not width or not height: |
|
|
raise fastapi.HTTPException( |
|
|
status_code=400, |
|
|
detail="Invalid 'size' format. Expected 'widthxheight', e.g., '1024x1024'." |
|
|
) |
|
|
|
|
|
|
|
|
final_seed_to_use = None |
|
|
if request.seed_value is not None: |
|
|
user_provided_seed = float(request.seed_value) |
|
|
current_timestamp_int = int(time.time()) |
|
|
sqrt2_times_pi = math.sqrt(2) * math.pi |
|
|
|
|
|
intermediate_sum = user_provided_seed + float(current_timestamp_int) + sqrt2_times_pi |
|
|
final_seed_to_use = math.floor(intermediate_sum) % 10 |
|
|
|
|
|
|
|
|
print(f"User provided seed: {request.seed_value}") |
|
|
print(f"Current timestamp (int): {current_timestamp_int}") |
|
|
print(f"Calculated constant (sqrt(2)*pi): {sqrt2_times_pi}") |
|
|
print(f"Intermediate sum for seed: {intermediate_sum}") |
|
|
print(f"Transformed seed for Pollinations: {final_seed_to_use}") |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
for _ in range(request.n): |
|
|
encoded_prompt = urllib.parse.quote(request.prompt) |
|
|
current_pollinations_url_path = f"{pollinations_base_url}{encoded_prompt}" |
|
|
|
|
|
query_params = {} |
|
|
if width: |
|
|
query_params["width"] = width |
|
|
if height: |
|
|
query_params["height"] = height |
|
|
|
|
|
if final_seed_to_use is not None: |
|
|
query_params["seed"] = final_seed_to_use |
|
|
|
|
|
|
|
|
|
|
|
if request.model: |
|
|
query_params["model"] = request.model |
|
|
if request.enhance is not None: |
|
|
query_params["enhance"] = str(request.enhance).lower() |
|
|
if request.nologo is not None: |
|
|
query_params["nologo"] = str(request.nologo).lower() |
|
|
|
|
|
if query_params: |
|
|
pollinations_image_url = f"{current_pollinations_url_path}?{urllib.parse.urlencode(query_params)}" |
|
|
else: |
|
|
pollinations_image_url = current_pollinations_url_path |
|
|
|
|
|
print(f"Requesting Pollinations URL: {pollinations_image_url}") |
|
|
|
|
|
if request.response_format == "url": |
|
|
results_data.append(ImageURL(url=pollinations_image_url)) |
|
|
elif request.response_format == "b64_json": |
|
|
b64_data = await fetch_and_encode_image(client, pollinations_image_url) |
|
|
if b64_data: |
|
|
results_data.append(ImageB64(b64_json=b64_data)) |
|
|
else: |
|
|
raise fastapi.HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Failed to fetch or encode image from Pollinations: {pollinations_image_url}" |
|
|
) |
|
|
else: |
|
|
raise fastapi.HTTPException(status_code=400, detail="Invalid response_format.") |
|
|
|
|
|
if not results_data and request.n > 0: |
|
|
raise fastapi.HTTPException( |
|
|
status_code=500, |
|
|
detail="No images were successfully generated or processed." |
|
|
) |
|
|
|
|
|
return OpenAIImageResponse(data=results_data) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
port_to_use = 7860 |
|
|
try: |
|
|
port_from_env = os.environ.get("PORT") |
|
|
if port_from_env: |
|
|
port_to_use = int(port_from_env) |
|
|
except ValueError: |
|
|
print(f"Warning: Invalid PORT environment variable '{port_from_env}'. Using default port {port_to_use}.") |
|
|
except Exception as e: |
|
|
print(f"Error reading PORT environment variable: {e}. Using default port {port_to_use}.") |
|
|
|
|
|
print(f"Starting Uvicorn server on host 0.0.0.0, port {port_to_use}") |
|
|
uvicorn.run("main:app", host="0.0.0.0", port=port_to_use, reload=True) |