Khanna, Videh Rakesh Rakesh
feat: JARVIS security, cross-device, intelligence & polish overhaul
e2a2dda | """JARVIS Cloud Database β MongoDB Atlas integration with local SQLite fallback.""" | |
| import os | |
| import json | |
| import sqlite3 | |
| from contextlib import contextmanager | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI", "") | |
| MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "jarvis") | |
| LOCAL_DB_PATH = Path(__file__).parent / "jarvis_cloud.db" | |
| _mongo_client = None | |
| _mongo_db = None | |
| _mongo_was_available = False # Tracks if Mongo was ever connected | |
| _fallback_writes: list[dict] = [] # Queued writes that happened during SQLite fallback | |
| async def get_mongo_db(): | |
| """Lazy-init async MongoDB connection.""" | |
| global _mongo_client, _mongo_db, _mongo_was_available | |
| if _mongo_db is not None: | |
| return _mongo_db | |
| if not MONGO_URI: | |
| return None | |
| try: | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| kwargs = {"serverSelectionTimeoutMS": 5000} | |
| try: | |
| import certifi | |
| kwargs["tlsCAFile"] = certifi.where() | |
| except ImportError: | |
| pass | |
| _mongo_client = AsyncIOMotorClient(MONGO_URI, **kwargs) | |
| # Verify connection | |
| await _mongo_client.admin.command("ping") | |
| _mongo_db = _mongo_client[MONGO_DB_NAME] | |
| _mongo_was_available = True | |
| print(f"[CloudDB] Connected to MongoDB Atlas: {MONGO_DB_NAME}") | |
| # Replay any queued fallback writes | |
| if _fallback_writes: | |
| print(f"[CloudDB] Replaying {len(_fallback_writes)} queued write(s) from SQLite fallback") | |
| for fw in _fallback_writes: | |
| try: | |
| collection = _mongo_db[fw["collection"]] | |
| await collection.update_one(fw["filter"], {"$set": fw["data"]}, upsert=True) | |
| except Exception: | |
| pass | |
| _fallback_writes.clear() | |
| return _mongo_db | |
| except Exception as e: | |
| if _mongo_was_available: | |
| print(f"[CloudDB] MongoDB disconnected ({e}), queuing writes for reconnection") | |
| else: | |
| print(f"[CloudDB] MongoDB unavailable ({e}), using local SQLite fallback") | |
| _mongo_db = None | |
| return None | |
| def _get_local_db(): | |
| """SQLite fallback for cloud collections.""" | |
| conn = sqlite3.connect(str(LOCAL_DB_PATH)) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| return conn | |
| def _local_db(): | |
| """Context manager that guarantees SQLite connection is closed.""" | |
| conn = _get_local_db() | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def _init_local_db(): | |
| conn = _get_local_db() | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS user_profiles ( | |
| user_id TEXT PRIMARY KEY, | |
| data TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')), | |
| updated_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS daily_routines ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| date TEXT NOT NULL, | |
| data TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')), | |
| updated_at TEXT DEFAULT (datetime('now')), | |
| UNIQUE(user_id, date) | |
| ); | |
| CREATE TABLE IF NOT EXISTS work_sessions ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| data TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')), | |
| updated_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_routines_user_date ON daily_routines(user_id, date); | |
| CREATE INDEX IF NOT EXISTS idx_sessions_user ON work_sessions(user_id); | |
| CREATE TABLE IF NOT EXISTS user_devices ( | |
| device_id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| alias TEXT NOT NULL, | |
| device_type TEXT NOT NULL DEFAULT 'computer', | |
| platform TEXT DEFAULT '', | |
| hostname TEXT DEFAULT '', | |
| last_ip TEXT DEFAULT '', | |
| status TEXT DEFAULT 'online', | |
| data TEXT DEFAULT '{}', | |
| created_at TEXT DEFAULT (datetime('now')), | |
| updated_at TEXT DEFAULT (datetime('now')), | |
| last_seen TEXT DEFAULT (datetime('now')), | |
| UNIQUE(user_id, alias) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_user_devices_user ON user_devices(user_id); | |
| CREATE TABLE IF NOT EXISTS device_command_queue ( | |
| cmd_id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| source_device_id TEXT NOT NULL, | |
| target_device_id TEXT NOT NULL, | |
| command TEXT NOT NULL, | |
| status TEXT DEFAULT 'pending', | |
| result TEXT DEFAULT '', | |
| created_at TEXT DEFAULT (datetime('now')), | |
| completed_at TEXT DEFAULT '' | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_cmd_queue_target ON device_command_queue(target_device_id, status); | |
| """) | |
| conn.commit() | |
| conn.close() | |
| _init_local_db() | |
| class CloudDB: | |
| """Unified interface β uses MongoDB Atlas when available, SQLite otherwise.""" | |
| # ββ User Profiles ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def save_profile(self, user_id: str, data: dict) -> dict: | |
| data["updated_at"] = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| data["user_id"] = user_id | |
| await db.user_profiles.update_one( | |
| {"user_id": user_id}, {"$set": data}, upsert=True | |
| ) | |
| else: | |
| # Write to local SQLite and queue for MongoDB reconciliation | |
| conn = _get_local_db() | |
| conn.execute( | |
| """INSERT INTO user_profiles (user_id, data, updated_at) | |
| VALUES (?, ?, datetime('now')) | |
| ON CONFLICT(user_id) DO UPDATE SET | |
| data=excluded.data, updated_at=datetime('now')""", | |
| (user_id, json.dumps(data)), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| if _mongo_was_available: | |
| _fallback_writes.append({ | |
| "collection": "user_profiles", | |
| "filter": {"user_id": user_id}, | |
| "data": {**data, "user_id": user_id}, | |
| }) | |
| return data | |
| async def get_profile(self, user_id: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| doc = await db.user_profiles.find_one( | |
| {"user_id": user_id}, {"_id": 0} | |
| ) | |
| return doc | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| "SELECT data FROM user_profiles WHERE user_id=?", (user_id,) | |
| ).fetchone() | |
| conn.close() | |
| return json.loads(row["data"]) if row else None | |
| # ββ Daily Routines βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def save_routine(self, user_id: str, date: str, data: dict) -> dict: | |
| data["user_id"] = user_id | |
| data["date"] = date | |
| data["updated_at"] = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.daily_routines.update_one( | |
| {"user_id": user_id, "date": date}, {"$set": data}, upsert=True | |
| ) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| """INSERT INTO daily_routines (user_id, date, data, updated_at) | |
| VALUES (?, ?, ?, datetime('now')) | |
| ON CONFLICT(user_id, date) DO UPDATE SET | |
| data=excluded.data, updated_at=datetime('now')""", | |
| (user_id, date, json.dumps(data)), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return data | |
| async def get_routine(self, user_id: str, date: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| doc = await db.daily_routines.find_one( | |
| {"user_id": user_id, "date": date}, {"_id": 0} | |
| ) | |
| return doc | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| "SELECT data FROM daily_routines WHERE user_id=? AND date=?", | |
| (user_id, date), | |
| ).fetchone() | |
| conn.close() | |
| return json.loads(row["data"]) if row else None | |
| async def get_recent_routines( | |
| self, user_id: str, limit: int = 7 | |
| ) -> list[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| cursor = ( | |
| db.daily_routines.find( | |
| {"user_id": user_id}, {"_id": 0} | |
| ) | |
| .sort("date", -1) | |
| .limit(limit) | |
| ) | |
| return await cursor.to_list(length=limit) | |
| else: | |
| conn = _get_local_db() | |
| rows = conn.execute( | |
| """SELECT data FROM daily_routines | |
| WHERE user_id=? ORDER BY date DESC LIMIT ?""", | |
| (user_id, limit), | |
| ).fetchall() | |
| conn.close() | |
| return [json.loads(r["data"]) for r in rows] | |
| # ββ Work Sessions ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def save_work_session( | |
| self, session_id: str, user_id: str, data: dict | |
| ) -> dict: | |
| data["id"] = session_id | |
| data["user_id"] = user_id | |
| data["updated_at"] = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.work_sessions.update_one( | |
| {"id": session_id}, {"$set": data}, upsert=True | |
| ) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| """INSERT INTO work_sessions (id, user_id, data, updated_at) | |
| VALUES (?, ?, ?, datetime('now')) | |
| ON CONFLICT(id) DO UPDATE SET | |
| data=excluded.data, updated_at=datetime('now')""", | |
| (session_id, user_id, json.dumps(data)), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return data | |
| async def get_work_session(self, session_id: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| doc = await db.work_sessions.find_one( | |
| {"id": session_id}, {"_id": 0} | |
| ) | |
| return doc | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| "SELECT data FROM work_sessions WHERE id=?", (session_id,) | |
| ).fetchone() | |
| conn.close() | |
| return json.loads(row["data"]) if row else None | |
| async def get_recent_sessions( | |
| self, user_id: str, limit: int = 10 | |
| ) -> list[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| cursor = ( | |
| db.work_sessions.find( | |
| {"user_id": user_id}, {"_id": 0} | |
| ) | |
| .sort("updated_at", -1) | |
| .limit(limit) | |
| ) | |
| return await cursor.to_list(length=limit) | |
| else: | |
| conn = _get_local_db() | |
| rows = conn.execute( | |
| """SELECT data FROM work_sessions | |
| WHERE user_id=? ORDER BY updated_at DESC LIMIT ?""", | |
| (user_id, limit), | |
| ).fetchall() | |
| conn.close() | |
| return [json.loads(r["data"]) for r in rows] | |
| async def get_active_session(self, user_id: str) -> Optional[dict]: | |
| """Get the currently in-progress work session.""" | |
| db = await get_mongo_db() | |
| if db is not None: | |
| doc = await db.work_sessions.find_one( | |
| {"user_id": user_id, "status": "in_progress"}, | |
| {"_id": 0}, | |
| sort=[("updated_at", -1)], | |
| ) | |
| return doc | |
| else: | |
| conn = _get_local_db() | |
| rows = conn.execute( | |
| "SELECT data FROM work_sessions WHERE user_id=? ORDER BY updated_at DESC", | |
| (user_id,), | |
| ).fetchall() | |
| conn.close() | |
| for r in rows: | |
| d = json.loads(r["data"]) | |
| if d.get("status") == "in_progress": | |
| return d | |
| return None | |
| # ββ User Devices βββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def save_user_device(self, device_id: str, user_id: str, data: dict) -> dict: | |
| data["device_id"] = device_id | |
| data["user_id"] = user_id | |
| data["updated_at"] = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.user_devices.update_one( | |
| {"device_id": device_id}, {"$set": data}, upsert=True | |
| ) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| """INSERT INTO user_devices (device_id, user_id, alias, device_type, | |
| platform, hostname, last_ip, status, data, last_seen, updated_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) | |
| ON CONFLICT(device_id) DO UPDATE SET | |
| alias=excluded.alias, device_type=excluded.device_type, | |
| platform=excluded.platform, hostname=excluded.hostname, | |
| last_ip=excluded.last_ip, status=excluded.status, | |
| data=excluded.data, updated_at=datetime('now')""", | |
| (device_id, user_id, data.get("alias", ""), | |
| data.get("device_type", "computer"), data.get("platform", ""), | |
| data.get("hostname", ""), data.get("last_ip", ""), | |
| data.get("status", "online"), json.dumps(data)), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return data | |
| async def get_user_device(self, device_id: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| return await db.user_devices.find_one({"device_id": device_id}, {"_id": 0}) | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| "SELECT data FROM user_devices WHERE device_id=?", (device_id,) | |
| ).fetchone() | |
| conn.close() | |
| return json.loads(row["data"]) if row else None | |
| async def get_user_devices(self, user_id: str) -> list[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| cursor = db.user_devices.find({"user_id": user_id}, {"_id": 0}) | |
| return await cursor.to_list(length=50) | |
| else: | |
| conn = _get_local_db() | |
| rows = conn.execute( | |
| "SELECT data FROM user_devices WHERE user_id=? ORDER BY alias", | |
| (user_id,), | |
| ).fetchall() | |
| conn.close() | |
| return [json.loads(r["data"]) for r in rows] | |
| async def find_user_device_by_alias(self, user_id: str, alias: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| return await db.user_devices.find_one( | |
| {"user_id": user_id, "alias": {"$regex": f"^{alias}$", "$options": "i"}}, | |
| {"_id": 0}, | |
| ) | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| "SELECT data FROM user_devices WHERE user_id=? AND LOWER(alias)=LOWER(?)", | |
| (user_id, alias), | |
| ).fetchone() | |
| conn.close() | |
| return json.loads(row["data"]) if row else None | |
| async def update_device_heartbeat(self, device_id: str): | |
| now = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.user_devices.update_one( | |
| {"device_id": device_id}, | |
| {"$set": {"status": "online", "last_seen": now, "updated_at": now}}, | |
| ) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| "UPDATE user_devices SET status='online', last_seen=datetime('now'), updated_at=datetime('now') WHERE device_id=?", | |
| (device_id,), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| async def remove_user_device(self, device_id: str): | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.user_devices.delete_one({"device_id": device_id}) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute("DELETE FROM user_devices WHERE device_id=?", (device_id,)) | |
| conn.commit() | |
| conn.close() | |
| # ββ Device Command Queue βββββββββββββββββββββββββββββββββββββββ | |
| async def enqueue_device_command(self, cmd_id: str, user_id: str, | |
| source_device_id: str, target_device_id: str, | |
| command: str) -> dict: | |
| now = datetime.now(timezone.utc).isoformat() | |
| data = { | |
| "cmd_id": cmd_id, "user_id": user_id, | |
| "source_device_id": source_device_id, | |
| "target_device_id": target_device_id, | |
| "command": command, "status": "pending", | |
| "result": "", "created_at": now, "completed_at": "", | |
| } | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.device_command_queue.insert_one(data) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| """INSERT INTO device_command_queue | |
| (cmd_id, user_id, source_device_id, target_device_id, command, status, created_at) | |
| VALUES (?, ?, ?, ?, ?, 'pending', ?)""", | |
| (cmd_id, user_id, source_device_id, target_device_id, command, now), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return data | |
| async def get_pending_commands(self, target_device_id: str) -> list[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| cursor = db.device_command_queue.find( | |
| {"target_device_id": target_device_id, "status": "pending"}, | |
| {"_id": 0}, | |
| ).sort("created_at", 1) | |
| return await cursor.to_list(length=50) | |
| else: | |
| conn = _get_local_db() | |
| rows = conn.execute( | |
| """SELECT cmd_id, user_id, source_device_id, target_device_id, | |
| command, status, result, created_at, completed_at | |
| FROM device_command_queue | |
| WHERE target_device_id=? AND status='pending' | |
| ORDER BY created_at""", | |
| (target_device_id,), | |
| ).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| async def complete_device_command(self, cmd_id: str, result: str) -> Optional[dict]: | |
| now = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.device_command_queue.update_one( | |
| {"cmd_id": cmd_id}, | |
| {"$set": {"status": "completed", "result": result, "completed_at": now}}, | |
| ) | |
| return await db.device_command_queue.find_one({"cmd_id": cmd_id}, {"_id": 0}) | |
| else: | |
| conn = _get_local_db() | |
| conn.execute( | |
| "UPDATE device_command_queue SET status='completed', result=?, completed_at=? WHERE cmd_id=?", | |
| (result, now, cmd_id), | |
| ) | |
| conn.commit() | |
| row = conn.execute( | |
| """SELECT cmd_id, user_id, source_device_id, target_device_id, | |
| command, status, result, created_at, completed_at | |
| FROM device_command_queue WHERE cmd_id=?""", | |
| (cmd_id,), | |
| ).fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| async def get_command_result(self, cmd_id: str) -> Optional[dict]: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| return await db.device_command_queue.find_one({"cmd_id": cmd_id}, {"_id": 0}) | |
| else: | |
| conn = _get_local_db() | |
| row = conn.execute( | |
| """SELECT cmd_id, user_id, source_device_id, target_device_id, | |
| command, status, result, created_at, completed_at | |
| FROM device_command_queue WHERE cmd_id=?""", | |
| (cmd_id,), | |
| ).fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| # ββ Memory Sync (cross-device memories) ββββββββββββββββββββββββ | |
| async def save_memory(self, user_id: str, category: str, key: str, value: str) -> dict: | |
| data = { | |
| "user_id": user_id, | |
| "category": category, | |
| "key": key, | |
| "value": value, | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| db = await get_mongo_db() | |
| if db is not None: | |
| await db.memories.update_one( | |
| {"user_id": user_id, "category": category, "key": key}, | |
| {"$set": data}, | |
| upsert=True, | |
| ) | |
| return data | |
| async def get_memories(self, user_id: str, category: str = None) -> list[dict]: | |
| db = await get_mongo_db() | |
| if db is None: | |
| return [] | |
| query = {"user_id": user_id} | |
| if category: | |
| query["category"] = category | |
| cursor = db.memories.find(query, {"_id": 0}).sort("updated_at", -1).limit(50) | |
| return await cursor.to_list(length=50) | |
| async def delete_memory(self, user_id: str, category: str, key: str) -> bool: | |
| db = await get_mongo_db() | |
| if db is not None: | |
| result = await db.memories.delete_one( | |
| {"user_id": user_id, "category": category, "key": key} | |
| ) | |
| return result.deleted_count > 0 | |
| return False | |
| # ββ Stale command cleanup ββββββββββββββββββββββββββββββββββββββ | |
| async def expire_stale_commands(self, max_age_seconds: int = 300) -> int: | |
| """Mark commands older than max_age_seconds as expired.""" | |
| cutoff = datetime.now(timezone.utc).isoformat() | |
| db = await get_mongo_db() | |
| if db is not None: | |
| from datetime import timedelta | |
| cutoff_dt = datetime.now(timezone.utc) - timedelta(seconds=max_age_seconds) | |
| result = await db.device_command_queue.update_many( | |
| {"status": "pending", "created_at": {"$lt": cutoff_dt.isoformat()}}, | |
| {"$set": {"status": "expired", "completed_at": cutoff}}, | |
| ) | |
| return result.modified_count | |
| else: | |
| from datetime import timedelta | |
| cutoff_dt = datetime.now(timezone.utc) - timedelta(seconds=max_age_seconds) | |
| conn = _get_local_db() | |
| cur = conn.execute( | |
| "UPDATE device_command_queue SET status='expired', completed_at=? WHERE status='pending' AND created_at < ?", | |
| (cutoff, cutoff_dt.isoformat()), | |
| ) | |
| count = cur.rowcount | |
| conn.commit() | |
| conn.close() | |
| return count | |