"""Space entrypoint: Gradio UI + FastAPI /ingest, served together on one port.""" from __future__ import annotations import json import os from pathlib import Path import gradio as gr import uvicorn from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel from server import dedup, events, health, threads from server.pipeline import AgentRequest, AgentResponse, run_pipeline from ui.blocks import CAROUSEL_JS, CSS, THEME, build_demo INGEST_TOKEN = os.environ.get("INGEST_TOKEN", "") FEED_PATH = Path(os.environ.get("FEED_PATH", "/tmp/ingest_feed.json")) MAX_FEED = 200 # Opt-in: run the agent automatically on each new message (front-end A). Off by # default, so /ingest keeps its store-only behavior unless explicitly enabled. AUTONOMOUS = os.environ.get("AUTONOMOUS") == "1" # Which message direction triggers autonomous action: "outgoing" = only when YOU # send/accept an invite (is_from_me), "any" = any new message in the chat. TRIGGER_ON = os.environ.get("TRIGGER_ON", "outgoing").lower() app = FastAPI(title="iMessage Calendar Agent") class IngestMessage(BaseModel): chat: str sender: str text: str timestamp: str images: list[str] = [] # base64 data URIs of image attachments is_from_me: bool = False # True when YOU sent it (the send/accept trigger) class IngestBatch(BaseModel): messages: list[IngestMessage] def _load_feed() -> list[dict]: try: return json.loads(FEED_PATH.read_text()) except Exception: # noqa: BLE001 missing/corrupt -> empty return [] def _append_feed(items: list[dict]) -> None: feed = (_load_feed() + items)[-MAX_FEED:] FEED_PATH.write_text(json.dumps(feed, indent=2)) def _require_token(authorization: str) -> None: if not INGEST_TOKEN or authorization != f"Bearer {INGEST_TOKEN}": raise HTTPException(status_code=401, detail="bad token") def _run_autonomous(chats: set[str]) -> None: """For each affected chat, run the agent over its rolling thread and deliver only the genuinely-new events (deduped). Used when AUTONOMOUS=1. Order matters: extract WITHOUT pushing, dedup, then push only the fresh events. (Pushing inside the pipeline re-pushed already-captured events on every rolling-window re-run — the exact duplicate-creation dedup exists to prevent.)""" feed = _load_feed() for chat in chats: thread = threads.rolling_thread(feed, chat) if not thread: continue resp = run_pipeline(AgentRequest(thread=thread, push_gcal=False)) # Filter WITHOUT recording: events are only marked seen once the push # actually succeeds — recording first turns any transient push failure # into silent, permanent event loss (filtered out on every retry). new_events = dedup.filter_new(resp.plan.events, record=False) if not new_events: continue try: from calendar_out.gcal import push_events # lazy: google libs optional push_events(new_events) except Exception as e: # noqa: BLE001 push failure must not kill the loop events.emit("calendar", f"autonomous push failed (will retry next run): " f"{type(e).__name__}: {e}", level="error") continue # NOT marked seen -> retried on the next trigger dedup.mark_seen(new_events) events.emit( "decision", f"autonomous: {len(new_events)} new event(s) in {chat}", events=len(new_events), ) @app.post("/agent", response_model=AgentResponse) def agent(req: AgentRequest, authorization: str = Header(default="")): """Run the agent on a thread (or messages) and return an ActionPlan. The shared contract every front-end calls (iOS Shortcut, Android Tasker, the Mac collector). Stateless — see server/pipeline.run_pipeline. """ _require_token(authorization) return run_pipeline(req) @app.post("/ingest") def ingest(batch: IngestBatch, background_tasks: BackgroundTasks, authorization: str = Header(default="")): """Receive new messages from the Mac collector (bearer-token protected). Returns immediately — autonomous runs (full LLM inference, potentially minutes per chat) happen in a background task. Running them inline blew the collector's 30s POST timeout, which skipped _save_rowid and re-sent the same batch every poll (duplicate feed entries + duplicate runs).""" _require_token(authorization) items = [m.model_dump() for m in batch.messages] _append_feed(items) n_imgs = sum(len(m.images) for m in batch.messages) chats = sorted({m.chat for m in batch.messages}) events.emit("ingest", f"{len(items)} msg(s) from {', '.join(chats) or '—'}", images=n_imgs) if AUTONOMOUS: # Trigger on YOUR sent/accepted messages by default; "any" widens it. if TRIGGER_ON == "any": trigger_chats = set(chats) else: trigger_chats = {m.chat for m in batch.messages if m.is_from_me} if trigger_chats: background_tasks.add_task(_run_autonomous, trigger_chats) return {"received": len(items)} @app.get("/health") def health_route(): # Liveness + hardware-adequacy (device/model/degraded/reason). The on-page # status banner and the maintenance monitor both read this. return health.health_status() # --- Per-user Google Calendar OAuth (web flow) ----------------------------- # def _oauth_redirect_uri(request: Request) -> str: """Public redirect URI. On a Space, SPACE_HOST is the public host; locally, fall back to the request's base URL. Must match the Google client config.""" host = os.environ.get("SPACE_HOST", "").strip() base = f"https://{host}" if host else str(request.base_url).rstrip("/") return base.rstrip("/") + "/oauth2callback" @app.get("/oauth2/start") def oauth2_start(request: Request): """Kick off the Google consent flow (opened as a popup from the UI).""" from calendar_out import gcal try: url, _state = gcal.auth_url(_oauth_redirect_uri(request)) except Exception as e: # noqa: BLE001 not configured -> friendly page return HTMLResponse( f"

