OffGridSchedula / app.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
12.9 kB
"""Space entrypoint: Gradio UI + FastAPI /ingest, served together on one port."""
from __future__ import annotations
import json
import os
from pathlib import Path
import gradio as gr
import uvicorn
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from pydantic import BaseModel
from server import dedup, events, health, threads
from server.pipeline import AgentRequest, AgentResponse, run_pipeline
from ui.blocks import CAROUSEL_JS, CSS, THEME, build_demo
INGEST_TOKEN = os.environ.get("INGEST_TOKEN", "")
FEED_PATH = Path(os.environ.get("FEED_PATH", "/tmp/ingest_feed.json"))
MAX_FEED = 200
# Opt-in: run the agent automatically on each new message (front-end A). Off by
# default, so /ingest keeps its store-only behavior unless explicitly enabled.
AUTONOMOUS = os.environ.get("AUTONOMOUS") == "1"
# Which message direction triggers autonomous action: "outgoing" = only when YOU
# send/accept an invite (is_from_me), "any" = any new message in the chat.
TRIGGER_ON = os.environ.get("TRIGGER_ON", "outgoing").lower()
app = FastAPI(title="iMessage Calendar Agent")
class IngestMessage(BaseModel):
chat: str
sender: str
text: str
timestamp: str
images: list[str] = [] # base64 data URIs of image attachments
is_from_me: bool = False # True when YOU sent it (the send/accept trigger)
class IngestBatch(BaseModel):
messages: list[IngestMessage]
def _load_feed() -> list[dict]:
try:
return json.loads(FEED_PATH.read_text())
except Exception: # noqa: BLE001 missing/corrupt -> empty
return []
def _append_feed(items: list[dict]) -> None:
feed = (_load_feed() + items)[-MAX_FEED:]
FEED_PATH.write_text(json.dumps(feed, indent=2))
def _require_token(authorization: str) -> None:
if not INGEST_TOKEN or authorization != f"Bearer {INGEST_TOKEN}":
raise HTTPException(status_code=401, detail="bad token")
def _run_autonomous(chats: set[str]) -> None:
"""For each affected chat, run the agent over its rolling thread and deliver
only the genuinely-new events (deduped). Used when AUTONOMOUS=1.
Order matters: extract WITHOUT pushing, dedup, then push only the fresh
events. (Pushing inside the pipeline re-pushed already-captured events on
every rolling-window re-run — the exact duplicate-creation dedup exists to
prevent.)"""
feed = _load_feed()
for chat in chats:
thread = threads.rolling_thread(feed, chat)
if not thread:
continue
resp = run_pipeline(AgentRequest(thread=thread, push_gcal=False))
# Filter WITHOUT recording: events are only marked seen once the push
# actually succeeds — recording first turns any transient push failure
# into silent, permanent event loss (filtered out on every retry).
new_events = dedup.filter_new(resp.plan.events, record=False)
if not new_events:
continue
try:
from calendar_out.gcal import push_events # lazy: google libs optional
push_events(new_events)
except Exception as e: # noqa: BLE001 push failure must not kill the loop
events.emit("calendar",
f"autonomous push failed (will retry next run): "
f"{type(e).__name__}: {e}",
level="error")
continue # NOT marked seen -> retried on the next trigger
dedup.mark_seen(new_events)
events.emit(
"decision",
f"autonomous: {len(new_events)} new event(s) in {chat}",
events=len(new_events),
)
@app.post("/agent", response_model=AgentResponse)
def agent(req: AgentRequest, authorization: str = Header(default="")):
"""Run the agent on a thread (or messages) and return an ActionPlan.
The shared contract every front-end calls (iOS Shortcut, Android Tasker, the
Mac collector). Stateless — see server/pipeline.run_pipeline.
"""
_require_token(authorization)
return run_pipeline(req)
@app.post("/ingest")
def ingest(batch: IngestBatch, background_tasks: BackgroundTasks,
authorization: str = Header(default="")):
"""Receive new messages from the Mac collector (bearer-token protected).
Returns immediately — autonomous runs (full LLM inference, potentially
minutes per chat) happen in a background task. Running them inline blew
the collector's 30s POST timeout, which skipped _save_rowid and re-sent
the same batch every poll (duplicate feed entries + duplicate runs)."""
_require_token(authorization)
items = [m.model_dump() for m in batch.messages]
_append_feed(items)
n_imgs = sum(len(m.images) for m in batch.messages)
chats = sorted({m.chat for m in batch.messages})
events.emit("ingest", f"{len(items)} msg(s) from {', '.join(chats) or '—'}", images=n_imgs)
if AUTONOMOUS:
# Trigger on YOUR sent/accepted messages by default; "any" widens it.
if TRIGGER_ON == "any":
trigger_chats = set(chats)
else:
trigger_chats = {m.chat for m in batch.messages if m.is_from_me}
if trigger_chats:
background_tasks.add_task(_run_autonomous, trigger_chats)
return {"received": len(items)}
@app.get("/health")
def health_route():
# Liveness + hardware-adequacy (device/model/degraded/reason). The on-page
# status banner and the maintenance monitor both read this.
return health.health_status()
# --- Per-user Google Calendar OAuth (web flow) ----------------------------- #
def _oauth_redirect_uri(request: Request) -> str:
"""Public redirect URI. On a Space, SPACE_HOST is the public host; locally,
fall back to the request's base URL. Must match the Google client config."""
host = os.environ.get("SPACE_HOST", "").strip()
base = f"https://{host}" if host else str(request.base_url).rstrip("/")
return base.rstrip("/") + "/oauth2callback"
@app.get("/oauth2/start")
def oauth2_start(request: Request):
"""Kick off the Google consent flow (opened as a popup from the UI)."""
from calendar_out import gcal
try:
url, _state = gcal.auth_url(_oauth_redirect_uri(request))
except Exception as e: # noqa: BLE001 not configured -> friendly page
return HTMLResponse(
f"<p style='font-family:sans-serif;padding:24px'>Google Calendar isn't "
f"configured on this Space.<br><small>{e}</small></p>",
status_code=503,
)
return RedirectResponse(url)
@app.get("/oauth2callback")
def oauth2_callback(request: Request):
"""Google redirects here after consent. Exchange the code for a per-user token,
hand it to the opener window (and localStorage), then close. The token is NOT
stored server-side."""
code = request.query_params.get("code")
if request.query_params.get("error") or not code:
return HTMLResponse(
"<p style='font-family:sans-serif;padding:24px'>Google connection cancelled. "
"You can close this window.</p><script>setTimeout(()=>window.close(),500)</script>"
)
from calendar_out import gcal
try:
token_json = gcal.exchange_code(
_oauth_redirect_uri(request), code, request.query_params.get("state", "")
)
except Exception as e: # noqa: BLE001
return HTMLResponse(
f"<p style='font-family:sans-serif;padding:24px'>Couldn't complete Google "
f"sign-in.<br><small>{e}</small></p>"
)
tok_js = json.dumps(token_json) # JS string literal of the token JSON
return HTMLResponse(
"<!doctype html><meta charset=utf-8>"
"<body style='font-family:sans-serif;padding:24px'>"
"<p>✅ Google Calendar connected. You can close this window.</p>"
"<script>try{var t=" + tok_js + ";localStorage.setItem('gcal_token',t);"
"if(window.opener)window.opener.postMessage({gcal_token:t},location.origin);}"
"catch(e){}setTimeout(function(){window.close();},800);</script></body>"
)
class TokenCheckBody(BaseModel):
token: str
@app.post("/oauth2/check")
def oauth2_check(body: TokenCheckBody):
"""Liveness-check a browser-held Google token with one real API call
(same-origin fetch from wireGcal on page load). POST so the token never
lands in access logs; it is checked and discarded, never stored.
200 = definitive verdict; non-200 = indeterminate (client keeps its
local shape-check state)."""
from calendar_out import gcal
try:
gcal._client_config() # mirror /oauth2/start: friendly 503 when env unset
except Exception as e: # noqa: BLE001
return JSONResponse(
{"ok": False, "transient": True, "reason": str(e)}, status_code=503
)
res = gcal.check_token(body.token)
out: dict = {"ok": res["ok"]}
if res["ok"]:
if res.get("refreshed_token"):
out["token"] = res["refreshed_token"]
else:
out["reason"] = res.get("reason", "")
out["transient"] = bool(res.get("transient"))
return out
# Register the @spaces.GPU functions at startup so ZeroGPU can schedule them.
import server.model # noqa: E402,F401
demo = build_demo()
# Serving mode, env-selected:
# - "gradio": the HF *Gradio-SDK* / ZeroGPU platform manages the launch (a self-run
# uvicorn gets SIGTERM'd there), so we call demo.launch(). /agent etc. aren't served.
# - "uvicorn": mount gradio under FastAPI and serve UI + /agent + /ingest on one port.
# Used locally and on the *Docker-SDK* GPU Space (Dockerfile sets SERVE=uvicorn).
# Default: gradio on a Space unless told otherwise, uvicorn locally.
_default_serve = "gradio" if (os.environ.get("SPACE_ID") or os.environ.get("SYSTEM") == "spaces") else "uvicorn"
SERVE = os.environ.get("SERVE", _default_serve)
# Gradio 6 applies theme/css at mount/launch time — the css set on gr.Blocks is
# IGNORED when mounted, so pass it here or the custom UI renders as default Gradio.
#
# The `js=` load-function does NOT reliably execute on a *mounted* (uvicorn) app in
# Gradio 6 — the carousel then sits on its first slide with dead arrows/dots. So we
# inject the carousel script as a real inline <script> before </body> via middleware;
# it self-bootstraps and its MutationObserver wires every .carousel once Gradio
# client-renders the page. (The launch() path below still passes js= for ZeroGPU.)
if SERVE == "uvicorn":
from starlette.responses import Response as _Response
_CAROUSEL_INLINE = f'<script id="cz-inline-js">({CAROUSEL_JS})();</script>'
# Status banner: fetch /health on load and reveal #status-banner if degraded
# (e.g. real model on CPU-only hardware). Same inline-script pattern as the
# carousel, since js= is unreliable on a mounted app; it polls for the element
# because Gradio renders it client-side after </body>.
_BANNER_JS = (
"(function(){fetch('/health').then(function(r){return r.json();})"
".then(function(h){if(!h||!h.degraded){return;}(function s(){"
"var b=document.getElementById('status-banner');"
"if(!b){return setTimeout(s,400);}"
"b.textContent='\\u26a0\\ufe0f '+(h.reason||'This Space needs a GPU.')+' \\u26a0\\ufe0f';"
"b.style.display='block';})();}).catch(function(){});})();"
)
_BANNER_INLINE = f'<script id="cz-banner-js">{_BANNER_JS}</script>'
@app.middleware("http")
async def _inject_carousel_js(request, call_next): # noqa: ANN001
resp = await call_next(request)
if request.url.path != "/" or "text/html" not in resp.headers.get("content-type", ""):
return resp
body = b"".join([chunk async for chunk in resp.body_iterator])
html = body.decode("utf-8", "ignore")
if "cz-inline-js" not in html and "</body>" in html:
html = html.replace("</body>", _CAROUSEL_INLINE + _BANNER_INLINE + "</body>", 1)
headers = dict(resp.headers)
headers.pop("content-length", None) # body length changed; let Starlette recompute
return _Response(content=html, status_code=resp.status_code,
headers=headers, media_type="text/html")
app = gr.mount_gradio_app(
app, demo, path="/", ssr_mode=False, theme=THEME, css=CSS, js=CAROUSEL_JS,
mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools
)
if __name__ == "__main__":
if SERVE == "gradio":
demo.launch(
server_name="0.0.0.0", server_port=7860, ssr_mode=False,
theme=THEME, css=CSS, js=CAROUSEL_JS,
mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools
)
else:
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))