Spaces:
Sleeping
Sleeping
| # app.py — Hidden Stroke (AI Noir Investigation) with verbose logging | |
| # Flask + Firebase Realtime DB + Firebase Storage + Gemini | |
| # Envs required: FIREBASE, Firebase_DB, Firebase_Storage, Gemini | |
| # Optional envs: GAME_SALT, ADMIN_KEY, IA_USER_AGENT, MIN_IA_POOL, IA_QUERY, | |
| # BOOTSTRAP_IA, LOG_LEVEL, ALLOW_DEV_BOOTSTRAP, ALLOW_DEV_DIAGNOSTICS | |
| import os, io, uuid, json, hmac, hashlib, random, traceback, requests, re, hashlib as _hash | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Dict, Any, Tuple, List, Optional | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from PIL import Image | |
| # ----- Logging --------------------------------------------------------------- | |
| import logging | |
| LOG_LEVEL = os.environ.get("LOG_LEVEL", "DEBUG").upper() | |
| logging.basicConfig( | |
| level=getattr(logging, LOG_LEVEL, logging.DEBUG), | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s" | |
| ) | |
| log = logging.getLogger("hidden_stroke") | |
| # ---------------- Firebase Admin (Realtime DB + Storage) ---------------- | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage | |
| # ---------------- Gemini (exact client & model names) ------------------- | |
| from google import genai | |
| from google.genai import types | |
| # ----------------------------------------------------------------------------- | |
| # 1) CONFIG & INIT | |
| # ----------------------------------------------------------------------------- | |
| app = Flask(__name__) | |
| CORS(app, | |
| resources={r"/*": {"origins": "*"}}, | |
| supports_credentials=False, | |
| methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["Content-Type", "X-Reddit-User", "X-Reddit-Id"]) | |
| # --- Firebase --- | |
| try: | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("The FIREBASE environment variable is not set.") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| firebase_storage_bucket = os.environ.get("Firebase_Storage") | |
| if not firebase_db_url or not firebase_storage_bucket: | |
| raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': firebase_db_url, | |
| 'storageBucket': firebase_storage_bucket | |
| }) | |
| bucket = storage.bucket() | |
| db_root = db.reference("/") | |
| log.info("Firebase Realtime DB + Storage initialized.") | |
| except Exception: | |
| log.exception("FATAL: Firebase init failed") | |
| raise | |
| # --- Gemini --- | |
| try: | |
| GEMINI_API_KEY = os.environ.get("Gemini") | |
| if not GEMINI_API_KEY: | |
| raise ValueError("The 'Gemini' environment variable is not set.") | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| log.info("Gemini client initialized.") | |
| except Exception: | |
| log.exception("FATAL: Gemini init failed") | |
| raise | |
| # --- Models (exact names) --- | |
| CATEGORY_MODEL = "gemini-2.5-flash" | |
| GENERATION_MODEL = "gemini-2.0-flash-exp-image-generation" | |
| #GENERATION_MODEL = "gemini-2.5-flash-image-preview | |
| # --- Game constants --- | |
| TIMER_SECONDS = 90 | |
| INITIAL_IP = 8 | |
| TOOL_COSTS = {"signature": 1, "metadata": 1, "financial": 2} | |
| LEADERBOARD_TOP_N = 50 | |
| # --- Misc config --- | |
| GAME_SALT = os.environ.get("GAME_SALT", "dev-salt") | |
| ADMIN_KEY = os.environ.get("ADMIN_KEY") | |
| IA_USER_AGENT = os.environ.get("IA_USER_AGENT", "HiddenStrokeBot/1.0 (+https://reddit.com)") | |
| MIN_IA_POOL = int(os.environ.get("MIN_IA_POOL", "60")) | |
| DEFAULT_IA_QUERY = os.environ.get( | |
| "IA_QUERY", | |
| '(collection:(metropolitanmuseum OR smithsonian OR getty OR artic) AND mediatype:image)' | |
| ) | |
| ALLOW_DEV_BOOTSTRAP = os.environ.get("ALLOW_DEV_BOOTSTRAP", "0") == "1" | |
| ALLOW_DEV_DIAGNOSTICS = os.environ.get("ALLOW_DEV_DIAGNOSTICS", "0") == "1" | |
| FALLBACK_IA_QUERIES = [ | |
| '(mediatype:image AND (format:JPEG OR format:PNG))', | |
| '(mediatype:image AND (format:JPEG OR format:PNG) AND (subject:portrait OR title:portrait))', | |
| '(mediatype:image AND format:JPEG)', | |
| ] | |
| # ----------------------------------------------------------------------------- | |
| # 2) UTILS | |
| # ----------------------------------------------------------------------------- | |
| def utc_today_str() -> str: | |
| return datetime.now(timezone.utc).strftime("%Y%m%d") | |
| def case_ref(case_id: str): | |
| return db_root.child(f"cases/{case_id}") | |
| def plays_ref(case_id: str): | |
| return db_root.child(f"plays/{case_id}") | |
| def leaderboard_ref(case_id: str): | |
| return db_root.child(f"leaderboards/{case_id}/top") | |
| def sessions_ref(): | |
| return db_root.child("sessions") | |
| def ia_pool_ref(): | |
| return db_root.child("ia_pool") | |
| def hmac_hex(s: str) -> str: | |
| return hmac.new(GAME_SALT.encode(), s.encode(), hashlib.sha256).hexdigest() | |
| # Firebase RTDB key sanitizer (no . $ # [ ] / or control chars) | |
| _FB_BAD = re.compile(r'[.$#\[\]/\x00-\x1F\x7F]') | |
| def fb_key(raw: str) -> str: | |
| safe = _FB_BAD.sub('_', raw or '') | |
| if len(safe) > 700: | |
| safe = safe[:700] | |
| if safe != raw: | |
| suffix = _hash.sha1((raw or '').encode('utf-8')).hexdigest()[:8] | |
| safe = f"{safe}__{suffix}" | |
| return safe or _hash.sha1(b'empty').hexdigest()[:8] | |
| def upload_bytes_to_storage(data: bytes, path: str, content_type: str) -> str: | |
| log.debug(f"Uploading to Storage: path={path}, content_type={content_type}, bytes={len(data)}") | |
| blob = bucket.blob(path) | |
| blob.upload_from_string(data, content_type=content_type) | |
| blob.make_public() | |
| url = blob.public_url | |
| log.debug(f"Uploaded: {url}") | |
| return url | |
| def pil_from_inline_image_part(part) -> Image.Image: | |
| image_bytes = part.inline_data.data | |
| return Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| def save_image_return_url(img: Image.Image, path: str, quality=92) -> str: | |
| b = io.BytesIO() | |
| img.save(b, format="JPEG", quality=quality, optimize=True) | |
| return upload_bytes_to_storage(b.getvalue(), path, "image/jpeg") | |
| def extract_user_from_headers(req) -> Tuple[str, str]: | |
| uname = (req.headers.get("X-Reddit-User") or "").strip() | |
| uid = (req.headers.get("X-Reddit-Id") or "").strip() | |
| if not uname: | |
| uname = "anon" | |
| if not uid: | |
| uid = uname | |
| return uid, uname | |
| def seed_for_date(case_id: str) -> int: | |
| return int(hmac_hex(f"seed::{case_id}")[:12], 16) | |
| def fifty_fifty_mode(case_seed: int) -> str: | |
| return "knowledge" if (case_seed % 2 == 0) else "observation" | |
| def http_get_json(url: str, params: dict = None) -> dict: | |
| log.debug(f"HTTP GET JSON: {url} params={params}") | |
| headers = {"User-Agent": IA_USER_AGENT} | |
| r = requests.get(url, params=params, headers=headers, timeout=30) | |
| log.debug(f"HTTP {r.status_code} for {r.url}") | |
| r.raise_for_status() | |
| return r.json() | |
| def http_get_bytes(url: str) -> bytes: | |
| log.debug(f"HTTP GET BYTES: {url}") | |
| headers = {"User-Agent": IA_USER_AGENT} | |
| r = requests.get(url, headers=headers, timeout=60) | |
| log.debug(f"HTTP {r.status_code} for {r.url} bytes={len(r.content)}") | |
| r.raise_for_status() | |
| return r.content | |
| def ia_advanced_search(query: str, rows: int, page: int) -> List[dict]: | |
| url = "https://archive.org/advancedsearch.php" | |
| params = {"q": query, "rows": rows, "page": page, "output": "json"} | |
| try: | |
| data = http_get_json(url, params=params) | |
| docs = data.get("response", {}).get("docs", []) | |
| log.info(f"IA search page={page} rows={rows} -> {len(docs)} docs") | |
| return docs | |
| except Exception: | |
| log.exception("IA advanced search failed") | |
| raise | |
| def ia_metadata(identifier: str) -> dict: | |
| url = f"https://archive.org/metadata/{identifier}" | |
| try: | |
| meta = http_get_json(url) | |
| log.debug(f"Fetched metadata for {identifier}, files={len(meta.get('files', []) or [])}") | |
| return meta | |
| except Exception: | |
| log.exception(f"IA metadata fetch failed for {identifier}") | |
| raise | |
| def ia_best_image_from_metadata(meta: dict) -> Optional[dict]: | |
| files = meta.get("files", []) or [] | |
| best, best_pixels = None, -1 | |
| for f in files: | |
| fmt = (f.get("format") or "").lower() | |
| if any(x in fmt for x in ["jpeg", "jpg", "png", "tiff", "image"]): | |
| w = int(f.get("width") or 0) | |
| h = int(f.get("height") or 0) | |
| px = w * h if (w and h) else int(f.get("size") or 0) | |
| if px > best_pixels: | |
| best_pixels, best = px, f | |
| if best: | |
| log.debug(f"Best image: name={best.get('name')} fmt={best.get('format')} dims={best.get('width')}x{best.get('height')} size={best.get('size')}") | |
| else: | |
| log.warning("No suitable image file found in metadata") | |
| return best | |
| def ingest_ia_doc(doc: dict) -> Optional[dict]: | |
| """Fetch /metadata and store best image entry into ia_pool (sanitized key).""" | |
| identifier = doc.get("identifier") | |
| if not identifier: | |
| return None | |
| pool_key = fb_key(identifier) | |
| log.info(f"Ingesting IA identifier={identifier} -> pool_key={pool_key}") | |
| meta = ia_metadata(identifier) | |
| best = ia_best_image_from_metadata(meta) | |
| if not best: | |
| log.warning(f"Skipping {identifier}: no image file") | |
| return None | |
| md = meta.get("metadata", {}) or {} | |
| title = md.get("title", "") or doc.get("title", "") | |
| date = md.get("date", "") or doc.get("date", "") | |
| creator = md.get("creator", "") or doc.get("creator", "") | |
| rights = md.get("rights", "") or doc.get("rights", "") | |
| licenseurl = md.get("licenseurl", "") or doc.get("licenseurl", "") | |
| download_url = f"https://archive.org/download/{identifier}/{best['name']}" | |
| record = { | |
| "identifier": identifier, # original IA id preserved | |
| "_pool_key": pool_key, # sanitized RTDB key | |
| "title": title, | |
| "date": str(date), | |
| "creator": creator, | |
| "rights": rights, | |
| "licenseurl": licenseurl, | |
| "download_url": download_url, | |
| "file_name": best["name"], | |
| "format": best.get("format"), | |
| "width": best.get("width"), | |
| "height": best.get("height"), | |
| "size": best.get("size"), | |
| "source": "internet_archive" | |
| } | |
| ia_pool_ref().child(pool_key).set(record) | |
| log.info(f"Ingested {identifier} -> ia_pool/{pool_key} (title='{title}')") | |
| return record | |
| def choose_ia_item_for_case(case_id: str) -> Optional[dict]: | |
| pool = ia_pool_ref().get() or {} | |
| if not pool: | |
| log.warning("choose_ia_item_for_case: pool is empty") | |
| return None | |
| keys = sorted(pool.keys()) | |
| case_seed = seed_for_date(case_id) | |
| pool_key = keys[case_seed % len(keys)] | |
| log.info(f"Chosen IA pool_key for case {case_id}: {pool_key}") | |
| return pool[pool_key] | |
| def download_image_to_pil(url: str) -> Image.Image: | |
| data = http_get_bytes(url) | |
| img = Image.open(io.BytesIO(data)).convert("RGB") | |
| log.debug(f"Opened image from {url} size={img.size}") | |
| return img | |
| def crop_signature_macro(img: Image.Image, size: int = 512) -> Image.Image: | |
| w, h = img.size | |
| cw = min(size, w) | |
| ch = min(size, h) | |
| left = max(0, w - cw) | |
| top = max(0, h - ch) | |
| log.debug(f"Signature crop from ({left},{top}) to ({left+cw},{top+ch})") | |
| return img.crop((left, top, left + cw, top + ch)) | |
| # ----------------------------------------------------------------------------- | |
| # 3) IA -> Firebase Storage caching + Zero-admin bootstrap | |
| # ----------------------------------------------------------------------------- | |
| def _resize_if_needed(img: Image.Image, max_dim: int = 4096) -> Image.Image: | |
| w, h = img.size | |
| if max(w, h) <= max_dim: | |
| return img | |
| if w >= h: | |
| new_w = max_dim | |
| new_h = int(h * (max_dim / w)) | |
| else: | |
| new_h = max_dim | |
| new_w = int(w * (max_dim / h)) | |
| log.debug(f"Resizing image from {w}x{h} to {new_w}x{new_h}") | |
| return img.resize((new_w, new_h), Image.LANCZOS) | |
| def cache_single_ia_identifier( | |
| pool_key: str, | |
| overwrite: bool = False, | |
| max_dim: int = 4096, | |
| jpeg_quality: int = 90, | |
| skip_if_restricted: bool = True, | |
| ) -> dict: | |
| rec_ref = ia_pool_ref().child(pool_key) | |
| rec = rec_ref.get() or {} | |
| if not rec: | |
| return {"pool_key": pool_key, "stored": False, "reason": "not_in_pool"} | |
| identifier = rec.get("identifier") or pool_key | |
| rights = (rec.get("rights") or "").lower() | |
| if skip_if_restricted and ("in copyright" in rights or "all rights reserved" in rights): | |
| log.info(f"Skipping {identifier}: restricted rights") | |
| return {"pool_key": pool_key, "stored": False, "reason": "restricted_rights"} | |
| if rec.get("storage_url") and not overwrite: | |
| log.info(f"Skipping {identifier}: already cached") | |
| return {"pool_key": pool_key, "stored": False, "reason": "already_cached", "storage_url": rec["storage_url"]} | |
| source_url = rec.get("storage_url") or rec.get("download_url") | |
| if not source_url: | |
| log.warning(f"{identifier}: missing source_url") | |
| return {"pool_key": pool_key, "stored": False, "reason": "missing_source_url"} | |
| try: | |
| log.info(f"Caching {identifier} from {source_url}") | |
| img = download_image_to_pil(source_url) | |
| except Exception as e: | |
| if rec.get("download_url") and source_url != rec.get("download_url"): | |
| try: | |
| log.warning(f"Retrying {identifier} from IA download_url") | |
| img = download_image_to_pil(rec["download_url"]) | |
| except Exception as e2: | |
| log.exception(f"{identifier}: download failed") | |
| return {"pool_key": pool_key, "stored": False, "reason": f"download_failed: {e2}"} | |
| else: | |
| log.exception(f"{identifier}: download failed") | |
| return {"pool_key": pool_key, "stored": False, "reason": f"download_failed: {e}"} | |
| img = _resize_if_needed(img, max_dim=max_dim) | |
| w, h = img.size | |
| # Upload original | |
| img_bytes = io.BytesIO() | |
| img.save(img_bytes, format="JPEG", quality=jpeg_quality, optimize=True) | |
| img_bytes.seek(0) | |
| img_path = f"ia_cache/{pool_key}/original.jpg" | |
| storage_url = upload_bytes_to_storage(img_bytes.getvalue(), img_path, "image/jpeg") | |
| # Upload macro crop | |
| crop = crop_signature_macro(img, 512) | |
| crop_bytes = io.BytesIO() | |
| crop.save(crop_bytes, format="JPEG", quality=jpeg_quality, optimize=True) | |
| crop_bytes.seek(0) | |
| crop_path = f"ia_cache/{pool_key}/signature_crop.jpg" | |
| signature_crop_url = upload_bytes_to_storage(crop_bytes.getvalue(), crop_path, "image/jpeg") | |
| rec_update = { | |
| "storage_url": storage_url, | |
| "signature_crop_url": signature_crop_url, | |
| "image_path": img_path, | |
| "crop_path": crop_path, | |
| "width": w, | |
| "height": h, | |
| "cached_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| rec_ref.update(rec_update) | |
| log.info(f"Cached {identifier} -> {storage_url}") | |
| return { | |
| "pool_key": pool_key, | |
| "stored": True, | |
| "storage_url": storage_url, | |
| "signature_crop_url": signature_crop_url, | |
| "width": w, | |
| "height": h | |
| } | |
| def batch_cache_ia_pool( | |
| limit: int = 100, | |
| overwrite: bool = False, | |
| randomize: bool = True, | |
| min_width: int = 800, | |
| min_height: int = 800, | |
| max_dim: int = 4096, | |
| jpeg_quality: int = 90, | |
| skip_if_restricted: bool = True, | |
| ) -> dict: | |
| pool = ia_pool_ref().get() or {} | |
| log.info(f"batch_cache_ia_pool: pool_size={len(pool)}") | |
| if not pool: | |
| return {"ok": True, "processed": 0, "stored": 0, "skipped": 0, "results": []} | |
| candidates = [] | |
| for pkey, rec in pool.items(): | |
| if overwrite or not rec.get("storage_url"): | |
| w = int(rec.get("width") or 0) | |
| h = int(rec.get("height") or 0) | |
| if (w and h) and (w < min_width or h < min_height): | |
| log.debug(f"Skip {pkey}: too small {w}x{h}") | |
| continue | |
| candidates.append(pkey) | |
| if randomize: | |
| random.shuffle(candidates) | |
| candidates = candidates[:max(0, limit)] | |
| log.info(f"Caching candidates: {len(candidates)} (limit={limit})") | |
| results, stored, skipped = [], 0, 0 | |
| for pkey in candidates: | |
| res = cache_single_ia_identifier( | |
| pkey, | |
| overwrite=overwrite, | |
| max_dim=max_dim, | |
| jpeg_quality=jpeg_quality, | |
| skip_if_restricted=skip_if_restricted, | |
| ) | |
| results.append(res) | |
| if res.get("stored"): | |
| stored += 1 | |
| else: | |
| skipped += 1 | |
| log.info(f"batch_cache_ia_pool done: processed={len(candidates)} stored={stored} skipped={skipped}") | |
| return {"ok": True, "processed": len(candidates), "stored": stored, "skipped": skipped, "results": results} | |
| def ensure_minimum_ia_pool(min_items: int = MIN_IA_POOL, rows: int = 100, max_pages: int = 5) -> dict: | |
| pool = ia_pool_ref().get() or {} | |
| have = len(pool) | |
| added = 0 | |
| cached = 0 | |
| log.info(f"ensure_minimum_ia_pool: have={have}, target={min_items}") | |
| candidate_queries = [] | |
| if DEFAULT_IA_QUERY: | |
| candidate_queries.append(DEFAULT_IA_QUERY) | |
| candidate_queries.extend([q for q in FALLBACK_IA_QUERIES if q not in candidate_queries]) | |
| for q in candidate_queries: | |
| if have + added >= min_items: | |
| break | |
| log.info(f"IA ingest: trying query: {q}") | |
| page = 1 | |
| while have + added < min_items and page <= max_pages: | |
| try: | |
| docs = ia_advanced_search(q, rows=rows, page=page) | |
| except Exception: | |
| log.warning(f"IA search failed on page {page} for query {q!r}, moving on") | |
| break | |
| log.info(f"IA search page={page} -> {len(docs)} docs for query {q!r}") | |
| if not docs: | |
| break | |
| for d in docs: | |
| ident = d.get("identifier") | |
| if not ident: | |
| continue | |
| if ia_pool_ref().child(fb_key(ident)).get(): | |
| continue | |
| try: | |
| rec = ingest_ia_doc(d) | |
| if rec: | |
| added += 1 | |
| except Exception: | |
| log.exception(f"Ingest failed for {ident}") | |
| continue | |
| if have + added >= min_items: | |
| break | |
| page += 1 | |
| pool = ia_pool_ref().get() or {} | |
| have_now = len(pool) | |
| need_cache = max(0, min_items - have_now) | |
| log.info(f"ensure_minimum_ia_pool: post-ingest have={have_now}, need_cache={need_cache}") | |
| if need_cache: | |
| res = batch_cache_ia_pool(limit=need_cache, randomize=True) | |
| cached = res.get("stored", 0) | |
| final_size = len(ia_pool_ref().get() or {}) | |
| stats = {"ok": True, "had": have, "added": added, "cached": cached, "final_size": final_size} | |
| log.info(f"ensure_minimum_ia_pool: stats={stats}") | |
| return stats | |
| # ----------------------------------------------------------------------------- | |
| # 4) CASE GENERATION (uses IA for authentic image, Gemini for forgeries/meta) | |
| # ----------------------------------------------------------------------------- | |
| def ensure_case_generated(case_id: str) -> Dict[str, Any]: | |
| existing_public = case_ref(case_id).child("public").get() | |
| if existing_public: | |
| log.info(f"Case {case_id} already exists") | |
| return existing_public | |
| # Ensure we have a cached pool ready | |
| try: | |
| stats = ensure_minimum_ia_pool() | |
| log.debug(f"Bootstrap stats for case {case_id}: {stats}") | |
| except Exception: | |
| log.exception("Bootstrap failed inside ensure_case_generated") | |
| ia_item = choose_ia_item_for_case(case_id) | |
| if not ia_item: | |
| raise RuntimeError("No IA items available. Ingest needed.") | |
| case_seed = seed_for_date(case_id) | |
| mode = "knowledge" if (case_seed % 2 == 0) else "observation" | |
| log.info(f"Case {case_id}: mode={mode}") | |
| style_period = "sourced from Internet Archive; museum catalog reproduction" | |
| source_url = ia_item.get("storage_url") or ia_item["download_url"] | |
| log.info(f"Case {case_id}: authentic source={source_url}") | |
| auth_img = download_image_to_pil(source_url) | |
| images_urls: List[str] = [] | |
| signature_crops: List[str] = [] | |
| url1 = save_image_return_url(auth_img, f"hidden_stroke/{case_id}/images/img_1.jpg") | |
| images_urls.append(url1) | |
| log.debug(f"Case {case_id}: saved authentic -> {url1}") | |
| crop1 = crop_signature_macro(auth_img, 512) | |
| crop1_url = save_image_return_url(crop1, f"hidden_stroke/{case_id}/signature_crops/crop_1.jpg", quality=88) | |
| signature_crops.append(crop1_url) | |
| log.debug(f"Case {case_id}: saved authentic crop -> {crop1_url}") | |
| if mode == "knowledge": | |
| for _ in [2, 3]: | |
| images_urls.append(images_urls[0]) | |
| signature_crops.append(signature_crops[0]) | |
| else: | |
| for i in range(2): | |
| forg_prompt = """ | |
| Create a near-identical variant of the provided painting. | |
| Keep composition, palette, and lighting the same. | |
| Only introduce a subtle change in signature micro-geometry (baseline alignment, stroke overlap order, or curve spacing). | |
| No annotations. Differences must be visible only at macro zoom. | |
| """ | |
| log.info(f"Case {case_id}: generating forgery {i+1}") | |
| resp = client.models.generate_content( | |
| model=GENERATION_MODEL, | |
| contents=[forg_prompt, auth_img], | |
| config=types.GenerateContentConfig(response_modalities=["IMAGE"]) | |
| ) | |
| f_img = None | |
| for p in resp.candidates[0].content.parts: | |
| if getattr(p, "inline_data", None): | |
| f_img = pil_from_inline_image_part(p) | |
| break | |
| if f_img is None: | |
| log.warning("Gemini returned no image; falling back to copy of authentic") | |
| f_img = auth_img.copy() | |
| url = save_image_return_url(f_img, f"hidden_stroke/{case_id}/images/img_{i+2}.jpg") | |
| images_urls.append(url) | |
| crop = crop_signature_macro(f_img, 512) | |
| c_url = save_image_return_url(crop, f"hidden_stroke/{case_id}/signature_crops/crop_{i+2}.jpg", quality=88) | |
| signature_crops.append(c_url) | |
| log.debug(f"Case {case_id}: forgery saved -> {url}; crop -> {c_url}") | |
| title = ia_item.get("title") or "Untitled" | |
| creator = ia_item.get("creator") or "" | |
| date = ia_item.get("date") or "" | |
| rights = ia_item.get("rights") or "" | |
| licenseurl = ia_item.get("licenseurl") or "" | |
| log.info(f"Case {case_id}: prompting metadata with title='{title}' creator='{creator}' date='{date}'") | |
| meta_prompt = f""" | |
| You are generating a daily case for a noir art investigation game. | |
| MODE: {"KNOWLEDGE" if mode=="knowledge" else "OBSERVATION"} | |
| AUTHENTIC CONTEXT (from Internet Archive): | |
| - title: {title} | |
| - creator: {creator} | |
| - date: {date} | |
| - rights: {rights} | |
| - licenseurl: {licenseurl} | |
| TASK: | |
| 1) Create a short, punchy "case_brief" (2–4 sentences) explaining why the artifact matters and why fraud is suspected — NO SPOILERS. | |
| 2) Prepare THREE metadata bundles for images A,B,C with NEARLY IDENTICAL fields. | |
| Ensure exactly ONE bundle is AUTHENTIC and that it corresponds to the above authentic context. | |
| The other two are FORGERIES with subtle, reality-checkable anomalies. | |
| 3) Provide a concise "ledger_summary" describing a believable ownership/payment trail. | |
| 4) Provide the solution with: "answer_index" (0 for A, 1 for B, 2 for C) and detailed flags for signature/metadata/financial, plus an "explanation". | |
| OUTPUT STRICT JSON with this schema: | |
| {{ | |
| "case_brief": "...", | |
| "metadata": [ | |
| {{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}}, | |
| {{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}}, | |
| {{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}} | |
| ], | |
| "ledger_summary": "short paragraph", | |
| "solution": {{ | |
| "answer_index": 0, | |
| "flags_signature": [ "..." ], | |
| "flags_metadata": [ "..." ], | |
| "flags_financial": [ "..." ], | |
| "explanation": "A few sentences that justify the authentic pick without listing spoilers." | |
| }} | |
| }} | |
| """ | |
| meta_resp = client.models.generate_content( | |
| model=CATEGORY_MODEL, | |
| contents=[meta_prompt] | |
| ) | |
| raw_text = meta_resp.text.strip() | |
| log.debug(f"Case {case_id}: raw meta JSON text len={len(raw_text)}") | |
| try: | |
| meta_json = json.loads(raw_text) | |
| except Exception: | |
| cleaned = raw_text | |
| if "```" in raw_text: | |
| parts = raw_text.split("```") | |
| if len(parts) >= 2: | |
| cleaned = parts[1] | |
| if cleaned.lower().startswith("json"): | |
| cleaned = cleaned.split("\n", 1)[1] | |
| meta_json = json.loads(cleaned) | |
| case_brief = meta_json.get("case_brief", "A resurfaced portrait raises questions—its paper trail glitters a little too perfectly.") | |
| metadata = meta_json.get("metadata", []) | |
| ledger_summary = meta_json.get("ledger_summary", "") | |
| solution = meta_json.get("solution", {}) | |
| answer_index = int(solution.get("answer_index", 0)) | |
| flags_signature = solution.get("flags_signature", []) | |
| flags_metadata = solution.get("flags_metadata", []) | |
| flags_financial = solution.get("flags_financial", []) | |
| explanation = solution.get("explanation", "The authentic work aligns with period-accurate details; the others contain subtle contradictions.") | |
| log.info(f"Case {case_id}: answer_index={answer_index}, meta_count={len(metadata)}") | |
| if len(metadata) != 3: | |
| log.error("Gemini did not return exactly 3 metadata bundles") | |
| raise RuntimeError("Expected exactly 3 metadata bundles.") | |
| public = { | |
| "case_id": case_id, | |
| "mode": mode, | |
| "brief": case_brief, | |
| "style_period": style_period, | |
| "images": images_urls, | |
| "signature_crops": signature_crops, | |
| "metadata": metadata, | |
| "ledger_summary": ledger_summary, | |
| "timer_seconds": TIMER_SECONDS, | |
| "initial_ip": INITIAL_IP, | |
| "tool_costs": TOOL_COSTS, | |
| "credits": { | |
| "source": "Internet Archive", | |
| "identifier": ia_item.get("identifier"), | |
| "title": title, | |
| "creator": creator, | |
| "rights": rights, | |
| "licenseurl": licenseurl | |
| } | |
| } | |
| solution_doc = { | |
| "answer_index": answer_index, | |
| "flags_signature": flags_signature, | |
| "flags_metadata": flags_metadata, | |
| "flags_financial": flags_financial, | |
| "explanation": explanation | |
| } | |
| cref = case_ref(case_id) | |
| cref.child("public").set(public) | |
| cref.child("solution").set(solution_doc) | |
| log.info(f"Case {case_id}: generated and stored") | |
| return public | |
| # ----------------------------------------------------------------------------- | |
| # 5) SESSIONS, TOOLS, GUESS, LEADERBOARD | |
| # ----------------------------------------------------------------------------- | |
| def create_session(user_id: str, username: str, case_id: str) -> Dict[str, Any]: | |
| session_id = str(uuid.uuid4()) | |
| expires_at = (datetime.now(timezone.utc) + timedelta(seconds=TIMER_SECONDS)).isoformat() | |
| session_doc = { | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| "username": username, | |
| "case_id": case_id, | |
| "ip_remaining": INITIAL_IP, | |
| "started_at": datetime.now(timezone.utc).isoformat(), | |
| "expires_at": expires_at, | |
| "actions": [], | |
| "status": "active" | |
| } | |
| sessions_ref().child(session_id).set(session_doc) | |
| log.info(f"New session {session_id} for user={username} case={case_id}") | |
| return session_doc | |
| def get_session(session_id: str) -> Dict[str, Any]: | |
| return sessions_ref().child(session_id).get() or {} | |
| def require_active_session(req) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| session_id = req.headers.get("X-Session-Id", "") | |
| if not session_id: | |
| return {}, {"error": "Missing X-Session-Id header."} | |
| sess = get_session(session_id) | |
| if not sess or sess.get("status") != "active": | |
| return {}, {"error": "Invalid or inactive session."} | |
| now = datetime.now(timezone.utc) | |
| exp = datetime.fromisoformat(sess["expires_at"].replace("Z", "+00:00")) | |
| if now > exp: | |
| sess["status"] = "expired" | |
| sessions_ref().child(session_id).child("status").set("expired") | |
| return {}, {"error": "Session expired."} | |
| return sess, {} | |
| def spend_ip(session: Dict[str, Any], cost: int, action: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| if session["ip_remaining"] < cost: | |
| return session, {"error": "Not enough Investigation Points."} | |
| new_ip = session["ip_remaining"] - cost | |
| session["ip_remaining"] = new_ip | |
| action["ts"] = datetime.now(timezone.utc).isoformat() | |
| sessions_ref().child(session["session_id"]).child("ip_remaining").set(new_ip) | |
| sessions_ref().child(session["session_id"]).child("actions").push(action) | |
| log.debug(f"Spend IP: {cost} -> remaining={new_ip}") | |
| return session, {} | |
| def score_result(correct: bool, session: Dict[str, Any]) -> Dict[str, Any]: | |
| exp = datetime.fromisoformat(session["expires_at"].replace("Z", "+00:00")) | |
| now = datetime.now(timezone.utc) | |
| seconds_left = max(0, int((exp - now).total_seconds())) | |
| time_bonus = (seconds_left + 9) // 10 | |
| ip_bonus = session["ip_remaining"] * 2 | |
| base = 100 if correct else 0 | |
| penalty = 40 if not correct else 0 | |
| score = max(0, base + time_bonus + ip_bonus - penalty) | |
| return {"score": score, "seconds_left": seconds_left, "ip_left": session["ip_remaining"]} | |
| def upsert_leaderboard(case_id: str, user_id: str, username: str, score: int): | |
| plays_ref(case_id).child(user_id).set({ | |
| "user_id": user_id, | |
| "username": username, | |
| "score": score, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| plays = plays_ref(case_id).get() or {} | |
| top = sorted(plays.values(), key=lambda x: x.get("score", 0), reverse=True)[:LEADERBOARD_TOP_N] | |
| leaderboard_ref(case_id).set(top) | |
| # ----------------------------------------------------------------------------- | |
| # 6) ROUTES | |
| # ----------------------------------------------------------------------------- | |
| def health(): | |
| return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()}) | |
| # --- Admin: Internet Archive ingestion (manual) --- | |
| def admin_ingest_ia(): | |
| if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| body = request.get_json() or {} | |
| query = body.get("query") or DEFAULT_IA_QUERY | |
| pages = int(body.get("pages") or 2) | |
| rows = int(body.get("rows") or 100) | |
| ingested = 0 | |
| errors = 0 | |
| log.info(f"Manual ingest: query='{query}' pages={pages} rows={rows}") | |
| for page in range(1, pages + 1): | |
| try: | |
| docs = ia_advanced_search(query, rows=rows, page=page) | |
| except Exception: | |
| errors += 1 | |
| continue | |
| for d in docs: | |
| ident = d.get("identifier") | |
| if not ident: | |
| continue | |
| if ia_pool_ref().child(fb_key(ident)).get(): | |
| continue | |
| try: | |
| rec = ingest_ia_doc(d) | |
| if rec: | |
| ingested += 1 | |
| except Exception: | |
| errors += 1 | |
| log.exception(f"Manual ingest failed for {ident}") | |
| continue | |
| pool_size = len(ia_pool_ref().get() or {}) | |
| return jsonify({"ok": True, "ingested": ingested, "errors": errors, "pool_size": pool_size}) | |
| # --- Admin: Cache IA images to Firebase Storage (manual) --- | |
| def admin_cache_ia(): | |
| if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| cfg = request.get_json() or {} | |
| out = batch_cache_ia_pool( | |
| limit=int(cfg.get("limit", 100)), | |
| overwrite=bool(cfg.get("overwrite", False)), | |
| randomize=bool(cfg.get("randomize", True)), | |
| min_width=int(cfg.get("min_width", 800)), | |
| min_height=int(cfg.get("min_height", 800)), | |
| max_dim=int(cfg.get("max_dim", 4096)), | |
| jpeg_quality=int(cfg.get("jpeg_quality", 90)), | |
| skip_if_restricted=bool(cfg.get("skip_if_restricted", True)), | |
| ) | |
| return jsonify(out) | |
| # --- Admin: pool stats --- | |
| def ia_pool_stats(): | |
| if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| pool = ia_pool_ref().get() or {} | |
| cached = sum(1 for r in pool.values() if r.get("storage_url")) | |
| return jsonify({"pool_size": len(pool), "cached": cached}) | |
| # --- Admin: pre-generate today's case (manual) --- | |
| def admin_generate_today(): | |
| if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| case_id = utc_today_str() | |
| public = ensure_case_generated(case_id) | |
| return jsonify({"generated": True, "case_id": case_id, "mode": public.get("mode")}) | |
| # --- DEV-ONLY: panic button bootstrap (no auth; gated by env) --- | |
| def admin_bootstrap_now(): | |
| if not ALLOW_DEV_BOOTSTRAP: | |
| return jsonify({"error": "Disabled. Set ALLOW_DEV_BOOTSTRAP=1 to enable."}), 403 | |
| cfg = request.get_json() or {} | |
| min_items = int(cfg.get("min_items", MIN_IA_POOL)) | |
| rows = int(cfg.get("rows", 100)) | |
| max_pages = int(cfg.get("max_pages", 5)) | |
| custom_q = cfg.get("query") | |
| global DEFAULT_IA_QUERY | |
| original_q = DEFAULT_IA_QUERY | |
| if custom_q: | |
| log.warning(f"DEV bootstrap using custom query: {custom_q!r}") | |
| DEFAULT_IA_QUERY = custom_q # temporary override | |
| try: | |
| stats = ensure_minimum_ia_pool(min_items=min_items, rows=rows, max_pages=max_pages) | |
| return jsonify({"ok": True, "stats": stats, "effective_query": DEFAULT_IA_QUERY}) | |
| except Exception as e: | |
| log.exception("bootstrap-now failed") | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| finally: | |
| DEFAULT_IA_QUERY = original_q # restore | |
| # --- DEV-ONLY: diagnostics (network + firebase sanity) --- | |
| def diagnostics(): | |
| if not ALLOW_DEV_DIAGNOSTICS: | |
| return jsonify({"error": "Disabled. Set ALLOW_DEV_DIAGNOSTICS=1 to enable."}), 403 | |
| info = { | |
| "bucket": bucket.name, | |
| "db_url": db_root.path, | |
| "log_level": LOG_LEVEL, | |
| "ia_query": DEFAULT_IA_QUERY, | |
| } | |
| diag = {"info": info, "ia": {}, "firebase": {}} | |
| try: | |
| docs = ia_advanced_search(DEFAULT_IA_QUERY, rows=3, page=1) | |
| diag["ia"]["search_docs"] = [d.get("identifier") for d in docs] | |
| if docs: | |
| ident = docs[0].get("identifier") | |
| meta = ia_metadata(ident) | |
| best = ia_best_image_from_metadata(meta) | |
| diag["ia"]["sample_identifier"] = ident | |
| diag["ia"]["best_file"] = (best or {}).get("name") | |
| except Exception as e: | |
| diag["ia"]["error"] = str(e) | |
| try: | |
| tiny = upload_bytes_to_storage(b"ping", f"diag/ping_{uuid.uuid4().hex}.txt", "text/plain") | |
| diag["firebase"]["upload_test"] = tiny | |
| except Exception as e: | |
| diag["firebase"]["error"] = str(e) | |
| return jsonify(diag) | |
| # --- Player flow --- | |
| def start_case(): | |
| user_id, username = extract_user_from_headers(request) | |
| case_id = utc_today_str() | |
| public = ensure_case_generated(case_id) | |
| existing = sessions_ref().order_by_child("user_id").equal_to(user_id).get() | |
| sess = None | |
| if existing: | |
| for _, sdoc in existing.items(): | |
| if sdoc.get("case_id") == case_id and sdoc.get("status") == "active": | |
| sess = sdoc | |
| break | |
| if not sess: | |
| sess = create_session(user_id, username, case_id) | |
| return jsonify({"session_id": sess["session_id"], "case": public}) | |
| def tool_signature(case_id): | |
| session, err = require_active_session(request) | |
| if err: return jsonify(err), 400 | |
| if session["case_id"] != case_id: | |
| return jsonify({"error": "Session/case mismatch."}), 400 | |
| body = request.get_json() or {} | |
| img_index = int(body.get("image_index", 0)) | |
| if img_index not in [0,1,2]: | |
| return jsonify({"error": "image_index must be 0,1,2"}), 400 | |
| session, err = spend_ip(session, TOOL_COSTS["signature"], {"type": "tool_signature", "image_index": img_index}) | |
| if err: return jsonify(err), 400 | |
| public = case_ref(case_id).child("public").get() or {} | |
| crops = public.get("signature_crops", []) | |
| crop_url = crops[img_index] if img_index < len(crops) else "" | |
| hint = "Examine baseline alignment and stroke overlap." if public.get("mode") == "observation" else "" | |
| return jsonify({"crop_url": crop_url, "hint": hint, "ip_remaining": session["ip_remaining"]}) | |
| def tool_metadata(case_id): | |
| session, err = require_active_session(request) | |
| if err: return jsonify(err), 400 | |
| if session["case_id"] != case_id: | |
| return jsonify({"error": "Session/case mismatch."}), 400 | |
| body = request.get_json() or {} | |
| img_index = int(body.get("image_index", 0)) | |
| if img_index not in [0,1,2]: | |
| return jsonify({"error": "image_index must be 0,1,2"}), 400 | |
| session, err = spend_ip(session, TOOL_COSTS["metadata"], {"type": "tool_metadata", "image_index": img_index}) | |
| if err: return jsonify(err), 400 | |
| solution = case_ref(case_id).child("solution").get() or {} | |
| flags_metadata: List[str] = solution.get("flags_metadata", []) | |
| hint = flags_metadata[0] if flags_metadata else "Check chronology, chemistry, and institutional formats." | |
| return jsonify({"flags": [hint], "ip_remaining": session["ip_remaining"]}) | |
| def tool_financial(case_id): | |
| session, err = require_active_session(request) | |
| if err: return jsonify(err), 400 | |
| if session["case_id"] != case_id: | |
| return jsonify({"error": "Session/case mismatch."}), 400 | |
| session, err = spend_ip(session, TOOL_COSTS["financial"], {"type": "tool_financial"}) | |
| if err: return jsonify(err), 400 | |
| solution = case_ref(case_id).child("solution").get() or {} | |
| flags_financial: List[str] = solution.get("flags_financial", []) | |
| hint = flags_financial[0] if flags_financial else "Follow currency, jurisdiction, and payment method timelines." | |
| return jsonify({"flags": [hint], "ip_remaining": session["ip_remaining"]}) | |
| def submit_guess(case_id): | |
| session, err = require_active_session(request) | |
| if err: return jsonify(err), 400 | |
| if session["case_id"] != case_id: | |
| return jsonify({"error": "Session/case mismatch."}), 400 | |
| body = request.get_json() or {} | |
| guess_index = int(body.get("image_index", -1)) | |
| rationale = (body.get("rationale") or "").strip() | |
| if guess_index not in [0,1,2]: | |
| return jsonify({"error": "image_index must be 0,1,2"}), 400 | |
| sessions_ref().child(session["session_id"]).child("status").set("finished") | |
| session["status"] = "finished" | |
| solution = case_ref(case_id).child("solution").get() or {} | |
| answer_index = int(solution.get("answer_index", 0)) | |
| correct = (guess_index == answer_index) | |
| summary = score_result(correct, session) | |
| upsert_leaderboard(case_id, session["user_id"], session["username"], summary["score"]) | |
| reveal = { | |
| "authentic_index": answer_index, | |
| "explanation": solution.get("explanation", ""), | |
| "flags_signature": solution.get("flags_signature", []), | |
| "flags_metadata": solution.get("flags_metadata", []), | |
| "flags_financial": solution.get("flags_financial", []) | |
| } | |
| plays_ref(case_id).child(session["user_id"]).update({ | |
| "rationale": rationale, | |
| "correct": correct, | |
| "score": summary["score"], | |
| "seconds_left": summary["seconds_left"], | |
| "ip_left": summary["ip_left"], | |
| "finished_at": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({ | |
| "correct": correct, | |
| "score": summary["score"], | |
| "timeLeft": summary["seconds_left"], | |
| "ipLeft": summary["ip_left"], | |
| "reveal": reveal | |
| }) | |
| def leaderboard_daily(): | |
| case_id = utc_today_str() | |
| top = leaderboard_ref(case_id).get() or [] | |
| user_id, _ = extract_user_from_headers(request) | |
| me = plays_ref(case_id).child(user_id).get() or {} | |
| rank = None | |
| if top: | |
| for i, row in enumerate(top): | |
| if row.get("user_id") == user_id: | |
| rank = i + 1 | |
| break | |
| return jsonify({"case_id": case_id, "top": top, "me": {"score": me.get("score"), "rank": rank}}) | |
| # ----------------------------------------------------------------------------- | |
| # 7) MAIN | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| if os.environ.get("BOOTSTRAP_IA", "1") == "1": | |
| log.info("Bootstrapping Internet Archive pool...") | |
| try: | |
| stats = ensure_minimum_ia_pool() | |
| log.info(f"Bootstrap complete: {stats}") | |
| except Exception: | |
| log.exception("Bootstrap failed") | |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "7860")), debug=True) | |