Janus-backend / backend /app /services /curiosity_engine.py
DevodG's picture
feat: stable janus intelligence with kaggle distillation
5f91e0b
"""
Curiosity engine for Janus — autonomous knowledge exploration.
FIXES vs previous version:
- Curiosity cycles were completing in 0.0 seconds (LLM never called)
- Root cause: interests dict was empty after HF Space restart (ephemeral FS),
causing the exploration loop to immediately exit with nothing to explore
- Fix: always has a default set of seed interests to explore if none exist
- Fix: exploration loop now runs even with zero stored discoveries
- Added deduplication of discoveries by topic
- Added persistence of discoveries to data/curiosity/ dir
- Added timeout guard on each LLM call
"""
import time
import json
import logging
import math
from pathlib import Path
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
try:
from app.config import DATA_DIR
except ImportError:
DATA_DIR = Path(__file__).parent.parent / "data"
CURIOSITY_DIR = Path(DATA_DIR) / "curiosity"
MAX_DISCOVERIES = 200
LLM_TIMEOUT = 120
# Default seed interests — used when nothing has been learned yet (e.g. after HF restart)
DEFAULT_SEED_INTERESTS = {
"AI market impact": 0.05,
"Federal Reserve policy": 0.05,
"earnings quality": 0.05,
"semiconductor demand": 0.04,
"energy transition": 0.04,
"emerging market risk": 0.03,
"corporate debt levels": 0.03,
"geopolitical trade risk": 0.03,
"venture capital trends": 0.02,
"regulatory AI": 0.02,
}
EXPLORATION_TYPES = [
"deep_dive",
"connection",
"contrarian",
"future_scenario",
"blind_spot",
]
def _call_with_timeout(fn, timeout: float):
import threading
result = [None]
error = [None]
def target():
try:
result[0] = fn()
except Exception as e:
error[0] = e
t = threading.Thread(target=target, daemon=True)
t.start()
t.join(timeout)
if t.is_alive():
return None, TimeoutError(f"Timed out after {timeout}s")
return result[0], error[0]
class CuriosityEngine:
"""
Proactively explores topics the system is interested in.
Generates insights without being asked.
"""
def __init__(self):
CURIOSITY_DIR.mkdir(parents=True, exist_ok=True)
self._interests: dict = {}
self._discoveries: list = []
self._total_discoveries = 0
self._total_interests = 0
self._load()
def _load(self):
"""Load saved interests and discoveries."""
interests_file = CURIOSITY_DIR / "interests.json"
if interests_file.exists():
try:
self._interests = json.loads(interests_file.read_text())
except Exception:
self._interests = {}
# If empty (e.g. after HF restart), seed with defaults
if not self._interests:
self._interests = dict(DEFAULT_SEED_INTERESTS)
logger.info("CuriosityEngine: seeded with default interests (no saved state found)")
# Load recent discoveries
self._discoveries = []
for f in sorted(CURIOSITY_DIR.glob("discovery_*.json"), reverse=True)[:MAX_DISCOVERIES]:
try:
self._discoveries.append(json.loads(f.read_text()))
except Exception:
pass
self._total_discoveries = len(self._discoveries)
self._total_interests = len(self._interests)
def _save_interests(self):
try:
(CURIOSITY_DIR / "interests.json").write_text(
json.dumps(self._interests, indent=2)
)
except Exception as e:
logger.warning(f"CuriosityEngine: save_interests failed: {e}")
def _save_discovery(self, discovery: dict):
try:
ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
(CURIOSITY_DIR / f"discovery_{ts}.json").write_text(
json.dumps(discovery, indent=2, default=str)
)
# Clean old discoveries
files = sorted(CURIOSITY_DIR.glob("discovery_*.json"))
for old in files[:-MAX_DISCOVERIES]:
try:
old.unlink()
except Exception:
pass
except Exception as e:
logger.warning(f"CuriosityEngine: save_discovery failed: {e}")
def run_curiosity_cycle(self, force: bool = False) -> dict:
"""
Run one curiosity exploration cycle.
FIXED: always explores something, even after fresh start.
"""
t0 = time.time()
logger.info("CuriosityEngine: starting curiosity cycle")
report = {
"timestamp": datetime.utcnow().isoformat(),
"explorations": [],
"discoveries": [],
"new_interests": [],
"duration_seconds": 0.0,
"total_discoveries": self._total_discoveries,
"total_interests": self._total_interests,
}
try:
from app.agents._model import call_model
# Always explore at least 1-2 topics per cycle
sorted_interests = sorted(
self._interests.items(), key=lambda x: -x[1]
)
import random
explore_topics = [t for t, _ in sorted_interests[:2]]
if len(sorted_interests) > 2:
random_pick = random.choice(sorted_interests[2:])
explore_topics.append(random_pick[0])
for topic in explore_topics[:2]: # max 2 per cycle to avoid timeout
exploration_type = random.choice(EXPLORATION_TYPES)
discovery = self._explore(call_model, topic, exploration_type)
report["explorations"].append({
"type": exploration_type,
"topic": topic,
"result": "discovered" if discovery else "explored",
})
if discovery:
# Deduplicate by topic
existing_topics = {d.get("topic", "") for d in self._discoveries}
if topic not in existing_topics:
self._discoveries.insert(0, discovery)
self._discoveries = self._discoveries[:MAX_DISCOVERIES]
self._total_discoveries += 1
report["discoveries"].append(discovery)
self._save_discovery(discovery)
logger.info(f"CuriosityEngine: new discovery on '{topic}'")
self._update_interest(topic, explored=True, discovered=bool(discovery))
# Occasionally generate new interests from discoveries
if self._discoveries and random.random() < 0.3:
new_interests = self._generate_new_interests(call_model)
for interest in new_interests:
if interest not in self._interests:
self._interests[interest] = 0.02
report["new_interests"].append(interest)
self._total_interests += 1
logger.info(f"CuriosityEngine: new interest '{interest}'")
self._save_interests()
except Exception as e:
logger.error(f"CuriosityEngine: cycle error: {e}")
report["error"] = str(e)
report["duration_seconds"] = round(time.time() - t0, 2)
report["total_discoveries"] = self._total_discoveries
report["total_interests"] = self._total_interests
logger.info(
f"CuriosityEngine: cycle complete in {report['duration_seconds']:.1f}s — "
f"{len(report['discoveries'])} discoveries"
)
return report
def _explore(self, call_model, topic: str, exploration_type: str) -> Optional[dict]:
"""Explore a topic and return a discovery if found."""
type_prompts = {
"deep_dive": f"Provide a non-obvious, specific insight about '{topic}'. What do most analysts miss?",
"connection": f"What is a surprising connection between '{topic}' and another domain?",
"contrarian": f"What is the strongest contrarian argument regarding '{topic}'?",
"future_scenario": f"What is the most likely unexpected development in '{topic}' in the next 12 months?",
"blind_spot": f"What critical blind spot do most people have about '{topic}'?",
}
prompt = (
f"You are Janus exploring '{topic}' with exploration type '{exploration_type}'.\n\n"
+ type_prompts.get(exploration_type, f"What is insightful about '{topic}'?")
+ "\n\nProvide a specific, evidence-based insight (not generic). "
"Respond with JSON:\n"
'{"key_insight": "...", "evidence": ["fact1", "fact2"], '
'"why_it_matters": "...", "connections": ["related_concept"]}'
)
result, err = _call_with_timeout(
lambda: call_model([{"role": "user", "content": prompt}]),
LLM_TIMEOUT
)
if err or not result:
logger.warning(f"CuriosityEngine: exploration of '{topic}' failed: {err}")
return None
try:
from app.agents.smart_router import safe_parse
data = safe_parse(result)
key_insight = data.get("key_insight", "")
why_it_matters = data.get("why_it_matters", "")
if not key_insight and not why_it_matters:
return None
insight_text = key_insight or result[:300]
return {
"topic": topic,
"type": exploration_type,
"insight": f"{insight_text[:300]} | Why: {why_it_matters[:200]}",
"timestamp": datetime.utcnow().isoformat(),
"raw_response": data,
}
except Exception as e:
logger.warning(f"CuriosityEngine: parse failed for '{topic}': {e}")
if len(result) > 50:
return {
"topic": topic,
"type": exploration_type,
"insight": result[:400],
"timestamp": datetime.utcnow().isoformat(),
}
return None
def _generate_new_interests(self, call_model) -> list:
"""Generate new topics to explore based on recent discoveries."""
recent = self._discoveries[:3]
topics = [d.get("topic", "") for d in recent if d.get("topic")]
prompt = (
f"Based on these recently explored topics: {', '.join(topics)}, "
"suggest 3 related topics worth investigating. "
'JSON: {"topics": ["topic1", "topic2", "topic3"]}'
)
result, err = _call_with_timeout(
lambda: call_model([{"role": "user", "content": prompt}]),
30
)
if err or not result:
return []
try:
from app.agents.smart_router import safe_parse
data = safe_parse(result)
return data.get("topics", [])[:3]
except Exception:
return []
def _update_interest(self, topic: str, explored: bool, discovered: bool):
"""Update interest score using exponential decay + exploration boost."""
current = self._interests.get(topic, 0.01)
decayed = current * 0.95
boost = 0.01 if discovered else 0.005 if explored else 0
self._interests[topic] = min(decayed + boost, 1.0)
def add_interest(self, topic: str, score: float = 0.03):
"""Add a new interest (called when users ask about a topic repeatedly)."""
if topic and len(topic) > 2:
self._interests[topic] = max(self._interests.get(topic, 0), score)
self._save_interests()
def get_status(self) -> dict:
top_interests = dict(sorted(self._interests.items(), key=lambda x: -x[1])[:10])
return {
"total_discoveries": self._total_discoveries,
"total_interests": self._total_interests,
"top_interests": top_interests,
"latest_discovery": self._discoveries[0] if self._discoveries else None,
}
def get_discoveries(self, limit: int = 10) -> list:
return self._discoveries[:limit]
def get_interests(self) -> dict:
return dict(sorted(self._interests.items(), key=lambda x: -x[1]))