DK / server.py
Adarshu07's picture
Update server.py
c18b868 verified
# server.py
"""
Perchance Image Generation API v3.0
════════════════════════════════════
Lightweight, self-healing, scalable image generation server.
Handles thousands of concurrent users on Hugging Face Spaces.
Key fix: Server starts accepting connections IMMEDIATELY.
Browser key fetch runs in BACKGROUND after startup.
"""
# ═══════════════════════════════════════════════════════════════
# IMPORTS
# ═══════════════════════════════════════════════════════════════
import asyncio
import base64
import json
import logging
import os
import random
import string
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from functools import partial
from pathlib import Path
from typing import Any, Dict, List, Optional
import cloudscraper
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel, Field
from sse_starlette.sse import EventSourceResponse
import zendriver as zd
from zendriver import cdp
try:
from pyvirtualdisplay import Display as _VDisplay
_VDISPLAY_AVAILABLE = True
except ImportError:
_VDisplay = None
_VDISPLAY_AVAILABLE = False
# ═══════════════════════════════════════════════════════════════
# CONFIGURATION
# ═══════════════════════════════════════════════════════════════
API_BASE = "https://image-generation.perchance.org"
API_GENERATE = "/api/generate"
API_DOWNLOAD = "/api/downloadTemporaryImage"
API_AWAIT = "/api/awaitExistingGenerationRequest"
API_AD_CODE = "/api/getAccessCodeForAdPoweredStuff"
BROWSER_TARGET = "https://perchance.org/ai-text-to-image-generator"
BROWSER_ORIGIN = "https://image-generation.perchance.org"
BROWSER_HEADLESS = os.environ.get("ZD_HEADLESS", "false").lower() in ("1", "true")
BROWSER_TIMEOUT = 90
CLICK_INTERVAL = 0.35
CLICK_JITTER = 8.0
USE_VIRTUAL_DISPLAY = os.environ.get("USE_VIRTUAL_DISPLAY", "true").lower() in ("1", "true")
HTTP_TIMEOUT = 30
DOWNLOAD_TIMEOUT = 180
BACKOFF_BASE = 0.7
GEN_MAX_RETRIES = 6
KEY_MAX_RETRIES = 3
KEY_COOLDOWN = 30
KEY_MAX_FAILURES = 5
WORKER_COUNT = 3
QUEUE_CAPACITY = 5000
THREAD_POOL_SIZE = 16
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
TASK_TTL = 3600
CLEANUP_INTERVAL = 300
MAX_STORED_TASKS = 10_000
SSE_QUEUE_DEPTH = 256
SSE_PING_SEC = 15
PERCHANCE_HEADERS = {
"Accept": "*/*",
"Content-Type": "application/json;charset=UTF-8",
"Origin": BROWSER_ORIGIN,
"Referer": f"{BROWSER_ORIGIN}/embed",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
}
# ═══════════════════════════════════════════════════════════════
# LOGGING
# ═══════════════════════════════════════════════════════════════
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-7s | %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("perchance")
# ═══════════════════════════════════════════════════════════════
# PYDANTIC MODELS
# ═══════════════════════════════════════════════════════════════
class GenerationRequest(BaseModel):
prompts: List[str] = Field(default_factory=list)
prompt: Optional[str] = Field(None)
count: int = Field(1, ge=1, le=20)
resolution: str = Field("512x768")
guidance_scale: float = Field(7.0, ge=1.0, le=20.0)
negative_prompt: str = Field("")
style: str = Field("private")
class GenerationResponse(BaseModel):
task_id: str
stream: str
status_url: str
total_images: int
created_at: str
queue_pos: int
class SetKeyRequest(BaseModel):
userKey: str = Field(..., min_length=1)
# ═══════════════════════════════════════════════════════════════
# UTILITIES
# ═══════════════════════════════════════════════════════════════
_SAFE = set(string.ascii_letters + string.digits + "-_.()")
def _fname(s: str, limit: int = 80) -> str:
return "".join(c if c in _SAFE else "_" for c in s)[:limit]
def _sid(n: int = 8) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
def _rid() -> str:
return f"{time.time():.6f}-{_sid()}"
def _age(iso: str) -> float:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
return time.time() - dt.timestamp()
except Exception:
return 0.0
# ═══════════════════════════════════════════════════════════════
# VIRTUAL DISPLAY MANAGER
# ═══════════════════════════════════════════════════════════════
class DisplayManager:
__slots__ = ("_disp",)
def __init__(self):
self._disp = None
def ensure(self, headless: bool) -> None:
if headless or not USE_VIRTUAL_DISPLAY:
return
if os.environ.get("DISPLAY"):
return
if not _VDISPLAY_AVAILABLE:
log.warning("pyvirtualdisplay not installed β€” no DISPLAY available")
return
if self._disp is not None:
return
try:
self._disp = _VDisplay(visible=0, size=(1280, 720))
self._disp.start()
log.info("Xvfb started DISPLAY=%s", os.environ.get("DISPLAY"))
except Exception as exc:
self._disp = None
log.exception("Xvfb start failed: %s", exc)
def shutdown(self) -> None:
if self._disp is None:
return
try:
self._disp.stop()
log.info("Xvfb stopped")
except Exception:
log.exception("Xvfb stop error")
finally:
self._disp = None
_display = DisplayManager()
# ═══════════════════════════════════════════════════════════════
# SSE BROADCASTER
# ═══════════════════════════════════════════════════════════════
class Broadcaster:
def __init__(self):
self._subs: Dict[str, List[asyncio.Queue]] = {}
self._lock = asyncio.Lock()
async def subscribe(self, task_id: str) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=SSE_QUEUE_DEPTH)
async with self._lock:
self._subs.setdefault(task_id, []).append(q)
return q
async def unsubscribe(self, task_id: str, q: asyncio.Queue) -> None:
async with self._lock:
bucket = self._subs.get(task_id)
if bucket:
self._subs[task_id] = [x for x in bucket if x is not q]
if not self._subs[task_id]:
del self._subs[task_id]
async def emit(self, task_id: str, event: dict) -> None:
async with self._lock:
targets = list(self._subs.get(task_id, []))
for q in targets:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
async def emit_global(self, event: dict) -> None:
async with self._lock:
all_targets = [
(tid, list(qs)) for tid, qs in self._subs.items()
]
for _, qs in all_targets:
for q in qs:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
@property
def total_subscribers(self) -> int:
return sum(len(v) for v in self._subs.values())
# ═══════════════════════════════════════════════════════════════
# TASK STORE
# ═══════════════════════════════════════════════════════════════
class TaskStore:
def __init__(self):
self._data: Dict[str, dict] = {}
def create(self, *, prompts, count, resolution, guidance, negative, style) -> dict:
tid = str(uuid.uuid4())
task = {
"id": tid,
"prompts": prompts,
"count": count,
"resolution": resolution,
"guidance": guidance,
"negative": negative,
"style": style,
"created_at": _now(),
"started_at": None,
"finished_at": None,
"status": "queued",
"total_images": len(prompts) * count,
"completed": 0,
"failed_count": 0,
"results": [],
"error": None,
}
if len(self._data) >= MAX_STORED_TASKS:
self._evict()
self._data[tid] = task
return task
def get(self, tid: str) -> Optional[dict]:
return self._data.get(tid)
@property
def size(self) -> int:
return len(self._data)
def active_count(self) -> int:
return sum(1 for t in self._data.values() if t["status"] in ("queued", "running"))
def _evict(self) -> None:
finished = sorted(
((k, v) for k, v in self._data.items()
if v["status"] in ("completed", "failed")),
key=lambda x: x[1]["created_at"],
)
for tid, _ in finished[:max(1, len(finished) // 4)]:
del self._data[tid]
def purge_expired(self) -> int:
stale = [
tid for tid, t in self._data.items()
if t["status"] in ("completed", "failed") and _age(t["created_at"]) > TASK_TTL
]
for tid in stale:
del self._data[tid]
return len(stale)
# ═══════════════════════════════════════════════════════════════
# KEY MANAGER
# ═══════════════════════════════════════════════════════════════
class KeyManager:
def __init__(self):
self.key: Optional[str] = None
self._rw_lock = asyncio.Lock()
self._usable = asyncio.Event()
self._usable.set()
self._refresh_gate = asyncio.Lock()
self._last_ok: float = 0.0
self._consec_fails: int = 0
self._ready = asyncio.Event() # set once first key obtained
@property
def is_ready(self) -> bool:
return self._ready.is_set()
async def wait_ready(self, timeout: float = None):
"""Wait until at least one key has been obtained."""
if timeout:
await asyncio.wait_for(self._ready.wait(), timeout=timeout)
else:
await self._ready.wait()
async def get(self) -> Optional[str]:
await self._usable.wait()
async with self._rw_lock:
return self.key
async def set(self, new_key: str) -> None:
async with self._rw_lock:
self.key = new_key
self._last_ok = time.time()
self._consec_fails = 0
self._usable.set()
self._ready.set()
log.info("userKey set OK (len=%d)", len(new_key))
def reset_failures(self) -> None:
self._consec_fails = 0
async def refresh(self, broadcaster: Broadcaster) -> Optional[str]:
async with self._refresh_gate:
if self.key and (time.time() - self._last_ok) < KEY_COOLDOWN:
log.info("Key refreshed %.0fs ago β†’ reuse", time.time() - self._last_ok)
return self.key
if self._consec_fails >= KEY_MAX_FAILURES:
msg = (
f"Auto-refresh disabled after {self._consec_fails} failures. "
"Set key manually via POST /v1/keys/set"
)
log.error(msg)
await broadcaster.emit_global({
"event": "key:error", "message": msg, "time": _now(),
})
return None
self._usable.clear()
log.info("Browser key refresh starting …")
await broadcaster.emit_global({
"event": "key:refresh",
"message": "UserKey expired β€” launching browser refresh …",
"time": _now(),
})
try:
new_key = await extract_key_via_browser(
timeout=BROWSER_TIMEOUT, headless=BROWSER_HEADLESS,
)
if new_key:
await self.set(new_key)
await broadcaster.emit_global({
"event": "key:ready",
"message": "UserKey refreshed β€” generation resuming",
"time": _now(),
})
return new_key
self._consec_fails += 1
log.error("Key refresh empty (%d/%d)", self._consec_fails, KEY_MAX_FAILURES)
await broadcaster.emit_global({
"event": "key:error",
"message": f"Refresh failed ({self._consec_fails}/{KEY_MAX_FAILURES})",
"time": _now(),
})
return None
except Exception as exc:
self._consec_fails += 1
log.exception("Key refresh error (%d/%d): %s",
self._consec_fails, KEY_MAX_FAILURES, exc)
return None
finally:
self._usable.set()
# ═══════════════════════════════════════════════════════════════
# PERCHANCE HTTP CLIENT
# ═══════════════════════════════════════════════════════════════
class PerchanceClient:
def __init__(self):
self._s = cloudscraper.create_scraper()
self._base = API_BASE.rstrip("/")
def fetch_ad_code(self) -> str:
try:
r = self._s.get(f"{self._base}{API_AD_CODE}",
timeout=HTTP_TIMEOUT, headers=PERCHANCE_HEADERS)
r.raise_for_status()
return r.text.strip()
except Exception:
return ""
def _await_prev(self, key: str) -> None:
try:
self._s.get(
f"{self._base}{API_AWAIT}",
params={"userKey": key, "__cacheBust": random.random()},
timeout=20, headers=PERCHANCE_HEADERS,
)
except Exception:
pass
def generate(self, *, prompt, negative, seed, resolution,
guidance, style, user_key, ad_code) -> dict:
rid = _rid()
params = {
"userKey": user_key, "requestId": rid,
"adAccessCode": ad_code, "__cacheBust": random.random(),
}
body = {
"prompt": prompt, "negativePrompt": negative,
"seed": seed, "resolution": resolution,
"guidanceScale": guidance,
"channel": "ai-text-to-image-generator",
"subChannel": style,
"userKey": user_key, "adAccessCode": ad_code,
"requestId": rid,
}
ad_refreshed = False
for att in range(1, GEN_MAX_RETRIES + 1):
try:
r = self._s.post(
f"{self._base}{API_GENERATE}",
json=body, params=params,
timeout=HTTP_TIMEOUT, headers=PERCHANCE_HEADERS,
)
r.raise_for_status()
res = r.json()
except Exception as exc:
log.info("Network error attempt %d/%d: %s", att, GEN_MAX_RETRIES, exc)
time.sleep(1.0)
continue
st = res.get("status")
if st == "success":
iid = res.get("imageId")
urls = res.get("imageDataUrls")
if iid:
return {"imageId": iid, "seed": res.get("seed")}
if urls:
return {"inline": urls[0], "seed": res.get("seed")}
return {"error": "empty_success"}
if st == "invalid_key":
return {"error": "invalid_key"}
if st == "waiting_for_prev_request_to_finish":
self._await_prev(user_key)
time.sleep(0.5)
continue
if st == "invalid_ad_access_code" and not ad_refreshed:
code = self.fetch_ad_code()
if code:
ad_code = code
params["adAccessCode"] = code
body["adAccessCode"] = code
ad_refreshed = True
time.sleep(0.8)
continue
return {"error": "invalid_ad_access_code"}
if st == "gen_failure" and res.get("type") == 1:
time.sleep(2.5)
continue
if st in (None, "stale_request", "fetch_failure"):
time.sleep(1.0)
continue
log.error("Unhandled status '%s': %.200s", st, str(res))
return {"error": f"unhandled_{st}"}
return {"error": "max_retries"}
def download(self, image_id: str, prefix: str) -> str:
url = f"{self._base}{API_DOWNLOAD}?imageId={image_id}"
t0 = time.time()
bk = BACKOFF_BASE
while time.time() - t0 < DOWNLOAD_TIMEOUT:
try:
r = self._s.get(url, timeout=HTTP_TIMEOUT,
headers=PERCHANCE_HEADERS, stream=True)
if r.status_code == 200:
ct = r.headers.get("Content-Type", "")
ext = ".png" if "png" in ct else ".webp" if "webp" in ct else ".jpg"
fn = _fname(f"{prefix}_{image_id[:12]}{ext}")
fp = str(OUTPUT_DIR / fn)
with open(fp, "wb") as f:
for chunk in r.iter_content(8192):
if chunk:
f.write(chunk)
return fp
except Exception:
pass
time.sleep(bk)
bk = min(bk * 1.8, 8.0)
raise TimeoutError(f"Download timeout ({DOWNLOAD_TIMEOUT}s) for {image_id}")
def close(self) -> None:
try:
self._s.close()
except Exception:
pass
# ═══════════════════════════════════════════════════════════════
# BROWSER KEY EXTRACTION
# ═══════════════════════════════════════════════════════════════
async def _cdp_mouse(tab, action, x, y, **kw):
await tab.send(cdp.input_.dispatch_mouse_event(
type_=action, x=float(x), y=float(y), **kw,
))
async def _viewport_centre(tab):
try:
v = await tab.evaluate(
"(()=>({w:innerWidth,h:innerHeight}))()",
await_promise=False, return_by_value=True,
)
return v["w"] / 2, v["h"] / 2
except Exception:
return 600.0, 400.0
async def _ls_get(tab, key):
try:
return await tab.evaluate(
f"localStorage&&localStorage.getItem({json.dumps(key)})",
await_promise=True, return_by_value=True,
)
except Exception:
return None
async def _click_loop(tab, stop: asyncio.Event):
try:
await tab.evaluate("window.focus&&window.focus()",
await_promise=False, return_by_value=False)
except Exception:
pass
cx, cy = await _viewport_centre(tab)
last_upd = time.time()
while not stop.is_set():
if time.time() - last_upd > 3.0:
cx, cy = await _viewport_centre(tab)
last_upd = time.time()
jx = random.uniform(-CLICK_JITTER, CLICK_JITTER)
jy = random.uniform(-CLICK_JITTER, CLICK_JITTER)
x, y = cx + jx, cy + jy
try:
await _cdp_mouse(tab, "mouseMoved", x, y, pointer_type="mouse")
await asyncio.sleep(random.uniform(0.02, 0.08))
await _cdp_mouse(tab, "mousePressed", x, y,
button=cdp.input_.MouseButton.LEFT,
click_count=1, buttons=1)
await asyncio.sleep(random.uniform(0.03, 0.12))
await _cdp_mouse(tab, "mouseReleased", x, y,
button=cdp.input_.MouseButton.LEFT,
click_count=1, buttons=0)
except Exception:
pass
try:
await asyncio.wait_for(
stop.wait(),
timeout=CLICK_INTERVAL * random.uniform(0.85, 1.15),
)
break
except asyncio.TimeoutError:
pass
async def _poll_key(tab, stop: asyncio.Event, timeout: int) -> Optional[str]:
t0 = time.time()
while not stop.is_set() and (time.time() - t0) < timeout:
val = await _ls_get(tab, "userKey-0")
if val:
return val
try:
keys = await tab.evaluate(
"Object.keys(localStorage||{}).filter(k=>k.includes('userKey'))",
await_promise=False, return_by_value=True,
)
for k in (keys or []):
v = await _ls_get(tab, k)
if v:
return v
except Exception:
pass
await asyncio.sleep(0.25)
return None
async def extract_key_via_browser(
timeout: int = BROWSER_TIMEOUT,
headless: bool = BROWSER_HEADLESS,
) -> Optional[str]:
log.info("Browser key extraction timeout=%ds headless=%s", timeout, headless)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, partial(_display.ensure, headless))
try:
browser = await zd.start(headless=headless)
except Exception as exc:
log.exception("Browser start failed: %s", exc)
return None
stop = asyncio.Event()
result = None
try:
main_tab = await browser.get(BROWSER_TARGET)
await asyncio.sleep(2.0)
origin_tab = await browser.get(BROWSER_ORIGIN, new_tab=True)
await asyncio.sleep(1.0)
await main_tab.bring_to_front()
await asyncio.sleep(0.5)
clicker = asyncio.create_task(_click_loop(main_tab, stop))
poller = asyncio.create_task(_poll_key(origin_tab, stop, timeout))
try:
done, _ = await asyncio.wait({poller}, timeout=timeout)
if poller in done:
result = poller.result()
finally:
stop.set()
if not clicker.done():
clicker.cancel()
try:
await clicker
except asyncio.CancelledError:
pass
for t in (origin_tab, main_tab):
try:
await t.close()
except Exception:
pass
finally:
try:
await browser.stop()
except Exception:
pass
if result:
log.info("Extracted userKey (len=%d)", len(result))
else:
log.warning("Key extraction failed within %ds", timeout)
return result
# ═══════════════════════════════════════════════════════════════
# GLOBAL SINGLETONS
# ═══════════════════════════════════════════════════════════════
_client = PerchanceClient()
_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
_keys = KeyManager()
_tasks = TaskStore()
_sse = Broadcaster()
_queue: Optional[asyncio.Queue] = None
# Track startup readiness (server is up but key may still be loading)
_server_start_time: float = 0.0
# ═══════════════════════════════════════════════════════════════
# GENERATION ENGINE
# ═══════════════════════════════════════════════════════════════
async def _save_inline(data_url: str, label: str) -> str:
loop = asyncio.get_running_loop()
parts = data_url.split(",", 1) if "," in data_url else ["", data_url]
header, b64 = parts[0], parts[-1]
ext = ".png" if "png" in header else ".jpg"
fn = _fname(f"{label[:30]}_{_ts()}_{_sid()}{ext}")
fp = OUTPUT_DIR / fn
raw = base64.b64decode(b64)
await loop.run_in_executor(_executor, fp.write_bytes, raw)
return str(fp)
async def _download_image(image_id: str, label: str) -> str:
loop = asyncio.get_running_loop()
prefix = _fname(f"{label[:30]}_{_ts()}_{_sid()}")
return await loop.run_in_executor(
_executor, partial(_client.download, image_id, prefix),
)
async def _gen_one_image(prompt, task, img_idx, ad_code) -> Optional[dict]:
loop = asyncio.get_running_loop()
tid = task["id"]
for key_try in range(1, KEY_MAX_RETRIES + 1):
user_key = await _keys.get()
if not user_key:
await _sse.emit(tid, {
"event": "error",
"message": "No userKey available β€” set one via POST /v1/keys/set",
"time": _now(),
})
return None
result = await loop.run_in_executor(
_executor,
partial(
_client.generate,
prompt=prompt, negative=task["negative"],
seed=-1, resolution=task["resolution"],
guidance=task["guidance"], style=task["style"],
user_key=user_key, ad_code=ad_code,
),
)
if result.get("error") == "invalid_key":
log.warning("invalid_key task=%s try=%d/%d", tid[:8], key_try, KEY_MAX_RETRIES)
await _sse.emit(tid, {
"event": "key:refresh",
"message": f"Key invalid β€” refreshing ({key_try}/{KEY_MAX_RETRIES})",
"time": _now(),
})
new = await _keys.refresh(_sse)
if new:
ad_code = await loop.run_in_executor(_executor, _client.fetch_ad_code)
continue
return None
if result.get("error"):
log.warning("Gen error task=%s: %s", tid[:8], result["error"])
task["failed_count"] += 1
await _sse.emit(tid, {
"event": "error",
"message": f"Generation error: {result['error']}",
"prompt": prompt, "index": img_idx, "time": _now(),
})
return None
try:
if result.get("inline"):
fpath = await _save_inline(result["inline"], prompt)
elif result.get("imageId"):
fpath = await _download_image(result["imageId"], prompt)
else:
log.error("Unexpected payload: %s", result)
return None
filename = Path(fpath).name
image_url = f"/v1/generation/outputs/{filename}"
seed = result.get("seed")
entry = {
"prompt": prompt, "index": img_idx,
"image_url": image_url, "seed": seed,
}
task["completed"] += 1
task["results"].append(entry)
await _sse.emit(tid, {
"event": "progress", "task_id": tid,
"prompt": prompt, "index": img_idx,
"completed": task["completed"], "total": task["total_images"],
"image_url": image_url, "seed": seed, "time": _now(),
})
log.info("βœ“ %d/%d task=%s %s",
task["completed"], task["total_images"], tid[:8], filename)
return entry
except Exception as exc:
log.exception("Save/download error task=%s: %s", tid[:8], exc)
task["failed_count"] += 1
await _sse.emit(tid, {
"event": "error", "message": f"Save failed: {exc}",
"prompt": prompt, "index": img_idx, "time": _now(),
})
return None
return None
async def _run_task(task: dict) -> None:
loop = asyncio.get_running_loop()
tid = task["id"]
# Wait for key to be available before starting
if not _keys.is_ready:
log.info("Task %s waiting for initial key …", tid[:8])
await _sse.emit(tid, {
"event": "waiting", "task_id": tid,
"message": "Waiting for server key initialization …",
"time": _now(),
})
try:
await _keys.wait_ready(timeout=120)
except asyncio.TimeoutError:
task["status"] = "failed"
task["error"] = "Key initialization timeout"
task["finished_at"] = _now()
await _sse.emit(tid, {
"event": "failed", "task_id": tid,
"error": "Key initialization timeout", "time": _now(),
})
return
task["status"] = "running"
task["started_at"] = _now()
await _sse.emit(tid, {
"event": "started", "task_id": tid,
"total": task["total_images"], "time": _now(),
})
ad_code = await loop.run_in_executor(_executor, _client.fetch_ad_code)
async def _hb():
t0 = time.time()
while task["status"] == "running":
await asyncio.sleep(SSE_PING_SEC)
if task["status"] == "running":
await _sse.emit(tid, {
"event": "heartbeat", "task_id": tid,
"completed": task["completed"], "total": task["total_images"],
"elapsed": round(time.time() - t0, 1), "time": _now(),
})
hb = asyncio.create_task(_hb())
try:
for prompt in task["prompts"]:
for i in range(task["count"]):
await _gen_one_image(prompt, task, i, ad_code)
if task["completed"] == 0 and task["total_images"] > 0:
task["status"] = "failed"
task["error"] = "No images generated"
else:
task["status"] = "completed"
task["finished_at"] = _now()
await _sse.emit(tid, {
"event": task["status"], "task_id": tid,
"completed": task["completed"], "total": task["total_images"],
"failed": task["failed_count"], "results": task["results"],
"error": task.get("error"), "time": _now(),
})
except Exception as exc:
log.exception("Task %s crashed: %s", tid[:8], exc)
task["status"] = "failed"
task["error"] = str(exc)
task["finished_at"] = _now()
await _sse.emit(tid, {
"event": "failed", "task_id": tid,
"error": str(exc), "time": _now(),
})
finally:
hb.cancel()
try:
await hb
except asyncio.CancelledError:
pass
log.info("Task %s %s %d/%d failed=%d",
tid[:8], task["status"],
task["completed"], task["total_images"], task["failed_count"])
# ═══════════════════════════════════════════════════════════════
# WORKERS + BACKGROUND JOBS
# ═══════════════════════════════════════════════════════════════
async def _worker(wid: int):
log.info("Worker-%d online", wid)
while True:
job = await _queue.get()
if job is None:
_queue.task_done()
log.info("Worker-%d offline", wid)
return
try:
await _run_task(job["task"])
except Exception as exc:
log.exception("Worker-%d unhandled: %s", wid, exc)
_queue.task_done()
async def _janitor():
while True:
await asyncio.sleep(CLEANUP_INTERVAL)
try:
n = _tasks.purge_expired()
if n:
log.info("Janitor: purged %d expired tasks (%d remain)", n, _tasks.size)
except Exception:
log.exception("Janitor error")
async def _background_key_fetch():
"""
Fetch userKey via browser IN THE BACKGROUND.
Server is already accepting connections while this runs.
"""
skip = os.environ.get("NO_INITIAL_FETCH", "0") in ("1", "true", "True")
if skip:
log.info("NO_INITIAL_FETCH β†’ skipping browser key fetch")
return
log.info("Background key fetch starting …")
try:
key = await extract_key_via_browser(BROWSER_TIMEOUT, BROWSER_HEADLESS)
if key:
await _keys.set(key)
log.info("Background key fetch OK")
else:
log.warning("Background key fetch failed β€” use /v1/keys/set or /v1/keys/refresh")
except Exception as exc:
log.exception("Background key fetch error: %s", exc)
# ═══════════════════════════════════════════════════════════════
# FASTAPI β€” LIFESPAN
#
# KEY CHANGE: yield IMMEDIATELY so HF Spaces sees port 7860
# key fetch runs as a background task AFTER yield
# ═══════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(_app: FastAPI):
global _queue, _server_start_time
_server_start_time = time.time()
_queue = asyncio.Queue(maxsize=QUEUE_CAPACITY)
loop = asyncio.get_running_loop()
# Start Xvfb (fast, synchronous)
await loop.run_in_executor(None, partial(_display.ensure, BROWSER_HEADLESS))
# Start workers (instant)
workers = [asyncio.create_task(_worker(i + 1)) for i in range(WORKER_COUNT)]
# Start janitor (instant)
janitor = asyncio.create_task(_janitor())
# Start key fetch IN BACKGROUND (non-blocking!)
key_task = asyncio.create_task(_background_key_fetch())
log.info(
"═══ SERVER READY ═══ workers=%d queue_cap=%d headless=%s (key fetching in background)",
WORKER_COUNT, QUEUE_CAPACITY, BROWSER_HEADLESS,
)
# ════════════════════════════════════════
# yield IMMEDIATELY β€” port 7860 opens NOW
# HF Spaces sees 200 OK on GET /
# No more "stuck on Starting"!
# ════════════════════════════════════════
yield
# ──────── shutdown ────────
log.info("Shutting down …")
# Cancel background key fetch if still running
if not key_task.done():
key_task.cancel()
try:
await key_task
except (asyncio.CancelledError, Exception):
pass
for _ in range(WORKER_COUNT):
await _queue.put(None)
await asyncio.gather(*workers, return_exceptions=True)
janitor.cancel()
try:
await janitor
except asyncio.CancelledError:
pass
_client.close()
_executor.shutdown(wait=False)
await loop.run_in_executor(None, _display.shutdown)
log.info("═══ SHUTDOWN COMPLETE ═══")
# ═══════════════════════════════════════════════════════════════
# FASTAPI β€” APP + ROUTES
# ═══════════════════════════════════════════════════════════════
app = FastAPI(title="Perchance Image Generation API", version="3.0", lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
@app.exception_handler(Exception)
async def _global_err(request: Request, exc: Exception):
log.exception("Unhandled: %s", exc)
return JSONResponse(status_code=500, content={"error": "Internal server error"})
# ══════════════════════════════════════
# ROOT β€” HF Spaces readiness check
# ══════════════════════════════════════
@app.get("/")
async def root():
return {
"name": "Perchance Image Generation API",
"version": "3.0",
"status": "running",
"key_ready": _keys.is_ready,
"uptime": round(time.time() - _server_start_time, 1),
"docs": "/docs",
"endpoints": {
"generate": "POST /v1/generation/Image/Create",
"stream": "GET /v1/generation/stream/{task_id}",
"status": "GET /v1/generation/status/{task_id}",
"outputs": "GET /v1/generation/outputs/{filename}",
"health": "GET /health",
"set_key": "POST /v1/keys/set",
"refresh_key": "POST /v1/keys/refresh",
},
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"has_key": _keys.key is not None,
"key_ready": _keys.is_ready,
"queue_size": _queue.qsize() if _queue else 0,
"queue_capacity": QUEUE_CAPACITY,
"active_tasks": _tasks.active_count(),
"stored_tasks": _tasks.size,
"sse_subscribers": _sse.total_subscribers,
"uptime": round(time.time() - _server_start_time, 1),
"time": _now(),
}
@app.post("/v1/generation/Image/Create", response_model=GenerationResponse)
async def create_image(req: GenerationRequest):
prompts = req.prompts if req.prompts else ([req.prompt] if req.prompt else [])
prompts = [p.strip() for p in prompts if p and p.strip()]
if not prompts:
raise HTTPException(400, detail="At least one non-empty prompt is required")
task = _tasks.create(
prompts=prompts, count=req.count, resolution=req.resolution,
guidance=req.guidance_scale, negative=req.negative_prompt, style=req.style,
)
try:
await asyncio.wait_for(_queue.put({"task": task}), timeout=5.0)
except (asyncio.TimeoutError, asyncio.QueueFull):
raise HTTPException(503, detail="Server is busy β€” please retry shortly")
pos = _queue.qsize()
await _sse.emit(task["id"], {
"event": "queued", "task_id": task["id"],
"position": pos, "total": task["total_images"], "time": _now(),
})
return GenerationResponse(
task_id=task["id"],
stream=f"/v1/generation/stream/{task['id']}",
status_url=f"/v1/generation/status/{task['id']}",
total_images=task["total_images"],
created_at=task["created_at"],
queue_pos=pos,
)
@app.get("/v1/generation/stream/{task_id}")
async def stream_task(request: Request, task_id: str):
task = _tasks.get(task_id)
if not task:
raise HTTPException(404, detail="Task not found")
sub_q = await _sse.subscribe(task_id)
async def _stream():
try:
yield {
"event": "connected",
"data": json.dumps({
"task_id": task_id, "status": task["status"],
"total_images": task["total_images"],
"completed": task["completed"],
"created_at": task["created_at"], "time": _now(),
}),
}
if task["status"] in ("completed", "failed"):
for r in task["results"]:
yield {
"event": "progress",
"data": json.dumps({
"task_id": task_id, "prompt": r["prompt"],
"index": r["index"], "image_url": r["image_url"],
"seed": r["seed"], "completed": task["completed"],
"total": task["total_images"],
}),
}
yield {
"event": task["status"],
"data": json.dumps({
"task_id": task_id, "completed": task["completed"],
"total": task["total_images"],
"results": task["results"], "error": task.get("error"),
}),
}
return
while True:
try:
ev = await asyncio.wait_for(sub_q.get(), timeout=SSE_PING_SEC + 5)
except asyncio.TimeoutError:
yield {
"event": "ping",
"data": json.dumps({"time": _now()}),
}
if await request.is_disconnected():
break
if task["status"] in ("completed", "failed"):
yield {
"event": task["status"],
"data": json.dumps({
"task_id": task_id, "completed": task["completed"],
"total": task["total_images"],
"results": task["results"], "error": task.get("error"),
}),
}
break
continue
etype = ev.get("event", "message")
yield {"event": etype, "data": json.dumps(ev)}
if etype in ("completed", "failed"):
break
finally:
await _sse.unsubscribe(task_id, sub_q)
return EventSourceResponse(_stream())
@app.get("/v1/generation/status/{task_id}")
async def task_status(task_id: str):
task = _tasks.get(task_id)
if not task:
raise HTTPException(404, detail="Task not found")
return {
"task_id": task["id"], "status": task["status"],
"total_images": task["total_images"], "completed": task["completed"],
"failed_count": task["failed_count"], "created_at": task["created_at"],
"started_at": task["started_at"], "finished_at": task["finished_at"],
"results": task["results"], "error": task.get("error"),
}
@app.get("/v1/generation/outputs/{filename}")
async def serve_image(filename: str):
fp = OUTPUT_DIR / filename
if not fp.exists():
raise HTTPException(404, detail="File not found")
ext = fp.suffix.lower()
media = {
".png": "image/png", ".jpg": "image/jpeg",
".jpeg": "image/jpeg", ".webp": "image/webp",
}.get(ext, "application/octet-stream")
return FileResponse(fp, media_type=media, filename=filename)
@app.post("/v1/keys/set")
async def set_key(body: SetKeyRequest):
await _keys.set(body.userKey)
_keys.reset_failures()
return {"status": "ok", "key_length": len(body.userKey)}
@app.post("/v1/keys/refresh")
async def refresh_key():
_keys.reset_failures()
asyncio.create_task(_keys.refresh(_sse))
return {"status": "started", "message": "Key refresh running in background"}
# ═══════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
import uvicorn
_display.ensure(BROWSER_HEADLESS)
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))