"""Puck daemon — gradio.Server doubles as the HF Space backend and the local daemon. Dumb pipes by design: this process receives wire events, validates them against the shared schema (/schema/event.schema.json), queues them, and serves the built frontend. All policy — scoring, interruption taste, personality — lives in the TS engine running in the browser/webview. Keep it that way, or the eventual Tauri/Rust swap stops being ~150 lines. """ import asyncio import base64 import json import os import re import shutil import subprocess import sys import threading import time import uuid from datetime import datetime, timezone from pathlib import Path from fastapi import Request, Response from fastapi.responses import FileResponse, JSONResponse from gradio import Server from jsonschema import Draft202012Validator def _load_env_local() -> None: """The project keeps URLs/secrets in .env.local (not .env). Load it before importing brain, which reads PUCK_* env at import time. setdefault so a real exported env var always wins over the file.""" env = Path(__file__).resolve().parent.parent / ".env.local" if not env.exists(): return for line in env.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: k, _, v = line.partition("=") os.environ.setdefault(k.strip(), v.strip()) _load_env_local() import brain # noqa: E402 — must follow the env load so it sees the configured URLs import ocr # noqa: E402 — Vision OCR: terminal-CLI id + topical grounding (region-local) import recognizer # noqa: E402 — visual fingerprint matcher (ONNX CLIP), for site labels ROOT = Path(__file__).resolve().parent.parent SCHEMA = json.loads((ROOT / "schema" / "event.schema.json").read_text()) VALIDATOR = Draft202012Validator(SCHEMA) FRONTEND_DIST = ROOT / "frontend" / "dist" TRACES_PATH = ROOT / "server" / "data" / "traces.jsonl" SFT_PATH = ROOT / "molt" / "data" / "sft.jsonl" # curated rated traces that must accumulate before a molt (LoRA retrain from base) # is eligible; the molt itself is eval-gated in molt/, not triggered here. MOLT_THRESHOLD = 200 app = Server() # in-memory event queue: watchers POST in, the frontend drains. Survives nothing — # real persistence arrives with the memory store, not here. _events: list[dict] = [] _lock = threading.Lock() @app.post("/api/events") async def post_event(request: Request) -> JSONResponse: try: body = await request.json() except json.JSONDecodeError: return JSONResponse({"error": "body must be JSON"}, status_code=400) errors = [f"{e.json_path}: {e.message}" for e in VALIDATOR.iter_errors(body)] if errors: # loud contract failure — a watcher drifted from event.schema.json return JSONResponse({"error": "schema validation failed", "details": errors}, status_code=422) body.setdefault("id", f"evt_{uuid.uuid4().hex[:12]}") body.setdefault("ts", datetime.now(timezone.utc).isoformat()) with _lock: _events.append(body) # backstop against an unattended queue growing forever — keep the newest 200 _events[:] = _events[-200:] return JSONResponse({"ok": True, "id": body["id"]}, status_code=202) @app.get("/api/events/pending") def pending_events() -> JSONResponse: """Drain the queue. The frontend polls this; each event is delivered once.""" with _lock: drained, _events[:] = _events[:], [] return JSONResponse({"events": drained}) @app.post("/api/traces") async def post_traces(request: Request) -> JSONResponse: """Night Bloom ships the day's decision traces here — the molt's training data. Append-only JSONL; the dataset builder reads it later. Dumb storage, no policy.""" try: body = await request.json() except json.JSONDecodeError: return JSONResponse({"error": "body must be JSON"}, status_code=400) records = body.get("records") if not isinstance(records, list) or not all(isinstance(r, dict) for r in records): return JSONResponse({"error": "expected {records: object[]}"}, status_code=400) TRACES_PATH.parent.mkdir(parents=True, exist_ok=True) with _lock, TRACES_PATH.open("a") as f: for r in records: f.write(json.dumps(r, separators=(",", ":")) + "\n") return JSONResponse({"ok": True, "appended": len(records)}, status_code=202) @app.post("/api/brain/see") async def brain_see(request: Request) -> JSONResponse: """Puck's eyes: a screenshot (data URL) → wire events for what he notices. Perceived events run the SAME schema validation + queue as hook events, so the engine scores vision and hooks identically — vision is just another source.""" body = await request.json() image = body.get("image") if not isinstance(image, str) or not image.startswith("data:image"): return JSONResponse({"error": "expected {image: data-url, fairy_state?}"}, status_code=400) try: observed = await asyncio.to_thread(brain.see, image, body.get("fairy_state", {})) except brain.BrainError as e: return _brain_503(e) queued = [] with _lock: for ev in observed: ev.setdefault("id", f"evt_{uuid.uuid4().hex[:12]}") ev["ts"] = datetime.now(timezone.utc).isoformat() ev.setdefault("payload", {"via": "vision"}) # tag perceived events (external sources pre-tag their own) if not list(VALIDATOR.iter_errors(ev)): # off-contract perceptions are dropped, not queued _events.append(ev) queued.append(ev) _events[:] = _events[-200:] return JSONResponse({"observed": len(observed), "queued": len(queued), "events": queued}) PEEKS_DIR = ROOT / "server" / "data" / "peeks" def _log_peek( image_data_url: str, quip: str, fairy_state: dict, context: str = "", emotion: str = "" ) -> None: """Save each (screenshot, context, quip, emotion) for tuning — what Puck saw + the app context vs. what he said and felt. Gitignored (data/). Best-effort.""" try: PEEKS_DIR.mkdir(parents=True, exist_ok=True) header, _, b64 = image_data_url.partition(",") ext = "jpg" if "jpeg" in header else "png" name = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f") + "." + ext (PEEKS_DIR / name).write_bytes(base64.b64decode(b64)) with (PEEKS_DIR / "log.jsonl").open("a") as f: rec = {"ts": name.rsplit(".", 1)[0], "image": name, "context": context, "quip": quip, "emotion": emotion, "fairy": fairy_state} f.write(json.dumps(rec, separators=(",", ":")) + "\n") except Exception as e: # noqa: BLE001 — logging must never break a peek print(f"peek-log failed: {e}") def _recent_peeks(n: int) -> list[dict]: """Puck's memory = his recent observations (the tail of the peek log).""" try: lines = (PEEKS_DIR / "log.jsonl").read_text().splitlines() except FileNotFoundError: return [] out = [] for ln in lines[-n:]: try: out.append(json.loads(ln)) except json.JSONDecodeError: pass return out @app.get("/api/memories") def memories(limit: int = 30) -> dict: """Recent things Puck has noticed — for the companion's memory tab.""" return {"memories": list(reversed(_recent_peeks(limit)))} # newest first RECOGNIZE_REFS = ROOT / "recognize" / "refs" @app.get("/api/peeks/{name}") def peek_image(name: str) -> Response: """Serve a saved peek screenshot (for the Memory tab's labeling view).""" if "/" in name or ".." in name: return JSONResponse({"error": "bad name"}, status_code=400) target = (PEEKS_DIR / name).resolve() if PEEKS_DIR.resolve() in target.parents and target.is_file(): return FileResponse(target) return JSONResponse({"error": "not found"}, status_code=404) @app.get("/api/recognize/labels") def recognize_labels() -> dict: """Existing reference labels — so the labeling UI can suggest reuse.""" if not RECOGNIZE_REFS.exists(): return {"labels": []} return {"labels": sorted(p.name for p in RECOGNIZE_REFS.iterdir() if p.is_dir())} @app.post("/api/recognize/label") async def recognize_label(request: Request) -> JSONResponse: """User taught Puck what a peek is → enroll that screenshot as a fingerprint reference under recognize/refs/