"""JARVIS Server — FastAPI + WebSocket for real-time AI interaction.""" # Hide Python dock icon — this must run before any other imports import sys as _sys if _sys.platform == "darwin": try: import AppKit AppKit.NSApplication.sharedApplication().setActivationPolicy_( AppKit.NSApplicationActivationPolicyProhibited # type: ignore[attr-defined] ) except Exception: pass import os import re import json import asyncio import secrets import logging from contextlib import asynccontextmanager logging.basicConfig(level=logging.INFO) _log = logging.getLogger("jarvis.server") _log.setLevel(logging.DEBUG) from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv load_dotenv() from memory import Memory from llm import SYSTEM_PROMPT, stream_response, get_active_backend, get_available_backends, FREE_MODELS, HF_FREE_MODELS from stm import apply_stm, AutoTune from tools import get_tools_prompt, execute_tool, TOOL_REGISTRY, register_macos_tools import tools.builtin # Register built-in tools import tools.system_control # System control tools import tools.user_tools # User profile, routines, work sessions import tools.vscode_tools # VS Code & Copilot integration import tools.device_control # Universal device control (BT, WiFi, MQTT, HomeKit, IR) import tools.app_automation # Deep app control (Spotify, Notes, Calendar, Mail, etc.) import tools.device_onboarding # User device registration & cross-device commands from user_profile import get_user_context, get_preferences, get_today_routine, get_work_history, get_active_work_session # Tag macOS-only tools so they get delegated to connected devices on Linux (HF Space) register_macos_tools() # ── Auth Token ──────────────────────────────────────────────────── # Set JARVIS_AUTH_TOKEN in .env for a fixed token; otherwise a random one is # generated each startup and printed to the console. AUTH_TOKEN = os.getenv("JARVIS_AUTH_TOKEN", "").strip() AUTH_ENABLED = os.getenv("JARVIS_AUTH_ENABLED", "true").strip().lower() in {"1", "true", "yes", "on"} if AUTH_ENABLED and not AUTH_TOKEN: AUTH_TOKEN = secrets.token_urlsafe(32) # Paths that don't require auth (health checks, static assets, login page) _PUBLIC_PATHS = {"/", "/api/status", "/api/auth/token"} _PUBLIC_PREFIXES = ("/static/",) @asynccontextmanager async def lifespan(app: FastAPI): memory = Memory() backends = get_available_backends() # Start background task for command expiry async def _expire_commands_loop(): from cloud_db import CloudDB cdb = CloudDB() while True: try: expired = await cdb.expire_stale_commands(max_age_seconds=300) if expired: _log.info(f"[CLEANUP] Expired {expired} stale command(s)") except Exception as e: _log.debug(f"Command expiry check failed: {e}") await asyncio.sleep(60) task = asyncio.create_task(_expire_commands_loop()) # Start background scheduler for proactive alerts from scheduler import start_scheduler, stop_scheduler scheduler_task = asyncio.create_task(start_scheduler()) print("\n" + "=" * 50) print(" J.A.R.V.I.S. Online") print(f" Backends: {', '.join(backends)}") print(f" Tools: {len(TOOL_REGISTRY)} loaded") print(f" STM: Active (hedge removal, direct mode)") print(f" AutoTune: Active (adaptive parameters)") print(f" URL: http://localhost:{os.getenv('JARVIS_PORT', '8000')}") if AUTH_ENABLED: print(f" Auth: ENABLED (token: {AUTH_TOKEN[:8]}...)") else: print(f" Auth: DISABLED") print("=" * 50 + "\n") yield task.cancel() stop_scheduler() scheduler_task.cancel() app = FastAPI(title="JARVIS", lifespan=lifespan) # ── CORS ───────────────────────────────────────────────────────── ALLOWED_ORIGINS = [o.strip() for o in os.getenv("JARVIS_CORS_ORIGINS", "http://localhost:8000").split(",")] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], ) # ── Allowed User IDs ───────────────────────────────────────────── _ALLOWED_USERS = {u.strip() for u in os.getenv("JARVIS_ALLOWED_USERS", "default").split(",")} def _validate_user_id(user_id: str) -> str: """Validate user_id against whitelist. Returns 'default' if invalid.""" if user_id in _ALLOWED_USERS: return user_id _log.warning(f"[AUTH] Rejected unknown user_id: {user_id!r}") return "default" def _check_auth(request: Request) -> bool: """Validate auth token from header, query param, or cookie.""" if not AUTH_ENABLED: return True path = request.url.path if path in _PUBLIC_PATHS or any(path.startswith(p) for p in _PUBLIC_PREFIXES): return True # Check Authorization header auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer ") and auth_header[7:] == AUTH_TOKEN: return True # Check query param (deprecated — tokens in URLs leak via logs/Referer) if request.query_params.get("token") == AUTH_TOKEN: _log.warning(f"[AUTH] Token passed via query param (deprecated) — use Bearer header or cookie instead. Path: {path}") return True # Check cookie if request.cookies.get("jarvis_token") == AUTH_TOKEN: return True return False # ── Rate Limiting ───────────────────────────────────────────────── import time as _time _rate_limit_store: dict[str, list[float]] = {} RATE_LIMIT_MAX = int(os.getenv("JARVIS_RATE_LIMIT", "30")) # requests per minute RATE_LIMIT_WINDOW = 60 # seconds _RATE_LIMITED_PREFIXES = ("/api/ask", "/ws", "/api/work", "/api/routine", "/api/auth") _AUTH_RATE_LIMIT_MAX = 5 # Stricter limit for auth endpoints (5 per minute) def _check_rate_limit(client_ip: str, path: str) -> bool: """Returns True if request should be allowed.""" if not any(path.startswith(p) for p in _RATE_LIMITED_PREFIXES): return True now = _time.time() if client_ip not in _rate_limit_store: _rate_limit_store[client_ip] = [now] return True # Purge old entries _rate_limit_store[client_ip] = [t for t in _rate_limit_store[client_ip] if now - t < RATE_LIMIT_WINDOW] # Remove empty IP entries to prevent memory leak if not _rate_limit_store[client_ip]: del _rate_limit_store[client_ip] return True # Stricter limit for auth endpoints (brute-force protection) limit = _AUTH_RATE_LIMIT_MAX if path.startswith("/api/auth") else RATE_LIMIT_MAX if len(_rate_limit_store[client_ip]) >= limit: return False _rate_limit_store[client_ip].append(now) return True @app.middleware("http") async def auth_middleware(request: Request, call_next): if not _check_auth(request): return JSONResponse({"error": "Unauthorized. Provide token via Bearer header, ?token= query, or jarvis_token cookie."}, status_code=401) client_ip = request.client.host if request.client else "unknown" if not _check_rate_limit(client_ip, request.url.path): return JSONResponse({"error": "Rate limit exceeded. Try again in a moment."}, status_code=429) response = await call_next(request) return response @app.post("/api/auth/token") async def auth_login(req: Request): """Validate token and set auth cookie.""" data = await req.json() token = data.get("token", "") if not AUTH_ENABLED: return {"ok": True} if token == AUTH_TOKEN: response = JSONResponse({"ok": True}) response.set_cookie("jarvis_token", AUTH_TOKEN, httponly=True, samesite="strict", max_age=86400 * 30) return response return JSONResponse({"error": "Invalid token"}, status_code=401) app.mount("/static", StaticFiles(directory="static"), name="static") import platform as _platform memory = Memory() def _get_os_context() -> str: """Auto-detect OS and provide context for the LLM.""" system = _platform.system() release = _platform.release() machine = _platform.machine() if system == "Darwin": os_name = "macOS" terminal_cmd = "Terminal.app (or iTerm2)" open_cmd = "open -a" shell = "zsh" elif system == "Windows": os_name = "Windows" terminal_cmd = "Command Prompt (cmd) or PowerShell" open_cmd = "start" shell = "PowerShell" else: os_name = f"Linux ({release})" terminal_cmd = "terminal emulator" open_cmd = "xdg-open" shell = "bash" return ( f"\n[SYSTEM CONTEXT]\n" f"Operating System: {os_name} ({release})\n" f"Architecture: {machine}\n" f"Default terminal: {terminal_cmd}\n" f"App launch command: {open_cmd}\n" f"Default shell: {shell}\n" f"IMPORTANT: When the user says 'open terminal' or 'open command prompt', " f"open the {terminal_cmd} using the open_terminal or open_app tool. " f"Adapt all system commands to {os_name}.\n" ) def _get_time_context() -> str: """Build time-aware context for smarter responses.""" from datetime import datetime now = datetime.now() hour = now.hour if 5 <= hour < 12: greeting_hint = "It's morning. Be energetic and proactive." elif 12 <= hour < 17: greeting_hint = "It's afternoon. Be focused and efficient." elif 17 <= hour < 21: greeting_hint = "It's evening. Be relaxed and wind-down oriented." else: greeting_hint = "It's nighttime. Be brief and quiet." return ( f"\n[TIME CONTEXT]\n" f"Current time: {now.strftime('%I:%M %p')}\n" f"Date: {now.strftime('%A, %B %d, %Y')}\n" f"{greeting_hint}\n" ) async def build_system_prompt(user_id: str = "default") -> str: user_ctx = await get_user_context(user_id) os_ctx = _get_os_context() time_ctx = _get_time_context() return SYSTEM_PROMPT.format( tools_prompt=get_tools_prompt(), memory_context=memory.get_context_summary() + user_ctx + os_ctx + time_ctx, ) def extract_tool_calls(text: str) -> list[dict]: """Extract tool calls from ```tool blocks in response. Handles format variations from different LLM backends: ```tool, ```Tool, ```TOOL, ```json, missing trailing newline, etc. Also catches inline JSON with "tool" key even without code fences. """ # Try progressively looser patterns patterns = [ r"```tool\s*\n(.*?)\n```", # exact: ```tool\n...\n``` r"```[Tt]ool\s*\n(.*?)```", # case-insensitive, no trailing newline r"```(?:tool|Tool|TOOL)\s*(.*?)```", # no newline after tag r'```json\s*\n(\{[^`]*?"tool"[^`]*?\})\s*```', # ```json with tool key r'```\s*\n(\{[^`]*?"tool"[^`]*?\})\s*```', # bare ``` with tool key r'```\s*(\{[^`]*?"tool"[^`]*?\})\s*```', # bare ``` no newline ] calls = [] for pattern in patterns: matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) for match in matches: try: data = json.loads(match.strip()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue if calls: break # Last resort: find bare JSON objects with "tool" key anywhere in text if not calls: for m in re.finditer(r'\{[^{}]*"tool"\s*:\s*"[^"]+"\s*,\s*"args"\s*:\s*\{[^{}]*\}\s*\}', text): try: data = json.loads(m.group()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue return calls @app.get("/", response_class=HTMLResponse) async def index(): with open("static/index.html", "r") as f: return f.read() @app.get("/api/status") async def status(): return { "status": "online", "backend": get_active_backend(), "backends": get_available_backends(), "tools": list(TOOL_REGISTRY.keys()), "free_models": FREE_MODELS, "hf_models": HF_FREE_MODELS, "stm": True, "autotune": True, } @app.post("/api/ask") async def ask_jarvis(req: Request): """Simple REST endpoint for Siri Shortcuts / Telegram / any client. POST {"message": "your question"} → {"response": "JARVIS answer"} """ from stm import apply_stm, AutoTune request = await req.json() user_msg = request.get("message", "") if not user_msg: return {"error": "No message provided"} if len(user_msg) > 10000: return {"error": "Message too long (max 10,000 characters)"} backend = request.get("backend", "auto") user_id = _validate_user_id(request.get("user_id", "default")) messages = [{"role": "user", "content": user_msg}] system = await build_system_prompt(user_id) params = AutoTune.get_params(user_msg) # Collect full response (with automatic fallback to free models) full_response = "" error_msg = None try: async for chunk in stream_response(messages, system, backend, params): full_response += chunk except Exception as e: error_msg = str(e) _log.info(f"[ASK] User: {user_msg!r}") _log.info(f"[ASK] Backend: {backend} | Raw LLM response ({len(full_response)} chars): {full_response[:500]!r}") # Fallback: if primary backend failed, retry with fallback chain if error_msg or "Error:" in full_response[:80] or "credit balance" in full_response.lower(): _log.warning(f"[ASK] Primary backend failed: {error_msg or full_response[:200]}") # Try fallback backends in order (excluding the one that already failed) fallback_backends = ["openrouter", "ollama", "huggingface", "hf-ollama"] fallback_backends = [b for b in fallback_backends if b != backend] for fb_backend in fallback_backends: full_response = "" try: async for chunk in stream_response(messages, system, fb_backend, params): full_response += chunk if full_response.strip() and "All models are busy" not in full_response: _log.info(f"[ASK] Fallback {fb_backend} succeeded ({len(full_response)} chars)") break except Exception: continue if not full_response.strip(): return {"error": error_msg or "All backends failed"} # Extract tool calls BEFORE STM (STM can corrupt ```tool blocks) tool_calls = extract_tool_calls(full_response) _log.info(f"[ASK] Extracted {len(tool_calls)} tool call(s): {[tc.get('tool') for tc in tool_calls]}") if not tool_calls and "tool" in full_response.lower() and "```" in full_response: _log.warning("Response contains 'tool' and code fences but no tool calls matched regex — possible format mismatch") _log.warning(f"Unparsed LLM response: {full_response!r}") # Apply STM to the non-tool-call text cleaned = apply_stm(full_response, aggressive=True) # Handle tool calls with multi-step chaining (max 3 rounds) MAX_TOOL_ROUNDS = 3 for round_num in range(MAX_TOOL_ROUNDS): if not tool_calls: break _log.info(f"[TOOL] Round {round_num + 1}: executing {[tc['tool'] for tc in tool_calls]}") tool_results = [] for tc in tool_calls: _log.info(f"[TOOL] Calling {tc['tool']}({tc.get('args', {})})") result = await execute_tool(tc["tool"], tc.get("args", {})) _log.info(f"[TOOL] {tc['tool']} returned: {str(result)[:300]!r}") tool_results.append(f"[{tc['tool']}]: {result}") memory.track_command("tool", tool_name=tc["tool"], query_text=user_msg, user_id=user_id) # Synthesize with tool results messages.append({"role": "assistant", "content": cleaned}) messages.append({"role": "user", "content": f"[Tool Results]\n" + "\n".join(tool_results) + "\n\nSynthesize. Be direct."}) synthesis = "" async for chunk in stream_response(messages, system, backend, params): synthesis += chunk # Check if synthesis contains MORE tool calls (chaining) tool_calls = extract_tool_calls(synthesis) cleaned = apply_stm(synthesis, aggressive=True) # Save to memory conv_id = memory.create_conversation("API") memory.add_message(conv_id, "user", user_msg) memory.add_message(conv_id, "assistant", cleaned) return {"response": cleaned} @app.get("/api/conversations") async def get_conversations(): return memory.get_conversations() @app.get("/api/memories") async def get_memories(): return memory.get_memories() # ── Cross-Device WebSocket Push ────────────────────────────────── # Connected device sockets: {device_id: WebSocket} _device_connections: dict[str, WebSocket] = {} async def push_command_to_device(device_id: str, command_data: dict) -> bool: """Push a command to a connected device via WebSocket. Returns True if delivered.""" ws = _device_connections.get(device_id) if ws is None: return False try: await ws.send_json({"type": "command", **command_data}) return True except Exception: _device_connections.pop(device_id, None) return False @app.websocket("/ws/device/{device_id}") async def device_websocket(ws: WebSocket, device_id: str): """WebSocket endpoint for devices to receive real-time commands.""" if AUTH_ENABLED: token = ws.query_params.get("token", "") cookie_token = ws.cookies.get("jarvis_token", "") if token != AUTH_TOKEN and cookie_token != AUTH_TOKEN: await ws.close(code=4001, reason="Unauthorized") return await ws.accept() _device_connections[device_id] = ws _log.info(f"[DEVICE-WS] Device {device_id} connected") try: # Send any pending commands immediately on connect from user_device_registry import get_pending_commands pending = await get_pending_commands(device_id) for cmd in pending: await ws.send_json({"type": "command", **cmd}) while True: data = await ws.receive_json() if data.get("type") == "heartbeat": from user_device_registry import update_heartbeat await update_heartbeat(device_id) elif data.get("type") == "command_result": from user_device_registry import complete_command await complete_command(data["cmd_id"], data.get("result", "")) elif data.get("type") == "ping": await ws.send_json({"type": "pong"}) except WebSocketDisconnect: pass except Exception as e: _log.warning(f"[DEVICE-WS] Device {device_id} error: {e}") finally: _device_connections.pop(device_id, None) _log.info(f"[DEVICE-WS] Device {device_id} disconnected") @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): # Auth check for WebSocket connections if AUTH_ENABLED: token = ws.query_params.get("token", "") cookie_token = ws.cookies.get("jarvis_token", "") if token != AUTH_TOKEN and cookie_token != AUTH_TOKEN: await ws.close(code=4001, reason="Unauthorized") return await ws.accept() conv_id = memory.create_conversation("New Chat") messages = [] stm_enabled = True ws_user_id = "default" try: while True: data = await ws.receive_json() if data.get("type") == "set_user_id": ws_user_id = _validate_user_id(data.get("user_id", "default")) continue if data.get("type") == "message": user_msg = data["content"] # Enforce message length limit (same as REST endpoint) if len(user_msg) > 10000: await ws.send_json({"type": "error", "content": "Message too long (max 10,000 characters)"}) continue backend = data.get("backend", "auto") stm_enabled = data.get("stm", True) ws_user_id = _validate_user_id(data.get("user_id", ws_user_id)) # Save user message memory.add_message(conv_id, "user", user_msg) messages.append({"role": "user", "content": user_msg}) # AutoTune — get optimal params for this query tune_params = AutoTune.get_params(user_msg) await ws.send_json({ "type": "autotune", "query_type": tune_params["type"], "temperature": tune_params["temperature"], }) # Build system prompt with tools and memory system = await build_system_prompt(ws_user_id) # Stream LLM response full_response = "" await ws.send_json({"type": "stream_start"}) try: async for chunk in stream_response(messages, system, backend, tune_params): full_response += chunk await ws.send_json({"type": "stream", "content": chunk}) except Exception as e: error_msg = f"LLM Error: {e}" await ws.send_json({"type": "error", "content": error_msg}) continue # Extract tool calls BEFORE STM (STM can corrupt ```tool blocks) tool_calls = extract_tool_calls(full_response) if not tool_calls and "tool" in full_response.lower() and "```" in full_response: _log.warning("WS: Response contains 'tool' and code fences but no tool calls matched regex") _log.debug(f"WS unparsed LLM response: {full_response!r}") # Apply STM post-processing if stm_enabled: cleaned = apply_stm(full_response, aggressive=True) if cleaned != full_response: # Send the cleaned version await ws.send_json({ "type": "stm_applied", "original_length": len(full_response), "cleaned_length": len(cleaned), }) full_response = cleaned # Handle tool calls with multi-step chaining (max 3 rounds) for round_num in range(3): if not tool_calls: break await ws.send_json({"type": "tool_start"}) tool_results = [] for tc in tool_calls: tool_name = tc["tool"] tool_args = tc.get("args", {}) await ws.send_json({ "type": "tool_exec", "tool": tool_name, "args": tool_args, }) result = await execute_tool(tool_name, tool_args) tool_results.append( f"[Tool: {tool_name}] Result:\n{result}" ) await ws.send_json({ "type": "tool_result", "tool": tool_name, "result": result, }) # Feed tool results back to LLM for synthesis messages.append({"role": "assistant", "content": full_response}) tool_context = "\n\n".join(tool_results) messages.append({ "role": "user", "content": f"[Tool Results]\n{tool_context}\n\nSynthesize these results into your response. Be direct.", }) # Stream the synthesis synthesis = "" await ws.send_json({"type": "stream_start"}) try: async for chunk in stream_response(messages, system, backend, tune_params): synthesis += chunk await ws.send_json({"type": "stream", "content": chunk}) except Exception as e: synthesis = f"Error synthesizing: {e}" # Check for more tool calls in synthesis (chaining) tool_calls = extract_tool_calls(synthesis) # Apply STM to synthesis too if stm_enabled: synthesis = apply_stm(synthesis, aggressive=True) full_response = synthesis messages.pop() messages.pop() # Save assistant response messages.append({"role": "assistant", "content": full_response}) memory.add_message(conv_id, "assistant", full_response) await ws.send_json({"type": "stream_end"}) # Keep conversation history manageable if len(messages) > 40: messages = messages[-30:] elif data.get("type") == "clear": messages = [] conv_id = memory.create_conversation("New Chat") await ws.send_json({"type": "cleared"}) except WebSocketDisconnect: pass except Exception as e: print(f"WebSocket error: {e}") # ── User Profile & Work Tracking API ────────────────────────────── @app.get("/api/profile") async def api_get_profile(): return await get_preferences() @app.put("/api/profile/name") async def api_set_name(req: Request): data = await req.json() name = data.get("name", "").strip() if not name: return JSONResponse({"error": "name is required"}, status_code=400) from user_profile import set_user_name profile = await set_user_name(name) return profile @app.put("/api/profile/preference") async def api_set_preference(req: Request): data = await req.json() key = data.get("key", "") value = data.get("value", "") if not key: return JSONResponse({"error": "key is required"}, status_code=400) from user_profile import set_preference profile = await set_preference(key, value) return profile @app.get("/api/routine") async def api_get_routine(): return await get_today_routine() @app.post("/api/routine") async def api_add_routine(req: Request): data = await req.json() activity = data.get("activity", "") if not activity: return JSONResponse({"error": "activity is required"}, status_code=400) from user_profile import add_routine_entry return await add_routine_entry(activity, data.get("time", "")) @app.get("/api/work/current") async def api_current_work(): session = await get_active_work_session() if session is None: return {"status": "no_active_session"} return session @app.get("/api/work/history") async def api_work_history(): return await get_work_history() @app.post("/api/work/start") async def api_start_work(req: Request): data = await req.json() title = data.get("title", "") if not title: return JSONResponse({"error": "title is required"}, status_code=400) from user_profile import start_work_session tags = [t.strip() for t in data.get("tags", "").split(",") if t.strip()] return await start_work_session(title, data.get("description", ""), tags) @app.post("/api/work/end") async def api_end_work(req: Request): data = await req.json() from user_profile import end_work_session return await end_work_session(data.get("summary", "")) # ── Deferred Task Queue API ────────────────────────────────────── @app.get("/api/tasks") async def api_get_tasks(): return memory.get_all_tasks(include_completed=False) @app.get("/api/tasks/pending") async def api_get_pending_tasks(): return memory.get_pending_tasks() @app.post("/api/tasks") async def api_add_task(req: Request): data = await req.json() title = data.get("title", "") if not title: return JSONResponse({"error": "title is required"}, status_code=400) task_id = memory.add_task( title=title, description=data.get("description", ""), task_type=data.get("task_type", "general"), priority=data.get("priority", 0), metadata=data.get("metadata"), ) return {"id": task_id, "title": title, "status": "pending"} @app.put("/api/tasks/{task_id}/status") async def api_update_task_status(task_id: int, req: Request): data = await req.json() status = data.get("status", "") if status not in ("pending", "in_progress", "completed"): return JSONResponse({"error": "Invalid status"}, status_code=400) memory.update_task_status(task_id, status) return {"id": task_id, "status": status} @app.delete("/api/tasks/{task_id}") async def api_delete_task(task_id: int): task = memory.get_task(task_id) if not task: return JSONResponse({"error": f"Task #{task_id} not found"}, status_code=404) memory.delete_task(task_id) return {"deleted": task_id} @app.get("/api/tasks/summary") async def api_tasks_summary(): return {"summary": memory.get_pending_tasks_summary()} # ── User Device Registration & Cross-Device Command API ─────────── @app.post("/api/devices/register") async def api_register_device(req: Request): """Register a user's personal device (laptop, phone, etc.) during onboarding.""" from user_device_registry import register_device data = await req.json() alias = data.get("alias", "").strip() if not alias: return JSONResponse({"error": "alias is required"}, status_code=400) device = await register_device( alias=alias, device_type=data.get("device_type", "computer"), device_id=data.get("device_id", ""), user_id=_validate_user_id(data.get("user_id", "default")), ) return device @app.get("/api/devices/mine") async def api_list_my_devices(req: Request): """List all registered devices for the current user.""" from user_device_registry import list_devices user_id = _validate_user_id(req.query_params.get("user_id", "default")) devices = await list_devices(user_id) return {"devices": devices} @app.post("/api/devices/heartbeat") async def api_device_heartbeat(req: Request): """Mark a device as online. Clients should call this every ~60s.""" from user_device_registry import update_heartbeat data = await req.json() device_id = data.get("device_id", "") if not device_id: return JSONResponse({"error": "device_id is required"}, status_code=400) await update_heartbeat(device_id) return {"status": "ok"} @app.post("/api/devices/command") async def api_send_device_command(req: Request): """Queue a command for execution on a target device (by alias).""" from user_device_registry import send_command_to_device data = await req.json() target_alias = data.get("target_alias", "").strip() command = data.get("command", "").strip() if not target_alias or not command: return JSONResponse( {"error": "target_alias and command are required"}, status_code=400 ) result = await send_command_to_device( target_alias=target_alias, command=command, source_device_id=data.get("source_device_id", ""), user_id=_validate_user_id(data.get("user_id", "default")), ) if "error" in result: return JSONResponse(result, status_code=404) return result @app.get("/api/devices/{device_id}/commands/pending") async def api_get_pending_commands(device_id: str): """Poll for pending commands targeting this device. Devices call this to pick up work.""" from user_device_registry import get_pending_commands commands = await get_pending_commands(device_id) return {"commands": commands} @app.post("/api/devices/commands/{cmd_id}/complete") async def api_complete_device_command(cmd_id: str, req: Request): """Report the result of an executed command.""" from user_device_registry import complete_command data = await req.json() result_text = data.get("result", "") record = await complete_command(cmd_id, result_text) if not record: return JSONResponse({"error": "command not found"}, status_code=404) return record @app.get("/api/devices/commands/{cmd_id}/status") async def api_command_status(cmd_id: str): """Check the status/result of a queued command.""" from user_device_registry import check_command_status record = await check_command_status(cmd_id) if not record: return JSONResponse({"error": "command not found"}, status_code=404) return record @app.delete("/api/devices/{device_id}") async def api_unregister_device(device_id: str): """Unregister a device.""" from user_device_registry import unregister_device removed = await unregister_device(device_id) if not removed: return JSONResponse({"error": "device not found"}, status_code=404) return {"deleted": device_id} @app.post("/api/device/heartbeat") async def api_device_heartbeat(req: Request): """Receive heartbeat from a device to mark it as online.""" data = await req.json() device_id = data.get("device_id", "") if not device_id: return JSONResponse({"error": "device_id is required"}, status_code=400) from user_device_registry import update_heartbeat await update_heartbeat(device_id) return {"ok": True} # ── Dashboard API ───────────────────────────────────────────────── @app.get("/api/dashboard") async def api_dashboard(): """Aggregated dashboard data: devices, analytics, tasks, memories.""" from user_device_registry import list_devices devices = await list_devices() analytics = memory.get_command_patterns() tasks = memory.get_pending_tasks() memories_list = memory.get_memories() conversations = memory.get_conversations(limit=5) # Connected devices (via WebSocket) connected = list(_device_connections.keys()) return { "devices": devices, "connected_device_ids": connected, "analytics": analytics, "pending_tasks": tasks, "memories": memories_list[:20], "recent_conversations": conversations, } @app.get("/dashboard", response_class=HTMLResponse) async def dashboard_page(): """Serve the dashboard page.""" dashboard_path = os.path.join("static", "dashboard.html") if os.path.exists(dashboard_path): with open(dashboard_path) as f: return f.read() return HTMLResponse("

