anime-gen-api / app /services /pollinations.py
AswinMathew's picture
Hybrid pollen-paced video gen, BYOP, background consistency, browser notifications
c4dfe24 verified
"""Pollinations API client — image generation, video generation, media upload, TTS, music."""
import asyncio
import io
import logging
from pathlib import Path
import httpx
from PIL import Image
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
BASE_URL = "https://gen.pollinations.ai"
MEDIA_URL = "https://media.pollinations.ai"
# Rate limiting
_image_semaphore = asyncio.Semaphore(3)
_video_semaphore = asyncio.Semaphore(1)
# Model cost reference (pollen per unit)
MODEL_COSTS = {
# Image models (per image)
"imagen-4": 0.0025,
"grok-imagine": 0.0025,
"klein": 0.008, # FLUX.2 4B, has vision
"klein-large": 0.012, # FLUX.2 9B, has vision
"gptimage": 0.02, # GPT Image 1 Mini, has vision
"flux": 0.0, # Flux Schnell (free)
"zimage": 0.0, # Z-Image Turbo (free, default)
# Video models (per second)
"grok-video": 0.003,
# Audio models
"elevenlabs-tts": 0.18, # per 1K chars
"elevenlabs-music": 0.005, # per second
}
# Vision-capable models (can accept reference images)
VISION_MODELS = {"klein", "klein-large", "gptimage"}
def _auth_headers(api_key: str | None = None) -> dict:
"""Return auth headers. Uses per-user BYOP key if provided, else server key."""
key = api_key or settings.pollinations_api_key
if key:
return {"Authorization": f"Bearer {key}"}
return {}
async def upload_media(file_path: str, api_key: str | None = None) -> str:
"""Upload a file to media.pollinations.ai and return the public URL.
Uses POST with multipart form data.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
async with httpx.AsyncClient(timeout=120.0) as client:
with open(file_path, "rb") as f:
files = {"file": (path.name, f, "image/png")}
resp = await client.post(
f"{MEDIA_URL}/upload",
files=files,
headers=_auth_headers(api_key),
)
resp.raise_for_status()
# Response may be JSON with URL or plain text URL
content_type = resp.headers.get("content-type", "")
if "json" in content_type:
data = resp.json()
url = data.get("url") or data.get("image_url") or data.get("media_url", "")
if url:
return url
# Try plain text
text = resp.text.strip()
if text.startswith("http"):
return text
raise ValueError(f"Could not extract URL from upload response: {resp.text[:200]}")
async def generate_image(
prompt: str,
output_path: str,
model: str = "imagen-4",
width: int = 1024,
height: int = 768,
seed: int | None = None,
reference_image_url: str | None = None,
max_retries: int = 3,
api_key: str | None = None,
) -> str:
"""Generate an image via Pollinations API.
Uses POST for long prompts (>400 chars) to avoid URL length limits,
falls back to GET for short prompts. Retries on 429/500 errors.
Args:
prompt: Image generation prompt.
output_path: Where to save the generated image.
model: Model name (imagen-4, grok-imagine, klein, klein-large, gptimage, flux, zimage).
width: Image width.
height: Image height.
seed: Optional seed for reproducibility.
reference_image_url: Optional reference image URL for vision-capable models.
max_retries: Max retry attempts on transient errors.
Returns:
The output_path where the image was saved.
"""
import json as _json
import urllib.parse
encoded_prompt = urllib.parse.quote(prompt, safe="")
url = f"{BASE_URL}/image/{encoded_prompt}"
params = {
"model": model,
"width": width,
"height": height,
"nologo": "true",
"enhance": "false",
}
if seed is not None:
params["seed"] = seed
if reference_image_url and model in VISION_MODELS:
params["image"] = reference_image_url
_key = api_key or settings.pollinations_api_key
if _key:
params["key"] = _key
async with _image_semaphore:
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client:
resp = await client.get(
url,
params=params,
headers=_auth_headers(api_key),
)
if resp.status_code in (429, 500, 502, 503):
# Retry on transient errors
try:
err = _json.loads(resp.text)
delay = float(err.get("retryAfterSeconds", 5))
except Exception:
delay = 5.0 * (attempt + 1)
logger.warning(
"Image gen %s (attempt %d/%d), retrying in %.1fs...",
resp.status_code, attempt + 1, max_retries, delay,
)
await asyncio.sleep(delay + 0.5)
continue
resp.raise_for_status()
# Response is raw image bytes
if len(resp.content) < 1000:
try:
error_data = resp.json()
raise ValueError(f"Pollinations API error: {error_data}")
except ValueError:
raise
except Exception:
pass
img = Image.open(io.BytesIO(resp.content))
img = img.resize((width, height), Image.LANCZOS)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
img.save(output_path, "PNG")
logger.info("Generated image via Pollinations/%s: %s", model, output_path)
return output_path
except httpx.HTTPStatusError:
raise
except Exception as e:
if attempt < max_retries - 1:
logger.warning("Image gen error (attempt %d): %s", attempt + 1, e)
await asyncio.sleep(3)
continue
raise
raise RuntimeError(f"Image generation failed after {max_retries} retries")
async def generate_video(
prompt: str,
output_path: str,
model: str = "grok-video",
duration: int = 5,
aspect_ratio: str = "16:9",
image_url: str | None = None,
max_retries: int = 3,
api_key: str | None = None,
) -> str:
"""Generate a video via Pollinations API.
Args:
prompt: Motion/scene description for video generation.
output_path: Where to save the generated video.
model: Video model (grok-video).
duration: Duration in seconds (1-10 for grok-video).
aspect_ratio: Aspect ratio string.
image_url: Optional reference image URL for img2vid.
max_retries: Max retry attempts on transient errors.
Returns:
The output_path where the video was saved.
"""
import urllib.parse
encoded_prompt = urllib.parse.quote(prompt, safe="")
url = f"{BASE_URL}/video/{encoded_prompt}"
params = {
"model": model,
"duration": min(max(duration, 1), 10),
"aspectRatio": aspect_ratio,
"nologo": "true",
}
if image_url:
params["image"] = image_url
_key = api_key or settings.pollinations_api_key
if _key:
params["key"] = _key
async with _video_semaphore:
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=300.0, follow_redirects=True) as client:
resp = await client.get(
url,
params=params,
headers=_auth_headers(api_key),
)
if resp.status_code in (429, 500, 502, 503):
delay = 10.0 * (attempt + 1) # 10s, 20s, 30s
logger.warning(
"Video gen %s (attempt %d/%d), retrying in %.1fs...",
resp.status_code, attempt + 1, max_retries, delay,
)
await asyncio.sleep(delay)
continue
resp.raise_for_status()
if len(resp.content) < 5000:
try:
error_data = resp.json()
except Exception:
pass # Not JSON — probably valid binary video data
else:
raise ValueError(f"Pollinations video API error: {error_data}")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
f.write(resp.content)
logger.info("Generated video via Pollinations/%s (%ds): %s", model, duration, output_path)
return output_path
except httpx.TimeoutException:
logger.warning("Video gen timeout (attempt %d/%d)", attempt + 1, max_retries)
if attempt < max_retries - 1:
await asyncio.sleep(10.0 * (attempt + 1))
continue
raise
raise RuntimeError(f"Video generation failed after {max_retries} retries")
async def generate_tts(
text: str,
output_path: str,
voice: str = "alloy",
model: str = "openai-audio",
api_key: str | None = None,
) -> str:
"""Generate TTS audio via Pollinations ElevenLabs API.
Args:
text: Text to speak.
output_path: Where to save the audio file.
voice: Voice name/ID.
model: TTS model (openai-audio, elevenlabs).
Returns:
The output_path where the audio was saved.
"""
import urllib.parse
encoded_text = urllib.parse.quote(text, safe="")
url = f"{BASE_URL}/audio/{encoded_text}"
params = {
"model": model,
"voice": voice,
}
_key = api_key or settings.pollinations_api_key
if _key:
params["key"] = _key
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
resp = await client.get(
url,
params=params,
headers=_auth_headers(api_key),
)
resp.raise_for_status()
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
f.write(resp.content)
logger.info("Generated TTS via Pollinations/%s: %s", model, output_path)
return output_path
async def generate_music(
prompt: str,
output_path: str,
duration: int = 30,
api_key: str | None = None,
) -> str:
"""Generate music via Pollinations ElevenLabs Music API.
Args:
prompt: Music description prompt.
output_path: Where to save the audio file.
duration: Duration in seconds.
Returns:
The output_path where the audio was saved.
"""
import urllib.parse
encoded_prompt = urllib.parse.quote(prompt, safe="")
url = f"{BASE_URL}/audio/{encoded_prompt}"
params = {
"model": "elevenlabs-music",
"duration": duration,
}
_key = api_key or settings.pollinations_api_key
if _key:
params["key"] = _key
async with httpx.AsyncClient(timeout=180.0, follow_redirects=True) as client:
resp = await client.get(
url,
params=params,
headers=_auth_headers(api_key),
)
resp.raise_for_status()
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
f.write(resp.content)
logger.info("Generated music via Pollinations (%ds): %s", duration, output_path)
return output_path
def estimate_cost(model: str, quantity: float = 1.0) -> float:
"""Estimate pollen cost for a generation.
For image models: quantity = number of images.
For video models: quantity = total seconds.
For TTS: quantity = thousands of characters.
For music: quantity = seconds.
"""
cost_per = MODEL_COSTS.get(model, 0.01)
return cost_per * quantity