Spaces:
Sleeping
Sleeping
| """Phase 1: Pollinations Image Model Comparison Test. | |
| Generates the SAME scene with all 7 image models (5 paid + 2 free), | |
| saves to test_pollinations_output/ for visual comparison. | |
| Cost: ~0.035 pollen (trivial) | |
| Usage: | |
| python test_pollinations_images.py | |
| """ | |
| import asyncio | |
| import time | |
| from pathlib import Path | |
| import httpx | |
| from PIL import Image | |
| import io | |
| API_KEY = "pk_Qh43XHjquSeBBiqm" | |
| BASE_URL = "https://gen.pollinations.ai" | |
| OUTPUT_DIR = Path("test_pollinations_output") | |
| # Same prompt for all models — manhwa style character portrait | |
| TEST_PROMPT = ( | |
| "manhwa style, korean webtoon art, sharp detailed lineart, cel-shaded coloring, " | |
| "close-up shot of a young man with jet black messy hair and deep crimson eyes, " | |
| "wearing a black high-collar military coat with silver trim, " | |
| "dramatic lighting, vibrant saturated colors, detailed background of a dark throne room, " | |
| "intense determined expression" | |
| ) | |
| # Models ordered by cost (cheapest first) | |
| MODELS = [ | |
| # (model_name, cost_per_image, description) | |
| ("imagen-4", 0.0025, "Google Imagen 4"), | |
| ("grok-imagine", 0.0025, "xAI Grok Imagine"), | |
| ("klein", 0.008, "FLUX.2 4B (vision)"), | |
| ("klein-large", 0.012, "FLUX.2 9B (vision)"), | |
| ("gptimage", 0.02, "GPT Image 1 Mini (vision)"), | |
| ("flux", 0.0, "Flux Schnell (FREE)"), | |
| ("zimage", 0.0, "Z-Image Turbo (FREE)"), | |
| ] | |
| async def generate_image(model: str, output_path: Path, max_retries: int = 3) -> dict: | |
| """Generate a single image with retry on rate limit.""" | |
| import urllib.parse | |
| import json as _json | |
| encoded_prompt = urllib.parse.quote(TEST_PROMPT, safe="") | |
| url = f"{BASE_URL}/image/{encoded_prompt}" | |
| params = { | |
| "model": model, | |
| "width": 1024, | |
| "height": 768, | |
| "nologo": "true", | |
| "enhance": "false", | |
| "key": API_KEY, | |
| "seed": 42, # Same seed for consistency | |
| } | |
| start = time.time() | |
| for attempt in range(max_retries): | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client: | |
| resp = await client.get( | |
| url, | |
| params=params, | |
| headers={"Authorization": f"Bearer {API_KEY}"}, | |
| ) | |
| if resp.status_code == 429: | |
| # Parse retry delay from response | |
| try: | |
| err = _json.loads(resp.text) | |
| delay = float(err.get("retryAfterSeconds", 5)) | |
| except Exception: | |
| delay = 5.0 * (attempt + 1) | |
| print(f" Rate limited, waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})...") | |
| await asyncio.sleep(delay + 0.5) # Add small buffer | |
| continue | |
| elapsed = time.time() - start | |
| if resp.status_code != 200: | |
| return { | |
| "model": model, | |
| "status": "error", | |
| "error": f"HTTP {resp.status_code}: {resp.text[:200]}", | |
| "time": elapsed, | |
| } | |
| # Check if response is actually an image | |
| content_type = resp.headers.get("content-type", "") | |
| if len(resp.content) < 1000 and "image" not in content_type: | |
| return { | |
| "model": model, | |
| "status": "error", | |
| "error": f"Response too small ({len(resp.content)} bytes): {resp.text[:200]}", | |
| "time": elapsed, | |
| } | |
| # Save the image | |
| img = Image.open(io.BytesIO(resp.content)) | |
| img.save(str(output_path), "PNG") | |
| return { | |
| "model": model, | |
| "status": "ok", | |
| "size": f"{img.width}x{img.height}", | |
| "file_size": len(resp.content), | |
| "time": elapsed, | |
| "path": str(output_path), | |
| } | |
| except Exception as e: | |
| elapsed = time.time() - start | |
| if attempt < max_retries - 1: | |
| print(f" Error: {e}, retrying...") | |
| await asyncio.sleep(3) | |
| continue | |
| return { | |
| "model": model, | |
| "status": "error", | |
| "error": str(e), | |
| "time": elapsed, | |
| } | |
| elapsed = time.time() - start | |
| return { | |
| "model": model, | |
| "status": "error", | |
| "error": f"Rate limited after {max_retries} retries", | |
| "time": elapsed, | |
| } | |
| async def main(): | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| print("=" * 70) | |
| print("POLLINATIONS IMAGE MODEL COMPARISON TEST") | |
| print(f"Prompt: {TEST_PROMPT[:80]}...") | |
| print(f"Output: {OUTPUT_DIR}/") | |
| print("=" * 70) | |
| total_cost = 0.0 | |
| results = [] | |
| for i, (model_name, cost, desc) in enumerate(MODELS): | |
| output_path = OUTPUT_DIR / f"{model_name}.png" | |
| # Skip if already generated successfully | |
| if output_path.exists() and output_path.stat().st_size > 5000: | |
| print(f"\n[{model_name}] {desc} — CACHED (already exists)") | |
| img = Image.open(str(output_path)) | |
| results.append({ | |
| "model": model_name, | |
| "status": "ok", | |
| "size": f"{img.width}x{img.height}", | |
| "file_size": output_path.stat().st_size, | |
| "time": 0.0, | |
| "path": str(output_path), | |
| }) | |
| total_cost += cost | |
| continue | |
| print(f"\n[{model_name}] {desc} (${cost}/img)...") | |
| # Wait between requests to avoid rate limiting | |
| if i > 0: | |
| print(f" (waiting 6s to avoid rate limit...)") | |
| await asyncio.sleep(6) | |
| result = await generate_image(model_name, output_path) | |
| results.append(result) | |
| total_cost += cost | |
| if result["status"] == "ok": | |
| print(f" OK — {result['size']}, {result['file_size']//1024}KB, {result['time']:.1f}s") | |
| else: | |
| print(f" FAILED — {result['error']}") | |
| # Create side-by-side comparison grid | |
| print("\n" + "=" * 70) | |
| print("CREATING COMPARISON GRID") | |
| print("=" * 70) | |
| try: | |
| _create_comparison_grid(results) | |
| print(f" Grid saved to {OUTPUT_DIR / 'comparison_grid.png'}") | |
| except Exception as e: | |
| print(f" Grid creation failed: {e}") | |
| # Summary | |
| print("\n" + "=" * 70) | |
| print("SUMMARY") | |
| print("=" * 70) | |
| print(f"{'Model':<15} {'Status':<8} {'Time':>6} {'Size':>12} {'Cost':>8}") | |
| print("-" * 55) | |
| for r in results: | |
| model = r["model"] | |
| cost = next(c for m, c, _ in MODELS if m == model) | |
| status = r["status"] | |
| t = f"{r['time']:.1f}s" | |
| size = r.get("size", "—") | |
| cost_str = f"${cost:.4f}" if cost > 0 else "FREE" | |
| print(f"{model:<15} {status:<8} {t:>6} {size:>12} {cost_str:>8}") | |
| print(f"\nTotal pollen spent: {total_cost:.4f}") | |
| print(f"Output directory: {OUTPUT_DIR.resolve()}") | |
| print("\nNow visually compare the images and pick your winners!") | |
| def _create_comparison_grid(results: list[dict]): | |
| """Create a side-by-side comparison grid of all successful images.""" | |
| successful = [r for r in results if r["status"] == "ok"] | |
| if not successful: | |
| print(" No successful images to grid.") | |
| return | |
| # Load all images | |
| images = [] | |
| labels = [] | |
| for r in successful: | |
| img = Image.open(r["path"]) | |
| images.append(img) | |
| cost = next(c for m, c, _ in MODELS if m == r["model"]) | |
| cost_str = f"${cost:.4f}" if cost > 0 else "FREE" | |
| labels.append(f"{r['model']} ({cost_str}, {r['time']:.1f}s)") | |
| # Create grid: 2 rows for 7 images (4 top + 3 bottom) | |
| cols = min(4, len(images)) | |
| rows = (len(images) + cols - 1) // cols | |
| thumb_w, thumb_h = 512, 384 | |
| label_h = 30 | |
| padding = 10 | |
| grid_w = cols * (thumb_w + padding) + padding | |
| grid_h = rows * (thumb_h + label_h + padding) + padding | |
| grid = Image.new("RGB", (grid_w, grid_h), (30, 30, 30)) | |
| from PIL import ImageDraw | |
| draw = ImageDraw.Draw(grid) | |
| for idx, (img, label) in enumerate(zip(images, labels)): | |
| row = idx // cols | |
| col = idx % cols | |
| x = padding + col * (thumb_w + padding) | |
| y = padding + row * (thumb_h + label_h + padding) | |
| # Resize and paste | |
| thumb = img.resize((thumb_w, thumb_h), Image.LANCZOS) | |
| grid.paste(thumb, (x, y)) | |
| # Label | |
| draw.text((x + 5, y + thumb_h + 5), label, fill=(255, 255, 255)) | |
| grid.save(str(OUTPUT_DIR / "comparison_grid.png"), "PNG") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |