DevodG's picture
feat: stable janus intelligence with kaggle distillation
5f91e0b
from __future__ import annotations
import time
import json
import logging
import asyncio
from datetime import datetime
from pathlib import Path
from app.services.market_watcher import MarketWatcher
from app.services.news_pulse import NewsPulse
from app.services.event_detector import EventDetector
from app.services.signal_queue import SignalQueue
from app.services.circadian_rhythm import CircadianRhythm
from app.services.dream_processor import DreamCycleProcessor
from app.services.curiosity_engine import CuriosityEngine
from app.services.guardian_interceptor import guardian_interceptor
from app.config import DATA_DIR
logger = logging.getLogger(__name__)
PENDING_THOUGHTS_FILE = DATA_DIR / "daemon" / "pending_thoughts.json"
PENDING_THOUGHTS_FILE.parent.mkdir(parents=True, exist_ok=True)
class JanusDaemon:
def __init__(self):
self.market_watcher = MarketWatcher()
self.news_pulse = NewsPulse()
self.event_detector = EventDetector()
self.signal_queue = SignalQueue()
self.circadian = CircadianRhythm()
self.dream_processor = DreamCycleProcessor()
self.curiosity = CuriosityEngine()
self.cycle_count = 0
self.last_run = None
self.last_dream = None
self.last_curiosity_cycle = None
self._pending_thoughts = self._load_pending_thoughts()
self._clear_stale_pending_thoughts()
def _clear_stale_pending_thoughts(self):
"""Clear pending thoughts older than 24h and deduplicate on boot."""
try:
now = time.time()
fresh = [
t for t in self._pending_thoughts
if (now - t.get("created_at", now)) < 86400
]
seen = set()
deduped = []
for t in fresh:
text = t.get("thought", "").strip()
if text and text not in seen:
seen.add(text)
deduped.append(t)
kept = len(deduped)
self._pending_thoughts = deduped[:20]
self._save_pending_thoughts()
logger.info(f"[DAEMON] Boot: kept {kept} fresh pending thoughts (deduplicated)")
except Exception as e:
logger.warning(f"[DAEMON] Failed to clear stale thoughts: {e}")
def _load_pending_thoughts(self) -> list:
if PENDING_THOUGHTS_FILE.exists():
try:
with open(PENDING_THOUGHTS_FILE) as f:
return json.load(f)
except Exception:
pass
return []
def _save_pending_thoughts(self):
try:
self._pending_thoughts = self._pending_thoughts[:30]
with open(PENDING_THOUGHTS_FILE, "w") as f:
json.dump(self._pending_thoughts, f, indent=2)
except Exception as e:
logger.error(f"Failed to save pending thoughts: {e}")
def _generate_pending_thoughts(self, market_signals, news_signals, events):
"""Convert discoveries into natural thoughts the system wants to share."""
new_thoughts = []
for signal in market_signals[:3]:
ticker = signal.get("ticker", "")
change = signal.get("change_percent", 0)
if abs(change) > 2:
direction = "up" if change > 0 else "down"
new_thoughts.append(
{
"thought": f"{ticker} moved {abs(change):.1f}% {direction} — might be worth looking into",
"priority": min(abs(change) / 10, 1.0),
"created_at": time.time(),
"source": "market",
}
)
for signal in news_signals[:2]:
topic = signal.get("topic", "")
headline = signal.get("headline", "")
if topic and headline:
new_thoughts.append(
{
"thought": f"Something happening with {topic}: {headline[:100]}",
"priority": 0.4,
"created_at": time.time(),
"source": "news",
}
)
for event in events[:2]:
event_type = event.get("event_type", "")
description = event.get("description", "")
if event_type and description:
new_thoughts.append(
{
"thought": f"Detected a {event_type} event — {description[:100]}",
"priority": 0.6,
"created_at": time.time(),
"source": "event",
}
)
if self.last_dream:
insights = self.last_dream.get("insights", [])
for insight in insights[:1]:
new_thoughts.append(
{
"thought": f"I had a thought during my last dream cycle — {insight[:120]}",
"priority": 0.3,
"created_at": time.time(),
"source": "dream",
}
)
if self.last_curiosity_cycle:
discoveries = self.last_curiosity_cycle.get("discoveries", [])
for d in discoveries[:1]:
new_thoughts.append(
{
"thought": f"I found something interesting while exploring — {str(d)[:120]}",
"priority": 0.35,
"created_at": time.time(),
"source": "curiosity",
}
)
self._pending_thoughts.extend(new_thoughts)
self._pending_thoughts.sort(key=lambda x: x.get("priority", 0), reverse=True)
self._pending_thoughts = self._pending_thoughts[:30]
self._save_pending_thoughts()
return new_thoughts
async def run(self):
"""Main daemon loop — runs forever with circadian awareness."""
logger.info("=" * 60)
logger.info("JANUS DAEMON STARTED — Living Intelligence Engine")
logger.info(f"Watchlist: {self.market_watcher.watchlist}")
logger.info(f"Topics: {self.news_pulse.topics}")
logger.info(f"Circadian Phase: {self.circadian.get_current_phase().value}")
logger.info("=" * 60)
while True:
cycle_start = time.time()
self.cycle_count += 1
self.last_run = datetime.utcnow().isoformat()
phase = self.circadian.get_current_phase()
phase_config = self.circadian.get_phase_config(phase)
try:
logger.info(
f"[DAEMON] Cycle #{self.cycle_count} — Phase: {phase.value} ({phase_config['name']})"
)
market_signals = self.market_watcher.poll()
news_signals = self.news_pulse.fetch()
# ACTIVE GUARDIAN: Audit and Intervene on Scams
unfiltered_signals = market_signals + news_signals
all_signals, interventions = await guardian_interceptor.process_signals(unfiltered_signals)
if interventions:
logger.warning(f"[DAEMON] Guardian blocked {len(interventions)} high-risk scam signals!")
for inter in interventions:
# Prioritize intervention in pending thoughts
self._pending_thoughts.insert(0, {
"thought": f"🚨 GUARDIAN INTERVENTION: {inter['reason']}",
"priority": 1.0,
"created_at": time.time(),
"source": "guardian"
})
self.signal_queue.add_batch(all_signals)
events = self.event_detector.detect(all_signals)
new_thoughts = self._generate_pending_thoughts(
market_signals, news_signals, events
)
if new_thoughts:
logger.info(
f"[DAEMON] Generated {len(new_thoughts)} pending thoughts"
)
if phase.value == "night" or getattr(self, '_force_cycles', False):
force = getattr(self, '_force_cycles', False)
try:
dream_report = self.dream_processor.run_dream_cycle(force=force)
self.last_dream = dream_report
logger.info(
f"[DAEMON] Dream cycle: {dream_report.get('duration_seconds', 0):.1f}s — "
f"{len(dream_report.get('insights', []))} insights"
)
except Exception as e:
logger.error(f"[DAEMON] Dream cycle FAILED: {e}")
try:
curiosity_report = self.curiosity.run_curiosity_cycle(force=force)
self.last_curiosity_cycle = curiosity_report
logger.info(
f"[DAEMON] Curiosity cycle: {curiosity_report.get('duration_seconds', 0):.1f}s — "
f"{curiosity_report.get('total_discoveries', 0)} discoveries"
)
except Exception as e:
logger.error(f"[DAEMON] Curiosity cycle FAILED: {e}")
# Reset force flag after one run
if force:
self._force_cycles = False
# Self-reflection: analyze own performance, form opinions
try:
from app.services.self_reflection import self_reflection
from app.services.case_store import list_cases
recent_cases = list_cases(limit=20, full=True)
review = self_reflection.run_night_review(recent_cases)
logger.info(
f"[DAEMON] Self-review: {review.get('cases_reviewed', 0)} cases, "
f"{review.get('opinions_formed', 0)} opinions, "
f"learning_rate={review.get('learning_rate', 0)}"
)
# Generate pending thoughts from self-reflection
gaps = self_reflection.get_gaps()[:2]
for gap in gaps:
self._pending_thoughts.append(
{
"thought": f"I need to get better at {gap.get('topic', '')}{gap.get('reason', '')}",
"priority": gap.get("urgency", 0.5),
"created_at": time.time(),
"source": "self_reflection",
}
)
opinions = self_reflection.get_opinions()[:2]
for op in opinions:
if op.get("confidence", 0) > 0.7:
self._pending_thoughts.append(
{
"thought": f"I've formed a view on {op.get('topic', '')}: {op.get('statement', '')[:100]}",
"priority": op.get("confidence", 0.5) * 0.8,
"created_at": time.time(),
"source": "self_reflection",
}
)
self._pending_thoughts.sort(
key=lambda x: x.get("priority", 0), reverse=True
)
self._pending_thoughts = self._pending_thoughts[:30]
self._save_pending_thoughts()
except Exception as e:
logger.error(f"[DAEMON] Self-reflection failed: {e}")
# Autonomous learning: search HF datasets for gaps, extract knowledge
try:
from app.services.autonomous_learner import autonomous_learner
learning_result = autonomous_learner.run_learning_cycle(
max_gaps=2,
max_datasets_per_gap=2,
max_samples_per_dataset=30
)
logger.info(
f"[DAEMON] Autonomous learning: {learning_result.get('gaps_addressed', 0)} gaps, "
f"{learning_result.get('knowledge_added', 0)} knowledge, "
f"{learning_result.get('training_pairs_added', 0)} training pairs"
)
except Exception as e:
logger.error(f"[DAEMON] Autonomous learning failed: {e}")
# Continuous Training: generate synthetic datasets, test prompts, crawl internet
try:
from app.services.continuous_training import continuous_self_trainer
ct_result = continuous_self_trainer.run_training_cycle()
logger.info(
f"[DAEMON] Continuous Training: "
f"{ct_result.get('synthetic_data_generated', 0)} synthetic pairs, "
f"{ct_result.get('prompts_tested', 0)} prompts tested, "
f"{ct_result.get('improvements_made', 0)} optimizations"
)
except Exception as e:
logger.error(f"[DAEMON] Continuous training failed: {e}")
elapsed = time.time() - cycle_start
stats = self.signal_queue.get_stats()
logger.info(
f"[DAEMON] Cycle #{self.cycle_count} complete in {elapsed:.1f}s"
)
logger.info(
f"[DAEMON] Market signals: {len(market_signals)}, News signals: {len(news_signals)}, Events: {len(events)}"
)
logger.info(f"[DAEMON] Queue stats: {stats}")
except Exception as e:
logger.error(f"[DAEMON] Cycle #{self.cycle_count} failed: {e}")
sleep_time = phase_config.get("poll_interval", 900)
logger.info(f"[DAEMON] Sleeping for {sleep_time}s ({phase.value} phase)")
await asyncio.sleep(sleep_time)
def get_status(self) -> dict:
"""Get daemon status."""
phase = self.circadian.get_current_phase()
phase_config = self.circadian.get_phase_config(phase)
return {
"running": True,
"cycle_count": self.cycle_count,
"last_run": self.last_run,
"circadian": {
"current_phase": phase.value,
"phase_name": phase_config["name"],
"phase_description": phase_config["description"],
"priority": phase_config["priority"],
"current_tasks": phase_config["tasks"],
},
"watchlist": self.market_watcher.watchlist,
"news_pulse": self.news_pulse.get_status(),
"signal_queue": self.signal_queue.get_stats(),
"dream_processor": self.dream_processor.get_status(),
"curiosity_engine": self.curiosity.get_status(),
"last_dream": self.last_dream,
"last_curiosity_cycle": self.last_curiosity_cycle,
"pending_thoughts": self._pending_thoughts[:10],
}