#!/usr/bin/env python3 """ SOFIA Self-Improving Learning System Automatically monitors performance and improves embeddings through continuous learning """ import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import numpy as np import json import os from datetime import datetime, timedelta from typing import Dict, List, Tuple, Optional, Any import logging from collections import deque import threading import time logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class PerformanceMonitor: """ Monitors SOFIA's performance across different tasks and metrics """ def __init__(self, history_size: int = 1000): self.history_size = history_size self.performance_history = deque(maxlen=history_size) self.current_metrics = {} self.baseline_metrics = { 'mteb_score': 58.2, # Base MPNet score 'similarity_accuracy': 0.85, 'retrieval_precision': 0.75, 'response_time': 0.5 # seconds } def record_performance(self, task: str, metrics: Dict[str, float], timestamp: Optional[datetime] = None): """Record performance metrics for a specific task""" if timestamp is None: timestamp = datetime.now() record = { 'timestamp': timestamp.isoformat(), 'task': task, 'metrics': metrics, 'improvement': self._calculate_improvement(metrics) } self.performance_history.append(record) self.current_metrics[task] = metrics logger.info(f"Recorded performance for {task}: {metrics}") def _calculate_improvement(self, metrics: Dict[str, float]) -> Dict[str, float]: """Calculate improvement over baseline metrics""" improvements = {} for metric, value in metrics.items(): if metric in self.baseline_metrics: baseline = self.baseline_metrics[metric] if metric in ['response_time']: # Lower is better improvements[metric] = (baseline - value) / baseline else: # Higher is better improvements[metric] = (value - baseline) / baseline else: improvements[metric] = 0.0 # No baseline available return improvements def get_recent_performance(self, hours: int = 24) -> List[Dict]: """Get performance records from the last N hours""" cutoff = datetime.now() - timedelta(hours=hours) recent = [] for record in self.performance_history: record_time = datetime.fromisoformat(record['timestamp']) if record_time >= cutoff: recent.append(record) return recent def get_performance_trend(self, metric: str, hours: int = 24) -> Dict[str, Any]: """Analyze performance trend for a specific metric""" recent_records = self.get_recent_performance(hours) if not recent_records: return {'trend': 'insufficient_data', 'change': 0.0} values = [] timestamps = [] for record in recent_records: if metric in record['metrics']: values.append(record['metrics'][metric]) timestamps.append(datetime.fromisoformat(record['timestamp'])) if len(values) < 2: return {'trend': 'insufficient_data', 'change': 0.0} # Calculate trend start_value = values[0] end_value = values[-1] change = (end_value - start_value) / start_value if start_value != 0 else 0 if metric in ['response_time']: # Lower is better trend = 'improving' if change < -0.05 else 'stable' if abs(change) < 0.05 else 'degrading' else: # Higher is better trend = 'improving' if change > 0.05 else 'stable' if abs(change) < 0.05 else 'degrading' return { 'trend': trend, 'change': change, 'start_value': start_value, 'end_value': end_value, 'data_points': len(values) } def should_trigger_improvement(self, task: str) -> bool: """Determine if performance improvement should be triggered""" trend = self.get_performance_trend('mteb_score' if task == 'similarity' else 'similarity_accuracy') # Trigger improvement if performance is degrading or stable for too long return trend['trend'] == 'degrading' or ( trend['trend'] == 'stable' and trend['data_points'] > 10 ) class AdaptiveParameterTuner: """ Automatically tunes model parameters based on performance feedback """ def __init__(self, model, learning_rate_range: Tuple[float, float] = (1e-6, 1e-3), batch_size_range: Tuple[int, int] = (8, 64)): self.model = model self.lr_range = learning_rate_range self.batch_size_range = batch_size_range # Parameter history self.parameter_history = [] self.best_parameters = { 'learning_rate': 2e-5, 'batch_size': 32, 'performance': 0.0 } def tune_parameters(self, performance_metrics: Dict[str, float]) -> Dict[str, Any]: """Tune parameters based on current performance""" current_performance = performance_metrics.get('mteb_score', 0.0) # Simple parameter tuning strategy new_params = {} if current_performance > self.best_parameters['performance']: # Performance improved, try to optimize further new_params['learning_rate'] = min( self.best_parameters['learning_rate'] * 1.2, self.lr_range[1] ) new_params['batch_size'] = min( self.best_parameters['batch_size'] + 4, self.batch_size_range[1] ) else: # Performance degraded, try conservative approach new_params['learning_rate'] = max( self.best_parameters['learning_rate'] * 0.8, self.lr_range[0] ) new_params['batch_size'] = max( self.best_parameters['batch_size'] - 4, self.batch_size_range[0] ) # Update best parameters if performance improved if current_performance > self.best_parameters['performance']: self.best_parameters.update({ 'learning_rate': new_params['learning_rate'], 'batch_size': new_params['batch_size'], 'performance': current_performance }) # Record parameter change self.parameter_history.append({ 'timestamp': datetime.now().isoformat(), 'parameters': new_params.copy(), 'performance': current_performance }) return new_params class ContinuousLearner: """ Implements continuous learning capabilities for SOFIA """ def __init__(self, model, performance_monitor: PerformanceMonitor): self.model = model self.monitor = performance_monitor self.adaptive_tuner = AdaptiveParameterTuner(model) # Learning state self.is_learning = False self.learning_thread = None self.new_data_buffer = deque(maxlen=1000) # Buffer for new training data # Learning configuration self.learning_interval = 3600 # 1 hour in seconds self.min_data_points = 50 # Minimum data points for retraining def add_training_data(self, text_pairs: List[Tuple[str, str]], labels: Optional[List[float]] = None): """Add new training data to the buffer""" if labels is None: labels = [1.0] * len(text_pairs) # Default positive pairs for (text1, text2), label in zip(text_pairs, labels): self.new_data_buffer.append({ 'text1': text1, 'text2': text2, 'label': label, 'timestamp': datetime.now().isoformat() }) logger.info(f"Added {len(text_pairs)} new training examples") def start_continuous_learning(self): """Start the continuous learning process""" if self.is_learning: logger.warning("Continuous learning already running") return self.is_learning = True self.learning_thread = threading.Thread(target=self._continuous_learning_loop) self.learning_thread.daemon = True self.learning_thread.start() logger.info("Continuous learning started") def stop_continuous_learning(self): """Stop the continuous learning process""" self.is_learning = False if self.learning_thread: self.learning_thread.join(timeout=5) logger.info("Continuous learning stopped") def _continuous_learning_loop(self): """Main continuous learning loop""" while self.is_learning: try: # Check if we should trigger learning if self._should_trigger_learning(): self._perform_learning_update() # Wait before next check time.sleep(self.learning_interval) except Exception as e: logger.error(f"Error in continuous learning loop: {e}") time.sleep(60) # Wait 1 minute before retrying def _should_trigger_learning(self) -> bool: """Determine if learning update should be triggered""" # Check data availability if len(self.new_data_buffer) < self.min_data_points: return False # Check performance trends return self.monitor.should_trigger_improvement('similarity') def _perform_learning_update(self): """Perform a learning update with new data""" logger.info("Performing learning update...") try: # Prepare training data training_data = list(self.new_data_buffer) self.new_data_buffer.clear() # Clear buffer after use # Tune parameters based on recent performance recent_perf = self.monitor.get_recent_performance(1) # Last hour if recent_perf: latest_metrics = recent_perf[-1]['metrics'] new_params = self.adaptive_tuner.tune_parameters(latest_metrics) logger.info(f"Adapted parameters: {new_params}") # Perform fine-tuning (simplified version) self._fine_tune_on_new_data(training_data) # Record improvement self.monitor.record_performance( 'continuous_learning', {'data_points_used': len(training_data)}, datetime.now() ) logger.info("Learning update completed successfully") except Exception as e: logger.error(f"Learning update failed: {e}") def _fine_tune_on_new_data(self, training_data: List[Dict]): """Fine-tune the model on new data (simplified implementation)""" # This is a placeholder for actual fine-tuning logic # In a real implementation, this would: # 1. Prepare data loaders # 2. Set up optimizer with adapted parameters # 3. Perform gradient updates # 4. Validate improvements logger.info(f"Fine-tuning on {len(training_data)} examples") # Simulate fine-tuning process time.sleep(2) # Simulate training time # In a real implementation, you would: # - Convert training_data to tensors # - Create DataLoader # - Set up optimizer and loss function # - Perform training loop # - Save updated model logger.info("Fine-tuning simulation completed") class SelfImprovingSOFIA: """ Main interface for SOFIA's self-improving capabilities """ def __init__(self, model): self.model = model self.performance_monitor = PerformanceMonitor() self.continuous_learner = ContinuousLearner(model, self.performance_monitor) # Auto-save state self.state_file = "sofia_self_improvement_state.json" self.load_state() def start_self_improvement(self): """Start all self-improvement processes""" self.continuous_learner.start_continuous_learning() logger.info("SOFIA self-improvement system activated") def stop_self_improvement(self): """Stop all self-improvement processes""" self.continuous_learner.stop_continuous_learning() self.save_state() logger.info("SOFIA self-improvement system deactivated") def record_task_performance(self, task: str, metrics: Dict[str, float]): """Record performance metrics for monitoring""" self.performance_monitor.record_performance(task, metrics) def add_feedback_data(self, text_pairs: List[Tuple[str, str]], quality_scores: List[float]): """Add user feedback data for continuous learning""" self.continuous_learner.add_training_data(text_pairs, quality_scores) def get_system_status(self) -> Dict[str, Any]: """Get current status of the self-improvement system""" return { 'is_learning': self.continuous_learner.is_learning, 'buffered_data': len(self.continuous_learner.new_data_buffer), 'recent_performance': self.performance_monitor.get_recent_performance(1), 'performance_trends': { 'mteb_score': self.performance_monitor.get_performance_trend('mteb_score'), 'similarity_accuracy': self.performance_monitor.get_performance_trend('similarity_accuracy') } } def save_state(self): """Save the current state of the self-improvement system""" state = { 'performance_history': list(self.performance_monitor.performance_history), 'parameter_history': self.continuous_learner.adaptive_tuner.parameter_history, 'best_parameters': self.continuous_learner.adaptive_tuner.best_parameters, 'saved_at': datetime.now().isoformat() } with open(self.state_file, 'w') as f: json.dump(state, f, indent=2) logger.info(f"Self-improvement state saved to {self.state_file}") def load_state(self): """Load the saved state of the self-improvement system""" if os.path.exists(self.state_file): try: with open(self.state_file, 'r') as f: state = json.load(f) # Restore performance history self.performance_monitor.performance_history.extend(state.get('performance_history', [])) # Restore parameter history self.continuous_learner.adaptive_tuner.parameter_history = state.get('parameter_history', []) self.continuous_learner.adaptive_tuner.best_parameters = state.get('best_parameters', self.continuous_learner.adaptive_tuner.best_parameters) logger.info(f"Self-improvement state loaded from {self.state_file}") except Exception as e: logger.error(f"Failed to load state: {e}") # Example usage if __name__ == "__main__": # This would be integrated with the actual SOFIA model print("SOFIA Self-Improving System") print("This module provides continuous learning and performance monitoring capabilities") # Example of how it would be used: """ from sofia_model import SOFIAModel from sofia_self_improving import SelfImprovingSOFIA # Initialize SOFIA sofia_model = SOFIAModel() # Add self-improvement capabilities self_improving_sofia = SelfImprovingSOFIA(sofia_model) # Start self-improvement self_improving_sofia.start_self_improvement() # Record performance after tasks self_improving_sofia.record_task_performance('similarity', { 'mteb_score': 65.1, 'similarity_accuracy': 0.92 }) # Add user feedback for continuous learning feedback_pairs = [("hello world", "hi there"), ("machine learning", "AI algorithms")] quality_scores = [0.9, 0.8] self_improving_sofia.add_feedback_data(feedback_pairs, quality_scores) # Get system status status = self_improving_sofia.get_system_status() print("System status:", status) """