Spaces:
Sleeping
Sleeping
| """Space entrypoint: Gradio UI + FastAPI /ingest, served together on one port.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| import gradio as gr | |
| import uvicorn | |
| from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
| from pydantic import BaseModel | |
| from server import dedup, events, threads | |
| from server.pipeline import AgentRequest, AgentResponse, run_pipeline | |
| from ui.blocks import CAROUSEL_JS, CSS, THEME, build_demo | |
| INGEST_TOKEN = os.environ.get("INGEST_TOKEN", "") | |
| FEED_PATH = Path(os.environ.get("FEED_PATH", "/tmp/ingest_feed.json")) | |
| MAX_FEED = 200 | |
| # Opt-in: run the agent automatically on each new message (front-end A). Off by | |
| # default, so /ingest keeps its store-only behavior unless explicitly enabled. | |
| AUTONOMOUS = os.environ.get("AUTONOMOUS") == "1" | |
| # Which message direction triggers autonomous action: "outgoing" = only when YOU | |
| # send/accept an invite (is_from_me), "any" = any new message in the chat. | |
| TRIGGER_ON = os.environ.get("TRIGGER_ON", "outgoing").lower() | |
| app = FastAPI(title="iMessage Calendar Agent") | |
| class IngestMessage(BaseModel): | |
| chat: str | |
| sender: str | |
| text: str | |
| timestamp: str | |
| images: list[str] = [] # base64 data URIs of image attachments | |
| is_from_me: bool = False # True when YOU sent it (the send/accept trigger) | |
| class IngestBatch(BaseModel): | |
| messages: list[IngestMessage] | |
| def _load_feed() -> list[dict]: | |
| try: | |
| return json.loads(FEED_PATH.read_text()) | |
| except Exception: # noqa: BLE001 missing/corrupt -> empty | |
| return [] | |
| def _append_feed(items: list[dict]) -> None: | |
| feed = (_load_feed() + items)[-MAX_FEED:] | |
| FEED_PATH.write_text(json.dumps(feed, indent=2)) | |
| def _require_token(authorization: str) -> None: | |
| if not INGEST_TOKEN or authorization != f"Bearer {INGEST_TOKEN}": | |
| raise HTTPException(status_code=401, detail="bad token") | |
| def _run_autonomous(chats: set[str]) -> None: | |
| """For each affected chat, run the agent over its rolling thread and deliver | |
| only the genuinely-new events (deduped). Used when AUTONOMOUS=1. | |
| Order matters: extract WITHOUT pushing, dedup, then push only the fresh | |
| events. (Pushing inside the pipeline re-pushed already-captured events on | |
| every rolling-window re-run — the exact duplicate-creation dedup exists to | |
| prevent.)""" | |
| feed = _load_feed() | |
| for chat in chats: | |
| thread = threads.rolling_thread(feed, chat) | |
| if not thread: | |
| continue | |
| resp = run_pipeline(AgentRequest(thread=thread, push_gcal=False)) | |
| # Filter WITHOUT recording: events are only marked seen once the push | |
| # actually succeeds — recording first turns any transient push failure | |
| # into silent, permanent event loss (filtered out on every retry). | |
| new_events = dedup.filter_new(resp.plan.events, record=False) | |
| if not new_events: | |
| continue | |
| try: | |
| from calendar_out.gcal import push_events # lazy: google libs optional | |
| push_events(new_events) | |
| except Exception as e: # noqa: BLE001 push failure must not kill the loop | |
| events.emit("calendar", | |
| f"autonomous push failed (will retry next run): " | |
| f"{type(e).__name__}: {e}", | |
| level="error") | |
| continue # NOT marked seen -> retried on the next trigger | |
| dedup.mark_seen(new_events) | |
| events.emit( | |
| "decision", | |
| f"autonomous: {len(new_events)} new event(s) in {chat}", | |
| events=len(new_events), | |
| ) | |
| def agent(req: AgentRequest, authorization: str = Header(default="")): | |
| """Run the agent on a thread (or messages) and return an ActionPlan. | |
| The shared contract every front-end calls (iOS Shortcut, Android Tasker, the | |
| Mac collector). Stateless — see server/pipeline.run_pipeline. | |
| """ | |
| _require_token(authorization) | |
| return run_pipeline(req) | |
| def ingest(batch: IngestBatch, background_tasks: BackgroundTasks, | |
| authorization: str = Header(default="")): | |
| """Receive new messages from the Mac collector (bearer-token protected). | |
| Returns immediately — autonomous runs (full LLM inference, potentially | |
| minutes per chat) happen in a background task. Running them inline blew | |
| the collector's 30s POST timeout, which skipped _save_rowid and re-sent | |
| the same batch every poll (duplicate feed entries + duplicate runs).""" | |
| _require_token(authorization) | |
| items = [m.model_dump() for m in batch.messages] | |
| _append_feed(items) | |
| n_imgs = sum(len(m.images) for m in batch.messages) | |
| chats = sorted({m.chat for m in batch.messages}) | |
| events.emit("ingest", f"{len(items)} msg(s) from {', '.join(chats) or '—'}", images=n_imgs) | |
| if AUTONOMOUS: | |
| # Trigger on YOUR sent/accepted messages by default; "any" widens it. | |
| if TRIGGER_ON == "any": | |
| trigger_chats = set(chats) | |
| else: | |
| trigger_chats = {m.chat for m in batch.messages if m.is_from_me} | |
| if trigger_chats: | |
| background_tasks.add_task(_run_autonomous, trigger_chats) | |
| return {"received": len(items)} | |
| def health(): | |
| return {"ok": True} | |
| # --- Per-user Google Calendar OAuth (web flow) ----------------------------- # | |
| def _oauth_redirect_uri(request: Request) -> str: | |
| """Public redirect URI. On a Space, SPACE_HOST is the public host; locally, | |
| fall back to the request's base URL. Must match the Google client config.""" | |
| host = os.environ.get("SPACE_HOST", "").strip() | |
| base = f"https://{host}" if host else str(request.base_url).rstrip("/") | |
| return base.rstrip("/") + "/oauth2callback" | |
| def oauth2_start(request: Request): | |
| """Kick off the Google consent flow (opened as a popup from the UI).""" | |
| from calendar_out import gcal | |
| try: | |
| url, _state = gcal.auth_url(_oauth_redirect_uri(request)) | |
| except Exception as e: # noqa: BLE001 not configured -> friendly page | |
| return HTMLResponse( | |
| f"<p style='font-family:sans-serif;padding:24px'>Google Calendar isn't " | |
| f"configured on this Space.<br><small>{e}</small></p>", | |
| status_code=503, | |
| ) | |
| return RedirectResponse(url) | |
| def oauth2_callback(request: Request): | |
| """Google redirects here after consent. Exchange the code for a per-user token, | |
| hand it to the opener window (and localStorage), then close. The token is NOT | |
| stored server-side.""" | |
| code = request.query_params.get("code") | |
| if request.query_params.get("error") or not code: | |
| return HTMLResponse( | |
| "<p style='font-family:sans-serif;padding:24px'>Google connection cancelled. " | |
| "You can close this window.</p><script>setTimeout(()=>window.close(),500)</script>" | |
| ) | |
| from calendar_out import gcal | |
| try: | |
| token_json = gcal.exchange_code( | |
| _oauth_redirect_uri(request), code, request.query_params.get("state", "") | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| return HTMLResponse( | |
| f"<p style='font-family:sans-serif;padding:24px'>Couldn't complete Google " | |
| f"sign-in.<br><small>{e}</small></p>" | |
| ) | |
| tok_js = json.dumps(token_json) # JS string literal of the token JSON | |
| return HTMLResponse( | |
| "<!doctype html><meta charset=utf-8>" | |
| "<body style='font-family:sans-serif;padding:24px'>" | |
| "<p>✅ Google Calendar connected. You can close this window.</p>" | |
| "<script>try{var t=" + tok_js + ";localStorage.setItem('gcal_token',t);" | |
| "if(window.opener)window.opener.postMessage({gcal_token:t},location.origin);}" | |
| "catch(e){}setTimeout(function(){window.close();},800);</script></body>" | |
| ) | |
| class TokenCheckBody(BaseModel): | |
| token: str | |
| def oauth2_check(body: TokenCheckBody): | |
| """Liveness-check a browser-held Google token with one real API call | |
| (same-origin fetch from wireGcal on page load). POST so the token never | |
| lands in access logs; it is checked and discarded, never stored. | |
| 200 = definitive verdict; non-200 = indeterminate (client keeps its | |
| local shape-check state).""" | |
| from calendar_out import gcal | |
| try: | |
| gcal._client_config() # mirror /oauth2/start: friendly 503 when env unset | |
| except Exception as e: # noqa: BLE001 | |
| return JSONResponse( | |
| {"ok": False, "transient": True, "reason": str(e)}, status_code=503 | |
| ) | |
| res = gcal.check_token(body.token) | |
| out: dict = {"ok": res["ok"]} | |
| if res["ok"]: | |
| if res.get("refreshed_token"): | |
| out["token"] = res["refreshed_token"] | |
| else: | |
| out["reason"] = res.get("reason", "") | |
| out["transient"] = bool(res.get("transient")) | |
| return out | |
| # Register the @spaces.GPU functions at startup so ZeroGPU can schedule them. | |
| import server.model # noqa: E402,F401 | |
| demo = build_demo() | |
| # Serving mode, env-selected: | |
| # - "gradio": the HF *Gradio-SDK* / ZeroGPU platform manages the launch (a self-run | |
| # uvicorn gets SIGTERM'd there), so we call demo.launch(). /agent etc. aren't served. | |
| # - "uvicorn": mount gradio under FastAPI and serve UI + /agent + /ingest on one port. | |
| # Used locally and on the *Docker-SDK* GPU Space (Dockerfile sets SERVE=uvicorn). | |
| # Default: gradio on a Space unless told otherwise, uvicorn locally. | |
| _default_serve = "gradio" if (os.environ.get("SPACE_ID") or os.environ.get("SYSTEM") == "spaces") else "uvicorn" | |
| SERVE = os.environ.get("SERVE", _default_serve) | |
| # Gradio 6 applies theme/css at mount/launch time — the css set on gr.Blocks is | |
| # IGNORED when mounted, so pass it here or the custom UI renders as default Gradio. | |
| # | |
| # The `js=` load-function does NOT reliably execute on a *mounted* (uvicorn) app in | |
| # Gradio 6 — the carousel then sits on its first slide with dead arrows/dots. So we | |
| # inject the carousel script as a real inline <script> before </body> via middleware; | |
| # it self-bootstraps and its MutationObserver wires every .carousel once Gradio | |
| # client-renders the page. (The launch() path below still passes js= for ZeroGPU.) | |
| if SERVE == "uvicorn": | |
| from starlette.responses import Response as _Response | |
| _CAROUSEL_INLINE = f'<script id="cz-inline-js">({CAROUSEL_JS})();</script>' | |
| async def _inject_carousel_js(request, call_next): # noqa: ANN001 | |
| resp = await call_next(request) | |
| if request.url.path != "/" or "text/html" not in resp.headers.get("content-type", ""): | |
| return resp | |
| body = b"".join([chunk async for chunk in resp.body_iterator]) | |
| html = body.decode("utf-8", "ignore") | |
| if "cz-inline-js" not in html and "</body>" in html: | |
| html = html.replace("</body>", _CAROUSEL_INLINE + "</body>", 1) | |
| headers = dict(resp.headers) | |
| headers.pop("content-length", None) # body length changed; let Starlette recompute | |
| return _Response(content=html, status_code=resp.status_code, | |
| headers=headers, media_type="text/html") | |
| app = gr.mount_gradio_app( | |
| app, demo, path="/", ssr_mode=False, theme=THEME, css=CSS, js=CAROUSEL_JS, | |
| mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools | |
| ) | |
| if __name__ == "__main__": | |
| if SERVE == "gradio": | |
| demo.launch( | |
| server_name="0.0.0.0", server_port=7860, ssr_mode=False, | |
| theme=THEME, css=CSS, js=CAROUSEL_JS, | |
| mcp_server=True, # expose extract_events/make_ics/check_conflicts as MCP tools | |
| ) | |
| else: | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860"))) | |