Spaces:
Running
Running
| """ | |
| In-memory storage implementation for TutorX-MCP. | |
| This module provides in-memory storage for development and testing. | |
| In production, this would be replaced with database-backed storage. | |
| """ | |
| import json | |
| import pickle | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Union | |
| from pathlib import Path | |
| import threading | |
| from collections import defaultdict | |
| from ..models.student_profile import StudentProfile | |
| class MemoryStore: | |
| """ | |
| In-memory storage implementation for adaptive learning data. | |
| This provides a simple storage layer for development and testing. | |
| In production, this would be replaced with a proper database. | |
| """ | |
| def __init__(self, persistence_file: Optional[str] = None): | |
| """ | |
| Initialize the memory store. | |
| Args: | |
| persistence_file: Optional file path for data persistence | |
| """ | |
| self.persistence_file = persistence_file | |
| self._lock = threading.RLock() | |
| # Storage containers | |
| self.student_profiles: Dict[str, StudentProfile] = {} | |
| self.performance_data: Dict[str, Dict[str, Any]] = defaultdict(dict) | |
| self.session_data: Dict[str, Dict[str, Any]] = {} | |
| self.analytics_cache: Dict[str, Dict[str, Any]] = defaultdict(dict) | |
| self.adaptation_history: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| # Load persisted data if available | |
| if self.persistence_file: | |
| self._load_from_file() | |
| def _load_from_file(self): | |
| """Load data from persistence file.""" | |
| try: | |
| if Path(self.persistence_file).exists(): | |
| with open(self.persistence_file, 'rb') as f: | |
| data = pickle.load(f) | |
| self.student_profiles = data.get('student_profiles', {}) | |
| self.performance_data = data.get('performance_data', defaultdict(dict)) | |
| self.session_data = data.get('session_data', {}) | |
| self.analytics_cache = data.get('analytics_cache', defaultdict(dict)) | |
| self.adaptation_history = data.get('adaptation_history', defaultdict(list)) | |
| print(f"Loaded data from {self.persistence_file}") | |
| except Exception as e: | |
| print(f"Error loading data from {self.persistence_file}: {e}") | |
| def _save_to_file(self): | |
| """Save data to persistence file.""" | |
| if not self.persistence_file: | |
| return | |
| try: | |
| data = { | |
| 'student_profiles': self.student_profiles, | |
| 'performance_data': dict(self.performance_data), | |
| 'session_data': self.session_data, | |
| 'analytics_cache': dict(self.analytics_cache), | |
| 'adaptation_history': dict(self.adaptation_history) | |
| } | |
| with open(self.persistence_file, 'wb') as f: | |
| pickle.dump(data, f) | |
| except Exception as e: | |
| print(f"Error saving data to {self.persistence_file}: {e}") | |
| # Student Profile Operations | |
| def save_student_profile(self, profile: StudentProfile) -> bool: | |
| """Save a student profile.""" | |
| with self._lock: | |
| try: | |
| self.student_profiles[profile.student_id] = profile | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error saving student profile: {e}") | |
| return False | |
| def get_student_profile(self, student_id: str) -> Optional[StudentProfile]: | |
| """Get a student profile by ID.""" | |
| with self._lock: | |
| return self.student_profiles.get(student_id) | |
| def update_student_profile(self, student_id: str, updates: Dict[str, Any]) -> bool: | |
| """Update a student profile with new data.""" | |
| with self._lock: | |
| try: | |
| if student_id not in self.student_profiles: | |
| return False | |
| profile = self.student_profiles[student_id] | |
| # Update profile attributes | |
| for key, value in updates.items(): | |
| if hasattr(profile, key): | |
| setattr(profile, key, value) | |
| profile.last_updated = datetime.utcnow() | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error updating student profile: {e}") | |
| return False | |
| def delete_student_profile(self, student_id: str) -> bool: | |
| """Delete a student profile.""" | |
| with self._lock: | |
| try: | |
| if student_id in self.student_profiles: | |
| del self.student_profiles[student_id] | |
| self._save_to_file() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error deleting student profile: {e}") | |
| return False | |
| def list_student_profiles(self, active_only: bool = False, | |
| days: int = 30) -> List[StudentProfile]: | |
| """List student profiles, optionally filtering by activity.""" | |
| with self._lock: | |
| profiles = list(self.student_profiles.values()) | |
| if active_only: | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| profiles = [ | |
| p for p in profiles | |
| if p.last_active and p.last_active >= cutoff_date | |
| ] | |
| return profiles | |
| # Performance Data Operations | |
| def save_performance_data(self, student_id: str, concept_id: str, | |
| data: Dict[str, Any]) -> bool: | |
| """Save performance data for a student and concept.""" | |
| with self._lock: | |
| try: | |
| if student_id not in self.performance_data: | |
| self.performance_data[student_id] = {} | |
| self.performance_data[student_id][concept_id] = { | |
| **data, | |
| 'last_updated': datetime.utcnow().isoformat() | |
| } | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error saving performance data: {e}") | |
| return False | |
| def get_performance_data(self, student_id: str, | |
| concept_id: Optional[str] = None) -> Optional[Dict[str, Any]]: | |
| """Get performance data for a student and optionally a specific concept.""" | |
| with self._lock: | |
| if student_id not in self.performance_data: | |
| return None | |
| if concept_id: | |
| return self.performance_data[student_id].get(concept_id) | |
| else: | |
| return self.performance_data[student_id] | |
| def update_performance_data(self, student_id: str, concept_id: str, | |
| updates: Dict[str, Any]) -> bool: | |
| """Update performance data for a student and concept.""" | |
| with self._lock: | |
| try: | |
| if student_id not in self.performance_data: | |
| self.performance_data[student_id] = {} | |
| if concept_id not in self.performance_data[student_id]: | |
| self.performance_data[student_id][concept_id] = {} | |
| self.performance_data[student_id][concept_id].update(updates) | |
| self.performance_data[student_id][concept_id]['last_updated'] = datetime.utcnow().isoformat() | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error updating performance data: {e}") | |
| return False | |
| # Session Data Operations | |
| def save_session_data(self, session_id: str, data: Dict[str, Any]) -> bool: | |
| """Save session data.""" | |
| with self._lock: | |
| try: | |
| self.session_data[session_id] = { | |
| **data, | |
| 'saved_at': datetime.utcnow().isoformat() | |
| } | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error saving session data: {e}") | |
| return False | |
| def get_session_data(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """Get session data by ID.""" | |
| with self._lock: | |
| return self.session_data.get(session_id) | |
| def delete_session_data(self, session_id: str) -> bool: | |
| """Delete session data.""" | |
| with self._lock: | |
| try: | |
| if session_id in self.session_data: | |
| del self.session_data[session_id] | |
| self._save_to_file() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error deleting session data: {e}") | |
| return False | |
| def cleanup_old_sessions(self, days: int = 7) -> int: | |
| """Clean up old session data.""" | |
| with self._lock: | |
| try: | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| sessions_to_delete = [] | |
| for session_id, data in self.session_data.items(): | |
| saved_at_str = data.get('saved_at') | |
| if saved_at_str: | |
| saved_at = datetime.fromisoformat(saved_at_str) | |
| if saved_at < cutoff_date: | |
| sessions_to_delete.append(session_id) | |
| for session_id in sessions_to_delete: | |
| del self.session_data[session_id] | |
| if sessions_to_delete: | |
| self._save_to_file() | |
| return len(sessions_to_delete) | |
| except Exception as e: | |
| print(f"Error cleaning up old sessions: {e}") | |
| return 0 | |
| # Analytics Cache Operations | |
| def cache_analytics_result(self, cache_key: str, data: Dict[str, Any], | |
| ttl_minutes: int = 60) -> bool: | |
| """Cache analytics result with TTL.""" | |
| with self._lock: | |
| try: | |
| expiry_time = datetime.utcnow() + timedelta(minutes=ttl_minutes) | |
| self.analytics_cache[cache_key] = { | |
| 'data': data, | |
| 'expires_at': expiry_time.isoformat(), | |
| 'cached_at': datetime.utcnow().isoformat() | |
| } | |
| return True | |
| except Exception as e: | |
| print(f"Error caching analytics result: {e}") | |
| return False | |
| def get_cached_analytics(self, cache_key: str) -> Optional[Dict[str, Any]]: | |
| """Get cached analytics result if not expired.""" | |
| with self._lock: | |
| if cache_key not in self.analytics_cache: | |
| return None | |
| cached_item = self.analytics_cache[cache_key] | |
| expires_at = datetime.fromisoformat(cached_item['expires_at']) | |
| if datetime.utcnow() > expires_at: | |
| # Cache expired, remove it | |
| del self.analytics_cache[cache_key] | |
| return None | |
| return cached_item['data'] | |
| def clear_analytics_cache(self, pattern: Optional[str] = None) -> int: | |
| """Clear analytics cache, optionally matching a pattern.""" | |
| with self._lock: | |
| try: | |
| if pattern is None: | |
| count = len(self.analytics_cache) | |
| self.analytics_cache.clear() | |
| return count | |
| else: | |
| keys_to_delete = [ | |
| key for key in self.analytics_cache.keys() | |
| if pattern in key | |
| ] | |
| for key in keys_to_delete: | |
| del self.analytics_cache[key] | |
| return len(keys_to_delete) | |
| except Exception as e: | |
| print(f"Error clearing analytics cache: {e}") | |
| return 0 | |
| # Adaptation History Operations | |
| def add_adaptation_record(self, student_id: str, record: Dict[str, Any]) -> bool: | |
| """Add an adaptation record for a student.""" | |
| with self._lock: | |
| try: | |
| self.adaptation_history[student_id].append({ | |
| **record, | |
| 'recorded_at': datetime.utcnow().isoformat() | |
| }) | |
| # Keep only last 100 records per student | |
| if len(self.adaptation_history[student_id]) > 100: | |
| self.adaptation_history[student_id] = self.adaptation_history[student_id][-100:] | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error adding adaptation record: {e}") | |
| return False | |
| def get_adaptation_history(self, student_id: str, | |
| days: Optional[int] = None) -> List[Dict[str, Any]]: | |
| """Get adaptation history for a student.""" | |
| with self._lock: | |
| if student_id not in self.adaptation_history: | |
| return [] | |
| records = self.adaptation_history[student_id] | |
| if days is not None: | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| records = [ | |
| record for record in records | |
| if datetime.fromisoformat(record['recorded_at']) >= cutoff_date | |
| ] | |
| return records | |
| # Utility Operations | |
| def get_storage_stats(self) -> Dict[str, Any]: | |
| """Get storage statistics.""" | |
| with self._lock: | |
| return { | |
| 'student_profiles_count': len(self.student_profiles), | |
| 'performance_data_students': len(self.performance_data), | |
| 'total_performance_records': sum( | |
| len(concepts) for concepts in self.performance_data.values() | |
| ), | |
| 'active_sessions': len(self.session_data), | |
| 'cached_analytics': len(self.analytics_cache), | |
| 'adaptation_records': sum( | |
| len(records) for records in self.adaptation_history.values() | |
| ), | |
| 'persistence_enabled': self.persistence_file is not None, | |
| 'last_updated': datetime.utcnow().isoformat() | |
| } | |
| def export_data(self, format: str = 'json') -> Union[str, bytes]: | |
| """Export all data in specified format.""" | |
| with self._lock: | |
| data = { | |
| 'student_profiles': { | |
| sid: profile.to_dict() | |
| for sid, profile in self.student_profiles.items() | |
| }, | |
| 'performance_data': dict(self.performance_data), | |
| 'session_data': self.session_data, | |
| 'analytics_cache': dict(self.analytics_cache), | |
| 'adaptation_history': dict(self.adaptation_history), | |
| 'exported_at': datetime.utcnow().isoformat() | |
| } | |
| if format.lower() == 'json': | |
| return json.dumps(data, indent=2) | |
| elif format.lower() == 'pickle': | |
| return pickle.dumps(data) | |
| else: | |
| raise ValueError(f"Unsupported export format: {format}") | |
| def import_data(self, data: Union[str, bytes], format: str = 'json') -> bool: | |
| """Import data from specified format.""" | |
| with self._lock: | |
| try: | |
| if format.lower() == 'json': | |
| imported_data = json.loads(data) | |
| elif format.lower() == 'pickle': | |
| imported_data = pickle.loads(data) | |
| else: | |
| raise ValueError(f"Unsupported import format: {format}") | |
| # Import student profiles | |
| if 'student_profiles' in imported_data: | |
| for sid, profile_data in imported_data['student_profiles'].items(): | |
| profile = StudentProfile.from_dict(profile_data) | |
| self.student_profiles[sid] = profile | |
| # Import other data | |
| if 'performance_data' in imported_data: | |
| self.performance_data.update(imported_data['performance_data']) | |
| if 'session_data' in imported_data: | |
| self.session_data.update(imported_data['session_data']) | |
| if 'analytics_cache' in imported_data: | |
| self.analytics_cache.update(imported_data['analytics_cache']) | |
| if 'adaptation_history' in imported_data: | |
| self.adaptation_history.update(imported_data['adaptation_history']) | |
| self._save_to_file() | |
| return True | |
| except Exception as e: | |
| print(f"Error importing data: {e}") | |
| return False | |