Dashboard not found

", status_code=404) # ── Scheduler API (user-defined jobs) ──────────────────────────── @app.get("/api/scheduler/jobs") async def api_list_scheduled_jobs(): """List all user-defined scheduled jobs.""" from scheduler import list_scheduled_jobs return {"jobs": list_scheduled_jobs()} @app.post("/api/scheduler/jobs") async def api_add_scheduled_job(req: Request): """Add a new scheduled job. Body: {name, command, interval_seconds?, run_at?, repeat_daily?}""" from scheduler import add_scheduled_job data = await req.json() name = data.get("name", "").strip() command = data.get("command", "").strip() if not name or not command: return JSONResponse({"error": "name and command are required"}, status_code=400) job = add_scheduled_job( name=name, command=command, interval_seconds=data.get("interval_seconds", 0), run_at=data.get("run_at", ""), repeat_daily=data.get("repeat_daily", False), ) return job @app.delete("/api/scheduler/jobs/{job_id}") async def api_remove_scheduled_job(job_id: int): """Remove a scheduled job by ID.""" from scheduler import remove_scheduled_job remove_scheduled_job(job_id) return {"deleted": job_id} # ── Automation Rules API ───────────────────────────────────────── _automation_rules: list[dict] = [] @app.get("/api/automations") async def api_list_automations(): """List all automation rules (if-this-then-that).""" return {"rules": _automation_rules} @app.post("/api/automations") async def api_add_automation(req: Request): """Add an automation rule. Body: {name, trigger, condition, action} trigger: 'time', 'event', 'keyword' condition: depends on trigger type (e.g., time='09:00', keyword='meeting') action: JARVIS command to execute """ data = await req.json() rule = { "id": len(_automation_rules) + 1, "name": data.get("name", "Untitled Rule"), "trigger": data.get("trigger", ""), "condition": data.get("condition", ""), "action": data.get("action", ""), "enabled": data.get("enabled", True), } _automation_rules.append(rule) return rule @app.delete("/api/automations/{rule_id}") async def api_remove_automation(rule_id: int): """Remove an automation rule.""" global _automation_rules _automation_rules = [r for r in _automation_rules if r.get("id") != rule_id] return {"deleted": rule_id} # ── Settings API ───────────────────────────────────────────────── @app.get("/api/settings") async def api_get_settings(): """Get all JARVIS settings (read from environment/preferences).""" prefs = await get_preferences() return { "language": os.getenv("JARVIS_LANGUAGE", "en"), "tts_backend": os.getenv("JARVIS_TTS_BACKEND", "say"), "tts_voice": os.getenv("JARVIS_TTS_VOICE", "Daniel"), "tts_rate": int(os.getenv("JARVIS_TTS_RATE", "180")), "wake_mode": os.getenv("JARVIS_WAKE_MODE", "auto"), "idle_quit_minutes": int(os.getenv("JARVIS_IDLE_QUIT_MINUTES", "10")), "auth_enabled": AUTH_ENABLED, "cors_origins": ALLOWED_ORIGINS, "rate_limit": RATE_LIMIT_MAX, "backend": get_active_backend(), "backends_available": get_available_backends(), "tools_count": len(TOOL_REGISTRY), "user_preferences": prefs, } @app.put("/api/settings") async def api_update_settings(req: Request): """Update a user preference. Body: {key, value}""" data = await req.json() key = data.get("key", "").strip() value = data.get("value", "") if not key: return JSONResponse({"error": "key is required"}, status_code=400) from user_profile import save_preference await save_preference(key, str(value)) return {"key": key, "value": value, "saved": True} # ── Smart Context API ──────────────────────────────────────────── @app.get("/api/context") async def api_smart_context(): """Get smart context: time of day, location (if available), system state.""" import platform as _plat now = datetime.now() if 'datetime' not in dir() else __import__('datetime').datetime.now() from datetime import datetime as _dt now = _dt.now() hour = now.hour if 5 <= hour < 12: time_of_day = "morning" elif 12 <= hour < 17: time_of_day = "afternoon" elif 17 <= hour < 21: time_of_day = "evening" else: time_of_day = "night" # Get location (macOS CoreLocation via AppleScript) location = None if _plat.system() == "Darwin": try: import subprocess loc_result = subprocess.run( ["osascript", "-e", ''' use framework "CoreLocation" set mgr to current application's CLLocationManager's alloc()'s init() set loc to mgr's location() if loc is not missing value then set lat to loc's coordinate()'s latitude() as real set lon to loc's coordinate()'s longitude() as real return (lat as string) & "," & (lon as string) end if return "unavailable" '''], capture_output=True, text=True, timeout=5, ) loc_str = loc_result.stdout.strip() if loc_str and loc_str != "unavailable": parts = loc_str.split(",") if len(parts) == 2: location = {"lat": float(parts[0]), "lon": float(parts[1])} except Exception: pass # Get active work session work_session = await get_active_work_session() return { "time_of_day": time_of_day, "hour": hour, "date": now.strftime("%A, %B %d, %Y"), "timestamp": now.isoformat(), "location": location, "active_work_session": work_session.get("title") if work_session else None, "system": _plat.system(), } # ── Mount Gradio UI (for HF Spaces) ────────────────────────────── try: import gradio as gr from gradio_app import create_gradio_app gradio_demo = create_gradio_app() app = gr.mount_gradio_app(app, gradio_demo, path="/gradio") _log.info("Gradio UI mounted at /gradio") except ImportError: _log.info("Gradio not installed — /gradio UI unavailable (using static HTML only)") except Exception as e: _log.warning(f"Gradio mount failed: {e}") if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", os.getenv("JARVIS_PORT", "8000"))) uvicorn.run("server:app", host="0.0.0.0", port=port)