"""JARVIS Server — FastAPI + WebSocket for real-time AI interaction.""" # Hide Python dock icon — this must run before any other imports import sys as _sys if _sys.platform == "darwin": try: import AppKit AppKit.NSApplication.sharedApplication().setActivationPolicy_( AppKit.NSApplicationActivationPolicyProhibited # type: ignore[attr-defined] ) except Exception: pass import os import re import json import asyncio import secrets import logging from contextlib import asynccontextmanager logging.basicConfig(level=logging.INFO) _log = logging.getLogger("jarvis.server") _log.setLevel(logging.DEBUG) from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv load_dotenv() from memory import Memory from llm import SYSTEM_PROMPT, stream_response, get_active_backend, get_available_backends, FREE_MODELS, HF_FREE_MODELS from stm import apply_stm, AutoTune from tools import get_tools_prompt, execute_tool, TOOL_REGISTRY, register_macos_tools import tools.builtin # Register built-in tools import tools.system_control # System control tools import tools.user_tools # User profile, routines, work sessions import tools.vscode_tools # VS Code & Copilot integration import tools.device_control # Universal device control (BT, WiFi, MQTT, HomeKit, IR) import tools.app_automation # Deep app control (Spotify, Notes, Calendar, Mail, etc.) import tools.device_onboarding # User device registration & cross-device commands from user_profile import get_user_context, get_preferences, get_today_routine, get_work_history, get_active_work_session # Tag macOS-only tools so they get delegated to connected devices on Linux (HF Space) register_macos_tools() # ── Auth Token ──────────────────────────────────────────────────── # Set JARVIS_AUTH_TOKEN in .env for a fixed token; otherwise a random one is # generated each startup and printed to the console. AUTH_TOKEN = os.getenv("JARVIS_AUTH_TOKEN", "").strip() AUTH_ENABLED = os.getenv("JARVIS_AUTH_ENABLED", "true").strip().lower() in {"1", "true", "yes", "on"} if AUTH_ENABLED and not AUTH_TOKEN: AUTH_TOKEN = secrets.token_urlsafe(32) # Paths that don't require auth (health checks, static assets, login page) _PUBLIC_PATHS = {"/", "/api/status", "/api/auth/token"} _PUBLIC_PREFIXES = ("/static/",) @asynccontextmanager async def lifespan(app: FastAPI): memory = Memory() backends = get_available_backends() # Start background task for command expiry async def _expire_commands_loop(): from cloud_db import CloudDB cdb = CloudDB() while True: try: expired = await cdb.expire_stale_commands(max_age_seconds=300) if expired: _log.info(f"[CLEANUP] Expired {expired} stale command(s)") except Exception as e: _log.debug(f"Command expiry check failed: {e}") await asyncio.sleep(60) task = asyncio.create_task(_expire_commands_loop()) # Start background scheduler for proactive alerts from scheduler import start_scheduler, stop_scheduler scheduler_task = asyncio.create_task(start_scheduler()) print("\n" + "=" * 50) print(" J.A.R.V.I.S. Online") print(f" Backends: {', '.join(backends)}") print(f" Tools: {len(TOOL_REGISTRY)} loaded") print(f" STM: Active (hedge removal, direct mode)") print(f" AutoTune: Active (adaptive parameters)") print(f" URL: http://localhost:{os.getenv('JARVIS_PORT', '8000')}") if AUTH_ENABLED: print(f" Auth: ENABLED (token: {AUTH_TOKEN[:8]}...)") else: print(f" Auth: DISABLED") print("=" * 50 + "\n") yield task.cancel() stop_scheduler() scheduler_task.cancel() app = FastAPI(title="JARVIS", lifespan=lifespan) # ── CORS ───────────────────────────────────────────────────────── ALLOWED_ORIGINS = [o.strip() for o in os.getenv("JARVIS_CORS_ORIGINS", "http://localhost:8000").split(",")] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], ) # ── Allowed User IDs ───────────────────────────────────────────── _ALLOWED_USERS = {u.strip() for u in os.getenv("JARVIS_ALLOWED_USERS", "default").split(",")} def _validate_user_id(user_id: str) -> str: """Validate user_id against whitelist. Returns 'default' if invalid.""" if user_id in _ALLOWED_USERS: return user_id _log.warning(f"[AUTH] Rejected unknown user_id: {user_id!r}") return "default" def _check_auth(request: Request) -> bool: """Validate auth token from header, query param, or cookie.""" if not AUTH_ENABLED: return True path = request.url.path if path in _PUBLIC_PATHS or any(path.startswith(p) for p in _PUBLIC_PREFIXES): return True # Check Authorization header auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer ") and auth_header[7:] == AUTH_TOKEN: return True # Check query param (deprecated — tokens in URLs leak via logs/Referer) if request.query_params.get("token") == AUTH_TOKEN: _log.warning(f"[AUTH] Token passed via query param (deprecated) — use Bearer header or cookie instead. Path: {path}") return True # Check cookie if request.cookies.get("jarvis_token") == AUTH_TOKEN: return True return False # ── Rate Limiting ───────────────────────────────────────────────── import time as _time _rate_limit_store: dict[str, list[float]] = {} RATE_LIMIT_MAX = int(os.getenv("JARVIS_RATE_LIMIT", "30")) # requests per minute RATE_LIMIT_WINDOW = 60 # seconds _RATE_LIMITED_PREFIXES = ("/api/ask", "/ws", "/api/work", "/api/routine", "/api/auth") _AUTH_RATE_LIMIT_MAX = 5 # Stricter limit for auth endpoints (5 per minute) def _check_rate_limit(client_ip: str, path: str) -> bool: """Returns True if request should be allowed.""" if not any(path.startswith(p) for p in _RATE_LIMITED_PREFIXES): return True now = _time.time() if client_ip not in _rate_limit_store: _rate_limit_store[client_ip] = [now] return True # Purge old entries _rate_limit_store[client_ip] = [t for t in _rate_limit_store[client_ip] if now - t < RATE_LIMIT_WINDOW] # Remove empty IP entries to prevent memory leak if not _rate_limit_store[client_ip]: del _rate_limit_store[client_ip] return True # Stricter limit for auth endpoints (brute-force protection) limit = _AUTH_RATE_LIMIT_MAX if path.startswith("/api/auth") else RATE_LIMIT_MAX if len(_rate_limit_store[client_ip]) >= limit: return False _rate_limit_store[client_ip].append(now) return True @app.middleware("http") async def auth_middleware(request: Request, call_next): if not _check_auth(request): return JSONResponse({"error": "Unauthorized. Provide token via Bearer header, ?token= query, or jarvis_token cookie."}, status_code=401) client_ip = request.client.host if request.client else "unknown" if not _check_rate_limit(client_ip, request.url.path): return JSONResponse({"error": "Rate limit exceeded. Try again in a moment."}, status_code=429) response = await call_next(request) return response @app.post("/api/auth/token") async def auth_login(req: Request): """Validate token and set auth cookie.""" data = await req.json() token = data.get("token", "") if not AUTH_ENABLED: return {"ok": True} if token == AUTH_TOKEN: response = JSONResponse({"ok": True}) response.set_cookie("jarvis_token", AUTH_TOKEN, httponly=True, samesite="strict", max_age=86400 * 30) return response return JSONResponse({"error": "Invalid token"}, status_code=401) app.mount("/static", StaticFiles(directory="static"), name="static") import platform as _platform memory = Memory() def _get_os_context() -> str: """Auto-detect OS and provide context for the LLM.""" system = _platform.system() release = _platform.release() machine = _platform.machine() if system == "Darwin": os_name = "macOS" terminal_cmd = "Terminal.app (or iTerm2)" open_cmd = "open -a" shell = "zsh" elif system == "Windows": os_name = "Windows" terminal_cmd = "Command Prompt (cmd) or PowerShell" open_cmd = "start" shell = "PowerShell" else: os_name = f"Linux ({release})" terminal_cmd = "terminal emulator" open_cmd = "xdg-open" shell = "bash" return ( f"\n[SYSTEM CONTEXT]\n" f"Operating System: {os_name} ({release})\n" f"Architecture: {machine}\n" f"Default terminal: {terminal_cmd}\n" f"App launch command: {open_cmd}\n" f"Default shell: {shell}\n" f"IMPORTANT: When the user says 'open terminal' or 'open command prompt', " f"open the {terminal_cmd} using the open_terminal or open_app tool. " f"Adapt all system commands to {os_name}.\n" ) def _get_time_context() -> str: """Build time-aware context for smarter responses.""" from datetime import datetime now = datetime.now() hour = now.hour if 5 <= hour < 12: greeting_hint = "It's morning. Be energetic and proactive." elif 12 <= hour < 17: greeting_hint = "It's afternoon. Be focused and efficient." elif 17 <= hour < 21: greeting_hint = "It's evening. Be relaxed and wind-down oriented." else: greeting_hint = "It's nighttime. Be brief and quiet." return ( f"\n[TIME CONTEXT]\n" f"Current time: {now.strftime('%I:%M %p')}\n" f"Date: {now.strftime('%A, %B %d, %Y')}\n" f"{greeting_hint}\n" ) async def build_system_prompt(user_id: str = "default") -> str: user_ctx = await get_user_context(user_id) os_ctx = _get_os_context() time_ctx = _get_time_context() return SYSTEM_PROMPT.format( tools_prompt=get_tools_prompt(), memory_context=memory.get_context_summary() + user_ctx + os_ctx + time_ctx, ) def extract_tool_calls(text: str) -> list[dict]: """Extract tool calls from ```tool blocks in response. Handles format variations from different LLM backends: ```tool, ```Tool, ```TOOL, ```json, missing trailing newline, etc. Also catches inline JSON with "tool" key even without code fences. """ # Try progressively looser patterns patterns = [ r"```tool\s*\n(.*?)\n```", # exact: ```tool\n...\n``` r"```[Tt]ool\s*\n(.*?)```", # case-insensitive, no trailing newline r"```(?:tool|Tool|TOOL)\s*(.*?)```", # no newline after tag r'```json\s*\n(\{[^`]*?"tool"[^`]*?\})\s*```', # ```json with tool key r'```\s*\n(\{[^`]*?"tool"[^`]*?\})\s*```', # bare ``` with tool key r'```\s*(\{[^`]*?"tool"[^`]*?\})\s*```', # bare ``` no newline ] calls = [] for pattern in patterns: matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) for match in matches: try: data = json.loads(match.strip()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue if calls: break # Last resort: find bare JSON objects with "tool" key anywhere in text if not calls: for m in re.finditer(r'\{[^{}]*"tool"\s*:\s*"[^"]+"\s*,\s*"args"\s*:\s*\{[^{}]*\}\s*\}', text): try: data = json.loads(m.group()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue return calls @app.get("/", response_class=HTMLResponse) async def index(): with open("static/index.html", "r") as f: return f.read() @app.get("/api/status") async def status(): return { "status": "online", "backend": get_active_backend(), "backends": get_available_backends(), "tools": list(TOOL_REGISTRY.keys()), "free_models": FREE_MODELS, "hf_models": HF_FREE_MODELS, "stm": True, "autotune": True, } @app.post("/api/ask") async def ask_jarvis(req: Request): """Simple REST endpoint for Siri Shortcuts / Telegram / any client. POST {"message": "your question"} → {"response": "JARVIS answer"} """ from stm import apply_stm, AutoTune request = await req.json() user_msg = request.get("message", "") if not user_msg: return {"error": "No message provided"} if len(user_msg) > 10000: return {"error": "Message too long (max 10,000 characters)"} backend = request.get("backend", "auto") user_id = _validate_user_id(request.get("user_id", "default")) messages = [{"role": "user", "content": user_msg}] system = await build_system_prompt(user_id) params = AutoTune.get_params(user_msg) # Collect full response (with automatic fallback to free models) full_response = "" error_msg = None try: async for chunk in stream_response(messages, system, backend, params): full_response += chunk except Exception as e: error_msg = str(e) _log.info(f"[ASK] User: {user_msg!r}") _log.info(f"[ASK] Backend: {backend} | Raw LLM response ({len(full_response)} chars): {full_response[:500]!r}") # Fallback: if primary backend failed, retry with fallback chain if error_msg or "Error:" in full_response[:80] or "credit balance" in full_response.lower(): _log.warning(f"[ASK] Primary backend failed: {error_msg or full_response[:200]}") # Try fallback backends in order (excluding the one that already failed) fallback_backends = ["openrouter", "ollama", "huggingface", "hf-ollama"] fallback_backends = [b for b in fallback_backends if b != backend] for fb_backend in fallback_backends: full_response = "" try: async for chunk in stream_response(messages, system, fb_backend, params): full_response += chunk if full_response.strip() and "All models are busy" not in full_response: _log.info(f"[ASK] Fallback {fb_backend} succeeded ({len(full_response)} chars)") break except Exception: continue if not full_response.strip(): return {"error": error_msg or "All backends failed"} # Extract tool calls BEFORE STM (STM can corrupt ```tool blocks) tool_calls = extract_tool_calls(full_response) _log.info(f"[ASK] Extracted {len(tool_calls)} tool call(s): {[tc.get('tool') for tc in tool_calls]}") if not tool_calls and "tool" in full_response.lower() and "```" in full_response: _log.warning("Response contains 'tool' and code fences but no tool calls matched regex — possible format mismatch") _log.warning(f"Unparsed LLM response: {full_response!r}") # Apply STM to the non-tool-call text cleaned = apply_stm(full_response, aggressive=True) # Handle tool calls with multi-step chaining (max 3 rounds) MAX_TOOL_ROUNDS = 3 for round_num in range(MAX_TOOL_ROUNDS): if not tool_calls: break _log.info(f"[TOOL] Round {round_num + 1}: executing {[tc['tool'] for tc in tool_calls]}") tool_results = [] for tc in tool_calls: _log.info(f"[TOOL] Calling {tc['tool']}({tc.get('args', {})})") result = await execute_tool(tc["tool"], tc.get("args", {})) _log.info(f"[TOOL] {tc['tool']} returned: {str(result)[:300]!r}") tool_results.append(f"[{tc['tool']}]: {result}") memory.track_command("tool", tool_name=tc["tool"], query_text=user_msg, user_id=user_id) # Synthesize with tool results messages.append({"role": "assistant", "content": cleaned}) messages.append({"role": "user", "content": f"[Tool Results]\n" + "\n".join(tool_results) + "\n\nSynthesize. Be direct."}) synthesis = "" async for chunk in stream_response(messages, system, backend, params): synthesis += chunk # Check if synthesis contains MORE tool calls (chaining) tool_calls = extract_tool_calls(synthesis) cleaned = apply_stm(synthesis, aggressive=True) # Save to memory conv_id = memory.create_conversation("API") memory.add_message(conv_id, "user", user_msg) memory.add_message(conv_id, "assistant", cleaned) return {"response": cleaned} @app.get("/api/conversations") async def get_conversations(): return memory.get_conversations() @app.get("/api/memories") async def get_memories(): return memory.get_memories() # ── Cross-Device WebSocket Push ────────────────────────────────── # Connected device sockets: {device_id: WebSocket} _device_connections: dict[str, WebSocket] = {} async def push_command_to_device(device_id: str, command_data: dict) -> bool: """Push a command to a connected device via WebSocket. Returns True if delivered.""" ws = _device_connections.get(device_id) if ws is None: return False try: await ws.send_json({"type": "command", **command_data}) return True except Exception: _device_connections.pop(device_id, None) return False @app.websocket("/ws/device/{device_id}") async def device_websocket(ws: WebSocket, device_id: str): """WebSocket endpoint for devices to receive real-time commands.""" if AUTH_ENABLED: token = ws.query_params.get("token", "") cookie_token = ws.cookies.get("jarvis_token", "") if token != AUTH_TOKEN and cookie_token != AUTH_TOKEN: await ws.close(code=4001, reason="Unauthorized") return await ws.accept() _device_connections[device_id] = ws _log.info(f"[DEVICE-WS] Device {device_id} connected") try: # Send any pending commands immediately on connect from user_device_registry import get_pending_commands pending = await get_pending_commands(device_id) for cmd in pending: await ws.send_json({"type": "command", **cmd}) while True: data = await ws.receive_json() if data.get("type") == "heartbeat": from user_device_registry import update_heartbeat await update_heartbeat(device_id) elif data.get("type") == "command_result": from user_device_registry import complete_command await complete_command(data["cmd_id"], data.get("result", "")) elif data.get("type") == "ping": await ws.send_json({"type": "pong"}) except WebSocketDisconnect: pass except Exception as e: _log.warning(f"[DEVICE-WS] Device {device_id} error: {e}") finally: _device_connections.pop(device_id, None) _log.info(f"[DEVICE-WS] Device {device_id} disconnected") @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): # Auth check for WebSocket connections if AUTH_ENABLED: token = ws.query_params.get("token", "") cookie_token = ws.cookies.get("jarvis_token", "") if token != AUTH_TOKEN and cookie_token != AUTH_TOKEN: await ws.close(code=4001, reason="Unauthorized") return await ws.accept() conv_id = memory.create_conversation("New Chat") messages = [] stm_enabled = True ws_user_id = "default" try: while True: data = await ws.receive_json() if data.get("type") == "set_user_id": ws_user_id = _validate_user_id(data.get("user_id", "default")) continue if data.get("type") == "message": user_msg = data["content"] # Enforce message length limit (same as REST endpoint) if len(user_msg) > 10000: await ws.send_json({"type": "error", "content": "Message too long (max 10,000 characters)"}) continue backend = data.get("backend", "auto") stm_enabled = data.get("stm", True) ws_user_id = _validate_user_id(data.get("user_id", ws_user_id)) # Save user message memory.add_message(conv_id, "user", user_msg) messages.append({"role": "user", "content": user_msg}) # AutoTune — get optimal params for this query tune_params = AutoTune.get_params(user_msg) await ws.send_json({ "type": "autotune", "query_type": tune_params["type"], "temperature": tune_params["temperature"], }) # Build system prompt with tools and memory system = await build_system_prompt(ws_user_id) # Stream LLM response full_response = "" await ws.send_json({"type": "stream_start"}) try: async for chunk in stream_response(messages, system, backend, tune_params): full_response += chunk await ws.send_json({"type": "stream", "content": chunk}) except Exception as e: error_msg = f"LLM Error: {e}" await ws.send_json({"type": "error", "content": error_msg}) continue # Extract tool calls BEFORE STM (STM can corrupt ```tool blocks) tool_calls = extract_tool_calls(full_response) if not tool_calls and "tool" in full_response.lower() and "```" in full_response: _log.warning("WS: Response contains 'tool' and code fences but no tool calls matched regex") _log.debug(f"WS unparsed LLM response: {full_response!r}") # Apply STM post-processing if stm_enabled: cleaned = apply_stm(full_response, aggressive=True) if cleaned != full_response: # Send the cleaned version await ws.send_json({ "type": "stm_applied", "original_length": len(full_response), "cleaned_length": len(cleaned), }) full_response = cleaned # Handle tool calls with multi-step chaining (max 3 rounds) for round_num in range(3): if not tool_calls: break await ws.send_json({"type": "tool_start"}) tool_results = [] for tc in tool_calls: tool_name = tc["tool"] tool_args = tc.get("args", {}) await ws.send_json({ "type": "tool_exec", "tool": tool_name, "args": tool_args, }) result = await execute_tool(tool_name, tool_args) tool_results.append( f"[Tool: {tool_name}] Result:\n{result}" ) await ws.send_json({ "type": "tool_result", "tool": tool_name, "result": result, }) # Feed tool results back to LLM for synthesis messages.append({"role": "assistant", "content": full_response}) tool_context = "\n\n".join(tool_results) messages.append({ "role": "user", "content": f"[Tool Results]\n{tool_context}\n\nSynthesize these results into your response. Be direct.", }) # Stream the synthesis synthesis = "" await ws.send_json({"type": "stream_start"}) try: async for chunk in stream_response(messages, system, backend, tune_params): synthesis += chunk await ws.send_json({"type": "stream", "content": chunk}) except Exception as e: synthesis = f"Error synthesizing: {e}" # Check for more tool calls in synthesis (chaining) tool_calls = extract_tool_calls(synthesis) # Apply STM to synthesis too if stm_enabled: synthesis = apply_stm(synthesis, aggressive=True) full_response = synthesis messages.pop() messages.pop() # Save assistant response messages.append({"role": "assistant", "content": full_response}) memory.add_message(conv_id, "assistant", full_response) await ws.send_json({"type": "stream_end"}) # Keep conversation history manageable if len(messages) > 40: messages = messages[-30:] elif data.get("type") == "clear": messages = [] conv_id = memory.create_conversation("New Chat") await ws.send_json({"type": "cleared"}) except WebSocketDisconnect: pass except Exception as e: print(f"WebSocket error: {e}") # ── User Profile & Work Tracking API ────────────────────────────── @app.get("/api/profile") async def api_get_profile(): return await get_preferences() @app.put("/api/profile/name") async def api_set_name(req: Request): data = await req.json() name = data.get("name", "").strip() if not name: return JSONResponse({"error": "name is required"}, status_code=400) from user_profile import set_user_name profile = await set_user_name(name) return profile @app.put("/api/profile/preference") async def api_set_preference(req: Request): data = await req.json() key = data.get("key", "") value = data.get("value", "") if not key: return JSONResponse({"error": "key is required"}, status_code=400) from user_profile import set_preference profile = await set_preference(key, value) return profile @app.get("/api/routine") async def api_get_routine(): return await get_today_routine() @app.post("/api/routine") async def api_add_routine(req: Request): data = await req.json() activity = data.get("activity", "") if not activity: return JSONResponse({"error": "activity is required"}, status_code=400) from user_profile import add_routine_entry return await add_routine_entry(activity, data.get("time", "")) @app.get("/api/work/current") async def api_current_work(): session = await get_active_work_session() if session is None: return {"status": "no_active_session"} return session @app.get("/api/work/history") async def api_work_history(): return await get_work_history() @app.post("/api/work/start") async def api_start_work(req: Request): data = await req.json() title = data.get("title", "") if not title: return JSONResponse({"error": "title is required"}, status_code=400) from user_profile import start_work_session tags = [t.strip() for t in data.get("tags", "").split(",") if t.strip()] return await start_work_session(title, data.get("description", ""), tags) @app.post("/api/work/end") async def api_end_work(req: Request): data = await req.json() from user_profile import end_work_session return await end_work_session(data.get("summary", "")) # ── Deferred Task Queue API ────────────────────────────────────── @app.get("/api/tasks") async def api_get_tasks(): return memory.get_all_tasks(include_completed=False) @app.get("/api/tasks/pending") async def api_get_pending_tasks(): return memory.get_pending_tasks() @app.post("/api/tasks") async def api_add_task(req: Request): data = await req.json() title = data.get("title", "") if not title: return JSONResponse({"error": "title is required"}, status_code=400) task_id = memory.add_task( title=title, description=data.get("description", ""), task_type=data.get("task_type", "general"), priority=data.get("priority", 0), metadata=data.get("metadata"), ) return {"id": task_id, "title": title, "status": "pending"} @app.put("/api/tasks/{task_id}/status") async def api_update_task_status(task_id: int, req: Request): data = await req.json() status = data.get("status", "") if status not in ("pending", "in_progress", "completed"): return JSONResponse({"error": "Invalid status"}, status_code=400) memory.update_task_status(task_id, status) return {"id": task_id, "status": status} @app.delete("/api/tasks/{task_id}") async def api_delete_task(task_id: int): task = memory.get_task(task_id) if not task: return JSONResponse({"error": f"Task #{task_id} not found"}, status_code=404) memory.delete_task(task_id) return {"deleted": task_id} @app.get("/api/tasks/summary") async def api_tasks_summary(): return {"summary": memory.get_pending_tasks_summary()} # ── User Device Registration & Cross-Device Command API ─────────── @app.post("/api/devices/register") async def api_register_device(req: Request): """Register a user's personal device (laptop, phone, etc.) during onboarding.""" from user_device_registry import register_device data = await req.json() alias = data.get("alias", "").strip() if not alias: return JSONResponse({"error": "alias is required"}, status_code=400) device = await register_device( alias=alias, device_type=data.get("device_type", "computer"), device_id=data.get("device_id", ""), user_id=_validate_user_id(data.get("user_id", "default")), ) return device @app.get("/api/devices/mine") async def api_list_my_devices(req: Request): """List all registered devices for the current user.""" from user_device_registry import list_devices user_id = _validate_user_id(req.query_params.get("user_id", "default")) devices = await list_devices(user_id) return {"devices": devices} @app.post("/api/devices/heartbeat") async def api_device_heartbeat(req: Request): """Mark a device as online. Clients should call this every ~60s.""" from user_device_registry import update_heartbeat data = await req.json() device_id = data.get("device_id", "") if not device_id: return JSONResponse({"error": "device_id is required"}, status_code=400) await update_heartbeat(device_id) return {"status": "ok"} @app.post("/api/devices/command") async def api_send_device_command(req: Request): """Queue a command for execution on a target device (by alias).""" from user_device_registry import send_command_to_device data = await req.json() target_alias = data.get("target_alias", "").strip() command = data.get("command", "").strip() if not target_alias or not command: return JSONResponse( {"error": "target_alias and command are required"}, status_code=400 ) result = await send_command_to_device( target_alias=target_alias, command=command, source_device_id=data.get("source_device_id", ""), user_id=_validate_user_id(data.get("user_id", "default")), ) if "error" in result: return JSONResponse(result, status_code=404) return result @app.get("/api/devices/{device_id}/commands/pending") async def api_get_pending_commands(device_id: str): """Poll for pending commands targeting this device. Devices call this to pick up work.""" from user_device_registry import get_pending_commands commands = await get_pending_commands(device_id) return {"commands": commands} @app.post("/api/devices/commands/{cmd_id}/complete") async def api_complete_device_command(cmd_id: str, req: Request): """Report the result of an executed command.""" from user_device_registry import complete_command data = await req.json() result_text = data.get("result", "") record = await complete_command(cmd_id, result_text) if not record: return JSONResponse({"error": "command not found"}, status_code=404) return record @app.get("/api/devices/commands/{cmd_id}/status") async def api_command_status(cmd_id: str): """Check the status/result of a queued command.""" from user_device_registry import check_command_status record = await check_command_status(cmd_id) if not record: return JSONResponse({"error": "command not found"}, status_code=404) return record @app.delete("/api/devices/{device_id}") async def api_unregister_device(device_id: str): """Unregister a device.""" from user_device_registry import unregister_device removed = await unregister_device(device_id) if not removed: return JSONResponse({"error": "device not found"}, status_code=404) return {"deleted": device_id} @app.post("/api/device/heartbeat") async def api_device_heartbeat(req: Request): """Receive heartbeat from a device to mark it as online.""" data = await req.json() device_id = data.get("device_id", "") if not device_id: return JSONResponse({"error": "device_id is required"}, status_code=400) from user_device_registry import update_heartbeat await update_heartbeat(device_id) return {"ok": True} # ── Dashboard API ───────────────────────────────────────────────── @app.get("/api/dashboard") async def api_dashboard(): """Aggregated dashboard data: devices, analytics, tasks, memories.""" from user_device_registry import list_devices devices = await list_devices() analytics = memory.get_command_patterns() tasks = memory.get_pending_tasks() memories_list = memory.get_memories() conversations = memory.get_conversations(limit=5) # Connected devices (via WebSocket) connected = list(_device_connections.keys()) return { "devices": devices, "connected_device_ids": connected, "analytics": analytics, "pending_tasks": tasks, "memories": memories_list[:20], "recent_conversations": conversations, } @app.get("/dashboard", response_class=HTMLResponse) async def dashboard_page(): """Serve the dashboard page.""" dashboard_path = os.path.join("static", "dashboard.html") if os.path.exists(dashboard_path): with open(dashboard_path) as f: return f.read() return HTMLResponse("