""" Context engine for Janus — provides rich context injection into every LLM call. FIXES vs previous version: - pending_thoughts queue was growing unboundedly (one malformed thought per daemon cycle) - Topic extraction was cutting off queries at "what is the" instead of extracting meaning - Deduplication: identical thoughts no longer accumulate - Hard cap: max 20 pending thoughts, oldest dropped when full - Better topic extraction: skip stopwords, take the meaningful noun phrase """ import time import logging import os from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) try: from app.config import DATA_DIR except ImportError: DATA_DIR = Path(__file__).parent.parent / "data" CONTEXT_FILE = Path(DATA_DIR) / "daemon" / "context.json" # Stopwords to skip when extracting topic from a query _TOPIC_STOPWORDS = { "what", "is", "the", "a", "an", "are", "was", "were", "how", "why", "when", "where", "who", "which", "will", "can", "could", "should", "would", "do", "does", "did", "tell", "me", "about", "explain", "give", "show", "find", "get", "make", "help", "please", "i", "we", "my", "our", "your", "their", "its", "this", "that", "these", "those", "some", "any", "all", "more", "most", "much", "many", "few", "little", "of", "in", "on", "at", "to", "for", "by", "from", "with", "and", "or", "but", "not", "no", "if", "than", "then", "so", "yet", "hi", "hey", "hello", "howdy", "greetings", "yo", "sup", "use", "conversation", "below", "context", "answer", "latest", "message", "messages", "assistant", "system", "user", "simulate", "happens", "happening", "did", "does", "whats", "what's", "it's", } MAX_PENDING_THOUGHTS = 20 # hard cap — was unbounded MAX_THOUGHT_AGE_HOURS = 24 # drop thoughts older than 24h _META_TOPIC_WORDS = { "conversation", "context", "latest", "message", "messages", "assistant", "system", "user" } def _extract_topic(query: str) -> str: """ Extract the meaningful topic from a query, skipping leading stopwords. Examples: "what is the stock market" → "stock market" "how does inflation work" → "inflation" "tell me about AAPL earnings" → "AAPL earnings" "what is the" → "general query" (was causing the bug) """ import re # Clean and tokenize words = re.findall(r"[a-zA-Z0-9$€£%]+", query.lower()) # Skip leading stopwords meaningful = [] for w in words: if w not in _TOPIC_STOPWORDS or meaningful: if w not in _TOPIC_STOPWORDS: meaningful.append(w) # Take up to 4 meaningful words topic = " ".join(meaningful[:4]) return topic if topic and len(topic) > 2 else "general query" def _is_meta_topic(topic: str) -> bool: words = set((topic or "").lower().split()) return bool(words & _META_TOPIC_WORDS) class ContextEngine: """Manages system-wide context for LLM injection.""" def __init__(self): self._pending_thoughts: list = [] self._context_cache: dict = {} self._conversation_count: int = 0 self._last_topic: str = "" self._last_interaction: float = 0 self._recurring_interests: list = [] self._load() def _load(self): import json if CONTEXT_FILE.exists(): try: data = json.loads(CONTEXT_FILE.read_text()) raw_thoughts = data.get("pending_thoughts", []) # FIXED: deduplicate on load, enforce cap and age limit self._pending_thoughts = self._clean_thoughts(raw_thoughts) self._conversation_count = data.get("conversation_count", 0) self._last_topic = data.get("last_topic", "") self._last_interaction = data.get("last_interaction", 0) self._recurring_interests = data.get("recurring_interests", []) if _is_meta_topic(self._last_topic): self._last_topic = "" self._recurring_interests = [ topic for topic in self._recurring_interests if not _is_meta_topic(topic) ] except Exception as e: logger.warning(f"ContextEngine: load failed: {e}") def _save(self): import json CONTEXT_FILE.parent.mkdir(parents=True, exist_ok=True) try: CONTEXT_FILE.write_text(json.dumps({ "pending_thoughts": self._pending_thoughts, "conversation_count": self._conversation_count, "last_topic": self._last_topic, "last_interaction": self._last_interaction, "recurring_interests": self._recurring_interests, }, indent=2)) except Exception as e: logger.warning(f"ContextEngine: save failed: {e}") def _clean_thoughts(self, thoughts: list) -> list: """ Deduplicate, enforce age limit, enforce count cap. Returns list sorted by priority desc, newest first within same priority. """ now = time.time() seen_texts = set() clean = [] for t in thoughts: text = t.get("thought", "").strip() if not text: continue # Skip duplicates if text in seen_texts: continue # Skip ancient thoughts age_hours = (now - t.get("created_at", now)) / 3600 if age_hours > MAX_THOUGHT_AGE_HOURS: continue seen_texts.add(text) clean.append(t) # Sort by priority desc clean.sort(key=lambda x: (-x.get("priority", 0), -x.get("created_at", 0))) # Enforce cap return clean[:MAX_PENDING_THOUGHTS] def add_pending_thought(self, thought: str, priority: float = 0.5, source: str = "system", force: bool = False): """Add a thought to the pending queue — with dedup and cap enforcement.""" thought = thought.strip() if not thought or len(thought) < 15: return # NEW: rate-limit daemon sources to 1 thought per hour (unless forced) now = time.time() if source in {"dream", "curiosity", "daemon"} and not force: last = getattr(self, '_last_daemon_thought', 0) if now - last < 3600: return self._last_daemon_thought = now # NEW: fingerprint dedup (first 80 chars) import re def fp(t): return re.sub(r"[^a-z0-9 ]", "", t.lower())[:80] if any(fp(thought) == fp(t.get("thought","")) for t in self._pending_thoughts): return # Deduplicate by exact text (kept for backwards compat) existing_texts = {t.get("thought", "") for t in self._pending_thoughts} if thought in existing_texts: logger.debug(f"ContextEngine: duplicate thought skipped: {thought[:60]}") return self._pending_thoughts.append({ "thought": thought, "priority": priority, "created_at": time.time(), "source": source, }) # Apply cleaning after each add to enforce cap self._pending_thoughts = self._clean_thoughts(self._pending_thoughts) self._save() def get_pending_thoughts(self) -> list: """Return current pending thoughts (deduplicated, capped).""" self._pending_thoughts = self._clean_thoughts(self._pending_thoughts) return self._pending_thoughts def clear_delivered_thoughts(self, count: int = 3): """Mark the top N thoughts as delivered (remove them from queue).""" self._pending_thoughts = self._pending_thoughts[count:] self._save() def build_context(self, user_input: str) -> dict: """Build the full context dict for injection into LLM calls.""" now = time.time() hours_away = (now - self._last_interaction) / 3600 if self._last_interaction else None # Extract topic properly — FIXED topic = _extract_topic(user_input) if user_input else "" # Update recurring interests if topic and topic != "general query" and not _is_meta_topic(topic): if topic not in self._recurring_interests: self._recurring_interests.insert(0, topic) self._recurring_interests = self._recurring_interests[:10] return { "user": { "is_returning": self._conversation_count > 0, "conversation_count": self._conversation_count, "last_topic": self._last_topic, "time_away": f"{hours_away:.0f}h" if hours_away and hours_away > 1 else None, "recurring_interests": self._recurring_interests[:5], }, "system_self": { "pending_thoughts": self.get_pending_thoughts()[:3], "recent_discoveries": [], }, "self_reflection": {}, "current_topic": topic, } def update_after_interaction(self, user_input: str, response: str, context: dict): """Update state after each interaction.""" topic = _extract_topic(user_input) if topic and topic != "general query" and not _is_meta_topic(topic): self._last_topic = topic self._last_interaction = time.time() self._conversation_count += 1 self._save() def record_performance(self, success: bool, confidence: float, elapsed: float): pass # Telemetry hook — can be extended # Module-level singleton context_engine = ContextEngine()