Paritosh Upadhyay
Neural Core: Cloud Anchor
5196bf2
"""
Project Jarvis β€” Conversation Context Manager
Maintains session state, conversation history, and contextual awareness.
"""
import time
import logging
import subprocess
import os
from datetime import datetime
from typing import Optional, Dict, Any
logger = logging.getLogger("friday.context")
# ── Constants ────────────────────────────────────────────────
MAX_TURNS = 20 # Maximum conversation turns to keep
SESSION_TIMEOUT = 1800 # 30 minutes of inactivity = new session
MAX_SESSIONS = 10 # Max concurrent sessions
# ── Session Storage ──────────────────────────────────────────
_sessions: dict[str, dict] = {}
_default_session_id: str = "default"
# ── Session Management ───────────────────────────────────────
def get_or_create_session(session_id: Optional[str] = None) -> str:
"""Get existing session or create a new one."""
if session_id is None:
session_id = _default_session_id
if session_id not in _sessions:
_sessions[session_id] = {
"id": session_id,
"turns": [],
"created_at": datetime.now(),
"last_active": datetime.now(),
"metadata": {},
}
logger.info(f"New session created: {session_id}")
else:
# Check for timeout
session = _sessions[session_id]
elapsed = (datetime.now() - session["last_active"]).total_seconds()
if elapsed > SESSION_TIMEOUT:
logger.info(f"Session {session_id} timed out after {elapsed:.0f}s, creating new one")
_sessions[session_id] = {
"id": session_id,
"turns": [],
"created_at": datetime.now(),
"last_active": datetime.now(),
"metadata": {},
}
_sessions[session_id]["last_active"] = datetime.now()
_cleanup_sessions()
return session_id
def get_session(session_id: Optional[str] = None) -> dict:
"""Get the full session data object."""
sid = get_or_create_session(session_id)
return _sessions.get(sid, {})
def add_turn(session_id: str, role: str, content: str, intent: Optional[str] = None):
"""Add a conversation turn to the session."""
session_id = get_or_create_session(session_id)
session = _sessions[session_id]
turn = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"intent": intent,
}
session["turns"].append(turn)
session["last_active"] = datetime.now()
if len(session["turns"]) > MAX_TURNS:
session["turns"] = session["turns"][-MAX_TURNS:]
logger.debug(f"Session {session_id}: added {role} turn ({len(session['turns'])} total)")
def get_conversation_messages(session_id: str, max_turns: Optional[int] = None) -> list[dict]:
"""Get conversation history formatted for LLM consumption."""
session = _sessions.get(session_id)
if not session:
return []
turns = session["turns"]
if max_turns:
turns = turns[-max_turns:]
return [{"role": t["role"], "content": t["content"]} for t in turns]
def get_last_entities(session_id: str) -> dict:
"""Get entities stored in metadata."""
session = _sessions.get(session_id)
if not session:
return {}
return session.get("metadata", {}).get("entities", {})
def set_metadata(session_id: str, key: str, value):
"""Store metadata in the session (e.g., last-mentioned person, topic)."""
session = _sessions.get(session_id)
if session:
session["metadata"][key] = value
def is_silent_mode(session_id: str) -> bool:
"""Check if the session is currently in silent mode (Shut up protocol)."""
session = _sessions.get(session_id)
if not session:
return False
return session.get("metadata", {}).get("silent_mode", False)
def clear_session(session_id: str):
"""Clear a session's history."""
if session_id in _sessions:
_sessions[session_id]["turns"] = []
_sessions[session_id]["metadata"] = {}
def _cleanup_sessions():
"""Remove expired sessions."""
now = datetime.now()
expired = [
sid for sid, session in _sessions.items()
if (now - session["last_active"]).total_seconds() > SESSION_TIMEOUT * 2
]
for sid in expired:
del _sessions[sid]
# ── Deep Situational Context ────────────────────────────────
def get_situational_context(session_id: str) -> Dict[str, Any]:
"""
Gathers deep situational telemetry from the macOS system.
"""
telemetry = {
"timestamp": datetime.now().isoformat(),
"active_app": "Unknown",
"system_load": "Normal",
"battery": "Unknown",
"metadata": _sessions.get(session_id, {}).get("metadata", {})
}
try:
# 1. Get Active App
from app.services.tools import macos
app_info = macos.get_active_app_tool()
telemetry["active_app"] = app_info
# 2. Get System Load
load1, load5, load15 = os.getloadavg()
telemetry["system_load"] = f"1m: {round(load1,2)}, 5m: {round(load5,2)}"
# 3. Get Battery
batt = subprocess.check_output(["pmset", "-g", "batt"]).decode()
if "InternalBattery" in batt:
telemetry["battery"] = batt.split("\n")[1].strip()
except Exception:
pass
return telemetry
def enrich_with_context(user_query: str, session_id: str) -> str:
"""Compiles all context into a single prompt injection."""
from app.services import summarizer, holocron
sit = get_situational_context(session_id)
context_str = f"--- SITUATIONAL AWARENESS ---\n"
context_str += f"Current Time: {sit['timestamp']}\n"
context_str += f"Active Environment: {sit['active_app']}\n"
# 1. Morning Briefing (First contact of the day)
# Placeholder for 'first of day' check - in production we'd use a flag or timestamp delta
if session_id == "default" and len(_sessions.get(session_id, {}).get("turns", [])) < 2:
recap = summarizer.get_morning_briefing_context()
context_str += f"\n[MORNING BRIEFING]\n{recap}\n"
# 2. Deep Memory (Episodic Recall)
from app.services import memory
episodes = memory.recall_memory(user_query, n_results=5)
if episodes:
context_str += f"\n--- GUARDIAN'S EPISODIC PATTERNS ---\n{episodes}\n"
# 3. Holocron Relational Knowledge
related = holocron.get_relevant_knowledge(user_query)
if related:
context_str += "\n--- SOVEREIGN KNOWLEDGE ---\n"
for r in related:
context_str += f"[{r['category']}] {r['name']}: {r['content']}\n"
context_str += f"\nSystem Load: {sit['system_load']}\n"
context_str += f"Battery/Power: {sit['battery']}\n"
entities = sit["metadata"].get("entities", {})
if entities:
context_str += f"\n--- ACTIVE ENTITIES ---\n"
for k, v in entities.items():
if k != "pending_knowledge_card":
context_str += f"{k}: {v}\n"
return context_str
# ── Status ───────────────────────────────────────────────────
def get_status() -> dict:
return {
"active_sessions": len(_sessions),
"total_turns": sum(len(s["turns"]) for s in _sessions.values()),
}