|
|
|
|
|
""" |
|
|
SOFIA Self-Improving Learning System |
|
|
Automatically monitors performance and improves embeddings through continuous learning |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.optim as optim |
|
|
from torch.utils.data import DataLoader, TensorDataset |
|
|
import numpy as np |
|
|
import json |
|
|
import os |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Tuple, Optional, Any |
|
|
import logging |
|
|
from collections import deque |
|
|
import threading |
|
|
import time |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class PerformanceMonitor: |
|
|
""" |
|
|
Monitors SOFIA's performance across different tasks and metrics |
|
|
""" |
|
|
|
|
|
def __init__(self, history_size: int = 1000): |
|
|
self.history_size = history_size |
|
|
self.performance_history = deque(maxlen=history_size) |
|
|
self.current_metrics = {} |
|
|
self.baseline_metrics = { |
|
|
'mteb_score': 58.2, |
|
|
'similarity_accuracy': 0.85, |
|
|
'retrieval_precision': 0.75, |
|
|
'response_time': 0.5 |
|
|
} |
|
|
|
|
|
def record_performance(self, task: str, metrics: Dict[str, float], timestamp: Optional[datetime] = None): |
|
|
"""Record performance metrics for a specific task""" |
|
|
if timestamp is None: |
|
|
timestamp = datetime.now() |
|
|
|
|
|
record = { |
|
|
'timestamp': timestamp.isoformat(), |
|
|
'task': task, |
|
|
'metrics': metrics, |
|
|
'improvement': self._calculate_improvement(metrics) |
|
|
} |
|
|
|
|
|
self.performance_history.append(record) |
|
|
self.current_metrics[task] = metrics |
|
|
|
|
|
logger.info(f"Recorded performance for {task}: {metrics}") |
|
|
|
|
|
def _calculate_improvement(self, metrics: Dict[str, float]) -> Dict[str, float]: |
|
|
"""Calculate improvement over baseline metrics""" |
|
|
improvements = {} |
|
|
for metric, value in metrics.items(): |
|
|
if metric in self.baseline_metrics: |
|
|
baseline = self.baseline_metrics[metric] |
|
|
if metric in ['response_time']: |
|
|
improvements[metric] = (baseline - value) / baseline |
|
|
else: |
|
|
improvements[metric] = (value - baseline) / baseline |
|
|
else: |
|
|
improvements[metric] = 0.0 |
|
|
|
|
|
return improvements |
|
|
|
|
|
def get_recent_performance(self, hours: int = 24) -> List[Dict]: |
|
|
"""Get performance records from the last N hours""" |
|
|
cutoff = datetime.now() - timedelta(hours=hours) |
|
|
recent = [] |
|
|
|
|
|
for record in self.performance_history: |
|
|
record_time = datetime.fromisoformat(record['timestamp']) |
|
|
if record_time >= cutoff: |
|
|
recent.append(record) |
|
|
|
|
|
return recent |
|
|
|
|
|
def get_performance_trend(self, metric: str, hours: int = 24) -> Dict[str, Any]: |
|
|
"""Analyze performance trend for a specific metric""" |
|
|
recent_records = self.get_recent_performance(hours) |
|
|
|
|
|
if not recent_records: |
|
|
return {'trend': 'insufficient_data', 'change': 0.0} |
|
|
|
|
|
values = [] |
|
|
timestamps = [] |
|
|
|
|
|
for record in recent_records: |
|
|
if metric in record['metrics']: |
|
|
values.append(record['metrics'][metric]) |
|
|
timestamps.append(datetime.fromisoformat(record['timestamp'])) |
|
|
|
|
|
if len(values) < 2: |
|
|
return {'trend': 'insufficient_data', 'change': 0.0} |
|
|
|
|
|
|
|
|
start_value = values[0] |
|
|
end_value = values[-1] |
|
|
change = (end_value - start_value) / start_value if start_value != 0 else 0 |
|
|
|
|
|
if metric in ['response_time']: |
|
|
trend = 'improving' if change < -0.05 else 'stable' if abs(change) < 0.05 else 'degrading' |
|
|
else: |
|
|
trend = 'improving' if change > 0.05 else 'stable' if abs(change) < 0.05 else 'degrading' |
|
|
|
|
|
return { |
|
|
'trend': trend, |
|
|
'change': change, |
|
|
'start_value': start_value, |
|
|
'end_value': end_value, |
|
|
'data_points': len(values) |
|
|
} |
|
|
|
|
|
def should_trigger_improvement(self, task: str) -> bool: |
|
|
"""Determine if performance improvement should be triggered""" |
|
|
trend = self.get_performance_trend('mteb_score' if task == 'similarity' else 'similarity_accuracy') |
|
|
|
|
|
|
|
|
return trend['trend'] == 'degrading' or ( |
|
|
trend['trend'] == 'stable' and trend['data_points'] > 10 |
|
|
) |
|
|
|
|
|
class AdaptiveParameterTuner: |
|
|
""" |
|
|
Automatically tunes model parameters based on performance feedback |
|
|
""" |
|
|
|
|
|
def __init__(self, model, learning_rate_range: Tuple[float, float] = (1e-6, 1e-3), |
|
|
batch_size_range: Tuple[int, int] = (8, 64)): |
|
|
self.model = model |
|
|
self.lr_range = learning_rate_range |
|
|
self.batch_size_range = batch_size_range |
|
|
|
|
|
|
|
|
self.parameter_history = [] |
|
|
self.best_parameters = { |
|
|
'learning_rate': 2e-5, |
|
|
'batch_size': 32, |
|
|
'performance': 0.0 |
|
|
} |
|
|
|
|
|
def tune_parameters(self, performance_metrics: Dict[str, float]) -> Dict[str, Any]: |
|
|
"""Tune parameters based on current performance""" |
|
|
current_performance = performance_metrics.get('mteb_score', 0.0) |
|
|
|
|
|
|
|
|
new_params = {} |
|
|
|
|
|
if current_performance > self.best_parameters['performance']: |
|
|
|
|
|
new_params['learning_rate'] = min( |
|
|
self.best_parameters['learning_rate'] * 1.2, |
|
|
self.lr_range[1] |
|
|
) |
|
|
new_params['batch_size'] = min( |
|
|
self.best_parameters['batch_size'] + 4, |
|
|
self.batch_size_range[1] |
|
|
) |
|
|
else: |
|
|
|
|
|
new_params['learning_rate'] = max( |
|
|
self.best_parameters['learning_rate'] * 0.8, |
|
|
self.lr_range[0] |
|
|
) |
|
|
new_params['batch_size'] = max( |
|
|
self.best_parameters['batch_size'] - 4, |
|
|
self.batch_size_range[0] |
|
|
) |
|
|
|
|
|
|
|
|
if current_performance > self.best_parameters['performance']: |
|
|
self.best_parameters.update({ |
|
|
'learning_rate': new_params['learning_rate'], |
|
|
'batch_size': new_params['batch_size'], |
|
|
'performance': current_performance |
|
|
}) |
|
|
|
|
|
|
|
|
self.parameter_history.append({ |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'parameters': new_params.copy(), |
|
|
'performance': current_performance |
|
|
}) |
|
|
|
|
|
return new_params |
|
|
|
|
|
class ContinuousLearner: |
|
|
""" |
|
|
Implements continuous learning capabilities for SOFIA |
|
|
""" |
|
|
|
|
|
def __init__(self, model, performance_monitor: PerformanceMonitor): |
|
|
self.model = model |
|
|
self.monitor = performance_monitor |
|
|
self.adaptive_tuner = AdaptiveParameterTuner(model) |
|
|
|
|
|
|
|
|
self.is_learning = False |
|
|
self.learning_thread = None |
|
|
self.new_data_buffer = deque(maxlen=1000) |
|
|
|
|
|
|
|
|
self.learning_interval = 3600 |
|
|
self.min_data_points = 50 |
|
|
|
|
|
def add_training_data(self, text_pairs: List[Tuple[str, str]], labels: Optional[List[float]] = None): |
|
|
"""Add new training data to the buffer""" |
|
|
if labels is None: |
|
|
labels = [1.0] * len(text_pairs) |
|
|
|
|
|
for (text1, text2), label in zip(text_pairs, labels): |
|
|
self.new_data_buffer.append({ |
|
|
'text1': text1, |
|
|
'text2': text2, |
|
|
'label': label, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
}) |
|
|
|
|
|
logger.info(f"Added {len(text_pairs)} new training examples") |
|
|
|
|
|
def start_continuous_learning(self): |
|
|
"""Start the continuous learning process""" |
|
|
if self.is_learning: |
|
|
logger.warning("Continuous learning already running") |
|
|
return |
|
|
|
|
|
self.is_learning = True |
|
|
self.learning_thread = threading.Thread(target=self._continuous_learning_loop) |
|
|
self.learning_thread.daemon = True |
|
|
self.learning_thread.start() |
|
|
|
|
|
logger.info("Continuous learning started") |
|
|
|
|
|
def stop_continuous_learning(self): |
|
|
"""Stop the continuous learning process""" |
|
|
self.is_learning = False |
|
|
if self.learning_thread: |
|
|
self.learning_thread.join(timeout=5) |
|
|
logger.info("Continuous learning stopped") |
|
|
|
|
|
def _continuous_learning_loop(self): |
|
|
"""Main continuous learning loop""" |
|
|
while self.is_learning: |
|
|
try: |
|
|
|
|
|
if self._should_trigger_learning(): |
|
|
self._perform_learning_update() |
|
|
|
|
|
|
|
|
time.sleep(self.learning_interval) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in continuous learning loop: {e}") |
|
|
time.sleep(60) |
|
|
|
|
|
def _should_trigger_learning(self) -> bool: |
|
|
"""Determine if learning update should be triggered""" |
|
|
|
|
|
if len(self.new_data_buffer) < self.min_data_points: |
|
|
return False |
|
|
|
|
|
|
|
|
return self.monitor.should_trigger_improvement('similarity') |
|
|
|
|
|
def _perform_learning_update(self): |
|
|
"""Perform a learning update with new data""" |
|
|
logger.info("Performing learning update...") |
|
|
|
|
|
try: |
|
|
|
|
|
training_data = list(self.new_data_buffer) |
|
|
self.new_data_buffer.clear() |
|
|
|
|
|
|
|
|
recent_perf = self.monitor.get_recent_performance(1) |
|
|
if recent_perf: |
|
|
latest_metrics = recent_perf[-1]['metrics'] |
|
|
new_params = self.adaptive_tuner.tune_parameters(latest_metrics) |
|
|
logger.info(f"Adapted parameters: {new_params}") |
|
|
|
|
|
|
|
|
self._fine_tune_on_new_data(training_data) |
|
|
|
|
|
|
|
|
self.monitor.record_performance( |
|
|
'continuous_learning', |
|
|
{'data_points_used': len(training_data)}, |
|
|
datetime.now() |
|
|
) |
|
|
|
|
|
logger.info("Learning update completed successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Learning update failed: {e}") |
|
|
|
|
|
def _fine_tune_on_new_data(self, training_data: List[Dict]): |
|
|
"""Fine-tune the model on new data (simplified implementation)""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Fine-tuning on {len(training_data)} examples") |
|
|
|
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("Fine-tuning simulation completed") |
|
|
|
|
|
class SelfImprovingSOFIA: |
|
|
""" |
|
|
Main interface for SOFIA's self-improving capabilities |
|
|
""" |
|
|
|
|
|
def __init__(self, model): |
|
|
self.model = model |
|
|
self.performance_monitor = PerformanceMonitor() |
|
|
self.continuous_learner = ContinuousLearner(model, self.performance_monitor) |
|
|
|
|
|
|
|
|
self.state_file = "sofia_self_improvement_state.json" |
|
|
self.load_state() |
|
|
|
|
|
def start_self_improvement(self): |
|
|
"""Start all self-improvement processes""" |
|
|
self.continuous_learner.start_continuous_learning() |
|
|
logger.info("SOFIA self-improvement system activated") |
|
|
|
|
|
def stop_self_improvement(self): |
|
|
"""Stop all self-improvement processes""" |
|
|
self.continuous_learner.stop_continuous_learning() |
|
|
self.save_state() |
|
|
logger.info("SOFIA self-improvement system deactivated") |
|
|
|
|
|
def record_task_performance(self, task: str, metrics: Dict[str, float]): |
|
|
"""Record performance metrics for monitoring""" |
|
|
self.performance_monitor.record_performance(task, metrics) |
|
|
|
|
|
def add_feedback_data(self, text_pairs: List[Tuple[str, str]], quality_scores: List[float]): |
|
|
"""Add user feedback data for continuous learning""" |
|
|
self.continuous_learner.add_training_data(text_pairs, quality_scores) |
|
|
|
|
|
def get_system_status(self) -> Dict[str, Any]: |
|
|
"""Get current status of the self-improvement system""" |
|
|
return { |
|
|
'is_learning': self.continuous_learner.is_learning, |
|
|
'buffered_data': len(self.continuous_learner.new_data_buffer), |
|
|
'recent_performance': self.performance_monitor.get_recent_performance(1), |
|
|
'performance_trends': { |
|
|
'mteb_score': self.performance_monitor.get_performance_trend('mteb_score'), |
|
|
'similarity_accuracy': self.performance_monitor.get_performance_trend('similarity_accuracy') |
|
|
} |
|
|
} |
|
|
|
|
|
def save_state(self): |
|
|
"""Save the current state of the self-improvement system""" |
|
|
state = { |
|
|
'performance_history': list(self.performance_monitor.performance_history), |
|
|
'parameter_history': self.continuous_learner.adaptive_tuner.parameter_history, |
|
|
'best_parameters': self.continuous_learner.adaptive_tuner.best_parameters, |
|
|
'saved_at': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
with open(self.state_file, 'w') as f: |
|
|
json.dump(state, f, indent=2) |
|
|
|
|
|
logger.info(f"Self-improvement state saved to {self.state_file}") |
|
|
|
|
|
def load_state(self): |
|
|
"""Load the saved state of the self-improvement system""" |
|
|
if os.path.exists(self.state_file): |
|
|
try: |
|
|
with open(self.state_file, 'r') as f: |
|
|
state = json.load(f) |
|
|
|
|
|
|
|
|
self.performance_monitor.performance_history.extend(state.get('performance_history', [])) |
|
|
|
|
|
|
|
|
self.continuous_learner.adaptive_tuner.parameter_history = state.get('parameter_history', []) |
|
|
self.continuous_learner.adaptive_tuner.best_parameters = state.get('best_parameters', self.continuous_learner.adaptive_tuner.best_parameters) |
|
|
|
|
|
logger.info(f"Self-improvement state loaded from {self.state_file}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load state: {e}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("SOFIA Self-Improving System") |
|
|
print("This module provides continuous learning and performance monitoring capabilities") |
|
|
|
|
|
|
|
|
""" |
|
|
from sofia_model import SOFIAModel |
|
|
from sofia_self_improving import SelfImprovingSOFIA |
|
|
|
|
|
# Initialize SOFIA |
|
|
sofia_model = SOFIAModel() |
|
|
|
|
|
# Add self-improvement capabilities |
|
|
self_improving_sofia = SelfImprovingSOFIA(sofia_model) |
|
|
|
|
|
# Start self-improvement |
|
|
self_improving_sofia.start_self_improvement() |
|
|
|
|
|
# Record performance after tasks |
|
|
self_improving_sofia.record_task_performance('similarity', { |
|
|
'mteb_score': 65.1, |
|
|
'similarity_accuracy': 0.92 |
|
|
}) |
|
|
|
|
|
# Add user feedback for continuous learning |
|
|
feedback_pairs = [("hello world", "hi there"), ("machine learning", "AI algorithms")] |
|
|
quality_scores = [0.9, 0.8] |
|
|
self_improving_sofia.add_feedback_data(feedback_pairs, quality_scores) |
|
|
|
|
|
# Get system status |
|
|
status = self_improving_sofia.get_system_status() |
|
|
print("System status:", status) |
|
|
""" |
|
|
|