SOFIA-v2-agi / sofia_self_improving.py
MaliosDark's picture
Add AGI module: sofia_self_improving.py
56457e6 verified
#!/usr/bin/env python3
"""
SOFIA Self-Improving Learning System
Automatically monitors performance and improves embeddings through continuous learning
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
import logging
from collections import deque
import threading
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PerformanceMonitor:
"""
Monitors SOFIA's performance across different tasks and metrics
"""
def __init__(self, history_size: int = 1000):
self.history_size = history_size
self.performance_history = deque(maxlen=history_size)
self.current_metrics = {}
self.baseline_metrics = {
'mteb_score': 58.2, # Base MPNet score
'similarity_accuracy': 0.85,
'retrieval_precision': 0.75,
'response_time': 0.5 # seconds
}
def record_performance(self, task: str, metrics: Dict[str, float], timestamp: Optional[datetime] = None):
"""Record performance metrics for a specific task"""
if timestamp is None:
timestamp = datetime.now()
record = {
'timestamp': timestamp.isoformat(),
'task': task,
'metrics': metrics,
'improvement': self._calculate_improvement(metrics)
}
self.performance_history.append(record)
self.current_metrics[task] = metrics
logger.info(f"Recorded performance for {task}: {metrics}")
def _calculate_improvement(self, metrics: Dict[str, float]) -> Dict[str, float]:
"""Calculate improvement over baseline metrics"""
improvements = {}
for metric, value in metrics.items():
if metric in self.baseline_metrics:
baseline = self.baseline_metrics[metric]
if metric in ['response_time']: # Lower is better
improvements[metric] = (baseline - value) / baseline
else: # Higher is better
improvements[metric] = (value - baseline) / baseline
else:
improvements[metric] = 0.0 # No baseline available
return improvements
def get_recent_performance(self, hours: int = 24) -> List[Dict]:
"""Get performance records from the last N hours"""
cutoff = datetime.now() - timedelta(hours=hours)
recent = []
for record in self.performance_history:
record_time = datetime.fromisoformat(record['timestamp'])
if record_time >= cutoff:
recent.append(record)
return recent
def get_performance_trend(self, metric: str, hours: int = 24) -> Dict[str, Any]:
"""Analyze performance trend for a specific metric"""
recent_records = self.get_recent_performance(hours)
if not recent_records:
return {'trend': 'insufficient_data', 'change': 0.0}
values = []
timestamps = []
for record in recent_records:
if metric in record['metrics']:
values.append(record['metrics'][metric])
timestamps.append(datetime.fromisoformat(record['timestamp']))
if len(values) < 2:
return {'trend': 'insufficient_data', 'change': 0.0}
# Calculate trend
start_value = values[0]
end_value = values[-1]
change = (end_value - start_value) / start_value if start_value != 0 else 0
if metric in ['response_time']: # Lower is better
trend = 'improving' if change < -0.05 else 'stable' if abs(change) < 0.05 else 'degrading'
else: # Higher is better
trend = 'improving' if change > 0.05 else 'stable' if abs(change) < 0.05 else 'degrading'
return {
'trend': trend,
'change': change,
'start_value': start_value,
'end_value': end_value,
'data_points': len(values)
}
def should_trigger_improvement(self, task: str) -> bool:
"""Determine if performance improvement should be triggered"""
trend = self.get_performance_trend('mteb_score' if task == 'similarity' else 'similarity_accuracy')
# Trigger improvement if performance is degrading or stable for too long
return trend['trend'] == 'degrading' or (
trend['trend'] == 'stable' and trend['data_points'] > 10
)
class AdaptiveParameterTuner:
"""
Automatically tunes model parameters based on performance feedback
"""
def __init__(self, model, learning_rate_range: Tuple[float, float] = (1e-6, 1e-3),
batch_size_range: Tuple[int, int] = (8, 64)):
self.model = model
self.lr_range = learning_rate_range
self.batch_size_range = batch_size_range
# Parameter history
self.parameter_history = []
self.best_parameters = {
'learning_rate': 2e-5,
'batch_size': 32,
'performance': 0.0
}
def tune_parameters(self, performance_metrics: Dict[str, float]) -> Dict[str, Any]:
"""Tune parameters based on current performance"""
current_performance = performance_metrics.get('mteb_score', 0.0)
# Simple parameter tuning strategy
new_params = {}
if current_performance > self.best_parameters['performance']:
# Performance improved, try to optimize further
new_params['learning_rate'] = min(
self.best_parameters['learning_rate'] * 1.2,
self.lr_range[1]
)
new_params['batch_size'] = min(
self.best_parameters['batch_size'] + 4,
self.batch_size_range[1]
)
else:
# Performance degraded, try conservative approach
new_params['learning_rate'] = max(
self.best_parameters['learning_rate'] * 0.8,
self.lr_range[0]
)
new_params['batch_size'] = max(
self.best_parameters['batch_size'] - 4,
self.batch_size_range[0]
)
# Update best parameters if performance improved
if current_performance > self.best_parameters['performance']:
self.best_parameters.update({
'learning_rate': new_params['learning_rate'],
'batch_size': new_params['batch_size'],
'performance': current_performance
})
# Record parameter change
self.parameter_history.append({
'timestamp': datetime.now().isoformat(),
'parameters': new_params.copy(),
'performance': current_performance
})
return new_params
class ContinuousLearner:
"""
Implements continuous learning capabilities for SOFIA
"""
def __init__(self, model, performance_monitor: PerformanceMonitor):
self.model = model
self.monitor = performance_monitor
self.adaptive_tuner = AdaptiveParameterTuner(model)
# Learning state
self.is_learning = False
self.learning_thread = None
self.new_data_buffer = deque(maxlen=1000) # Buffer for new training data
# Learning configuration
self.learning_interval = 3600 # 1 hour in seconds
self.min_data_points = 50 # Minimum data points for retraining
def add_training_data(self, text_pairs: List[Tuple[str, str]], labels: Optional[List[float]] = None):
"""Add new training data to the buffer"""
if labels is None:
labels = [1.0] * len(text_pairs) # Default positive pairs
for (text1, text2), label in zip(text_pairs, labels):
self.new_data_buffer.append({
'text1': text1,
'text2': text2,
'label': label,
'timestamp': datetime.now().isoformat()
})
logger.info(f"Added {len(text_pairs)} new training examples")
def start_continuous_learning(self):
"""Start the continuous learning process"""
if self.is_learning:
logger.warning("Continuous learning already running")
return
self.is_learning = True
self.learning_thread = threading.Thread(target=self._continuous_learning_loop)
self.learning_thread.daemon = True
self.learning_thread.start()
logger.info("Continuous learning started")
def stop_continuous_learning(self):
"""Stop the continuous learning process"""
self.is_learning = False
if self.learning_thread:
self.learning_thread.join(timeout=5)
logger.info("Continuous learning stopped")
def _continuous_learning_loop(self):
"""Main continuous learning loop"""
while self.is_learning:
try:
# Check if we should trigger learning
if self._should_trigger_learning():
self._perform_learning_update()
# Wait before next check
time.sleep(self.learning_interval)
except Exception as e:
logger.error(f"Error in continuous learning loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
def _should_trigger_learning(self) -> bool:
"""Determine if learning update should be triggered"""
# Check data availability
if len(self.new_data_buffer) < self.min_data_points:
return False
# Check performance trends
return self.monitor.should_trigger_improvement('similarity')
def _perform_learning_update(self):
"""Perform a learning update with new data"""
logger.info("Performing learning update...")
try:
# Prepare training data
training_data = list(self.new_data_buffer)
self.new_data_buffer.clear() # Clear buffer after use
# Tune parameters based on recent performance
recent_perf = self.monitor.get_recent_performance(1) # Last hour
if recent_perf:
latest_metrics = recent_perf[-1]['metrics']
new_params = self.adaptive_tuner.tune_parameters(latest_metrics)
logger.info(f"Adapted parameters: {new_params}")
# Perform fine-tuning (simplified version)
self._fine_tune_on_new_data(training_data)
# Record improvement
self.monitor.record_performance(
'continuous_learning',
{'data_points_used': len(training_data)},
datetime.now()
)
logger.info("Learning update completed successfully")
except Exception as e:
logger.error(f"Learning update failed: {e}")
def _fine_tune_on_new_data(self, training_data: List[Dict]):
"""Fine-tune the model on new data (simplified implementation)"""
# This is a placeholder for actual fine-tuning logic
# In a real implementation, this would:
# 1. Prepare data loaders
# 2. Set up optimizer with adapted parameters
# 3. Perform gradient updates
# 4. Validate improvements
logger.info(f"Fine-tuning on {len(training_data)} examples")
# Simulate fine-tuning process
time.sleep(2) # Simulate training time
# In a real implementation, you would:
# - Convert training_data to tensors
# - Create DataLoader
# - Set up optimizer and loss function
# - Perform training loop
# - Save updated model
logger.info("Fine-tuning simulation completed")
class SelfImprovingSOFIA:
"""
Main interface for SOFIA's self-improving capabilities
"""
def __init__(self, model):
self.model = model
self.performance_monitor = PerformanceMonitor()
self.continuous_learner = ContinuousLearner(model, self.performance_monitor)
# Auto-save state
self.state_file = "sofia_self_improvement_state.json"
self.load_state()
def start_self_improvement(self):
"""Start all self-improvement processes"""
self.continuous_learner.start_continuous_learning()
logger.info("SOFIA self-improvement system activated")
def stop_self_improvement(self):
"""Stop all self-improvement processes"""
self.continuous_learner.stop_continuous_learning()
self.save_state()
logger.info("SOFIA self-improvement system deactivated")
def record_task_performance(self, task: str, metrics: Dict[str, float]):
"""Record performance metrics for monitoring"""
self.performance_monitor.record_performance(task, metrics)
def add_feedback_data(self, text_pairs: List[Tuple[str, str]], quality_scores: List[float]):
"""Add user feedback data for continuous learning"""
self.continuous_learner.add_training_data(text_pairs, quality_scores)
def get_system_status(self) -> Dict[str, Any]:
"""Get current status of the self-improvement system"""
return {
'is_learning': self.continuous_learner.is_learning,
'buffered_data': len(self.continuous_learner.new_data_buffer),
'recent_performance': self.performance_monitor.get_recent_performance(1),
'performance_trends': {
'mteb_score': self.performance_monitor.get_performance_trend('mteb_score'),
'similarity_accuracy': self.performance_monitor.get_performance_trend('similarity_accuracy')
}
}
def save_state(self):
"""Save the current state of the self-improvement system"""
state = {
'performance_history': list(self.performance_monitor.performance_history),
'parameter_history': self.continuous_learner.adaptive_tuner.parameter_history,
'best_parameters': self.continuous_learner.adaptive_tuner.best_parameters,
'saved_at': datetime.now().isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(state, f, indent=2)
logger.info(f"Self-improvement state saved to {self.state_file}")
def load_state(self):
"""Load the saved state of the self-improvement system"""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r') as f:
state = json.load(f)
# Restore performance history
self.performance_monitor.performance_history.extend(state.get('performance_history', []))
# Restore parameter history
self.continuous_learner.adaptive_tuner.parameter_history = state.get('parameter_history', [])
self.continuous_learner.adaptive_tuner.best_parameters = state.get('best_parameters', self.continuous_learner.adaptive_tuner.best_parameters)
logger.info(f"Self-improvement state loaded from {self.state_file}")
except Exception as e:
logger.error(f"Failed to load state: {e}")
# Example usage
if __name__ == "__main__":
# This would be integrated with the actual SOFIA model
print("SOFIA Self-Improving System")
print("This module provides continuous learning and performance monitoring capabilities")
# Example of how it would be used:
"""
from sofia_model import SOFIAModel
from sofia_self_improving import SelfImprovingSOFIA
# Initialize SOFIA
sofia_model = SOFIAModel()
# Add self-improvement capabilities
self_improving_sofia = SelfImprovingSOFIA(sofia_model)
# Start self-improvement
self_improving_sofia.start_self_improvement()
# Record performance after tasks
self_improving_sofia.record_task_performance('similarity', {
'mteb_score': 65.1,
'similarity_accuracy': 0.92
})
# Add user feedback for continuous learning
feedback_pairs = [("hello world", "hi there"), ("machine learning", "AI algorithms")]
quality_scores = [0.9, 0.8]
self_improving_sofia.add_feedback_data(feedback_pairs, quality_scores)
# Get system status
status = self_improving_sofia.get_system_status()
print("System status:", status)
"""