File size: 15,788 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f91e0b
 
 
 
 
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
from __future__ import annotations
import time
import json
import logging
import asyncio
from datetime import datetime
from pathlib import Path
from app.services.market_watcher import MarketWatcher
from app.services.news_pulse import NewsPulse
from app.services.event_detector import EventDetector
from app.services.signal_queue import SignalQueue
from app.services.circadian_rhythm import CircadianRhythm
from app.services.dream_processor import DreamCycleProcessor
from app.services.curiosity_engine import CuriosityEngine
from app.services.guardian_interceptor import guardian_interceptor
from app.config import DATA_DIR

logger = logging.getLogger(__name__)

PENDING_THOUGHTS_FILE = DATA_DIR / "daemon" / "pending_thoughts.json"
PENDING_THOUGHTS_FILE.parent.mkdir(parents=True, exist_ok=True)


class JanusDaemon:
    def __init__(self):
        self.market_watcher = MarketWatcher()
        self.news_pulse = NewsPulse()
        self.event_detector = EventDetector()
        self.signal_queue = SignalQueue()
        self.circadian = CircadianRhythm()
        self.dream_processor = DreamCycleProcessor()
        self.curiosity = CuriosityEngine()
        self.cycle_count = 0
        self.last_run = None
        self.last_dream = None
        self.last_curiosity_cycle = None
        self._pending_thoughts = self._load_pending_thoughts()
        self._clear_stale_pending_thoughts()

    def _clear_stale_pending_thoughts(self):
        """Clear pending thoughts older than 24h and deduplicate on boot."""
        try:
            now = time.time()
            fresh = [
                t for t in self._pending_thoughts
                if (now - t.get("created_at", now)) < 86400
            ]
            seen = set()
            deduped = []
            for t in fresh:
                text = t.get("thought", "").strip()
                if text and text not in seen:
                    seen.add(text)
                    deduped.append(t)
            kept = len(deduped)
            self._pending_thoughts = deduped[:20]
            self._save_pending_thoughts()
            logger.info(f"[DAEMON] Boot: kept {kept} fresh pending thoughts (deduplicated)")
        except Exception as e:
            logger.warning(f"[DAEMON] Failed to clear stale thoughts: {e}")

    def _load_pending_thoughts(self) -> list:
        if PENDING_THOUGHTS_FILE.exists():
            try:
                with open(PENDING_THOUGHTS_FILE) as f:
                    return json.load(f)
            except Exception:
                pass
        return []

    def _save_pending_thoughts(self):
        try:
            self._pending_thoughts = self._pending_thoughts[:30]
            with open(PENDING_THOUGHTS_FILE, "w") as f:
                json.dump(self._pending_thoughts, f, indent=2)
        except Exception as e:
            logger.error(f"Failed to save pending thoughts: {e}")

    def _generate_pending_thoughts(self, market_signals, news_signals, events):
        """Convert discoveries into natural thoughts the system wants to share."""
        new_thoughts = []

        for signal in market_signals[:3]:
            ticker = signal.get("ticker", "")
            change = signal.get("change_percent", 0)
            if abs(change) > 2:
                direction = "up" if change > 0 else "down"
                new_thoughts.append(
                    {
                        "thought": f"{ticker} moved {abs(change):.1f}% {direction} β€” might be worth looking into",
                        "priority": min(abs(change) / 10, 1.0),
                        "created_at": time.time(),
                        "source": "market",
                    }
                )

        for signal in news_signals[:2]:
            topic = signal.get("topic", "")
            headline = signal.get("headline", "")
            if topic and headline:
                new_thoughts.append(
                    {
                        "thought": f"Something happening with {topic}: {headline[:100]}",
                        "priority": 0.4,
                        "created_at": time.time(),
                        "source": "news",
                    }
                )

        for event in events[:2]:
            event_type = event.get("event_type", "")
            description = event.get("description", "")
            if event_type and description:
                new_thoughts.append(
                    {
                        "thought": f"Detected a {event_type} event β€” {description[:100]}",
                        "priority": 0.6,
                        "created_at": time.time(),
                        "source": "event",
                    }
                )

        if self.last_dream:
            insights = self.last_dream.get("insights", [])
            for insight in insights[:1]:
                new_thoughts.append(
                    {
                        "thought": f"I had a thought during my last dream cycle β€” {insight[:120]}",
                        "priority": 0.3,
                        "created_at": time.time(),
                        "source": "dream",
                    }
                )

        if self.last_curiosity_cycle:
            discoveries = self.last_curiosity_cycle.get("discoveries", [])
            for d in discoveries[:1]:
                new_thoughts.append(
                    {
                        "thought": f"I found something interesting while exploring β€” {str(d)[:120]}",
                        "priority": 0.35,
                        "created_at": time.time(),
                        "source": "curiosity",
                    }
                )

        self._pending_thoughts.extend(new_thoughts)
        self._pending_thoughts.sort(key=lambda x: x.get("priority", 0), reverse=True)
        self._pending_thoughts = self._pending_thoughts[:30]
        self._save_pending_thoughts()

        return new_thoughts

    async def run(self):
        """Main daemon loop β€” runs forever with circadian awareness."""
        logger.info("=" * 60)
        logger.info("JANUS DAEMON STARTED β€” Living Intelligence Engine")
        logger.info(f"Watchlist: {self.market_watcher.watchlist}")
        logger.info(f"Topics: {self.news_pulse.topics}")
        logger.info(f"Circadian Phase: {self.circadian.get_current_phase().value}")
        logger.info("=" * 60)

        while True:
            cycle_start = time.time()
            self.cycle_count += 1
            self.last_run = datetime.utcnow().isoformat()

            phase = self.circadian.get_current_phase()
            phase_config = self.circadian.get_phase_config(phase)

            try:
                logger.info(
                    f"[DAEMON] Cycle #{self.cycle_count} β€” Phase: {phase.value} ({phase_config['name']})"
                )

                market_signals = self.market_watcher.poll()
                news_signals = self.news_pulse.fetch()
                
                # ACTIVE GUARDIAN: Audit and Intervene on Scams
                unfiltered_signals = market_signals + news_signals
                all_signals, interventions = await guardian_interceptor.process_signals(unfiltered_signals)
                
                if interventions:
                    logger.warning(f"[DAEMON] Guardian blocked {len(interventions)} high-risk scam signals!")
                    for inter in interventions:
                        # Prioritize intervention in pending thoughts
                        self._pending_thoughts.insert(0, {
                            "thought": f"🚨 GUARDIAN INTERVENTION: {inter['reason']}",
                            "priority": 1.0,
                            "created_at": time.time(),
                            "source": "guardian"
                        })

                self.signal_queue.add_batch(all_signals)
                events = self.event_detector.detect(all_signals)

                new_thoughts = self._generate_pending_thoughts(
                    market_signals, news_signals, events
                )
                if new_thoughts:
                    logger.info(
                        f"[DAEMON] Generated {len(new_thoughts)} pending thoughts"
                    )

                if phase.value == "night" or getattr(self, '_force_cycles', False):
                    force = getattr(self, '_force_cycles', False)
                    try:
                        dream_report = self.dream_processor.run_dream_cycle(force=force)
                        self.last_dream = dream_report
                        logger.info(
                            f"[DAEMON] Dream cycle: {dream_report.get('duration_seconds', 0):.1f}s β€” "
                            f"{len(dream_report.get('insights', []))} insights"
                        )
                    except Exception as e:
                        logger.error(f"[DAEMON] Dream cycle FAILED: {e}")

                    try:
                        curiosity_report = self.curiosity.run_curiosity_cycle(force=force)
                        self.last_curiosity_cycle = curiosity_report
                        logger.info(
                            f"[DAEMON] Curiosity cycle: {curiosity_report.get('duration_seconds', 0):.1f}s β€” "
                            f"{curiosity_report.get('total_discoveries', 0)} discoveries"
                        )
                    except Exception as e:
                        logger.error(f"[DAEMON] Curiosity cycle FAILED: {e}")
                    
                    # Reset force flag after one run
                    if force:
                        self._force_cycles = False

                    # Self-reflection: analyze own performance, form opinions
                    try:
                        from app.services.self_reflection import self_reflection
                        from app.services.case_store import list_cases

                        recent_cases = list_cases(limit=20, full=True)
                        review = self_reflection.run_night_review(recent_cases)
                        logger.info(
                            f"[DAEMON] Self-review: {review.get('cases_reviewed', 0)} cases, "
                            f"{review.get('opinions_formed', 0)} opinions, "
                            f"learning_rate={review.get('learning_rate', 0)}"
                        )

                        # Generate pending thoughts from self-reflection
                        gaps = self_reflection.get_gaps()[:2]
                        for gap in gaps:
                            self._pending_thoughts.append(
                                {
                                    "thought": f"I need to get better at {gap.get('topic', '')} β€” {gap.get('reason', '')}",
                                    "priority": gap.get("urgency", 0.5),
                                    "created_at": time.time(),
                                    "source": "self_reflection",
                                }
                            )

                        opinions = self_reflection.get_opinions()[:2]
                        for op in opinions:
                            if op.get("confidence", 0) > 0.7:
                                self._pending_thoughts.append(
                                    {
                                        "thought": f"I've formed a view on {op.get('topic', '')}: {op.get('statement', '')[:100]}",
                                        "priority": op.get("confidence", 0.5) * 0.8,
                                        "created_at": time.time(),
                                        "source": "self_reflection",
                                    }
                                )

                        self._pending_thoughts.sort(
                            key=lambda x: x.get("priority", 0), reverse=True
                        )
                        self._pending_thoughts = self._pending_thoughts[:30]
                        self._save_pending_thoughts()

                    except Exception as e:
                        logger.error(f"[DAEMON] Self-reflection failed: {e}")

                    # Autonomous learning: search HF datasets for gaps, extract knowledge
                    try:
                        from app.services.autonomous_learner import autonomous_learner

                        learning_result = autonomous_learner.run_learning_cycle(
                            max_gaps=2,
                            max_datasets_per_gap=2,
                            max_samples_per_dataset=30
                        )
                        logger.info(
                            f"[DAEMON] Autonomous learning: {learning_result.get('gaps_addressed', 0)} gaps, "
                            f"{learning_result.get('knowledge_added', 0)} knowledge, "
                            f"{learning_result.get('training_pairs_added', 0)} training pairs"
                        )
                    except Exception as e:
                        logger.error(f"[DAEMON] Autonomous learning failed: {e}")

                    # Continuous Training: generate synthetic datasets, test prompts, crawl internet
                    try:
                        from app.services.continuous_training import continuous_self_trainer
                        
                        ct_result = continuous_self_trainer.run_training_cycle()
                        logger.info(
                            f"[DAEMON] Continuous Training: "
                            f"{ct_result.get('synthetic_data_generated', 0)} synthetic pairs, "
                            f"{ct_result.get('prompts_tested', 0)} prompts tested, "
                            f"{ct_result.get('improvements_made', 0)} optimizations"
                        )
                    except Exception as e:
                        logger.error(f"[DAEMON] Continuous training failed: {e}")

                elapsed = time.time() - cycle_start
                stats = self.signal_queue.get_stats()

                logger.info(
                    f"[DAEMON] Cycle #{self.cycle_count} complete in {elapsed:.1f}s"
                )
                logger.info(
                    f"[DAEMON] Market signals: {len(market_signals)}, News signals: {len(news_signals)}, Events: {len(events)}"
                )
                logger.info(f"[DAEMON] Queue stats: {stats}")

            except Exception as e:
                logger.error(f"[DAEMON] Cycle #{self.cycle_count} failed: {e}")

            sleep_time = phase_config.get("poll_interval", 900)
            logger.info(f"[DAEMON] Sleeping for {sleep_time}s ({phase.value} phase)")
            await asyncio.sleep(sleep_time)

    def get_status(self) -> dict:
        """Get daemon status."""
        phase = self.circadian.get_current_phase()
        phase_config = self.circadian.get_phase_config(phase)

        return {
            "running": True,
            "cycle_count": self.cycle_count,
            "last_run": self.last_run,
            "circadian": {
                "current_phase": phase.value,
                "phase_name": phase_config["name"],
                "phase_description": phase_config["description"],
                "priority": phase_config["priority"],
                "current_tasks": phase_config["tasks"],
            },
            "watchlist": self.market_watcher.watchlist,
            "news_pulse": self.news_pulse.get_status(),
            "signal_queue": self.signal_queue.get_stats(),
            "dream_processor": self.dream_processor.get_status(),
            "curiosity_engine": self.curiosity.get_status(),
            "last_dream": self.last_dream,
            "last_curiosity_cycle": self.last_curiosity_cycle,
            "pending_thoughts": self._pending_thoughts[:10],
        }