Google Calendar isn't " f"configured on this Space.
{e}

", status_code=503, ) return RedirectResponse(url) @app.get("/oauth2callback") def oauth2_callback(request: Request): """Google redirects here after consent. Exchange the code for a per-user token, hand it to the opener window (and localStorage), then close. The token is NOT stored server-side.""" code = request.query_params.get("code") if request.query_params.get("error") or not code: return HTMLResponse( "

Google connection cancelled. " "You can close this window.

" ) from calendar_out import gcal try: token_json = gcal.exchange_code( _oauth_redirect_uri(request), code, request.query_params.get("state", "") ) except Exception as e: # noqa: BLE001 return HTMLResponse( f"

Couldn't complete Google " f"sign-in.
{e}

" ) tok_js = json.dumps(token_json) # JS string literal of the token JSON return HTMLResponse( "" "" "

✅ Google Calendar connected. You can close this window.

" "" ) class TokenCheckBody(BaseModel): token: str @app.post("/oauth2/check") def oauth2_check(body: TokenCheckBody): """Liveness-check a browser-held Google token with one real API call (same-origin fetch from wireGcal on page load). POST so the token never lands in access logs; it is checked and discarded, never stored. 200 = definitive verdict; non-200 = indeterminate (client keeps its local shape-check state).""" from calendar_out import gcal try: gcal._client_config() # mirror /oauth2/start: friendly 503 when env unset except Exception as e: # noqa: BLE001 return JSONResponse( {"ok": False, "transient": True, "reason": str(e)}, status_code=503 ) res = gcal.check_token(body.token) out: dict = {"ok": res["ok"]} if res["ok"]: if res.get("refreshed_token"): out["token"] = res["refreshed_token"] else: out["reason"] = res.get("reason", "") out["transient"] = bool(res.get("transient")) return out # Register the @spaces.GPU functions at startup so ZeroGPU can schedule them. import server.model # noqa: E402,F401 demo = build_demo() # Serving mode, env-selected: # - "gradio": the HF *Gradio-SDK* / ZeroGPU platform manages the launch (a self-run # uvicorn gets SIGTERM'd there), so we call demo.launch(). /agent etc. aren't served. # - "uvicorn": mount gradio under FastAPI and serve UI + /agent + /ingest on one port. # Used locally and on the *Docker-SDK* GPU Space (Dockerfile sets SERVE=uvicorn). # Default: gradio on a Space unless told otherwise, uvicorn locally. _default_serve = "gradio" if (os.environ.get("SPACE_ID") or os.environ.get("SYSTEM") == "spaces") else "uvicorn" SERVE = os.environ.get("SERVE", _default_serve) # Gradio 6 applies theme/css at mount/launch time — the css set on gr.Blocks is # IGNORED when mounted, so pass it here or the custom UI renders as default Gradio. # # The `js=` load-function does NOT reliably execute on a *mounted* (uvicorn) app in # Gradio 6 — the carousel then sits on its first slide with dead arrows/dots. So we # inject the carousel script as a real inline ' # Status banner: fetch /health on load and reveal #status-banner if degraded # (e.g. real model on CPU-only hardware). Same inline-script pattern as the # carousel, since js= is unreliable on a mounted app; it polls for the element # because Gradio renders it client-side after . _BANNER_JS = ( "(function(){fetch('/health').then(function(r){return r.json();})" ".then(function(h){if(!h||!h.degraded){return;}(function s(){" "var b=document.getElementById('status-banner');" "if(!b){return setTimeout(s,400);}" "b.textContent='\\u26a0\\ufe0f '+(h.reason||'This Space needs a GPU.')+' \\u26a0\\ufe0f';" "b.style.display='block';})();}).catch(function(){});})();" ) _BANNER_INLINE = f'' @app.middleware("http") async def _inject_carousel_js(request, call_next): # noqa: ANN001 resp = await call_next(request) if request.url.path != "/" or "text/html" not in resp.headers.get("content-type", ""): return resp body = b"".join([chunk async for chunk in resp.body_iterator]) html = body.decode("utf-8", "ignore") if "cz-inline-js" not in html and "" in html: html = html.replace("", _CAROUSEL_INLINE + _BANNER_INLINE + "", 1) headers = dict(resp.headers) headers.pop("content-length", None) # body length changed; let Starlette recompute return _Response(content=html, status_code=resp.status_code, headers=headers, media_type="text/html") app = gr.mount_gradio_app( app, demo, path="/", ssr_mode=False, theme=THEME, css=CSS, js=CAROUSEL_JS, mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools ) if __name__ == "__main__": if SERVE == "gradio": demo.launch( server_name="0.0.0.0", server_port=7860, ssr_mode=False, theme=THEME, css=CSS, js=CAROUSEL_JS, mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools ) else: uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))