AdarshJi commited on
Commit
560edb5
Β·
verified Β·
1 Parent(s): d6c44c8

Create server.py

Browse files
Files changed (1) hide show
  1. server.py +1267 -0
server.py ADDED
@@ -0,0 +1,1267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # perchance_server.py
2
+ """
3
+ Perchance Image-Generation Server v2.0
4
+
5
+ Changes from v1:
6
+ β€’ Auto-detects 'invalid_key' and re-fetches userKey via zendriver
7
+ without manual intervention.
8
+ β€’ Only ONE browser launch at startup (no duplicate fetch).
9
+ β€’ Uses FastAPI *lifespan* context-manager β†’ zero deprecation warnings.
10
+ β€’ Coordinated key refresh: one refresh at a time; other workers wait.
11
+ β€’ SSE events: key_refreshing / key_refreshed / key_refresh_failed
12
+ so the client knows exactly what's happening.
13
+ β€’ Cleaner separation of concerns, better error handling.
14
+ """
15
+
16
+ # ═══════════════════════════════════════════════════════════════
17
+ # IMPORTS
18
+ # ═══════════════════════════════════════════════════════════════
19
+
20
+ import asyncio
21
+ import base64
22
+ import json
23
+ import logging
24
+ import os
25
+ import random
26
+ import string
27
+ import time
28
+ import uuid
29
+ from concurrent.futures import ThreadPoolExecutor
30
+ from contextlib import asynccontextmanager
31
+ from datetime import datetime
32
+ from functools import partial
33
+ from pathlib import Path
34
+ from typing import Any, Dict, List, Optional
35
+
36
+ import cloudscraper
37
+ from fastapi import FastAPI, HTTPException, Request
38
+ from fastapi.middleware.cors import CORSMiddleware
39
+ from fastapi.responses import FileResponse
40
+ from sse_starlette.sse import EventSourceResponse
41
+ import zendriver as zd
42
+ from zendriver import cdp
43
+
44
+
45
+ # ═══════════════════════════════════════════════════════════════
46
+ # CONFIGURATION
47
+ # ═══════════════════════════════════════════════════════════════
48
+
49
+ # --- Perchance API ---
50
+ BASE_URL = "https://image-generation.perchance.org"
51
+ API_GENERATE = "/api/generate"
52
+ API_DOWNLOAD = "/api/downloadTemporaryImage"
53
+ API_AWAIT = "/api/awaitExistingGenerationRequest"
54
+ API_ACCESS_CODE = "/api/getAccessCodeForAdPoweredStuff"
55
+
56
+ # --- Browser automation (zendriver) ---
57
+ TARGET_URL = "https://perchance.org/ai-text-to-image-generator"
58
+ IMAGE_GEN_ORIGIN = "https://image-generation.perchance.org"
59
+ ZD_TIMEOUT = 90 # seconds for key-fetch attempt
60
+ ZD_HEADLESS = False # True β†’ hide browser window
61
+ CLICK_INTERVAL = 0.35
62
+ CLICK_JITTER = 8.0
63
+ KEY_PREFIX = "userKey"
64
+
65
+ # --- HTTP / generation ---
66
+ HTTP_TIMEOUT = 30
67
+ MAX_DOWNLOAD_WAIT = 180
68
+ BACKOFF_INIT = 0.7
69
+ MAX_GEN_RETRIES = 6 # retries inside generate_one()
70
+
71
+ # --- Key-refresh policy ---
72
+ MAX_KEY_RETRIES = 3 # per-image retries when key is invalid
73
+ KEY_REFRESH_COOLDOWN = 30 # min seconds between two refreshes
74
+ MAX_REFRESH_FAILURES = 5 # consecutive failures β†’ stop auto-refresh
75
+
76
+ # --- Server ---
77
+ WORKER_COUNT = 3
78
+ MAX_QUEUE_SIZE = 1000
79
+ EXECUTOR_THREADS = 16
80
+ OUTPUT_DIR = Path("outputs")
81
+ OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
82
+
83
+
84
+ # ═══════════════════════════════════════════════════════════════
85
+ # LOGGING
86
+ # ═══════════════════════════════════════════════════════════════
87
+
88
+ LOG_FMT = "%(asctime)s | %(levelname)-7s | %(message)s"
89
+ logging.basicConfig(level=logging.INFO, format=LOG_FMT)
90
+ log = logging.getLogger("perchance")
91
+
92
+
93
+ # ═══════════════════════════════════════════════════════════════
94
+ # GLOBAL STATE
95
+ #
96
+ # Asyncio primitives (Lock, Event, Queue) are created inside
97
+ # the lifespan handler so they live in uvicorn's event loop.
98
+ # ═══════════════════════════════════════════════════════════════
99
+
100
+ USER_KEY: Optional[str] = None
101
+
102
+ # -- set in lifespan --
103
+ _key_lock: Optional[asyncio.Lock] = None # guard USER_KEY reads/writes
104
+ _key_valid: Optional[asyncio.Event] = None # cleared while refreshing
105
+ _key_refresh_lock: Optional[asyncio.Lock] = None # one refresh at a time
106
+ _key_last_ts: float = 0.0 # last successful refresh
107
+ _key_fail_count: int = 0 # consecutive refresh failures
108
+
109
+ JOB_QUEUE: Optional[asyncio.Queue] = None
110
+
111
+ TASKS: Dict[str, Dict[str, Any]] = {}
112
+ TASK_QUEUES: Dict[str, asyncio.Queue] = {} # SSE event queues
113
+
114
+ EXECUTOR = ThreadPoolExecutor(max_workers=EXECUTOR_THREADS)
115
+ SCRAPER = cloudscraper.create_scraper()
116
+
117
+
118
+ # ═══════════════════════════════════════════════════════════════
119
+ # SMALL HELPERS
120
+ # ═══════════════════════════════════════════════════════════════
121
+
122
+ def _safe(s: str) -> str:
123
+ """Sanitise string for filenames."""
124
+ ok = set(string.ascii_letters + string.digits + "-_.()")
125
+ return "".join(c if c in ok else "_" for c in s)[:120]
126
+
127
+
128
+ def _sid() -> str:
129
+ return "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
130
+
131
+
132
+ def _now() -> str:
133
+ return datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
134
+
135
+
136
+ def _reqid() -> str:
137
+ return f"{time.time():.6f}-{_sid()}"
138
+
139
+
140
+ def _stamp() -> str:
141
+ return datetime.utcnow().strftime("%Y%m%dT%H%M%S")
142
+
143
+
144
+ # ═══════════════════════════════════════════════════════════════
145
+ # PERCHANCE HTTP CLIENT (blocking – runs in ThreadPoolExecutor)
146
+ # ═══════════════════════════════════════════════════════════════
147
+
148
+ class PerchanceClient:
149
+ """All blocking HTTP work against the Perchance API."""
150
+
151
+ def __init__(self):
152
+ self.base = BASE_URL.rstrip("/")
153
+ self.s = SCRAPER
154
+ self.h = {
155
+ "Accept": "*/*",
156
+ "Content-Type": "application/json;charset=UTF-8",
157
+ "Origin": IMAGE_GEN_ORIGIN,
158
+ "Referer": f"{IMAGE_GEN_ORIGIN}/embed",
159
+ "User-Agent": (
160
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
161
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
162
+ "Chrome/131.0.0.0 Safari/537.36"
163
+ ),
164
+ }
165
+
166
+ # ---- low-level helpers ----
167
+
168
+ def get_ad_code(self) -> str:
169
+ try:
170
+ r = self.s.get(
171
+ f"{self.base}{API_ACCESS_CODE}",
172
+ timeout=HTTP_TIMEOUT, headers=self.h,
173
+ )
174
+ r.raise_for_status()
175
+ return r.text.strip()
176
+ except Exception:
177
+ return ""
178
+
179
+ def _post(self, body: dict, params: dict) -> dict:
180
+ try:
181
+ r = self.s.post(
182
+ f"{self.base}{API_GENERATE}",
183
+ json=body, params=params,
184
+ timeout=HTTP_TIMEOUT, headers=self.h,
185
+ )
186
+ r.raise_for_status()
187
+ try:
188
+ return r.json()
189
+ except Exception:
190
+ return {"status": "invalid_json", "raw": r.text}
191
+ except Exception as exc:
192
+ return {"status": "fetch_failure", "error": str(exc)}
193
+
194
+ def _await_prev(self, key: str):
195
+ try:
196
+ self.s.get(
197
+ f"{self.base}{API_AWAIT}",
198
+ params={"userKey": key, "__cacheBust": random.random()},
199
+ timeout=20, headers=self.h,
200
+ )
201
+ except Exception:
202
+ pass
203
+
204
+ # ---- generate one image ----
205
+
206
+ def generate_one(
207
+ self, *,
208
+ prompt: str,
209
+ negative_prompt: str = "",
210
+ seed: int = -1,
211
+ resolution: str = "512x768",
212
+ guidance_scale: float = 7.0,
213
+ channel: str = "ai-text-to-image-generator",
214
+ sub_channel: str = "private",
215
+ user_key: str = "",
216
+ ad_access_code: str = "",
217
+ request_id: str = "",
218
+ ) -> dict:
219
+ """
220
+ Returns ONE of:
221
+ {"imageId": ..., "seed": ...}
222
+ {"inline": ..., "seed": ...}
223
+ {"error": "invalid_key"} ← caller must refresh key
224
+ {"error": "<other>", ...}
225
+ """
226
+ request_id = request_id or _reqid()
227
+ params = {
228
+ "userKey": user_key,
229
+ "requestId": request_id,
230
+ "adAccessCode": ad_access_code,
231
+ "__cacheBust": random.random(),
232
+ }
233
+ body = {
234
+ "prompt": prompt,
235
+ "negativePrompt": negative_prompt,
236
+ "seed": seed,
237
+ "resolution": resolution,
238
+ "guidanceScale": guidance_scale,
239
+ "channel": channel,
240
+ "subChannel": sub_channel,
241
+ "userKey": user_key,
242
+ "adAccessCode": ad_access_code,
243
+ "requestId": request_id,
244
+ }
245
+
246
+ ad_refreshed = False
247
+
248
+ for att in range(1, MAX_GEN_RETRIES + 1):
249
+ res = self._post(body, params)
250
+ st = res.get("status")
251
+
252
+ # ---- success ----
253
+ if st == "success":
254
+ iid = res.get("imageId")
255
+ urls = res.get("imageDataUrls")
256
+ if iid:
257
+ log.info("Got imageId: %s", iid)
258
+ return {"imageId": iid, "seed": res.get("seed")}
259
+ if urls:
260
+ return {"inline": urls[0], "seed": res.get("seed")}
261
+ log.error("success but empty payload: %s", str(res)[:300])
262
+ return {"error": "empty_success", "raw": res}
263
+
264
+ # ---- invalid key β†’ return immediately (do NOT retry here) ----
265
+ if st == "invalid_key":
266
+ log.warning("Server says invalid_key")
267
+ return {"error": "invalid_key"}
268
+
269
+ # ---- previous request still running ----
270
+ if st == "waiting_for_prev_request_to_finish":
271
+ log.info("Waiting for prev request to finish …")
272
+ self._await_prev(user_key)
273
+ time.sleep(0.3 + random.random() * 0.3)
274
+ continue
275
+
276
+ # ---- ad access code expired ----
277
+ if st == "invalid_ad_access_code" and not ad_refreshed:
278
+ code = self.get_ad_code()
279
+ if code:
280
+ ad_access_code = code
281
+ params["adAccessCode"] = code
282
+ body["adAccessCode"] = code
283
+ ad_refreshed = True
284
+ log.info("Refreshed ad code β†’ retry")
285
+ time.sleep(0.8)
286
+ continue
287
+ return {"error": "invalid_ad_access_code"}
288
+
289
+ # ---- transient gen failure ----
290
+ if st == "gen_failure" and res.get("type") == 1:
291
+ log.warning("gen_failure type 1 β†’ retry after 2.5 s")
292
+ time.sleep(2.5)
293
+ continue
294
+
295
+ # ---- network / stale ----
296
+ if st in (None, "fetch_failure", "invalid_json", "stale_request"):
297
+ log.info("Transient error (status=%s) attempt %d/%d", st, att, MAX_GEN_RETRIES)
298
+ time.sleep(1.0)
299
+ continue
300
+
301
+ # ---- anything else ----
302
+ log.error("Unhandled status '%s': %s", st, str(res)[:300])
303
+ return {"error": f"unhandled_{st}", "raw": res}
304
+
305
+ return {"error": "max_retries_exceeded"}
306
+
307
+ # ---- download ----
308
+
309
+ def download_image(self, image_id: str, prefix: str = "img") -> str:
310
+ """Poll until the image is ready, save to OUTPUT_DIR, return path."""
311
+ url = f"{self.base}{API_DOWNLOAD}?imageId={image_id}"
312
+ t0 = time.time()
313
+ bk = BACKOFF_INIT
314
+
315
+ while True:
316
+ elapsed = time.time() - t0
317
+ if elapsed >= MAX_DOWNLOAD_WAIT:
318
+ raise TimeoutError(
319
+ f"Download timed out ({elapsed:.0f}s) for {image_id}"
320
+ )
321
+ try:
322
+ r = self.s.get(url, timeout=HTTP_TIMEOUT,
323
+ headers=self.h, stream=True)
324
+ if r.status_code == 200:
325
+ ct = r.headers.get("Content-Type", "")
326
+ ext = (
327
+ ".png" if "png" in ct else
328
+ ".webp" if "webp" in ct else ".jpg"
329
+ )
330
+ fn = _safe(f"{prefix}_{image_id[:12]}{ext}")
331
+ fp = str(OUTPUT_DIR / fn)
332
+ with open(fp, "wb") as f:
333
+ for chunk in r.iter_content(8192):
334
+ if chunk:
335
+ f.write(chunk)
336
+ log.info("Saved β†’ %s", fp)
337
+ return fp
338
+ except Exception:
339
+ pass
340
+
341
+ time.sleep(bk)
342
+ bk = min(bk * 1.8, 8.0)
343
+
344
+
345
+ CLIENT = PerchanceClient()
346
+
347
+
348
+ # ═══════════════════════════════════════════════════════════════
349
+ # ZENDRIVER – browser automation to extract userKey
350
+ # ═══════════════════════════════════════════════════════════════
351
+
352
+ async def _cdp_mouse(tab, typ, x, y, **kw):
353
+ await tab.send(
354
+ cdp.input_.dispatch_mouse_event(
355
+ type_=typ, x=float(x), y=float(y), **kw,
356
+ )
357
+ )
358
+
359
+
360
+ async def _viewport_center(tab):
361
+ try:
362
+ v = await tab.evaluate(
363
+ "(()=>({w:innerWidth,h:innerHeight}))()",
364
+ await_promise=False, return_by_value=True,
365
+ )
366
+ return (v["w"] / 2.0, v["h"] / 2.0)
367
+ except Exception:
368
+ return (600.0, 400.0)
369
+
370
+
371
+ async def _ls_get(tab, key):
372
+ try:
373
+ return await tab.evaluate(
374
+ f"localStorage&&localStorage.getItem({json.dumps(key)})",
375
+ await_promise=True, return_by_value=True,
376
+ )
377
+ except Exception:
378
+ return None
379
+
380
+
381
+ async def _clicker_loop(tab, stop: asyncio.Event):
382
+ """Simulate steady centre-clicks on *tab* until *stop* is set."""
383
+ try:
384
+ await tab.evaluate(
385
+ "window.focus&&window.focus()",
386
+ await_promise=False, return_by_value=False,
387
+ )
388
+ except Exception:
389
+ pass
390
+
391
+ centre = await _viewport_center(tab)
392
+ centre_upd = time.time()
393
+
394
+ while not stop.is_set():
395
+ if time.time() - centre_upd > 2.5:
396
+ centre = await _viewport_center(tab)
397
+ centre_upd = time.time()
398
+
399
+ jx = random.uniform(-CLICK_JITTER, CLICK_JITTER)
400
+ jy = random.uniform(-CLICK_JITTER, CLICK_JITTER)
401
+ cx, cy = centre[0] + jx, centre[1] + jy
402
+
403
+ try:
404
+ await _cdp_mouse(tab, "mouseMoved", cx, cy, pointer_type="mouse")
405
+ await asyncio.sleep(random.uniform(0.02, 0.08))
406
+ await _cdp_mouse(
407
+ tab, "mousePressed", cx, cy,
408
+ button=cdp.input_.MouseButton.LEFT,
409
+ click_count=1, buttons=1,
410
+ )
411
+ await asyncio.sleep(random.uniform(0.03, 0.12))
412
+ await _cdp_mouse(
413
+ tab, "mouseReleased", cx, cy,
414
+ button=cdp.input_.MouseButton.LEFT,
415
+ click_count=1, buttons=0,
416
+ )
417
+ except Exception:
418
+ pass
419
+
420
+ # interruptible sleep
421
+ try:
422
+ await asyncio.wait_for(
423
+ stop.wait(),
424
+ timeout=CLICK_INTERVAL * random.uniform(0.85, 1.15),
425
+ )
426
+ break
427
+ except asyncio.TimeoutError:
428
+ pass
429
+
430
+
431
+ async def _poll_for_key(tab, stop: asyncio.Event, max_sec: int):
432
+ """Poll localStorage every 250 ms for a userKey entry."""
433
+ t0 = time.time()
434
+ while not stop.is_set() and (time.time() - t0) < max_sec:
435
+ val = await _ls_get(tab, f"{KEY_PREFIX}-0")
436
+ if val:
437
+ return val
438
+ try:
439
+ keys = await tab.evaluate(
440
+ "Object.keys(localStorage||{}).filter(k=>k.includes('userKey'))",
441
+ await_promise=False, return_by_value=True,
442
+ )
443
+ for k in (keys or []):
444
+ v = await _ls_get(tab, k)
445
+ if v:
446
+ return v
447
+ except Exception:
448
+ pass
449
+ await asyncio.sleep(0.25)
450
+ return None
451
+
452
+
453
+ async def fetch_key_via_browser(
454
+ timeout: int = ZD_TIMEOUT,
455
+ headless: bool = ZD_HEADLESS,
456
+ ) -> Optional[str]:
457
+ """
458
+ Launch Chrome β†’ navigate to Perchance β†’ click to trigger
459
+ ad/verification β†’ read userKey from localStorage β†’ close browser.
460
+ Returns the key string or None.
461
+ """
462
+ log.info(
463
+ "Launching browser for userKey (timeout=%ds, headless=%s)",
464
+ timeout, headless,
465
+ )
466
+
467
+ try:
468
+ browser = await zd.start(headless=headless)
469
+ except Exception as exc:
470
+ log.exception("Browser start failed: %s", exc)
471
+ return None
472
+
473
+ stop = asyncio.Event()
474
+ result = None
475
+
476
+ try:
477
+ page_tab = await browser.get(TARGET_URL)
478
+ log.info("Opened %s", TARGET_URL)
479
+ await asyncio.sleep(2.0)
480
+
481
+ origin_tab = await browser.get(IMAGE_GEN_ORIGIN, new_tab=True)
482
+ log.info("Opened %s", IMAGE_GEN_ORIGIN)
483
+ await asyncio.sleep(1.0)
484
+
485
+ await page_tab.bring_to_front()
486
+ await asyncio.sleep(0.5)
487
+
488
+ clicker = asyncio.create_task(_clicker_loop(page_tab, stop))
489
+ poller = asyncio.create_task(_poll_for_key(origin_tab, stop, timeout))
490
+
491
+ try:
492
+ done, _ = await asyncio.wait({poller}, timeout=timeout)
493
+ if poller in done:
494
+ result = poller.result()
495
+ finally:
496
+ stop.set()
497
+ if not clicker.done():
498
+ clicker.cancel()
499
+ try:
500
+ await clicker
501
+ except asyncio.CancelledError:
502
+ pass
503
+
504
+ for t in (origin_tab, page_tab):
505
+ try:
506
+ await t.close()
507
+ except Exception:
508
+ pass
509
+ finally:
510
+ try:
511
+ await browser.stop()
512
+ except Exception:
513
+ pass
514
+
515
+ if result:
516
+ log.info("Fetched userKey (len=%d)", len(result))
517
+ else:
518
+ log.warning("Could not fetch userKey within %ds", timeout)
519
+ return result
520
+
521
+
522
+ # ═══════════════════════════════════════════════════════════════
523
+ # KEY MANAGEMENT – coordinated refresh across workers
524
+ # ═══════════════════════════════════════════════════════════════
525
+
526
+ async def _broadcast(event: dict):
527
+ """Push an event into every active task's SSE queue."""
528
+ for tid, q in TASK_QUEUES.items():
529
+ task = TASKS.get(tid)
530
+ if task and task["status"] in ("queued", "running"):
531
+ try:
532
+ q.put_nowait(event)
533
+ except asyncio.QueueFull:
534
+ pass
535
+
536
+
537
+ async def refresh_user_key() -> Optional[str]:
538
+ """
539
+ Coordinate a single key refresh. If another coroutine is already
540
+ refreshing, we simply wait for it to finish and return the new key.
541
+
542
+ Returns the new key string, or None on failure.
543
+ """
544
+ global USER_KEY, _key_last_ts, _key_fail_count
545
+
546
+ async with _key_refresh_lock:
547
+ # ── double-check: maybe another coroutine just refreshed ──
548
+ age = time.time() - _key_last_ts
549
+ if age < KEY_REFRESH_COOLDOWN and USER_KEY:
550
+ log.info(
551
+ "Key was refreshed %.1fs ago β†’ reusing existing key", age,
552
+ )
553
+ return USER_KEY
554
+
555
+ # ── too many consecutive failures? ──
556
+ if _key_fail_count >= MAX_REFRESH_FAILURES:
557
+ log.error(
558
+ "Key refresh disabled: %d consecutive failures. "
559
+ "Set key manually via POST /set_user_key",
560
+ _key_fail_count,
561
+ )
562
+ await _broadcast({
563
+ "type": "key_refresh_failed",
564
+ "time": _now(),
565
+ "message": (
566
+ f"Auto-refresh disabled after {_key_fail_count} failures. "
567
+ "Please set userKey manually via /set_user_key"
568
+ ),
569
+ })
570
+ return None
571
+
572
+ # ── signal "key is being refreshed" ──
573
+ _key_valid.clear()
574
+ log.info("Starting userKey refresh via browser …")
575
+
576
+ await _broadcast({
577
+ "type": "key_refreshing",
578
+ "time": _now(),
579
+ "message": "UserKey expired β€” refreshing via browser automation …",
580
+ })
581
+
582
+ try:
583
+ new_key = await fetch_key_via_browser(
584
+ timeout=ZD_TIMEOUT, headless=ZD_HEADLESS,
585
+ )
586
+
587
+ if new_key:
588
+ async with _key_lock:
589
+ USER_KEY = new_key
590
+ _key_last_ts = time.time()
591
+ _key_fail_count = 0
592
+
593
+ log.info("UserKey refreshed OK (len=%d)", len(new_key))
594
+ await _broadcast({
595
+ "type": "key_refreshed",
596
+ "time": _now(),
597
+ "message": "UserKey refreshed – resuming generation.",
598
+ })
599
+ return new_key
600
+
601
+ # fetch returned None
602
+ _key_fail_count += 1
603
+ log.error(
604
+ "Key refresh returned nothing (failure #%d/%d)",
605
+ _key_fail_count, MAX_REFRESH_FAILURES,
606
+ )
607
+ await _broadcast({
608
+ "type": "key_refresh_failed",
609
+ "time": _now(),
610
+ "message": (
611
+ f"Key refresh failed (attempt {_key_fail_count}"
612
+ f"/{MAX_REFRESH_FAILURES})"
613
+ ),
614
+ })
615
+ return None
616
+
617
+ except Exception as exc:
618
+ _key_fail_count += 1
619
+ log.exception(
620
+ "Key refresh error (failure #%d/%d): %s",
621
+ _key_fail_count, MAX_REFRESH_FAILURES, exc,
622
+ )
623
+ await _broadcast({
624
+ "type": "key_refresh_failed",
625
+ "time": _now(),
626
+ "message": f"Key refresh error: {exc}",
627
+ })
628
+ return None
629
+
630
+ finally:
631
+ # ALWAYS unblock waiters, even on failure
632
+ _key_valid.set()
633
+
634
+
635
+ # ═══════════════════════════════════════════════════════════════
636
+ # TASK MODEL
637
+ # ═══════════════════════════════════════════════════════════════
638
+
639
+ def create_task(
640
+ prompts: List[str],
641
+ count: int,
642
+ resolution: str,
643
+ guidance: float,
644
+ negative: str,
645
+ sub_channel: str,
646
+ ) -> dict:
647
+ tid = str(uuid.uuid4())
648
+ task = {
649
+ "id": tid,
650
+ "prompts": prompts,
651
+ "count": count,
652
+ "resolution": resolution,
653
+ "guidance": guidance,
654
+ "negative": negative,
655
+ "sub_channel": sub_channel,
656
+ "created_at": _now(),
657
+ "status": "queued", # queued β†’ running β†’ done / failed
658
+ "total_images": len(prompts) * count,
659
+ "completed": 0,
660
+ "results": [],
661
+ "error": None,
662
+ }
663
+ TASKS[tid] = task
664
+ TASK_QUEUES[tid] = asyncio.Queue()
665
+ return task
666
+
667
+
668
+ # ═══════════════════════════════════════════════════════════════
669
+ # WORKER β€” image generation + key-refresh retry loop
670
+ # ═══════════════════════════════════════════════════════════════
671
+
672
+ async def _save_inline(data_url: str, prompt: str) -> str:
673
+ """Decode base-64 data URL β†’ file. Returns path."""
674
+ loop = asyncio.get_running_loop()
675
+ header, b64 = (data_url.split(",", 1) + [""])[:2] if "," in data_url else ("", data_url)
676
+ ext = ".png" if "png" in header else ".jpg"
677
+ fn = _safe(f"{prompt[:30]}_{_stamp()}_{_sid()}{ext}")
678
+ fp = OUTPUT_DIR / fn
679
+ raw = base64.b64decode(b64)
680
+ await loop.run_in_executor(EXECUTOR, fp.write_bytes, raw)
681
+ log.info("Saved inline β†’ %s", fp)
682
+ return str(fp)
683
+
684
+
685
+ async def _download(image_id: str, prompt: str) -> str:
686
+ """Download via PerchanceClient (blocking, in executor)."""
687
+ loop = asyncio.get_running_loop()
688
+ prefix = f"{_safe(prompt[:30])}_{_stamp()}_{_sid()}"
689
+ return await loop.run_in_executor(
690
+ EXECUTOR,
691
+ partial(CLIENT.download_image, image_id, prefix),
692
+ )
693
+
694
+
695
+ async def _generate_single(
696
+ prompt: str,
697
+ task: dict,
698
+ idx: int,
699
+ queue: asyncio.Queue,
700
+ ad_code: str,
701
+ ) -> Optional[str]:
702
+ """
703
+ Generate + save one image.
704
+
705
+ On 'invalid_key', triggers a coordinated key refresh and retries
706
+ up to MAX_KEY_RETRIES times. Returns the saved filepath or None.
707
+ """
708
+ loop = asyncio.get_running_loop()
709
+ tid = task["id"]
710
+
711
+ for key_try in range(1, MAX_KEY_RETRIES + 1):
712
+
713
+ # ── wait if a refresh is in progress ──
714
+ await _key_valid.wait()
715
+
716
+ # ── read current key ──
717
+ async with _key_lock:
718
+ active_key = USER_KEY
719
+
720
+ if not active_key:
721
+ await queue.put({
722
+ "type": "error",
723
+ "time": _now(),
724
+ "task_id": tid,
725
+ "message": "No userKey available. Set via /set_user_key",
726
+ })
727
+ return None
728
+
729
+ # ── blocking generation in thread-pool ──
730
+ result = await loop.run_in_executor(
731
+ EXECUTOR,
732
+ partial(
733
+ CLIENT.generate_one,
734
+ prompt=prompt,
735
+ negative_prompt=task["negative"],
736
+ seed=-1,
737
+ resolution=task["resolution"],
738
+ guidance_scale=task["guidance"],
739
+ channel="ai-text-to-image-generator",
740
+ sub_channel=task["sub_channel"],
741
+ user_key=active_key,
742
+ ad_access_code=ad_code,
743
+ request_id=_reqid(),
744
+ ),
745
+ )
746
+
747
+ # ── invalid_key β†’ refresh + retry ──
748
+ if result.get("error") == "invalid_key":
749
+ log.warning(
750
+ "invalid_key for task %s (key_try %d/%d) β†’ refreshing",
751
+ tid, key_try, MAX_KEY_RETRIES,
752
+ )
753
+ await queue.put({
754
+ "type": "key_invalid",
755
+ "time": _now(),
756
+ "task_id": tid,
757
+ "attempt": key_try,
758
+ "max_attempts": MAX_KEY_RETRIES,
759
+ "message": "UserKey invalid β€” refreshing …",
760
+ })
761
+
762
+ new_key = await refresh_user_key()
763
+ if new_key:
764
+ # also refresh ad code with fresh key
765
+ ad_code = await loop.run_in_executor(
766
+ EXECUTOR, CLIENT.get_ad_code,
767
+ )
768
+ continue # ← retry generation
769
+ else:
770
+ await queue.put({
771
+ "type": "error",
772
+ "time": _now(),
773
+ "task_id": tid,
774
+ "message": "Could not refresh userKey β€” aborting image",
775
+ })
776
+ return None
777
+
778
+ # ── other errors ──
779
+ if result.get("error"):
780
+ log.warning(
781
+ "Gen error task=%s prompt='%.40s': %s",
782
+ tid, prompt, result,
783
+ )
784
+ await queue.put({
785
+ "type": "gen_error",
786
+ "time": _now(),
787
+ "task_id": tid,
788
+ "prompt": prompt,
789
+ "index": idx,
790
+ "error": result,
791
+ })
792
+ return None
793
+
794
+ # ── success β†’ save ──
795
+ try:
796
+ if result.get("inline"):
797
+ fp = await _save_inline(result["inline"], prompt)
798
+ elif result.get("imageId"):
799
+ fp = await _download(result["imageId"], prompt)
800
+ else:
801
+ log.error("Unexpected result: %s", result)
802
+ return None
803
+
804
+ seed = result.get("seed")
805
+ task["completed"] += 1
806
+ task["results"].append({
807
+ "prompt": prompt,
808
+ "index": idx,
809
+ "path": fp,
810
+ "seed": seed,
811
+ })
812
+ await queue.put({
813
+ "type": "image_ready",
814
+ "time": _now(),
815
+ "task_id": tid,
816
+ "prompt": prompt,
817
+ "index": idx,
818
+ "path": fp,
819
+ "seed": seed,
820
+ "completed": task["completed"],
821
+ "total": task["total_images"],
822
+ })
823
+ return fp
824
+
825
+ except Exception as exc:
826
+ log.exception("Save/download error task=%s: %s", tid, exc)
827
+ await queue.put({
828
+ "type": "download_error",
829
+ "time": _now(),
830
+ "task_id": tid,
831
+ "prompt": prompt,
832
+ "index": idx,
833
+ "error": str(exc),
834
+ })
835
+ return None
836
+
837
+ # exhausted key retries
838
+ log.error("Exhausted key retries for task %s prompt='%.40s'", tid, prompt)
839
+ return None
840
+
841
+
842
+ async def worker_loop(worker_id: int, semaphore: asyncio.Semaphore):
843
+ """Long-running coroutine: pull jobs β†’ generate images."""
844
+ log.info("Worker %d started", worker_id)
845
+ loop = asyncio.get_running_loop()
846
+
847
+ while True:
848
+ job = await JOB_QUEUE.get()
849
+
850
+ # shutdown sentinel
851
+ if job is None:
852
+ log.info("Worker %d shutting down", worker_id)
853
+ JOB_QUEUE.task_done()
854
+ break
855
+
856
+ task = job["task"]
857
+ tid = task["id"]
858
+ queue = TASK_QUEUES.get(tid)
859
+
860
+ log.info(
861
+ "Worker %d β†’ task %s (%d images)",
862
+ worker_id, tid, task["total_images"],
863
+ )
864
+ task["status"] = "running"
865
+ if queue:
866
+ await queue.put({
867
+ "type": "started",
868
+ "time": _now(),
869
+ "task_id": tid,
870
+ "total_images": task["total_images"],
871
+ })
872
+
873
+ # fetch ad code once per task
874
+ ad_code = await loop.run_in_executor(EXECUTOR, CLIENT.get_ad_code)
875
+
876
+ # heartbeat coroutine
877
+ async def _heartbeat():
878
+ while task["status"] == "running":
879
+ await asyncio.sleep(5.0)
880
+ if queue and task["status"] == "running":
881
+ try:
882
+ queue.put_nowait({
883
+ "type": "heartbeat",
884
+ "time": _now(),
885
+ "task_id": tid,
886
+ "completed": task["completed"],
887
+ "total": task["total_images"],
888
+ })
889
+ except asyncio.QueueFull:
890
+ pass
891
+
892
+ hb = asyncio.create_task(_heartbeat())
893
+
894
+ try:
895
+ for prompt in task["prompts"]:
896
+ for i in range(task["count"]):
897
+ async with semaphore:
898
+ await _generate_single(
899
+ prompt, task, i, queue, ad_code,
900
+ )
901
+ if task["status"] == "failed":
902
+ break
903
+ if task["status"] == "failed":
904
+ break
905
+
906
+ # decide final status
907
+ if task["status"] != "failed":
908
+ if task["completed"] == 0 and task["total_images"] > 0:
909
+ task["status"] = "failed"
910
+ task["error"] = "No images generated successfully"
911
+ else:
912
+ task["status"] = "done"
913
+
914
+ if queue:
915
+ await queue.put({
916
+ "type": task["status"], # "done" or "failed"
917
+ "time": _now(),
918
+ "task_id": tid,
919
+ "completed": task["completed"],
920
+ "total": task["total_images"],
921
+ "error": task.get("error"),
922
+ })
923
+
924
+ except Exception as exc:
925
+ log.exception("Worker %d task %s crashed: %s", worker_id, tid, exc)
926
+ task["status"] = "failed"
927
+ task["error"] = str(exc)
928
+ if queue:
929
+ await queue.put({
930
+ "type": "failed",
931
+ "time": _now(),
932
+ "task_id": tid,
933
+ "error": str(exc),
934
+ })
935
+
936
+ finally:
937
+ hb.cancel()
938
+ try:
939
+ await hb
940
+ except asyncio.CancelledError:
941
+ pass
942
+
943
+ if queue:
944
+ await queue.put({"type": "eof", "time": _now(), "task_id": tid})
945
+
946
+ JOB_QUEUE.task_done()
947
+ log.info(
948
+ "Worker %d task %s finished (%s, %d/%d)",
949
+ worker_id, tid, task["status"],
950
+ task["completed"], task["total_images"],
951
+ )
952
+
953
+
954
+ # ═══════════════════════════════════════════════════════════════
955
+ # FASTAPI – lifespan + app + endpoints
956
+ # ═══════════════════════════════════════════════════════════════
957
+
958
+ @asynccontextmanager
959
+ async def lifespan(app: FastAPI):
960
+ global USER_KEY, _key_lock, _key_valid, _key_refresh_lock
961
+ global _key_last_ts, _key_fail_count, JOB_QUEUE
962
+
963
+ # ── create asyncio primitives in uvicorn's loop ──
964
+ _key_lock = asyncio.Lock()
965
+ _key_valid = asyncio.Event()
966
+ _key_valid.set() # assume usable initially
967
+ _key_refresh_lock = asyncio.Lock()
968
+ JOB_QUEUE = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
969
+
970
+ # ── initial key fetch (skip if already set from __main__) ──
971
+ if USER_KEY:
972
+ log.info("Using pre-fetched userKey (len=%d)", len(USER_KEY))
973
+ _key_last_ts = time.time()
974
+ else:
975
+ skip = os.environ.get("NO_INITIAL_FETCH", "") in ("1", "true", "True")
976
+ if not skip:
977
+ try:
978
+ key = await fetch_key_via_browser(
979
+ timeout=ZD_TIMEOUT, headless=ZD_HEADLESS,
980
+ )
981
+ if key:
982
+ USER_KEY = key
983
+ _key_last_ts = time.time()
984
+ log.info("Fetched userKey at startup (len=%d)", len(key))
985
+ else:
986
+ log.warning(
987
+ "Startup key fetch failed. "
988
+ "Use /set_user_key or /fetch_user_key."
989
+ )
990
+ except Exception as exc:
991
+ log.exception("Startup key fetch error: %s", exc)
992
+ else:
993
+ log.info("NO_INITIAL_FETCH=1 β†’ skipping browser key fetch")
994
+
995
+ # ── launch workers ──
996
+ sem = asyncio.Semaphore(WORKER_COUNT)
997
+ workers = [
998
+ asyncio.create_task(worker_loop(i + 1, sem))
999
+ for i in range(WORKER_COUNT)
1000
+ ]
1001
+ log.info("Launched %d workers", WORKER_COUNT)
1002
+
1003
+ # ---------- server is running ----------
1004
+ yield
1005
+ # ---------- shutdown begins ------------
1006
+
1007
+ log.info("Shutdown: sending stop sentinels to workers …")
1008
+ for _ in range(WORKER_COUNT):
1009
+ await JOB_QUEUE.put(None)
1010
+ await asyncio.gather(*workers, return_exceptions=True)
1011
+
1012
+ try:
1013
+ SCRAPER.close()
1014
+ except Exception:
1015
+ pass
1016
+ EXECUTOR.shutdown(wait=True)
1017
+ log.info("Shutdown complete")
1018
+
1019
+
1020
+ # ── app ──
1021
+ app = FastAPI(
1022
+ title="Perchance Image Generation Server v2",
1023
+ lifespan=lifespan,
1024
+ )
1025
+ app.add_middleware(
1026
+ CORSMiddleware,
1027
+ allow_origins=["*"],
1028
+ allow_methods=["*"],
1029
+ allow_headers=["*"],
1030
+ )
1031
+
1032
+
1033
+ # ───────────── endpoints ─────────────
1034
+
1035
+ @app.get("/health")
1036
+ async def health():
1037
+ async with _key_lock:
1038
+ has_key = USER_KEY is not None
1039
+ return {
1040
+ "status": "ok",
1041
+ "has_user_key": has_key,
1042
+ "queue_size": JOB_QUEUE.qsize() if JOB_QUEUE else 0,
1043
+ "active_tasks": sum(
1044
+ 1 for t in TASKS.values() if t["status"] in ("queued", "running")
1045
+ ),
1046
+ }
1047
+
1048
+
1049
+ @app.get("/user_key")
1050
+ async def user_key_info():
1051
+ async with _key_lock:
1052
+ has = USER_KEY is not None
1053
+ ln = len(USER_KEY) if has else 0
1054
+ return {"has_user_key": has, "key_length": ln}
1055
+
1056
+
1057
+ @app.post("/set_user_key")
1058
+ async def set_user_key(payload: Dict[str, str]):
1059
+ global USER_KEY, _key_last_ts, _key_fail_count
1060
+ key = payload.get("userKey", "").strip()
1061
+ if not key:
1062
+ raise HTTPException(400, "userKey required")
1063
+ async with _key_lock:
1064
+ USER_KEY = key
1065
+ _key_last_ts = time.time()
1066
+ _key_fail_count = 0
1067
+ _key_valid.set() # unblock any waiting workers
1068
+ log.info("userKey set via API (len=%d)", len(key))
1069
+ return {"status": "ok", "key_length": len(key)}
1070
+
1071
+
1072
+ @app.post("/fetch_user_key")
1073
+ async def fetch_user_key_endpoint():
1074
+ """Trigger a background browser-based key fetch."""
1075
+ global _key_fail_count
1076
+
1077
+ async def _bg():
1078
+ global _key_fail_count
1079
+ _key_fail_count = 0 # reset so refresh is allowed
1080
+ await refresh_user_key()
1081
+
1082
+ asyncio.create_task(_bg())
1083
+ return {"status": "started", "note": "Browser key fetch running in background"}
1084
+
1085
+
1086
+ @app.post("/generate")
1087
+ async def submit_job(payload: Dict[str, Any]):
1088
+ """
1089
+ POST /generate
1090
+ Body:
1091
+ {
1092
+ "prompts": ["a cat in space", "sunset over mountains"],
1093
+ "count": 2,
1094
+ "resolution": "512x768",
1095
+ "guidance": 7.0,
1096
+ "negative": "",
1097
+ "subChannel": "private"
1098
+ }
1099
+ Returns:
1100
+ { "task_id": "...", "stream_url": "/stream/...", "queue_position": N }
1101
+ """
1102
+ prompts = payload.get("prompts") or payload.get("prompt") or []
1103
+ if isinstance(prompts, str):
1104
+ prompts = [prompts]
1105
+ if not isinstance(prompts, list) or not prompts:
1106
+ raise HTTPException(400, "prompts must be a non-empty list")
1107
+
1108
+ count = max(1, int(payload.get("count", 1)))
1109
+ resolution = payload.get("resolution", "512x768")
1110
+ guidance = float(payload.get("guidance", 7.0))
1111
+ negative = payload.get("negative", "") or ""
1112
+ sub_channel = payload.get("subChannel", "private")
1113
+
1114
+ task = create_task(prompts, count, resolution, guidance, negative, sub_channel)
1115
+
1116
+ try:
1117
+ await JOB_QUEUE.put({"task": task})
1118
+ except asyncio.QueueFull:
1119
+ raise HTTPException(503, "Server queue full β€” try again later")
1120
+
1121
+ position = JOB_QUEUE.qsize()
1122
+ q = TASK_QUEUES.get(task["id"])
1123
+ if q:
1124
+ await q.put({
1125
+ "type": "queued",
1126
+ "time": _now(),
1127
+ "task_id": task["id"],
1128
+ "queue_position": position,
1129
+ "total_images": task["total_images"],
1130
+ })
1131
+
1132
+ return {
1133
+ "task_id": task["id"],
1134
+ "stream_url": f"/stream/{task['id']}",
1135
+ "queue_position": position,
1136
+ }
1137
+
1138
+
1139
+ @app.get("/stream/{task_id}")
1140
+ async def stream_task(request: Request, task_id: str):
1141
+ """
1142
+ SSE stream. Event types:
1143
+ meta Β· queued Β· started Β· heartbeat Β· image_ready
1144
+ key_invalid Β· key_refreshing Β· key_refreshed Β· key_refresh_failed
1145
+ gen_error Β· download_error Β· done Β· failed Β· eof
1146
+ """
1147
+ if task_id not in TASKS:
1148
+ raise HTTPException(404, "unknown task id")
1149
+
1150
+ task = TASKS[task_id]
1151
+ queue = TASK_QUEUES[task_id]
1152
+
1153
+ async def event_gen():
1154
+ # ── initial snapshot ──
1155
+ yield {
1156
+ "event": "meta",
1157
+ "data": json.dumps({
1158
+ "task_id": task_id,
1159
+ "status": task["status"],
1160
+ "total_images": task["total_images"],
1161
+ "created_at": task["created_at"],
1162
+ }),
1163
+ }
1164
+
1165
+ # ── if already finished, replay results + EOF ──
1166
+ if task["status"] in ("done", "failed"):
1167
+ for r in task["results"]:
1168
+ yield {
1169
+ "event": "image_ready",
1170
+ "data": json.dumps({
1171
+ "task_id": task_id,
1172
+ "prompt": r["prompt"],
1173
+ "index": r["index"],
1174
+ "path": r["path"],
1175
+ "seed": r["seed"],
1176
+ "completed": task["completed"],
1177
+ "total": task["total_images"],
1178
+ }),
1179
+ }
1180
+ yield {
1181
+ "event": task["status"],
1182
+ "data": json.dumps({
1183
+ "task_id": task_id,
1184
+ "completed": task["completed"],
1185
+ "total": task["total_images"],
1186
+ "error": task.get("error"),
1187
+ }),
1188
+ }
1189
+ yield {
1190
+ "event": "eof",
1191
+ "data": json.dumps({"task_id": task_id}),
1192
+ }
1193
+ return
1194
+
1195
+ # ── live stream ──
1196
+ while True:
1197
+ try:
1198
+ ev = await asyncio.wait_for(queue.get(), timeout=30.0)
1199
+ except asyncio.TimeoutError:
1200
+ # keep-alive ping
1201
+ yield {
1202
+ "event": "ping",
1203
+ "data": json.dumps({"time": _now()}),
1204
+ }
1205
+ if await request.is_disconnected():
1206
+ log.info("SSE client disconnected (task %s)", task_id)
1207
+ break
1208
+ continue
1209
+
1210
+ yield {
1211
+ "event": ev.get("type", "event"),
1212
+ "data": json.dumps(ev),
1213
+ }
1214
+ if ev.get("type") == "eof":
1215
+ break
1216
+
1217
+ return EventSourceResponse(event_gen())
1218
+
1219
+
1220
+ @app.get("/status/{task_id}")
1221
+ async def get_status(task_id: str):
1222
+ task = TASKS.get(task_id)
1223
+ if not task:
1224
+ raise HTTPException(404, "unknown task id")
1225
+ return {"task": task}
1226
+
1227
+
1228
+ @app.get("/outputs/{filename}")
1229
+ async def get_output(filename: str):
1230
+ fp = OUTPUT_DIR / filename
1231
+ if not fp.exists():
1232
+ raise HTTPException(404, "file not found")
1233
+ return FileResponse(fp, media_type="application/octet-stream", filename=filename)
1234
+
1235
+
1236
+ # ═══════════════════════════════════════════════════════════════
1237
+ # MAIN
1238
+ # ═══════════════════════════════════════════════════════════════
1239
+
1240
+ def _run_uvicorn():
1241
+ import uvicorn
1242
+ uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
1243
+
1244
+
1245
+ if __name__ == "__main__":
1246
+ skip = os.environ.get("NO_INITIAL_FETCH", "") in ("1", "true", "True")
1247
+
1248
+ if not skip:
1249
+ log.info("Pre-startup: fetching userKey via browser (timeout %ds) …", ZD_TIMEOUT)
1250
+ try:
1251
+ found = asyncio.run(
1252
+ fetch_key_via_browser(timeout=ZD_TIMEOUT, headless=ZD_HEADLESS)
1253
+ )
1254
+ if found:
1255
+ USER_KEY = found
1256
+ log.info("Pre-startup fetch OK (len=%d)", len(found))
1257
+ else:
1258
+ log.warning(
1259
+ "Pre-startup fetch returned nothing. "
1260
+ "Server will start; set key via /set_user_key."
1261
+ )
1262
+ except Exception as exc:
1263
+ log.exception("Pre-startup fetch error: %s", exc)
1264
+ else:
1265
+ log.info("NO_INITIAL_FETCH=1 β†’ skipping pre-startup browser fetch")
1266
+
1267
+ _run_uvicorn()