Spaces:
Running
Running
| from __future__ import annotations | |
| import time | |
| import json | |
| import logging | |
| import asyncio | |
| from datetime import datetime | |
| from pathlib import Path | |
| from app.services.market_watcher import MarketWatcher | |
| from app.services.news_pulse import NewsPulse | |
| from app.services.event_detector import EventDetector | |
| from app.services.signal_queue import SignalQueue | |
| from app.services.circadian_rhythm import CircadianRhythm | |
| from app.services.dream_processor import DreamCycleProcessor | |
| from app.services.curiosity_engine import CuriosityEngine | |
| from app.services.guardian_interceptor import guardian_interceptor | |
| from app.config import DATA_DIR | |
| logger = logging.getLogger(__name__) | |
| PENDING_THOUGHTS_FILE = DATA_DIR / "daemon" / "pending_thoughts.json" | |
| PENDING_THOUGHTS_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| class JanusDaemon: | |
| def __init__(self): | |
| self.market_watcher = MarketWatcher() | |
| self.news_pulse = NewsPulse() | |
| self.event_detector = EventDetector() | |
| self.signal_queue = SignalQueue() | |
| self.circadian = CircadianRhythm() | |
| self.dream_processor = DreamCycleProcessor() | |
| self.curiosity = CuriosityEngine() | |
| self.cycle_count = 0 | |
| self.last_run = None | |
| self.last_dream = None | |
| self.last_curiosity_cycle = None | |
| self._pending_thoughts = self._load_pending_thoughts() | |
| self._clear_stale_pending_thoughts() | |
| def _clear_stale_pending_thoughts(self): | |
| """Clear pending thoughts older than 24h and deduplicate on boot.""" | |
| try: | |
| now = time.time() | |
| fresh = [ | |
| t for t in self._pending_thoughts | |
| if (now - t.get("created_at", now)) < 86400 | |
| ] | |
| seen = set() | |
| deduped = [] | |
| for t in fresh: | |
| text = t.get("thought", "").strip() | |
| if text and text not in seen: | |
| seen.add(text) | |
| deduped.append(t) | |
| kept = len(deduped) | |
| self._pending_thoughts = deduped[:20] | |
| self._save_pending_thoughts() | |
| logger.info(f"[DAEMON] Boot: kept {kept} fresh pending thoughts (deduplicated)") | |
| except Exception as e: | |
| logger.warning(f"[DAEMON] Failed to clear stale thoughts: {e}") | |
| def _load_pending_thoughts(self) -> list: | |
| if PENDING_THOUGHTS_FILE.exists(): | |
| try: | |
| with open(PENDING_THOUGHTS_FILE) as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return [] | |
| def _save_pending_thoughts(self): | |
| try: | |
| self._pending_thoughts = self._pending_thoughts[:30] | |
| with open(PENDING_THOUGHTS_FILE, "w") as f: | |
| json.dump(self._pending_thoughts, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save pending thoughts: {e}") | |
| def _generate_pending_thoughts(self, market_signals, news_signals, events): | |
| """Convert discoveries into natural thoughts the system wants to share.""" | |
| new_thoughts = [] | |
| for signal in market_signals[:3]: | |
| ticker = signal.get("ticker", "") | |
| change = signal.get("change_percent", 0) | |
| if abs(change) > 2: | |
| direction = "up" if change > 0 else "down" | |
| new_thoughts.append( | |
| { | |
| "thought": f"{ticker} moved {abs(change):.1f}% {direction} — might be worth looking into", | |
| "priority": min(abs(change) / 10, 1.0), | |
| "created_at": time.time(), | |
| "source": "market", | |
| } | |
| ) | |
| for signal in news_signals[:2]: | |
| topic = signal.get("topic", "") | |
| headline = signal.get("headline", "") | |
| if topic and headline: | |
| new_thoughts.append( | |
| { | |
| "thought": f"Something happening with {topic}: {headline[:100]}", | |
| "priority": 0.4, | |
| "created_at": time.time(), | |
| "source": "news", | |
| } | |
| ) | |
| for event in events[:2]: | |
| event_type = event.get("event_type", "") | |
| description = event.get("description", "") | |
| if event_type and description: | |
| new_thoughts.append( | |
| { | |
| "thought": f"Detected a {event_type} event — {description[:100]}", | |
| "priority": 0.6, | |
| "created_at": time.time(), | |
| "source": "event", | |
| } | |
| ) | |
| if self.last_dream: | |
| insights = self.last_dream.get("insights", []) | |
| for insight in insights[:1]: | |
| new_thoughts.append( | |
| { | |
| "thought": f"I had a thought during my last dream cycle — {insight[:120]}", | |
| "priority": 0.3, | |
| "created_at": time.time(), | |
| "source": "dream", | |
| } | |
| ) | |
| if self.last_curiosity_cycle: | |
| discoveries = self.last_curiosity_cycle.get("discoveries", []) | |
| for d in discoveries[:1]: | |
| new_thoughts.append( | |
| { | |
| "thought": f"I found something interesting while exploring — {str(d)[:120]}", | |
| "priority": 0.35, | |
| "created_at": time.time(), | |
| "source": "curiosity", | |
| } | |
| ) | |
| self._pending_thoughts.extend(new_thoughts) | |
| self._pending_thoughts.sort(key=lambda x: x.get("priority", 0), reverse=True) | |
| self._pending_thoughts = self._pending_thoughts[:30] | |
| self._save_pending_thoughts() | |
| return new_thoughts | |
| async def run(self): | |
| """Main daemon loop — runs forever with circadian awareness.""" | |
| logger.info("=" * 60) | |
| logger.info("JANUS DAEMON STARTED — Living Intelligence Engine") | |
| logger.info(f"Watchlist: {self.market_watcher.watchlist}") | |
| logger.info(f"Topics: {self.news_pulse.topics}") | |
| logger.info(f"Circadian Phase: {self.circadian.get_current_phase().value}") | |
| logger.info("=" * 60) | |
| while True: | |
| cycle_start = time.time() | |
| self.cycle_count += 1 | |
| self.last_run = datetime.utcnow().isoformat() | |
| phase = self.circadian.get_current_phase() | |
| phase_config = self.circadian.get_phase_config(phase) | |
| try: | |
| logger.info( | |
| f"[DAEMON] Cycle #{self.cycle_count} — Phase: {phase.value} ({phase_config['name']})" | |
| ) | |
| market_signals = self.market_watcher.poll() | |
| news_signals = self.news_pulse.fetch() | |
| # ACTIVE GUARDIAN: Audit and Intervene on Scams | |
| unfiltered_signals = market_signals + news_signals | |
| all_signals, interventions = await guardian_interceptor.process_signals(unfiltered_signals) | |
| if interventions: | |
| logger.warning(f"[DAEMON] Guardian blocked {len(interventions)} high-risk scam signals!") | |
| for inter in interventions: | |
| # Prioritize intervention in pending thoughts | |
| self._pending_thoughts.insert(0, { | |
| "thought": f"🚨 GUARDIAN INTERVENTION: {inter['reason']}", | |
| "priority": 1.0, | |
| "created_at": time.time(), | |
| "source": "guardian" | |
| }) | |
| self.signal_queue.add_batch(all_signals) | |
| events = self.event_detector.detect(all_signals) | |
| new_thoughts = self._generate_pending_thoughts( | |
| market_signals, news_signals, events | |
| ) | |
| if new_thoughts: | |
| logger.info( | |
| f"[DAEMON] Generated {len(new_thoughts)} pending thoughts" | |
| ) | |
| if phase.value == "night" or getattr(self, '_force_cycles', False): | |
| force = getattr(self, '_force_cycles', False) | |
| try: | |
| dream_report = self.dream_processor.run_dream_cycle(force=force) | |
| self.last_dream = dream_report | |
| logger.info( | |
| f"[DAEMON] Dream cycle: {dream_report.get('duration_seconds', 0):.1f}s — " | |
| f"{len(dream_report.get('insights', []))} insights" | |
| ) | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Dream cycle FAILED: {e}") | |
| try: | |
| curiosity_report = self.curiosity.run_curiosity_cycle(force=force) | |
| self.last_curiosity_cycle = curiosity_report | |
| logger.info( | |
| f"[DAEMON] Curiosity cycle: {curiosity_report.get('duration_seconds', 0):.1f}s — " | |
| f"{curiosity_report.get('total_discoveries', 0)} discoveries" | |
| ) | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Curiosity cycle FAILED: {e}") | |
| # Reset force flag after one run | |
| if force: | |
| self._force_cycles = False | |
| # Self-reflection: analyze own performance, form opinions | |
| try: | |
| from app.services.self_reflection import self_reflection | |
| from app.services.case_store import list_cases | |
| recent_cases = list_cases(limit=20, full=True) | |
| review = self_reflection.run_night_review(recent_cases) | |
| logger.info( | |
| f"[DAEMON] Self-review: {review.get('cases_reviewed', 0)} cases, " | |
| f"{review.get('opinions_formed', 0)} opinions, " | |
| f"learning_rate={review.get('learning_rate', 0)}" | |
| ) | |
| # Generate pending thoughts from self-reflection | |
| gaps = self_reflection.get_gaps()[:2] | |
| for gap in gaps: | |
| self._pending_thoughts.append( | |
| { | |
| "thought": f"I need to get better at {gap.get('topic', '')} — {gap.get('reason', '')}", | |
| "priority": gap.get("urgency", 0.5), | |
| "created_at": time.time(), | |
| "source": "self_reflection", | |
| } | |
| ) | |
| opinions = self_reflection.get_opinions()[:2] | |
| for op in opinions: | |
| if op.get("confidence", 0) > 0.7: | |
| self._pending_thoughts.append( | |
| { | |
| "thought": f"I've formed a view on {op.get('topic', '')}: {op.get('statement', '')[:100]}", | |
| "priority": op.get("confidence", 0.5) * 0.8, | |
| "created_at": time.time(), | |
| "source": "self_reflection", | |
| } | |
| ) | |
| self._pending_thoughts.sort( | |
| key=lambda x: x.get("priority", 0), reverse=True | |
| ) | |
| self._pending_thoughts = self._pending_thoughts[:30] | |
| self._save_pending_thoughts() | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Self-reflection failed: {e}") | |
| # Autonomous learning: search HF datasets for gaps, extract knowledge | |
| try: | |
| from app.services.autonomous_learner import autonomous_learner | |
| learning_result = autonomous_learner.run_learning_cycle( | |
| max_gaps=2, | |
| max_datasets_per_gap=2, | |
| max_samples_per_dataset=30 | |
| ) | |
| logger.info( | |
| f"[DAEMON] Autonomous learning: {learning_result.get('gaps_addressed', 0)} gaps, " | |
| f"{learning_result.get('knowledge_added', 0)} knowledge, " | |
| f"{learning_result.get('training_pairs_added', 0)} training pairs" | |
| ) | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Autonomous learning failed: {e}") | |
| # Continuous Training: generate synthetic datasets, test prompts, crawl internet | |
| try: | |
| from app.services.continuous_training import continuous_self_trainer | |
| ct_result = continuous_self_trainer.run_training_cycle() | |
| logger.info( | |
| f"[DAEMON] Continuous Training: " | |
| f"{ct_result.get('synthetic_data_generated', 0)} synthetic pairs, " | |
| f"{ct_result.get('prompts_tested', 0)} prompts tested, " | |
| f"{ct_result.get('improvements_made', 0)} optimizations" | |
| ) | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Continuous training failed: {e}") | |
| elapsed = time.time() - cycle_start | |
| stats = self.signal_queue.get_stats() | |
| logger.info( | |
| f"[DAEMON] Cycle #{self.cycle_count} complete in {elapsed:.1f}s" | |
| ) | |
| logger.info( | |
| f"[DAEMON] Market signals: {len(market_signals)}, News signals: {len(news_signals)}, Events: {len(events)}" | |
| ) | |
| logger.info(f"[DAEMON] Queue stats: {stats}") | |
| except Exception as e: | |
| logger.error(f"[DAEMON] Cycle #{self.cycle_count} failed: {e}") | |
| sleep_time = phase_config.get("poll_interval", 900) | |
| logger.info(f"[DAEMON] Sleeping for {sleep_time}s ({phase.value} phase)") | |
| await asyncio.sleep(sleep_time) | |
| def get_status(self) -> dict: | |
| """Get daemon status.""" | |
| phase = self.circadian.get_current_phase() | |
| phase_config = self.circadian.get_phase_config(phase) | |
| return { | |
| "running": True, | |
| "cycle_count": self.cycle_count, | |
| "last_run": self.last_run, | |
| "circadian": { | |
| "current_phase": phase.value, | |
| "phase_name": phase_config["name"], | |
| "phase_description": phase_config["description"], | |
| "priority": phase_config["priority"], | |
| "current_tasks": phase_config["tasks"], | |
| }, | |
| "watchlist": self.market_watcher.watchlist, | |
| "news_pulse": self.news_pulse.get_status(), | |
| "signal_queue": self.signal_queue.get_stats(), | |
| "dream_processor": self.dream_processor.get_status(), | |
| "curiosity_engine": self.curiosity.get_status(), | |
| "last_dream": self.last_dream, | |
| "last_curiosity_cycle": self.last_curiosity_cycle, | |
| "pending_thoughts": self._pending_thoughts[:10], | |
| } | |