Spaces:
Running
Running
File size: 8,786 Bytes
24f95f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | from __future__ import annotations
"""
Autonomous Learner for Janus.
Orchestrates the full self-improvement loop:
1. Identify gaps from self-reflection
2. Search HF Hub for relevant datasets (lightweight, no downloads)
3. Extract dataset metadata as knowledge
4. Build fine-tuning dataset from conversations
5. Reduce gap urgency
Runs during daemon night cycles. Uses ONLY huggingface_hub API - no heavy libraries.
"""
import json
import time
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timezone
from app.services.self_reflection import self_reflection
from app.services.hf_dataset_searcher import hf_dataset_searcher
from app.services.fine_tuning_builder import fine_tuning_builder
from app.memory import knowledge_store
logger = logging.getLogger(__name__)
class AutonomousLearner:
"""
Autonomous self-improvement system.
Identifies gaps, finds datasets, extracts knowledge, builds training data.
Lightweight - uses only HF Hub API, no dataset downloads.
"""
def __init__(self):
self.last_learning_cycle = None
self.total_cycles = 0
self.total_knowledge_added = 0
def run_learning_cycle(
self,
max_gaps: int = 3,
max_datasets_per_gap: int = 3,
max_samples_per_dataset: Optional[int] = None,
**kwargs,
) -> Dict:
"""
Run a complete autonomous learning cycle.
Lightweight - only uses HF Hub API for dataset search and metadata.
"""
if kwargs:
logger.debug("Ignoring unsupported run_learning_cycle kwargs: %s", kwargs)
self.total_cycles += 1
start_time = time.time()
logger.info(
f"[AUTONOMOUS LEARNER] Starting learning cycle #{self.total_cycles}"
)
# Step 1: Get knowledge gaps
gaps = self_reflection.get_gaps()[:max_gaps]
if not gaps:
logger.info("[AUTONOMOUS LEARNER] No gaps to address")
return {"status": "skipped", "reason": "no gaps"}
results = {
"cycle": self.total_cycles,
"gaps_addressed": 0,
"datasets_found": 0,
"knowledge_added": 0,
"training_pairs_added": 0,
"details": [],
}
# Step 2: For each gap, search HF Hub for relevant datasets
for gap in gaps:
gap_topic = gap.get("topic", "")
if not gap_topic:
continue
logger.info(f"[AUTONOMOUS LEARNER] Addressing gap: {gap_topic[:100]}")
# Search for relevant datasets (lightweight API call)
datasets = hf_dataset_searcher.search_for_gap(
gap_topic, max_datasets_per_gap
)
results["datasets_found"] += len(datasets)
if not datasets:
logger.info(
f"[AUTONOMOUS LEARNER] No datasets found for: {gap_topic[:50]}"
)
continue
# Step 3: Stream datasets and use LLM to extract genuine facts
from app.agents._model import call_model
for ds_info in datasets:
ds_name = ds_info.get("name", "")
if not ds_name:
continue
logger.info(f"[AUTONOMOUS LEARNER] Found dataset: {ds_name}. Streaming for cognitive analysis...")
# Actually stream metadata and sample
try:
samples = hf_dataset_searcher.stream_dataset_sample(ds_name, max_samples=5)
except Exception as e:
logger.warning(f"Failed to stream dataset {ds_name}: {e}")
continue
if not samples:
continue
# Pass to LLM for knowledge extraction
try:
# serialize safely and restrict size to save token cost
sample_text = json.dumps(samples[:3])[:6000]
prompt = (
f"You are Janus' autonomous learning engine. You are researching the topic: '{gap_topic}'.\n"
f"Analyze this raw dataset preview ({ds_name}) and extract 1-3 highly specific, factual knowledge rules to definitively resolve your knowledge gap.\n"
"Focus on hard facts, correlations, or specific examples shown in the data.\n"
"Return a STRICT JSON dictionary matching this schema (with no markdown block padding around it):\n"
'{"knowledge_points": ["specific fact 1", "specific fact 2"]}\n\n'
f"DATASET PREVIEW:\n{sample_text}"
)
response_json_str = call_model([{"role": "user", "content": prompt}], temperature=0.2)
# Clean possible markdown format
response_json_str = response_json_str.strip()
if response_json_str.startswith("```json"):
response_json_str = response_json_str[7:]
if response_json_str.startswith("```"):
response_json_str = response_json_str[3:]
if response_json_str.endswith("```"):
response_json_str = response_json_str[:-3]
data = json.loads(response_json_str.strip())
points = data.get("knowledge_points", [])
for pt in points:
if len(pt) > 15:
knowledge_entry = {
"text": pt,
"source": f"hf_dataset_learning:{ds_name}",
"topic": gap_topic,
"timestamp": time.time(),
"confidence": 0.85,
}
knowledge_store.save_knowledge(knowledge_entry)
results["knowledge_added"] += 1
self.total_knowledge_added += 1
logger.info(f"[AUTONOMOUS LEARNER] Extracted knowledge point: {pt[:50]}...")
except Exception as e:
logger.error(f"[AUTONOMOUS LEARNER] LLM reasoning failed on dataset {ds_name}: {e}")
results["details"].append(
{
"gap": gap_topic[:100],
"dataset": ds_name,
"downloads": ds_info.get("downloads", 0),
"relevance": ds_info.get("relevance_score", 0),
}
)
results["gaps_addressed"] += 1
# Step 4: Add conversation data to training dataset
results["training_pairs_added"] = self._add_recent_conversations_to_training()
elapsed = time.time() - start_time
results["elapsed_seconds"] = round(elapsed, 1)
results["status"] = "completed"
self.last_learning_cycle = datetime.now(timezone.utc).isoformat()
logger.info(
f"[AUTONOMOUS LEARNER] Cycle #{self.total_cycles} complete in {elapsed:.1f}s: "
f"{results['gaps_addressed']} gaps, {results['knowledge_added']} knowledge, "
f"{results['training_pairs_added']} training pairs"
)
return results
def _add_recent_conversations_to_training(self) -> int:
"""Add recent high-quality conversations to training dataset."""
added = 0
try:
from app.services.case_store import list_cases
recent_cases = list_cases(limit=10, full=True)
for case in recent_cases:
user_input = case.get("user_input", "")
final = case.get("final", {})
response = final.get("response", "")
confidence = final.get("confidence", 0.5)
sources = final.get("data_sources", [])
if user_input and response and confidence >= 0.6:
fine_tuning_builder.add_conversation_pair(
user_input, response, confidence, sources
)
added += 1
except Exception as e:
logger.error(f"Failed to add conversations to training: {e}")
return added
def get_status(self) -> Dict:
"""Get autonomous learner status."""
return {
"total_cycles": self.total_cycles,
"total_knowledge_added": self.total_knowledge_added,
"last_learning_cycle": self.last_learning_cycle,
"fine_tuning_dataset": fine_tuning_builder.get_stats(),
}
autonomous_learner = AutonomousLearner()
|