dev-stroke / main.py
rairo's picture
Update main.py
e57540f verified
# app.py — Hidden Stroke (AI Noir Investigation) with verbose logging
# Flask + Firebase Realtime DB + Firebase Storage + Gemini
# Envs required: FIREBASE, Firebase_DB, Firebase_Storage, Gemini
# Optional envs: GAME_SALT, ADMIN_KEY, IA_USER_AGENT, MIN_IA_POOL, IA_QUERY,
# BOOTSTRAP_IA, LOG_LEVEL, ALLOW_DEV_BOOTSTRAP, ALLOW_DEV_DIAGNOSTICS
import os, io, uuid, json, hmac, hashlib, random, traceback, requests, re, hashlib as _hash
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, Tuple, List, Optional
from flask import Flask, request, jsonify
from flask_cors import CORS
from PIL import Image
# ----- Logging ---------------------------------------------------------------
import logging
LOG_LEVEL = os.environ.get("LOG_LEVEL", "DEBUG").upper()
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.DEBUG),
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
log = logging.getLogger("hidden_stroke")
# ---------------- Firebase Admin (Realtime DB + Storage) ----------------
import firebase_admin
from firebase_admin import credentials, db, storage
# ---------------- Gemini (exact client & model names) -------------------
from google import genai
from google.genai import types
# -----------------------------------------------------------------------------
# 1) CONFIG & INIT
# -----------------------------------------------------------------------------
app = Flask(__name__)
CORS(app,
resources={r"/*": {"origins": "*"}},
supports_credentials=False,
methods=["GET", "POST", "OPTIONS"],
allow_headers=["Content-Type", "X-Reddit-User", "X-Reddit-Id"])
# --- Firebase ---
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string:
raise ValueError("The FIREBASE environment variable is not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
if not firebase_db_url or not firebase_storage_bucket:
raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {
'databaseURL': firebase_db_url,
'storageBucket': firebase_storage_bucket
})
bucket = storage.bucket()
db_root = db.reference("/")
log.info("Firebase Realtime DB + Storage initialized.")
except Exception:
log.exception("FATAL: Firebase init failed")
raise
# --- Gemini ---
try:
GEMINI_API_KEY = os.environ.get("Gemini")
if not GEMINI_API_KEY:
raise ValueError("The 'Gemini' environment variable is not set.")
client = genai.Client(api_key=GEMINI_API_KEY)
log.info("Gemini client initialized.")
except Exception:
log.exception("FATAL: Gemini init failed")
raise
# --- Models (exact names) ---
CATEGORY_MODEL = "gemini-2.5-flash"
GENERATION_MODEL = "gemini-2.0-flash-exp-image-generation"
#GENERATION_MODEL = "gemini-2.5-flash-image-preview
# --- Game constants ---
TIMER_SECONDS = 90
INITIAL_IP = 8
TOOL_COSTS = {"signature": 1, "metadata": 1, "financial": 2}
LEADERBOARD_TOP_N = 50
# --- Misc config ---
GAME_SALT = os.environ.get("GAME_SALT", "dev-salt")
ADMIN_KEY = os.environ.get("ADMIN_KEY")
IA_USER_AGENT = os.environ.get("IA_USER_AGENT", "HiddenStrokeBot/1.0 (+https://reddit.com)")
MIN_IA_POOL = int(os.environ.get("MIN_IA_POOL", "60"))
DEFAULT_IA_QUERY = os.environ.get(
"IA_QUERY",
'(collection:(metropolitanmuseum OR smithsonian OR getty OR artic) AND mediatype:image)'
)
ALLOW_DEV_BOOTSTRAP = os.environ.get("ALLOW_DEV_BOOTSTRAP", "0") == "1"
ALLOW_DEV_DIAGNOSTICS = os.environ.get("ALLOW_DEV_DIAGNOSTICS", "0") == "1"
FALLBACK_IA_QUERIES = [
'(mediatype:image AND (format:JPEG OR format:PNG))',
'(mediatype:image AND (format:JPEG OR format:PNG) AND (subject:portrait OR title:portrait))',
'(mediatype:image AND format:JPEG)',
]
# -----------------------------------------------------------------------------
# 2) UTILS
# -----------------------------------------------------------------------------
def utc_today_str() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d")
def case_ref(case_id: str):
return db_root.child(f"cases/{case_id}")
def plays_ref(case_id: str):
return db_root.child(f"plays/{case_id}")
def leaderboard_ref(case_id: str):
return db_root.child(f"leaderboards/{case_id}/top")
def sessions_ref():
return db_root.child("sessions")
def ia_pool_ref():
return db_root.child("ia_pool")
def hmac_hex(s: str) -> str:
return hmac.new(GAME_SALT.encode(), s.encode(), hashlib.sha256).hexdigest()
# Firebase RTDB key sanitizer (no . $ # [ ] / or control chars)
_FB_BAD = re.compile(r'[.$#\[\]/\x00-\x1F\x7F]')
def fb_key(raw: str) -> str:
safe = _FB_BAD.sub('_', raw or '')
if len(safe) > 700:
safe = safe[:700]
if safe != raw:
suffix = _hash.sha1((raw or '').encode('utf-8')).hexdigest()[:8]
safe = f"{safe}__{suffix}"
return safe or _hash.sha1(b'empty').hexdigest()[:8]
def upload_bytes_to_storage(data: bytes, path: str, content_type: str) -> str:
log.debug(f"Uploading to Storage: path={path}, content_type={content_type}, bytes={len(data)}")
blob = bucket.blob(path)
blob.upload_from_string(data, content_type=content_type)
blob.make_public()
url = blob.public_url
log.debug(f"Uploaded: {url}")
return url
def pil_from_inline_image_part(part) -> Image.Image:
image_bytes = part.inline_data.data
return Image.open(io.BytesIO(image_bytes)).convert("RGB")
def save_image_return_url(img: Image.Image, path: str, quality=92) -> str:
b = io.BytesIO()
img.save(b, format="JPEG", quality=quality, optimize=True)
return upload_bytes_to_storage(b.getvalue(), path, "image/jpeg")
def extract_user_from_headers(req) -> Tuple[str, str]:
uname = (req.headers.get("X-Reddit-User") or "").strip()
uid = (req.headers.get("X-Reddit-Id") or "").strip()
if not uname:
uname = "anon"
if not uid:
uid = uname
return uid, uname
def seed_for_date(case_id: str) -> int:
return int(hmac_hex(f"seed::{case_id}")[:12], 16)
def fifty_fifty_mode(case_seed: int) -> str:
return "knowledge" if (case_seed % 2 == 0) else "observation"
def http_get_json(url: str, params: dict = None) -> dict:
log.debug(f"HTTP GET JSON: {url} params={params}")
headers = {"User-Agent": IA_USER_AGENT}
r = requests.get(url, params=params, headers=headers, timeout=30)
log.debug(f"HTTP {r.status_code} for {r.url}")
r.raise_for_status()
return r.json()
def http_get_bytes(url: str) -> bytes:
log.debug(f"HTTP GET BYTES: {url}")
headers = {"User-Agent": IA_USER_AGENT}
r = requests.get(url, headers=headers, timeout=60)
log.debug(f"HTTP {r.status_code} for {r.url} bytes={len(r.content)}")
r.raise_for_status()
return r.content
def ia_advanced_search(query: str, rows: int, page: int) -> List[dict]:
url = "https://archive.org/advancedsearch.php"
params = {"q": query, "rows": rows, "page": page, "output": "json"}
try:
data = http_get_json(url, params=params)
docs = data.get("response", {}).get("docs", [])
log.info(f"IA search page={page} rows={rows} -> {len(docs)} docs")
return docs
except Exception:
log.exception("IA advanced search failed")
raise
def ia_metadata(identifier: str) -> dict:
url = f"https://archive.org/metadata/{identifier}"
try:
meta = http_get_json(url)
log.debug(f"Fetched metadata for {identifier}, files={len(meta.get('files', []) or [])}")
return meta
except Exception:
log.exception(f"IA metadata fetch failed for {identifier}")
raise
def ia_best_image_from_metadata(meta: dict) -> Optional[dict]:
files = meta.get("files", []) or []
best, best_pixels = None, -1
for f in files:
fmt = (f.get("format") or "").lower()
if any(x in fmt for x in ["jpeg", "jpg", "png", "tiff", "image"]):
w = int(f.get("width") or 0)
h = int(f.get("height") or 0)
px = w * h if (w and h) else int(f.get("size") or 0)
if px > best_pixels:
best_pixels, best = px, f
if best:
log.debug(f"Best image: name={best.get('name')} fmt={best.get('format')} dims={best.get('width')}x{best.get('height')} size={best.get('size')}")
else:
log.warning("No suitable image file found in metadata")
return best
def ingest_ia_doc(doc: dict) -> Optional[dict]:
"""Fetch /metadata and store best image entry into ia_pool (sanitized key)."""
identifier = doc.get("identifier")
if not identifier:
return None
pool_key = fb_key(identifier)
log.info(f"Ingesting IA identifier={identifier} -> pool_key={pool_key}")
meta = ia_metadata(identifier)
best = ia_best_image_from_metadata(meta)
if not best:
log.warning(f"Skipping {identifier}: no image file")
return None
md = meta.get("metadata", {}) or {}
title = md.get("title", "") or doc.get("title", "")
date = md.get("date", "") or doc.get("date", "")
creator = md.get("creator", "") or doc.get("creator", "")
rights = md.get("rights", "") or doc.get("rights", "")
licenseurl = md.get("licenseurl", "") or doc.get("licenseurl", "")
download_url = f"https://archive.org/download/{identifier}/{best['name']}"
record = {
"identifier": identifier, # original IA id preserved
"_pool_key": pool_key, # sanitized RTDB key
"title": title,
"date": str(date),
"creator": creator,
"rights": rights,
"licenseurl": licenseurl,
"download_url": download_url,
"file_name": best["name"],
"format": best.get("format"),
"width": best.get("width"),
"height": best.get("height"),
"size": best.get("size"),
"source": "internet_archive"
}
ia_pool_ref().child(pool_key).set(record)
log.info(f"Ingested {identifier} -> ia_pool/{pool_key} (title='{title}')")
return record
def choose_ia_item_for_case(case_id: str) -> Optional[dict]:
pool = ia_pool_ref().get() or {}
if not pool:
log.warning("choose_ia_item_for_case: pool is empty")
return None
keys = sorted(pool.keys())
case_seed = seed_for_date(case_id)
pool_key = keys[case_seed % len(keys)]
log.info(f"Chosen IA pool_key for case {case_id}: {pool_key}")
return pool[pool_key]
def download_image_to_pil(url: str) -> Image.Image:
data = http_get_bytes(url)
img = Image.open(io.BytesIO(data)).convert("RGB")
log.debug(f"Opened image from {url} size={img.size}")
return img
def crop_signature_macro(img: Image.Image, size: int = 512) -> Image.Image:
w, h = img.size
cw = min(size, w)
ch = min(size, h)
left = max(0, w - cw)
top = max(0, h - ch)
log.debug(f"Signature crop from ({left},{top}) to ({left+cw},{top+ch})")
return img.crop((left, top, left + cw, top + ch))
# -----------------------------------------------------------------------------
# 3) IA -> Firebase Storage caching + Zero-admin bootstrap
# -----------------------------------------------------------------------------
def _resize_if_needed(img: Image.Image, max_dim: int = 4096) -> Image.Image:
w, h = img.size
if max(w, h) <= max_dim:
return img
if w >= h:
new_w = max_dim
new_h = int(h * (max_dim / w))
else:
new_h = max_dim
new_w = int(w * (max_dim / h))
log.debug(f"Resizing image from {w}x{h} to {new_w}x{new_h}")
return img.resize((new_w, new_h), Image.LANCZOS)
def cache_single_ia_identifier(
pool_key: str,
overwrite: bool = False,
max_dim: int = 4096,
jpeg_quality: int = 90,
skip_if_restricted: bool = True,
) -> dict:
rec_ref = ia_pool_ref().child(pool_key)
rec = rec_ref.get() or {}
if not rec:
return {"pool_key": pool_key, "stored": False, "reason": "not_in_pool"}
identifier = rec.get("identifier") or pool_key
rights = (rec.get("rights") or "").lower()
if skip_if_restricted and ("in copyright" in rights or "all rights reserved" in rights):
log.info(f"Skipping {identifier}: restricted rights")
return {"pool_key": pool_key, "stored": False, "reason": "restricted_rights"}
if rec.get("storage_url") and not overwrite:
log.info(f"Skipping {identifier}: already cached")
return {"pool_key": pool_key, "stored": False, "reason": "already_cached", "storage_url": rec["storage_url"]}
source_url = rec.get("storage_url") or rec.get("download_url")
if not source_url:
log.warning(f"{identifier}: missing source_url")
return {"pool_key": pool_key, "stored": False, "reason": "missing_source_url"}
try:
log.info(f"Caching {identifier} from {source_url}")
img = download_image_to_pil(source_url)
except Exception as e:
if rec.get("download_url") and source_url != rec.get("download_url"):
try:
log.warning(f"Retrying {identifier} from IA download_url")
img = download_image_to_pil(rec["download_url"])
except Exception as e2:
log.exception(f"{identifier}: download failed")
return {"pool_key": pool_key, "stored": False, "reason": f"download_failed: {e2}"}
else:
log.exception(f"{identifier}: download failed")
return {"pool_key": pool_key, "stored": False, "reason": f"download_failed: {e}"}
img = _resize_if_needed(img, max_dim=max_dim)
w, h = img.size
# Upload original
img_bytes = io.BytesIO()
img.save(img_bytes, format="JPEG", quality=jpeg_quality, optimize=True)
img_bytes.seek(0)
img_path = f"ia_cache/{pool_key}/original.jpg"
storage_url = upload_bytes_to_storage(img_bytes.getvalue(), img_path, "image/jpeg")
# Upload macro crop
crop = crop_signature_macro(img, 512)
crop_bytes = io.BytesIO()
crop.save(crop_bytes, format="JPEG", quality=jpeg_quality, optimize=True)
crop_bytes.seek(0)
crop_path = f"ia_cache/{pool_key}/signature_crop.jpg"
signature_crop_url = upload_bytes_to_storage(crop_bytes.getvalue(), crop_path, "image/jpeg")
rec_update = {
"storage_url": storage_url,
"signature_crop_url": signature_crop_url,
"image_path": img_path,
"crop_path": crop_path,
"width": w,
"height": h,
"cached_at": datetime.now(timezone.utc).isoformat()
}
rec_ref.update(rec_update)
log.info(f"Cached {identifier} -> {storage_url}")
return {
"pool_key": pool_key,
"stored": True,
"storage_url": storage_url,
"signature_crop_url": signature_crop_url,
"width": w,
"height": h
}
def batch_cache_ia_pool(
limit: int = 100,
overwrite: bool = False,
randomize: bool = True,
min_width: int = 800,
min_height: int = 800,
max_dim: int = 4096,
jpeg_quality: int = 90,
skip_if_restricted: bool = True,
) -> dict:
pool = ia_pool_ref().get() or {}
log.info(f"batch_cache_ia_pool: pool_size={len(pool)}")
if not pool:
return {"ok": True, "processed": 0, "stored": 0, "skipped": 0, "results": []}
candidates = []
for pkey, rec in pool.items():
if overwrite or not rec.get("storage_url"):
w = int(rec.get("width") or 0)
h = int(rec.get("height") or 0)
if (w and h) and (w < min_width or h < min_height):
log.debug(f"Skip {pkey}: too small {w}x{h}")
continue
candidates.append(pkey)
if randomize:
random.shuffle(candidates)
candidates = candidates[:max(0, limit)]
log.info(f"Caching candidates: {len(candidates)} (limit={limit})")
results, stored, skipped = [], 0, 0
for pkey in candidates:
res = cache_single_ia_identifier(
pkey,
overwrite=overwrite,
max_dim=max_dim,
jpeg_quality=jpeg_quality,
skip_if_restricted=skip_if_restricted,
)
results.append(res)
if res.get("stored"):
stored += 1
else:
skipped += 1
log.info(f"batch_cache_ia_pool done: processed={len(candidates)} stored={stored} skipped={skipped}")
return {"ok": True, "processed": len(candidates), "stored": stored, "skipped": skipped, "results": results}
def ensure_minimum_ia_pool(min_items: int = MIN_IA_POOL, rows: int = 100, max_pages: int = 5) -> dict:
pool = ia_pool_ref().get() or {}
have = len(pool)
added = 0
cached = 0
log.info(f"ensure_minimum_ia_pool: have={have}, target={min_items}")
candidate_queries = []
if DEFAULT_IA_QUERY:
candidate_queries.append(DEFAULT_IA_QUERY)
candidate_queries.extend([q for q in FALLBACK_IA_QUERIES if q not in candidate_queries])
for q in candidate_queries:
if have + added >= min_items:
break
log.info(f"IA ingest: trying query: {q}")
page = 1
while have + added < min_items and page <= max_pages:
try:
docs = ia_advanced_search(q, rows=rows, page=page)
except Exception:
log.warning(f"IA search failed on page {page} for query {q!r}, moving on")
break
log.info(f"IA search page={page} -> {len(docs)} docs for query {q!r}")
if not docs:
break
for d in docs:
ident = d.get("identifier")
if not ident:
continue
if ia_pool_ref().child(fb_key(ident)).get():
continue
try:
rec = ingest_ia_doc(d)
if rec:
added += 1
except Exception:
log.exception(f"Ingest failed for {ident}")
continue
if have + added >= min_items:
break
page += 1
pool = ia_pool_ref().get() or {}
have_now = len(pool)
need_cache = max(0, min_items - have_now)
log.info(f"ensure_minimum_ia_pool: post-ingest have={have_now}, need_cache={need_cache}")
if need_cache:
res = batch_cache_ia_pool(limit=need_cache, randomize=True)
cached = res.get("stored", 0)
final_size = len(ia_pool_ref().get() or {})
stats = {"ok": True, "had": have, "added": added, "cached": cached, "final_size": final_size}
log.info(f"ensure_minimum_ia_pool: stats={stats}")
return stats
# -----------------------------------------------------------------------------
# 4) CASE GENERATION (uses IA for authentic image, Gemini for forgeries/meta)
# -----------------------------------------------------------------------------
def ensure_case_generated(case_id: str) -> Dict[str, Any]:
existing_public = case_ref(case_id).child("public").get()
if existing_public:
log.info(f"Case {case_id} already exists")
return existing_public
# Ensure we have a cached pool ready
try:
stats = ensure_minimum_ia_pool()
log.debug(f"Bootstrap stats for case {case_id}: {stats}")
except Exception:
log.exception("Bootstrap failed inside ensure_case_generated")
ia_item = choose_ia_item_for_case(case_id)
if not ia_item:
raise RuntimeError("No IA items available. Ingest needed.")
case_seed = seed_for_date(case_id)
mode = "knowledge" if (case_seed % 2 == 0) else "observation"
log.info(f"Case {case_id}: mode={mode}")
style_period = "sourced from Internet Archive; museum catalog reproduction"
source_url = ia_item.get("storage_url") or ia_item["download_url"]
log.info(f"Case {case_id}: authentic source={source_url}")
auth_img = download_image_to_pil(source_url)
images_urls: List[str] = []
signature_crops: List[str] = []
url1 = save_image_return_url(auth_img, f"hidden_stroke/{case_id}/images/img_1.jpg")
images_urls.append(url1)
log.debug(f"Case {case_id}: saved authentic -> {url1}")
crop1 = crop_signature_macro(auth_img, 512)
crop1_url = save_image_return_url(crop1, f"hidden_stroke/{case_id}/signature_crops/crop_1.jpg", quality=88)
signature_crops.append(crop1_url)
log.debug(f"Case {case_id}: saved authentic crop -> {crop1_url}")
if mode == "knowledge":
for _ in [2, 3]:
images_urls.append(images_urls[0])
signature_crops.append(signature_crops[0])
else:
for i in range(2):
forg_prompt = """
Create a near-identical variant of the provided painting.
Keep composition, palette, and lighting the same.
Only introduce a subtle change in signature micro-geometry (baseline alignment, stroke overlap order, or curve spacing).
No annotations. Differences must be visible only at macro zoom.
"""
log.info(f"Case {case_id}: generating forgery {i+1}")
resp = client.models.generate_content(
model=GENERATION_MODEL,
contents=[forg_prompt, auth_img],
config=types.GenerateContentConfig(response_modalities=["IMAGE"])
)
f_img = None
for p in resp.candidates[0].content.parts:
if getattr(p, "inline_data", None):
f_img = pil_from_inline_image_part(p)
break
if f_img is None:
log.warning("Gemini returned no image; falling back to copy of authentic")
f_img = auth_img.copy()
url = save_image_return_url(f_img, f"hidden_stroke/{case_id}/images/img_{i+2}.jpg")
images_urls.append(url)
crop = crop_signature_macro(f_img, 512)
c_url = save_image_return_url(crop, f"hidden_stroke/{case_id}/signature_crops/crop_{i+2}.jpg", quality=88)
signature_crops.append(c_url)
log.debug(f"Case {case_id}: forgery saved -> {url}; crop -> {c_url}")
title = ia_item.get("title") or "Untitled"
creator = ia_item.get("creator") or ""
date = ia_item.get("date") or ""
rights = ia_item.get("rights") or ""
licenseurl = ia_item.get("licenseurl") or ""
log.info(f"Case {case_id}: prompting metadata with title='{title}' creator='{creator}' date='{date}'")
meta_prompt = f"""
You are generating a daily case for a noir art investigation game.
MODE: {"KNOWLEDGE" if mode=="knowledge" else "OBSERVATION"}
AUTHENTIC CONTEXT (from Internet Archive):
- title: {title}
- creator: {creator}
- date: {date}
- rights: {rights}
- licenseurl: {licenseurl}
TASK:
1) Create a short, punchy "case_brief" (2–4 sentences) explaining why the artifact matters and why fraud is suspected — NO SPOILERS.
2) Prepare THREE metadata bundles for images A,B,C with NEARLY IDENTICAL fields.
Ensure exactly ONE bundle is AUTHENTIC and that it corresponds to the above authentic context.
The other two are FORGERIES with subtle, reality-checkable anomalies.
3) Provide a concise "ledger_summary" describing a believable ownership/payment trail.
4) Provide the solution with: "answer_index" (0 for A, 1 for B, 2 for C) and detailed flags for signature/metadata/financial, plus an "explanation".
OUTPUT STRICT JSON with this schema:
{{
"case_brief": "...",
"metadata": [
{{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}},
{{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}},
{{"title":"...", "year": "...", "medium": "...", "ink_or_pigment": "...", "catalog_ref": "...", "ownership_chain": ["...","..."], "notes":"..."}}
],
"ledger_summary": "short paragraph",
"solution": {{
"answer_index": 0,
"flags_signature": [ "..." ],
"flags_metadata": [ "..." ],
"flags_financial": [ "..." ],
"explanation": "A few sentences that justify the authentic pick without listing spoilers."
}}
}}
"""
meta_resp = client.models.generate_content(
model=CATEGORY_MODEL,
contents=[meta_prompt]
)
raw_text = meta_resp.text.strip()
log.debug(f"Case {case_id}: raw meta JSON text len={len(raw_text)}")
try:
meta_json = json.loads(raw_text)
except Exception:
cleaned = raw_text
if "```" in raw_text:
parts = raw_text.split("```")
if len(parts) >= 2:
cleaned = parts[1]
if cleaned.lower().startswith("json"):
cleaned = cleaned.split("\n", 1)[1]
meta_json = json.loads(cleaned)
case_brief = meta_json.get("case_brief", "A resurfaced portrait raises questions—its paper trail glitters a little too perfectly.")
metadata = meta_json.get("metadata", [])
ledger_summary = meta_json.get("ledger_summary", "")
solution = meta_json.get("solution", {})
answer_index = int(solution.get("answer_index", 0))
flags_signature = solution.get("flags_signature", [])
flags_metadata = solution.get("flags_metadata", [])
flags_financial = solution.get("flags_financial", [])
explanation = solution.get("explanation", "The authentic work aligns with period-accurate details; the others contain subtle contradictions.")
log.info(f"Case {case_id}: answer_index={answer_index}, meta_count={len(metadata)}")
if len(metadata) != 3:
log.error("Gemini did not return exactly 3 metadata bundles")
raise RuntimeError("Expected exactly 3 metadata bundles.")
public = {
"case_id": case_id,
"mode": mode,
"brief": case_brief,
"style_period": style_period,
"images": images_urls,
"signature_crops": signature_crops,
"metadata": metadata,
"ledger_summary": ledger_summary,
"timer_seconds": TIMER_SECONDS,
"initial_ip": INITIAL_IP,
"tool_costs": TOOL_COSTS,
"credits": {
"source": "Internet Archive",
"identifier": ia_item.get("identifier"),
"title": title,
"creator": creator,
"rights": rights,
"licenseurl": licenseurl
}
}
solution_doc = {
"answer_index": answer_index,
"flags_signature": flags_signature,
"flags_metadata": flags_metadata,
"flags_financial": flags_financial,
"explanation": explanation
}
cref = case_ref(case_id)
cref.child("public").set(public)
cref.child("solution").set(solution_doc)
log.info(f"Case {case_id}: generated and stored")
return public
# -----------------------------------------------------------------------------
# 5) SESSIONS, TOOLS, GUESS, LEADERBOARD
# -----------------------------------------------------------------------------
def create_session(user_id: str, username: str, case_id: str) -> Dict[str, Any]:
session_id = str(uuid.uuid4())
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=TIMER_SECONDS)).isoformat()
session_doc = {
"session_id": session_id,
"user_id": user_id,
"username": username,
"case_id": case_id,
"ip_remaining": INITIAL_IP,
"started_at": datetime.now(timezone.utc).isoformat(),
"expires_at": expires_at,
"actions": [],
"status": "active"
}
sessions_ref().child(session_id).set(session_doc)
log.info(f"New session {session_id} for user={username} case={case_id}")
return session_doc
def get_session(session_id: str) -> Dict[str, Any]:
return sessions_ref().child(session_id).get() or {}
def require_active_session(req) -> Tuple[Dict[str, Any], Dict[str, Any]]:
session_id = req.headers.get("X-Session-Id", "")
if not session_id:
return {}, {"error": "Missing X-Session-Id header."}
sess = get_session(session_id)
if not sess or sess.get("status") != "active":
return {}, {"error": "Invalid or inactive session."}
now = datetime.now(timezone.utc)
exp = datetime.fromisoformat(sess["expires_at"].replace("Z", "+00:00"))
if now > exp:
sess["status"] = "expired"
sessions_ref().child(session_id).child("status").set("expired")
return {}, {"error": "Session expired."}
return sess, {}
def spend_ip(session: Dict[str, Any], cost: int, action: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if session["ip_remaining"] < cost:
return session, {"error": "Not enough Investigation Points."}
new_ip = session["ip_remaining"] - cost
session["ip_remaining"] = new_ip
action["ts"] = datetime.now(timezone.utc).isoformat()
sessions_ref().child(session["session_id"]).child("ip_remaining").set(new_ip)
sessions_ref().child(session["session_id"]).child("actions").push(action)
log.debug(f"Spend IP: {cost} -> remaining={new_ip}")
return session, {}
def score_result(correct: bool, session: Dict[str, Any]) -> Dict[str, Any]:
exp = datetime.fromisoformat(session["expires_at"].replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
seconds_left = max(0, int((exp - now).total_seconds()))
time_bonus = (seconds_left + 9) // 10
ip_bonus = session["ip_remaining"] * 2
base = 100 if correct else 0
penalty = 40 if not correct else 0
score = max(0, base + time_bonus + ip_bonus - penalty)
return {"score": score, "seconds_left": seconds_left, "ip_left": session["ip_remaining"]}
def upsert_leaderboard(case_id: str, user_id: str, username: str, score: int):
plays_ref(case_id).child(user_id).set({
"user_id": user_id,
"username": username,
"score": score,
"ts": datetime.now(timezone.utc).isoformat()
})
plays = plays_ref(case_id).get() or {}
top = sorted(plays.values(), key=lambda x: x.get("score", 0), reverse=True)[:LEADERBOARD_TOP_N]
leaderboard_ref(case_id).set(top)
# -----------------------------------------------------------------------------
# 6) ROUTES
# -----------------------------------------------------------------------------
@app.route("/health", methods=["GET"])
def health():
return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()})
# --- Admin: Internet Archive ingestion (manual) ---
@app.route("/admin/ingest-ia", methods=["POST"])
def admin_ingest_ia():
if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY:
return jsonify({"error": "Forbidden"}), 403
body = request.get_json() or {}
query = body.get("query") or DEFAULT_IA_QUERY
pages = int(body.get("pages") or 2)
rows = int(body.get("rows") or 100)
ingested = 0
errors = 0
log.info(f"Manual ingest: query='{query}' pages={pages} rows={rows}")
for page in range(1, pages + 1):
try:
docs = ia_advanced_search(query, rows=rows, page=page)
except Exception:
errors += 1
continue
for d in docs:
ident = d.get("identifier")
if not ident:
continue
if ia_pool_ref().child(fb_key(ident)).get():
continue
try:
rec = ingest_ia_doc(d)
if rec:
ingested += 1
except Exception:
errors += 1
log.exception(f"Manual ingest failed for {ident}")
continue
pool_size = len(ia_pool_ref().get() or {})
return jsonify({"ok": True, "ingested": ingested, "errors": errors, "pool_size": pool_size})
# --- Admin: Cache IA images to Firebase Storage (manual) ---
@app.route("/admin/cache-ia", methods=["POST"])
def admin_cache_ia():
if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY:
return jsonify({"error": "Forbidden"}), 403
cfg = request.get_json() or {}
out = batch_cache_ia_pool(
limit=int(cfg.get("limit", 100)),
overwrite=bool(cfg.get("overwrite", False)),
randomize=bool(cfg.get("randomize", True)),
min_width=int(cfg.get("min_width", 800)),
min_height=int(cfg.get("min_height", 800)),
max_dim=int(cfg.get("max_dim", 4096)),
jpeg_quality=int(cfg.get("jpeg_quality", 90)),
skip_if_restricted=bool(cfg.get("skip_if_restricted", True)),
)
return jsonify(out)
# --- Admin: pool stats ---
@app.route("/admin/ia-pool/stats", methods=["GET"])
def ia_pool_stats():
if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY:
return jsonify({"error": "Forbidden"}), 403
pool = ia_pool_ref().get() or {}
cached = sum(1 for r in pool.values() if r.get("storage_url"))
return jsonify({"pool_size": len(pool), "cached": cached})
# --- Admin: pre-generate today's case (manual) ---
@app.route("/admin/generate-today", methods=["POST"])
def admin_generate_today():
if not ADMIN_KEY or request.headers.get("X-Admin-Key") != ADMIN_KEY:
return jsonify({"error": "Forbidden"}), 403
case_id = utc_today_str()
public = ensure_case_generated(case_id)
return jsonify({"generated": True, "case_id": case_id, "mode": public.get("mode")})
# --- DEV-ONLY: panic button bootstrap (no auth; gated by env) ---
@app.route("/admin/bootstrap-now", methods=["POST"])
def admin_bootstrap_now():
if not ALLOW_DEV_BOOTSTRAP:
return jsonify({"error": "Disabled. Set ALLOW_DEV_BOOTSTRAP=1 to enable."}), 403
cfg = request.get_json() or {}
min_items = int(cfg.get("min_items", MIN_IA_POOL))
rows = int(cfg.get("rows", 100))
max_pages = int(cfg.get("max_pages", 5))
custom_q = cfg.get("query")
global DEFAULT_IA_QUERY
original_q = DEFAULT_IA_QUERY
if custom_q:
log.warning(f"DEV bootstrap using custom query: {custom_q!r}")
DEFAULT_IA_QUERY = custom_q # temporary override
try:
stats = ensure_minimum_ia_pool(min_items=min_items, rows=rows, max_pages=max_pages)
return jsonify({"ok": True, "stats": stats, "effective_query": DEFAULT_IA_QUERY})
except Exception as e:
log.exception("bootstrap-now failed")
return jsonify({"ok": False, "error": str(e)}), 500
finally:
DEFAULT_IA_QUERY = original_q # restore
# --- DEV-ONLY: diagnostics (network + firebase sanity) ---
@app.route("/admin/diagnostics", methods=["GET"])
def diagnostics():
if not ALLOW_DEV_DIAGNOSTICS:
return jsonify({"error": "Disabled. Set ALLOW_DEV_DIAGNOSTICS=1 to enable."}), 403
info = {
"bucket": bucket.name,
"db_url": db_root.path,
"log_level": LOG_LEVEL,
"ia_query": DEFAULT_IA_QUERY,
}
diag = {"info": info, "ia": {}, "firebase": {}}
try:
docs = ia_advanced_search(DEFAULT_IA_QUERY, rows=3, page=1)
diag["ia"]["search_docs"] = [d.get("identifier") for d in docs]
if docs:
ident = docs[0].get("identifier")
meta = ia_metadata(ident)
best = ia_best_image_from_metadata(meta)
diag["ia"]["sample_identifier"] = ident
diag["ia"]["best_file"] = (best or {}).get("name")
except Exception as e:
diag["ia"]["error"] = str(e)
try:
tiny = upload_bytes_to_storage(b"ping", f"diag/ping_{uuid.uuid4().hex}.txt", "text/plain")
diag["firebase"]["upload_test"] = tiny
except Exception as e:
diag["firebase"]["error"] = str(e)
return jsonify(diag)
# --- Player flow ---
@app.route("/cases/today/start", methods=["POST"])
def start_case():
user_id, username = extract_user_from_headers(request)
case_id = utc_today_str()
public = ensure_case_generated(case_id)
existing = sessions_ref().order_by_child("user_id").equal_to(user_id).get()
sess = None
if existing:
for _, sdoc in existing.items():
if sdoc.get("case_id") == case_id and sdoc.get("status") == "active":
sess = sdoc
break
if not sess:
sess = create_session(user_id, username, case_id)
return jsonify({"session_id": sess["session_id"], "case": public})
@app.route("/cases/<case_id>/tool/signature", methods=["POST"])
def tool_signature(case_id):
session, err = require_active_session(request)
if err: return jsonify(err), 400
if session["case_id"] != case_id:
return jsonify({"error": "Session/case mismatch."}), 400
body = request.get_json() or {}
img_index = int(body.get("image_index", 0))
if img_index not in [0,1,2]:
return jsonify({"error": "image_index must be 0,1,2"}), 400
session, err = spend_ip(session, TOOL_COSTS["signature"], {"type": "tool_signature", "image_index": img_index})
if err: return jsonify(err), 400
public = case_ref(case_id).child("public").get() or {}
crops = public.get("signature_crops", [])
crop_url = crops[img_index] if img_index < len(crops) else ""
hint = "Examine baseline alignment and stroke overlap." if public.get("mode") == "observation" else ""
return jsonify({"crop_url": crop_url, "hint": hint, "ip_remaining": session["ip_remaining"]})
@app.route("/cases/<case_id>/tool/metadata", methods=["POST"])
def tool_metadata(case_id):
session, err = require_active_session(request)
if err: return jsonify(err), 400
if session["case_id"] != case_id:
return jsonify({"error": "Session/case mismatch."}), 400
body = request.get_json() or {}
img_index = int(body.get("image_index", 0))
if img_index not in [0,1,2]:
return jsonify({"error": "image_index must be 0,1,2"}), 400
session, err = spend_ip(session, TOOL_COSTS["metadata"], {"type": "tool_metadata", "image_index": img_index})
if err: return jsonify(err), 400
solution = case_ref(case_id).child("solution").get() or {}
flags_metadata: List[str] = solution.get("flags_metadata", [])
hint = flags_metadata[0] if flags_metadata else "Check chronology, chemistry, and institutional formats."
return jsonify({"flags": [hint], "ip_remaining": session["ip_remaining"]})
@app.route("/cases/<case_id>/tool/financial", methods=["POST"])
def tool_financial(case_id):
session, err = require_active_session(request)
if err: return jsonify(err), 400
if session["case_id"] != case_id:
return jsonify({"error": "Session/case mismatch."}), 400
session, err = spend_ip(session, TOOL_COSTS["financial"], {"type": "tool_financial"})
if err: return jsonify(err), 400
solution = case_ref(case_id).child("solution").get() or {}
flags_financial: List[str] = solution.get("flags_financial", [])
hint = flags_financial[0] if flags_financial else "Follow currency, jurisdiction, and payment method timelines."
return jsonify({"flags": [hint], "ip_remaining": session["ip_remaining"]})
@app.route("/cases/<case_id>/guess", methods=["POST"])
def submit_guess(case_id):
session, err = require_active_session(request)
if err: return jsonify(err), 400
if session["case_id"] != case_id:
return jsonify({"error": "Session/case mismatch."}), 400
body = request.get_json() or {}
guess_index = int(body.get("image_index", -1))
rationale = (body.get("rationale") or "").strip()
if guess_index not in [0,1,2]:
return jsonify({"error": "image_index must be 0,1,2"}), 400
sessions_ref().child(session["session_id"]).child("status").set("finished")
session["status"] = "finished"
solution = case_ref(case_id).child("solution").get() or {}
answer_index = int(solution.get("answer_index", 0))
correct = (guess_index == answer_index)
summary = score_result(correct, session)
upsert_leaderboard(case_id, session["user_id"], session["username"], summary["score"])
reveal = {
"authentic_index": answer_index,
"explanation": solution.get("explanation", ""),
"flags_signature": solution.get("flags_signature", []),
"flags_metadata": solution.get("flags_metadata", []),
"flags_financial": solution.get("flags_financial", [])
}
plays_ref(case_id).child(session["user_id"]).update({
"rationale": rationale,
"correct": correct,
"score": summary["score"],
"seconds_left": summary["seconds_left"],
"ip_left": summary["ip_left"],
"finished_at": datetime.now(timezone.utc).isoformat()
})
return jsonify({
"correct": correct,
"score": summary["score"],
"timeLeft": summary["seconds_left"],
"ipLeft": summary["ip_left"],
"reveal": reveal
})
@app.route("/leaderboard/daily", methods=["GET"])
def leaderboard_daily():
case_id = utc_today_str()
top = leaderboard_ref(case_id).get() or []
user_id, _ = extract_user_from_headers(request)
me = plays_ref(case_id).child(user_id).get() or {}
rank = None
if top:
for i, row in enumerate(top):
if row.get("user_id") == user_id:
rank = i + 1
break
return jsonify({"case_id": case_id, "top": top, "me": {"score": me.get("score"), "rank": rank}})
# -----------------------------------------------------------------------------
# 7) MAIN
# -----------------------------------------------------------------------------
if __name__ == "__main__":
if os.environ.get("BOOTSTRAP_IA", "1") == "1":
log.info("Bootstrapping Internet Archive pool...")
try:
stats = ensure_minimum_ia_pool()
log.info(f"Bootstrap complete: {stats}")
except Exception:
log.exception("Bootstrap failed")
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "7860")), debug=True)