Spaces:
Running
Running
File size: 9,666 Bytes
24f95f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | """
Context engine for Janus — provides rich context injection into every LLM call.
FIXES vs previous version:
- pending_thoughts queue was growing unboundedly (one malformed thought per daemon cycle)
- Topic extraction was cutting off queries at "what is the" instead of extracting meaning
- Deduplication: identical thoughts no longer accumulate
- Hard cap: max 20 pending thoughts, oldest dropped when full
- Better topic extraction: skip stopwords, take the meaningful noun phrase
"""
import time
import logging
import os
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
try:
from app.config import DATA_DIR
except ImportError:
DATA_DIR = Path(__file__).parent.parent / "data"
CONTEXT_FILE = Path(DATA_DIR) / "daemon" / "context.json"
# Stopwords to skip when extracting topic from a query
_TOPIC_STOPWORDS = {
"what", "is", "the", "a", "an", "are", "was", "were", "how", "why",
"when", "where", "who", "which", "will", "can", "could", "should",
"would", "do", "does", "did", "tell", "me", "about", "explain",
"give", "show", "find", "get", "make", "help", "please", "i", "we",
"my", "our", "your", "their", "its", "this", "that", "these", "those",
"some", "any", "all", "more", "most", "much", "many", "few", "little",
"of", "in", "on", "at", "to", "for", "by", "from", "with", "and",
"or", "but", "not", "no", "if", "than", "then", "so", "yet",
"hi", "hey", "hello", "howdy", "greetings", "yo", "sup",
"use", "conversation", "below", "context", "answer", "latest",
"message", "messages", "assistant", "system", "user",
"simulate", "happens", "happening", "did", "does", "whats", "what's", "it's",
}
MAX_PENDING_THOUGHTS = 20 # hard cap — was unbounded
MAX_THOUGHT_AGE_HOURS = 24 # drop thoughts older than 24h
_META_TOPIC_WORDS = {
"conversation", "context", "latest", "message", "messages", "assistant", "system", "user"
}
def _extract_topic(query: str) -> str:
"""
Extract the meaningful topic from a query, skipping leading stopwords.
Examples:
"what is the stock market" → "stock market"
"how does inflation work" → "inflation"
"tell me about AAPL earnings" → "AAPL earnings"
"what is the" → "general query" (was causing the bug)
"""
import re
# Clean and tokenize
words = re.findall(r"[a-zA-Z0-9$€£%]+", query.lower())
# Skip leading stopwords
meaningful = []
for w in words:
if w not in _TOPIC_STOPWORDS or meaningful:
if w not in _TOPIC_STOPWORDS:
meaningful.append(w)
# Take up to 4 meaningful words
topic = " ".join(meaningful[:4])
return topic if topic and len(topic) > 2 else "general query"
def _is_meta_topic(topic: str) -> bool:
words = set((topic or "").lower().split())
return bool(words & _META_TOPIC_WORDS)
class ContextEngine:
"""Manages system-wide context for LLM injection."""
def __init__(self):
self._pending_thoughts: list = []
self._context_cache: dict = {}
self._conversation_count: int = 0
self._last_topic: str = ""
self._last_interaction: float = 0
self._recurring_interests: list = []
self._load()
def _load(self):
import json
if CONTEXT_FILE.exists():
try:
data = json.loads(CONTEXT_FILE.read_text())
raw_thoughts = data.get("pending_thoughts", [])
# FIXED: deduplicate on load, enforce cap and age limit
self._pending_thoughts = self._clean_thoughts(raw_thoughts)
self._conversation_count = data.get("conversation_count", 0)
self._last_topic = data.get("last_topic", "")
self._last_interaction = data.get("last_interaction", 0)
self._recurring_interests = data.get("recurring_interests", [])
if _is_meta_topic(self._last_topic):
self._last_topic = ""
self._recurring_interests = [
topic for topic in self._recurring_interests if not _is_meta_topic(topic)
]
except Exception as e:
logger.warning(f"ContextEngine: load failed: {e}")
def _save(self):
import json
CONTEXT_FILE.parent.mkdir(parents=True, exist_ok=True)
try:
CONTEXT_FILE.write_text(json.dumps({
"pending_thoughts": self._pending_thoughts,
"conversation_count": self._conversation_count,
"last_topic": self._last_topic,
"last_interaction": self._last_interaction,
"recurring_interests": self._recurring_interests,
}, indent=2))
except Exception as e:
logger.warning(f"ContextEngine: save failed: {e}")
def _clean_thoughts(self, thoughts: list) -> list:
"""
Deduplicate, enforce age limit, enforce count cap.
Returns list sorted by priority desc, newest first within same priority.
"""
now = time.time()
seen_texts = set()
clean = []
for t in thoughts:
text = t.get("thought", "").strip()
if not text:
continue
# Skip duplicates
if text in seen_texts:
continue
# Skip ancient thoughts
age_hours = (now - t.get("created_at", now)) / 3600
if age_hours > MAX_THOUGHT_AGE_HOURS:
continue
seen_texts.add(text)
clean.append(t)
# Sort by priority desc
clean.sort(key=lambda x: (-x.get("priority", 0), -x.get("created_at", 0)))
# Enforce cap
return clean[:MAX_PENDING_THOUGHTS]
def add_pending_thought(self, thought: str, priority: float = 0.5, source: str = "system", force: bool = False):
"""Add a thought to the pending queue — with dedup and cap enforcement."""
thought = thought.strip()
if not thought or len(thought) < 15:
return
# NEW: rate-limit daemon sources to 1 thought per hour (unless forced)
now = time.time()
if source in {"dream", "curiosity", "daemon"} and not force:
last = getattr(self, '_last_daemon_thought', 0)
if now - last < 3600:
return
self._last_daemon_thought = now
# NEW: fingerprint dedup (first 80 chars)
import re
def fp(t): return re.sub(r"[^a-z0-9 ]", "", t.lower())[:80]
if any(fp(thought) == fp(t.get("thought","")) for t in self._pending_thoughts):
return
# Deduplicate by exact text (kept for backwards compat)
existing_texts = {t.get("thought", "") for t in self._pending_thoughts}
if thought in existing_texts:
logger.debug(f"ContextEngine: duplicate thought skipped: {thought[:60]}")
return
self._pending_thoughts.append({
"thought": thought,
"priority": priority,
"created_at": time.time(),
"source": source,
})
# Apply cleaning after each add to enforce cap
self._pending_thoughts = self._clean_thoughts(self._pending_thoughts)
self._save()
def get_pending_thoughts(self) -> list:
"""Return current pending thoughts (deduplicated, capped)."""
self._pending_thoughts = self._clean_thoughts(self._pending_thoughts)
return self._pending_thoughts
def clear_delivered_thoughts(self, count: int = 3):
"""Mark the top N thoughts as delivered (remove them from queue)."""
self._pending_thoughts = self._pending_thoughts[count:]
self._save()
def build_context(self, user_input: str) -> dict:
"""Build the full context dict for injection into LLM calls."""
now = time.time()
hours_away = (now - self._last_interaction) / 3600 if self._last_interaction else None
# Extract topic properly — FIXED
topic = _extract_topic(user_input) if user_input else ""
# Update recurring interests
if topic and topic != "general query" and not _is_meta_topic(topic):
if topic not in self._recurring_interests:
self._recurring_interests.insert(0, topic)
self._recurring_interests = self._recurring_interests[:10]
return {
"user": {
"is_returning": self._conversation_count > 0,
"conversation_count": self._conversation_count,
"last_topic": self._last_topic,
"time_away": f"{hours_away:.0f}h" if hours_away and hours_away > 1 else None,
"recurring_interests": self._recurring_interests[:5],
},
"system_self": {
"pending_thoughts": self.get_pending_thoughts()[:3],
"recent_discoveries": [],
},
"self_reflection": {},
"current_topic": topic,
}
def update_after_interaction(self, user_input: str, response: str, context: dict):
"""Update state after each interaction."""
topic = _extract_topic(user_input)
if topic and topic != "general query" and not _is_meta_topic(topic):
self._last_topic = topic
self._last_interaction = time.time()
self._conversation_count += 1
self._save()
def record_performance(self, success: bool, confidence: float, elapsed: float):
pass # Telemetry hook — can be extended
# Module-level singleton
context_engine = ContextEngine()
|