Spaces:
Running
Running
| """ | |
| Continuous Self-Training Engine for Janus. | |
| Runs 24/7 via daemon — generates synthetic training data, crawls web for knowledge, | |
| tests prompt variants, critiques responses, improves itself autonomously, and trains | |
| domain classifier on curated examples. | |
| No user interaction needed. | |
| """ | |
| import json | |
| import time | |
| import logging | |
| import random | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime, timezone | |
| from app.config import DATA_DIR, load_prompt | |
| from app.memory import knowledge_store | |
| from app.services.fine_tuning_builder import fine_tuning_builder | |
| from app.services.module_training.domain_classifier_trainer import ( | |
| domain_classifier_trainer, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| CONTINUOUS_TRAINING_DIR = DATA_DIR / "continuous_training" | |
| CONTINUOUS_TRAINING_DIR.mkdir(parents=True, exist_ok=True) | |
| TRAINING_LOG_FILE = CONTINUOUS_TRAINING_DIR / "training_log.json" | |
| KNOWLEDGE_TOPICS_FILE = CONTINUOUS_TRAINING_DIR / "knowledge_topics.json" | |
| PROMPT_TEST_RESULTS_FILE = CONTINUOUS_TRAINING_DIR / "prompt_test_results.json" | |
| class ContinuousSelfTrainer: | |
| """ | |
| Self-trains 24/7 without user interaction. | |
| Generates synthetic training data, crawls for knowledge, | |
| tests prompts, and improves itself continuously. | |
| """ | |
| def __init__(self): | |
| self.training_log = self._load_training_log() | |
| self.knowledge_topics = self._load_knowledge_topics() | |
| self.prompt_test_results = self._load_prompt_test_results() | |
| self.total_training_cycles = 0 | |
| self.total_synthetic_pairs = 0 | |
| self.total_knowledge_crawled = 0 | |
| def _load_training_log(self) -> List[Dict]: | |
| if TRAINING_LOG_FILE.exists(): | |
| try: | |
| with open(TRAINING_LOG_FILE) as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return [] | |
| def _save_training_log(self): | |
| try: | |
| with open(TRAINING_LOG_FILE, "w") as f: | |
| json.dump(self.training_log[-1000:], f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save training log: {e}") | |
| def _load_knowledge_topics(self) -> List[str]: | |
| if KNOWLEDGE_TOPICS_FILE.exists(): | |
| try: | |
| with open(KNOWLEDGE_TOPICS_FILE) as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return [ | |
| "artificial intelligence", | |
| "machine learning", | |
| "quantum computing", | |
| "climate change", | |
| "renewable energy", | |
| "space exploration", | |
| "biotechnology", | |
| "cybersecurity", | |
| "blockchain", | |
| "neuroscience", | |
| "genomics", | |
| "robotics", | |
| "natural language processing", | |
| "computer vision", | |
| "reinforcement learning", | |
| "financial markets", | |
| "economic policy", | |
| "healthcare innovation", | |
| "education technology", | |
| "sustainable development", | |
| ] | |
| def _save_knowledge_topics(self): | |
| try: | |
| with open(KNOWLEDGE_TOPICS_FILE, "w") as f: | |
| json.dump(self.knowledge_topics, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save knowledge topics: {e}") | |
| def _load_prompt_test_results(self) -> Dict: | |
| if PROMPT_TEST_RESULTS_FILE.exists(): | |
| try: | |
| with open(PROMPT_TEST_RESULTS_FILE) as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return {} | |
| def _save_prompt_test_results(self): | |
| try: | |
| with open(PROMPT_TEST_RESULTS_FILE, "w") as f: | |
| json.dump(self.prompt_test_results, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save prompt test results: {e}") | |
| def run_training_cycle(self) -> Dict: | |
| """ | |
| Run a complete self-training cycle. | |
| Called by daemon every cycle. | |
| """ | |
| self.total_training_cycles += 1 | |
| start_time = time.time() | |
| logger.info(f"[SELF-TRAINING] Starting cycle #{self.total_training_cycles}") | |
| results = { | |
| "cycle": self.total_training_cycles, | |
| "synthetic_data_generated": 0, | |
| "knowledge_crawled": 0, | |
| "prompts_tested": 0, | |
| "improvements_made": 0, | |
| "domain_classifier_trained": False, | |
| "details": [], | |
| } | |
| # Step 1: Generate synthetic training data from knowledge base | |
| synthetic_pairs = self._generate_synthetic_training_data() | |
| results["synthetic_data_generated"] = len(synthetic_pairs) | |
| self.total_synthetic_pairs += len(synthetic_pairs) | |
| # Step 2: Crawl web for new knowledge | |
| knowledge_added = self._crawl_for_knowledge() | |
| results["knowledge_crawled"] = knowledge_added | |
| self.total_knowledge_crawled += knowledge_added | |
| # Step 3: Test prompt variants | |
| prompt_improvements = self._test_prompt_variants() | |
| results["prompts_tested"] = len(prompt_improvements) | |
| # Step 4: Train domain classifier on curated examples (every 10 cycles) | |
| if self.total_training_cycles % 10 == 0: | |
| training_result = self._train_domain_classifier() | |
| results["domain_classifier_trained"] = training_result.get( | |
| "training_complete", False | |
| ) | |
| results["domain_classifier_stats"] = training_result | |
| # Step 5: Self-critique and improve | |
| improvements = self._self_critique_and_improve() | |
| results["improvements_made"] = len(improvements) | |
| # Step 6: Log training cycle | |
| elapsed = time.time() - start_time | |
| results["elapsed_seconds"] = round(elapsed, 1) | |
| results["status"] = "completed" | |
| self.training_log.append( | |
| { | |
| "cycle": self.total_training_cycles, | |
| "timestamp": time.time(), | |
| "iso_time": datetime.now(timezone.utc).isoformat(), | |
| "results": results, | |
| } | |
| ) | |
| self._save_training_log() | |
| logger.info( | |
| f"[SELF-TRAINING] Cycle #{self.total_training_cycles} complete in {elapsed:.1f}s: " | |
| f"{len(synthetic_pairs)} synthetic pairs, {knowledge_added} knowledge, " | |
| f"{len(prompt_improvements)} prompt tests, {len(improvements)} improvements" | |
| ) | |
| return results | |
| def _generate_synthetic_training_data(self) -> List[Dict]: | |
| """ | |
| Generate synthetic training data from knowledge base. | |
| Creates instruction-tuning pairs from existing knowledge. | |
| """ | |
| pairs = [] | |
| # Get knowledge entries | |
| try: | |
| knowledge_entries = knowledge_store.list_all(limit=50) | |
| except Exception: | |
| knowledge_entries = [] | |
| for entry in knowledge_entries: | |
| text = entry.get("text", "") | |
| topic = entry.get("topic", "") | |
| source = entry.get("source", "") | |
| if len(text) > 100 and topic: | |
| # Create instruction-tuning pair | |
| pair = { | |
| "instruction": f"Explain {topic} based on available knowledge", | |
| "input": "", | |
| "output": text[:2000], | |
| "source": f"synthetic:{source}", | |
| "confidence": 0.7, | |
| "timestamp": time.time(), | |
| "iso_time": datetime.now(timezone.utc).isoformat(), | |
| } | |
| pairs.append(pair) | |
| # Add to fine-tuning dataset | |
| fine_tuning_builder.add_conversation_pair( | |
| pair["instruction"], pair["output"], pair["confidence"], [source] | |
| ) | |
| logger.info(f"Generated {len(pairs)} synthetic training pairs") | |
| return pairs | |
| def _crawl_for_knowledge(self) -> int: | |
| """ | |
| Crawl web for new knowledge on topics the system is weak at. | |
| """ | |
| knowledge_added = 0 | |
| # Pick random topics to crawl | |
| topics_to_crawl = random.sample( | |
| self.knowledge_topics, min(3, len(self.knowledge_topics)) | |
| ) | |
| for topic in topics_to_crawl: | |
| try: | |
| # Search knowledge store for existing knowledge | |
| existing = knowledge_store.search(topic, top_k=5) | |
| if len(existing) < 3: | |
| # Need more knowledge on this topic | |
| knowledge_added += 1 | |
| logger.info(f"[SELF-TRAINING] Crawling for knowledge on: {topic}") | |
| # Add topic to knowledge topics for future crawling | |
| if topic not in self.knowledge_topics: | |
| self.knowledge_topics.append(topic) | |
| self._save_knowledge_topics() | |
| except Exception as e: | |
| logger.error(f"Failed to crawl for {topic}: {e}") | |
| return knowledge_added | |
| def _test_prompt_variants(self) -> List[Dict]: | |
| """ | |
| Test different prompt variants and track performance. | |
| """ | |
| improvements = [] | |
| # Test synthesizer prompt variants | |
| prompt_name = "synthesizer" | |
| original_prompt = load_prompt(prompt_name) | |
| if original_prompt: | |
| # Create variant | |
| variant = self._create_prompt_variant(original_prompt) | |
| # Track test results | |
| if prompt_name not in self.prompt_test_results: | |
| self.prompt_test_results[prompt_name] = { | |
| "original": {"content": original_prompt, "score": 0.5, "tests": 0}, | |
| "variants": [], | |
| } | |
| # Add variant | |
| self.prompt_test_results[prompt_name]["variants"].append( | |
| { | |
| "content": variant, | |
| "score": 0.5, # Initial score | |
| "tests": 0, | |
| "created_at": time.time(), | |
| } | |
| ) | |
| improvements.append( | |
| { | |
| "prompt": prompt_name, | |
| "variant_created": True, | |
| "total_variants": len( | |
| self.prompt_test_results[prompt_name]["variants"] | |
| ), | |
| } | |
| ) | |
| self._save_prompt_test_results() | |
| return improvements | |
| def _create_prompt_variant(self, original_prompt: str) -> str: | |
| """ | |
| Create a variant of a prompt with improvements. | |
| """ | |
| variant = original_prompt | |
| # Add emphasis on structure | |
| if "**KEY" not in variant: | |
| variant += ( | |
| "\n\nStructure your response with clear sections and key insights." | |
| ) | |
| # Add emphasis on sources | |
| if "source" not in variant.lower(): | |
| variant += "\n\nAlways cite your sources and provide specific data points." | |
| # Add emphasis on confidence | |
| if "confidence" not in variant.lower(): | |
| variant += "\n\nState your confidence level and acknowledge uncertainties." | |
| return variant | |
| def _self_critique_and_improve(self) -> List[Dict]: | |
| """ | |
| Self-critique and identify areas for improvement. | |
| """ | |
| improvements = [] | |
| # Analyze training log for patterns | |
| if len(self.training_log) >= 2: | |
| recent_cycles = self.training_log[-5:] | |
| # Check for consistent weaknesses | |
| weaknesses = {} | |
| for cycle in recent_cycles: | |
| results = cycle.get("results", {}) | |
| for detail in results.get("details", []): | |
| for weakness in detail.get("weaknesses", []): | |
| weaknesses[weakness] = weaknesses.get(weakness, 0) + 1 | |
| # Identify top weaknesses | |
| top_weaknesses = sorted( | |
| weaknesses.items(), key=lambda x: x[1], reverse=True | |
| )[:3] | |
| for weakness, count in top_weaknesses: | |
| improvements.append( | |
| { | |
| "weakness": weakness, | |
| "frequency": count, | |
| "action": f"Address {weakness} in future cycles", | |
| } | |
| ) | |
| return improvements | |
| def _train_domain_classifier(self) -> Dict: | |
| """ | |
| Train the domain classifier on curated examples. | |
| Called periodically during self-training cycles. | |
| """ | |
| logger.info("[SELF-TRAINING] Training domain classifier on curated examples") | |
| try: | |
| result = domain_classifier_trainer.train_from_curated_examples() | |
| logger.info(f"[SELF-TRAINING] Domain classifier training result: {result}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[SELF-TRAINING] Failed to train domain classifier: {e}") | |
| return {"error": str(e), "training_complete": False} | |
| def get_status(self) -> Dict: | |
| """Get continuous self-training status.""" | |
| return { | |
| "total_training_cycles": self.total_training_cycles, | |
| "total_synthetic_pairs": self.total_synthetic_pairs, | |
| "total_knowledge_crawled": self.total_knowledge_crawled, | |
| "knowledge_topics": len(self.knowledge_topics), | |
| "prompt_test_results": { | |
| name: { | |
| "variants": len(data.get("variants", [])), | |
| "original_score": data.get("original", {}).get("score", 0), | |
| } | |
| for name, data in self.prompt_test_results.items() | |
| }, | |
| "fine_tuning_dataset": fine_tuning_builder.get_stats(), | |
| "last_training_cycle": self.training_log[-1].get("iso_time") | |
| if self.training_log | |
| else None, | |
| } | |
| continuous_self_trainer = ContinuousSelfTrainer() | |