| """ |
| Project Jarvis β Conversation Context Manager |
| Maintains session state, conversation history, and contextual awareness. |
| """ |
|
|
| import time |
| import logging |
| import subprocess |
| import os |
| from datetime import datetime |
| from typing import Optional, Dict, Any |
|
|
| logger = logging.getLogger("friday.context") |
|
|
| |
|
|
| MAX_TURNS = 20 |
| SESSION_TIMEOUT = 1800 |
| MAX_SESSIONS = 10 |
|
|
|
|
| |
|
|
| _sessions: dict[str, dict] = {} |
| _default_session_id: str = "default" |
|
|
|
|
| |
|
|
| def get_or_create_session(session_id: Optional[str] = None) -> str: |
| """Get existing session or create a new one.""" |
| if session_id is None: |
| session_id = _default_session_id |
|
|
| if session_id not in _sessions: |
| _sessions[session_id] = { |
| "id": session_id, |
| "turns": [], |
| "created_at": datetime.now(), |
| "last_active": datetime.now(), |
| "metadata": {}, |
| } |
| logger.info(f"New session created: {session_id}") |
| else: |
| |
| session = _sessions[session_id] |
| elapsed = (datetime.now() - session["last_active"]).total_seconds() |
| if elapsed > SESSION_TIMEOUT: |
| logger.info(f"Session {session_id} timed out after {elapsed:.0f}s, creating new one") |
| _sessions[session_id] = { |
| "id": session_id, |
| "turns": [], |
| "created_at": datetime.now(), |
| "last_active": datetime.now(), |
| "metadata": {}, |
| } |
|
|
| _sessions[session_id]["last_active"] = datetime.now() |
| _cleanup_sessions() |
| return session_id |
|
|
|
|
| def get_session(session_id: Optional[str] = None) -> dict: |
| """Get the full session data object.""" |
| sid = get_or_create_session(session_id) |
| return _sessions.get(sid, {}) |
|
|
|
|
| def add_turn(session_id: str, role: str, content: str, intent: Optional[str] = None): |
| """Add a conversation turn to the session.""" |
| session_id = get_or_create_session(session_id) |
| session = _sessions[session_id] |
|
|
| turn = { |
| "role": role, |
| "content": content, |
| "timestamp": datetime.now().isoformat(), |
| "intent": intent, |
| } |
|
|
| session["turns"].append(turn) |
| session["last_active"] = datetime.now() |
|
|
| if len(session["turns"]) > MAX_TURNS: |
| session["turns"] = session["turns"][-MAX_TURNS:] |
|
|
| logger.debug(f"Session {session_id}: added {role} turn ({len(session['turns'])} total)") |
|
|
|
|
| def get_conversation_messages(session_id: str, max_turns: Optional[int] = None) -> list[dict]: |
| """Get conversation history formatted for LLM consumption.""" |
| session = _sessions.get(session_id) |
| if not session: |
| return [] |
|
|
| turns = session["turns"] |
| if max_turns: |
| turns = turns[-max_turns:] |
|
|
| return [{"role": t["role"], "content": t["content"]} for t in turns] |
|
|
|
|
| def get_last_entities(session_id: str) -> dict: |
| """Get entities stored in metadata.""" |
| session = _sessions.get(session_id) |
| if not session: |
| return {} |
| return session.get("metadata", {}).get("entities", {}) |
|
|
|
|
| def set_metadata(session_id: str, key: str, value): |
| """Store metadata in the session (e.g., last-mentioned person, topic).""" |
| session = _sessions.get(session_id) |
| if session: |
| session["metadata"][key] = value |
|
|
| def is_silent_mode(session_id: str) -> bool: |
| """Check if the session is currently in silent mode (Shut up protocol).""" |
| session = _sessions.get(session_id) |
| if not session: |
| return False |
| return session.get("metadata", {}).get("silent_mode", False) |
|
|
|
|
| def clear_session(session_id: str): |
| """Clear a session's history.""" |
| if session_id in _sessions: |
| _sessions[session_id]["turns"] = [] |
| _sessions[session_id]["metadata"] = {} |
|
|
|
|
| def _cleanup_sessions(): |
| """Remove expired sessions.""" |
| now = datetime.now() |
| expired = [ |
| sid for sid, session in _sessions.items() |
| if (now - session["last_active"]).total_seconds() > SESSION_TIMEOUT * 2 |
| ] |
| for sid in expired: |
| del _sessions[sid] |
|
|
|
|
| |
|
|
| def get_situational_context(session_id: str) -> Dict[str, Any]: |
| """ |
| Gathers deep situational telemetry from the macOS system. |
| """ |
| telemetry = { |
| "timestamp": datetime.now().isoformat(), |
| "active_app": "Unknown", |
| "system_load": "Normal", |
| "battery": "Unknown", |
| "metadata": _sessions.get(session_id, {}).get("metadata", {}) |
| } |
|
|
| try: |
| |
| from app.services.tools import macos |
| app_info = macos.get_active_app_tool() |
| telemetry["active_app"] = app_info |
|
|
| |
| load1, load5, load15 = os.getloadavg() |
| telemetry["system_load"] = f"1m: {round(load1,2)}, 5m: {round(load5,2)}" |
|
|
| |
| batt = subprocess.check_output(["pmset", "-g", "batt"]).decode() |
| if "InternalBattery" in batt: |
| telemetry["battery"] = batt.split("\n")[1].strip() |
|
|
| except Exception: |
| pass |
|
|
| return telemetry |
|
|
|
|
| def enrich_with_context(user_query: str, session_id: str) -> str: |
| """Compiles all context into a single prompt injection.""" |
| from app.services import summarizer, holocron |
| |
| sit = get_situational_context(session_id) |
| |
| context_str = f"--- SITUATIONAL AWARENESS ---\n" |
| context_str += f"Current Time: {sit['timestamp']}\n" |
| context_str += f"Active Environment: {sit['active_app']}\n" |
| |
| |
| |
| if session_id == "default" and len(_sessions.get(session_id, {}).get("turns", [])) < 2: |
| recap = summarizer.get_morning_briefing_context() |
| context_str += f"\n[MORNING BRIEFING]\n{recap}\n" |
|
|
| |
| from app.services import memory |
| episodes = memory.recall_memory(user_query, n_results=5) |
| if episodes: |
| context_str += f"\n--- GUARDIAN'S EPISODIC PATTERNS ---\n{episodes}\n" |
|
|
| |
| related = holocron.get_relevant_knowledge(user_query) |
| if related: |
| context_str += "\n--- SOVEREIGN KNOWLEDGE ---\n" |
| for r in related: |
| context_str += f"[{r['category']}] {r['name']}: {r['content']}\n" |
|
|
| context_str += f"\nSystem Load: {sit['system_load']}\n" |
| context_str += f"Battery/Power: {sit['battery']}\n" |
| |
| entities = sit["metadata"].get("entities", {}) |
| if entities: |
| context_str += f"\n--- ACTIVE ENTITIES ---\n" |
| for k, v in entities.items(): |
| if k != "pending_knowledge_card": |
| context_str += f"{k}: {v}\n" |
| |
| return context_str |
|
|
|
|
| |
|
|
| def get_status() -> dict: |
| return { |
| "active_sessions": len(_sessions), |
| "total_turns": sum(len(s["turns"]) for s in _sessions.values()), |
| } |
|
|