File size: 9,666 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
Context engine for Janus — provides rich context injection into every LLM call.

FIXES vs previous version:
  - pending_thoughts queue was growing unboundedly (one malformed thought per daemon cycle)
  - Topic extraction was cutting off queries at "what is the" instead of extracting meaning
  - Deduplication: identical thoughts no longer accumulate
  - Hard cap: max 20 pending thoughts, oldest dropped when full
  - Better topic extraction: skip stopwords, take the meaningful noun phrase
"""

import time
import logging
import os
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

try:
    from app.config import DATA_DIR
except ImportError:
    DATA_DIR = Path(__file__).parent.parent / "data"

CONTEXT_FILE = Path(DATA_DIR) / "daemon" / "context.json"

# Stopwords to skip when extracting topic from a query
_TOPIC_STOPWORDS = {
    "what", "is", "the", "a", "an", "are", "was", "were", "how", "why",
    "when", "where", "who", "which", "will", "can", "could", "should",
    "would", "do", "does", "did", "tell", "me", "about", "explain",
    "give", "show", "find", "get", "make", "help", "please", "i", "we",
    "my", "our", "your", "their", "its", "this", "that", "these", "those",
    "some", "any", "all", "more", "most", "much", "many", "few", "little",
    "of", "in", "on", "at", "to", "for", "by", "from", "with", "and",
    "or", "but", "not", "no", "if", "than", "then", "so", "yet",
    "hi", "hey", "hello", "howdy", "greetings", "yo", "sup",
    "use", "conversation", "below", "context", "answer", "latest",
    "message", "messages", "assistant", "system", "user",
    "simulate", "happens", "happening", "did", "does", "whats", "what's", "it's",
}

MAX_PENDING_THOUGHTS = 20  # hard cap — was unbounded
MAX_THOUGHT_AGE_HOURS = 24  # drop thoughts older than 24h
_META_TOPIC_WORDS = {
    "conversation", "context", "latest", "message", "messages", "assistant", "system", "user"
}


def _extract_topic(query: str) -> str:
    """
    Extract the meaningful topic from a query, skipping leading stopwords.

    Examples:
      "what is the stock market"     → "stock market"
      "how does inflation work"      → "inflation"
      "tell me about AAPL earnings"  → "AAPL earnings"
      "what is the"                  → "general query"  (was causing the bug)
    """
    import re
    # Clean and tokenize
    words = re.findall(r"[a-zA-Z0-9$€£%]+", query.lower())
    # Skip leading stopwords
    meaningful = []
    for w in words:
        if w not in _TOPIC_STOPWORDS or meaningful:
            if w not in _TOPIC_STOPWORDS:
                meaningful.append(w)
    # Take up to 4 meaningful words
    topic = " ".join(meaningful[:4])
    return topic if topic and len(topic) > 2 else "general query"


def _is_meta_topic(topic: str) -> bool:
    words = set((topic or "").lower().split())
    return bool(words & _META_TOPIC_WORDS)


class ContextEngine:
    """Manages system-wide context for LLM injection."""

    def __init__(self):
        self._pending_thoughts: list = []
        self._context_cache: dict = {}
        self._conversation_count: int = 0
        self._last_topic: str = ""
        self._last_interaction: float = 0
        self._recurring_interests: list = []
        self._load()

    def _load(self):
        import json
        if CONTEXT_FILE.exists():
            try:
                data = json.loads(CONTEXT_FILE.read_text())
                raw_thoughts = data.get("pending_thoughts", [])
                # FIXED: deduplicate on load, enforce cap and age limit
                self._pending_thoughts = self._clean_thoughts(raw_thoughts)
                self._conversation_count = data.get("conversation_count", 0)
                self._last_topic = data.get("last_topic", "")
                self._last_interaction = data.get("last_interaction", 0)
                self._recurring_interests = data.get("recurring_interests", [])
                if _is_meta_topic(self._last_topic):
                    self._last_topic = ""
                self._recurring_interests = [
                    topic for topic in self._recurring_interests if not _is_meta_topic(topic)
                ]
            except Exception as e:
                logger.warning(f"ContextEngine: load failed: {e}")

    def _save(self):
        import json
        CONTEXT_FILE.parent.mkdir(parents=True, exist_ok=True)
        try:
            CONTEXT_FILE.write_text(json.dumps({
                "pending_thoughts": self._pending_thoughts,
                "conversation_count": self._conversation_count,
                "last_topic": self._last_topic,
                "last_interaction": self._last_interaction,
                "recurring_interests": self._recurring_interests,
            }, indent=2))
        except Exception as e:
            logger.warning(f"ContextEngine: save failed: {e}")

    def _clean_thoughts(self, thoughts: list) -> list:
        """
        Deduplicate, enforce age limit, enforce count cap.
        Returns list sorted by priority desc, newest first within same priority.
        """
        now = time.time()
        seen_texts = set()
        clean = []
        for t in thoughts:
            text = t.get("thought", "").strip()
            if not text:
                continue
            # Skip duplicates
            if text in seen_texts:
                continue
            # Skip ancient thoughts
            age_hours = (now - t.get("created_at", now)) / 3600
            if age_hours > MAX_THOUGHT_AGE_HOURS:
                continue
            seen_texts.add(text)
            clean.append(t)
        # Sort by priority desc
        clean.sort(key=lambda x: (-x.get("priority", 0), -x.get("created_at", 0)))
        # Enforce cap
        return clean[:MAX_PENDING_THOUGHTS]

    def add_pending_thought(self, thought: str, priority: float = 0.5, source: str = "system", force: bool = False):
        """Add a thought to the pending queue — with dedup and cap enforcement."""
        thought = thought.strip()
        if not thought or len(thought) < 15:
            return

        # NEW: rate-limit daemon sources to 1 thought per hour (unless forced)
        now = time.time()
        if source in {"dream", "curiosity", "daemon"} and not force:
            last = getattr(self, '_last_daemon_thought', 0)
            if now - last < 3600:
                return
            self._last_daemon_thought = now
        
        # NEW: fingerprint dedup (first 80 chars)
        import re
        def fp(t): return re.sub(r"[^a-z0-9 ]", "", t.lower())[:80]
        if any(fp(thought) == fp(t.get("thought","")) for t in self._pending_thoughts):
            return

        # Deduplicate by exact text (kept for backwards compat)
        existing_texts = {t.get("thought", "") for t in self._pending_thoughts}
        if thought in existing_texts:
            logger.debug(f"ContextEngine: duplicate thought skipped: {thought[:60]}")
            return

        self._pending_thoughts.append({
            "thought": thought,
            "priority": priority,
            "created_at": time.time(),
            "source": source,
        })

        # Apply cleaning after each add to enforce cap
        self._pending_thoughts = self._clean_thoughts(self._pending_thoughts)
        self._save()

    def get_pending_thoughts(self) -> list:
        """Return current pending thoughts (deduplicated, capped)."""
        self._pending_thoughts = self._clean_thoughts(self._pending_thoughts)
        return self._pending_thoughts

    def clear_delivered_thoughts(self, count: int = 3):
        """Mark the top N thoughts as delivered (remove them from queue)."""
        self._pending_thoughts = self._pending_thoughts[count:]
        self._save()

    def build_context(self, user_input: str) -> dict:
        """Build the full context dict for injection into LLM calls."""
        now = time.time()
        hours_away = (now - self._last_interaction) / 3600 if self._last_interaction else None

        # Extract topic properly — FIXED
        topic = _extract_topic(user_input) if user_input else ""

        # Update recurring interests
        if topic and topic != "general query" and not _is_meta_topic(topic):
            if topic not in self._recurring_interests:
                self._recurring_interests.insert(0, topic)
                self._recurring_interests = self._recurring_interests[:10]

        return {
            "user": {
                "is_returning": self._conversation_count > 0,
                "conversation_count": self._conversation_count,
                "last_topic": self._last_topic,
                "time_away": f"{hours_away:.0f}h" if hours_away and hours_away > 1 else None,
                "recurring_interests": self._recurring_interests[:5],
            },
            "system_self": {
                "pending_thoughts": self.get_pending_thoughts()[:3],
                "recent_discoveries": [],
            },
            "self_reflection": {},
            "current_topic": topic,
        }

    def update_after_interaction(self, user_input: str, response: str, context: dict):
        """Update state after each interaction."""
        topic = _extract_topic(user_input)
        if topic and topic != "general query" and not _is_meta_topic(topic):
            self._last_topic = topic
        self._last_interaction = time.time()
        self._conversation_count += 1
        self._save()

    def record_performance(self, success: bool, confidence: float, elapsed: float):
        pass  # Telemetry hook — can be extended


# Module-level singleton
context_engine = ContextEngine()