import pandas as pd import os from typing import Dict, List, Any, Optional import math class FeatherManager: def __init__(self, models_dir: str = "models"): self.models_dir = models_dir os.makedirs(models_dir, exist_ok=True) def save_mini_model(self, model_data: Dict[str, Any], model_id: int) -> str: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) patterns = model_data.get('patterns', []) responses = model_data.get('responses', []) if not patterns or not responses: print(f"Warning: Model {model_id} has empty patterns or responses") patterns = patterns or ['hello'] responses = responses or ['Hello!'] df_data = { 'patterns': [str(pattern) for pattern in patterns], 'responses': [str(response) for response in responses], 'weights': model_data.get('weights', [1.0] * len(patterns)), 'confidence': [model_data.get('confidence', 0.5)] * len(patterns), 'grammar_rules': [str(rule) for rule in model_data.get('grammar_rules', [])] or ['none'], 'keywords': [' '.join(model_data.get('keywords', []))] * len(patterns), 'training_samples': [model_data.get('training_samples', 0)] * len(patterns) } max_len = max(len(v) if isinstance(v, list) else 1 for v in df_data.values()) for key, value in df_data.items(): if isinstance(value, list): while len(value) < max_len: value.append(value[-1] if value else '') df = pd.DataFrame(df_data) df.to_feather(filepath) print(f"Saved mini-model: {filename}") return filepath def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) if not os.path.exists(filepath): return None try: df = pd.read_feather(filepath) model_data = { 'patterns': [p for p in df['patterns'].tolist() if p], 'responses': [r for r in df['responses'].tolist() if r], 'weights': df['weights'].tolist(), 'confidence': df['confidence'].iloc[0] if len(df) > 0 else 0.5, 'grammar_rules': [rule for rule in df['grammar_rules'].tolist() if rule], 'keywords': df['keywords'].iloc[0].split() if len(df) > 0 and df['keywords'].iloc[0] else [], 'training_samples': df['training_samples'].iloc[0] if len(df) > 0 else 0, 'model_id': model_id } return model_data except Exception as e: print(f"Error loading model {model_id}: {e}") return None def load_all_models(self) -> List[Dict[str, Any]]: models = [] if not os.path.exists(self.models_dir): return models for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: model_id = int(filename.split("_")[2].split(".")[0]) model = self.load_mini_model(model_id) if model: models.append(model) except (ValueError, IndexError): print(f"Warning: Invalid model filename format: {filename}") continue return models def get_model_count(self) -> int: if not os.path.exists(self.models_dir): return 0 count = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): count += 1 return count def get_next_model_id(self) -> int: if not os.path.exists(self.models_dir): return 1 max_id = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: model_id = int(filename.split("_")[2].split(".")[0]) max_id = max(max_id, model_id) except (ValueError, IndexError): continue return max_id + 1 def delete_model(self, model_id: int) -> bool: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) if os.path.exists(filepath): try: os.remove(filepath) print(f"Deleted model: {filename}") return True except Exception as e: print(f"Error deleting model {model_id}: {e}") return False return False def clear_all_models(self) -> int: if not os.path.exists(self.models_dir): return 0 deleted_count = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: os.remove(os.path.join(self.models_dir, filename)) deleted_count += 1 except Exception as e: print(f"Error deleting {filename}: {e}") print(f"Deleted {deleted_count} model files") return deleted_count def similarity_score(text1: str, text2: str) -> float: if not text1 or not text2: return 0.0 words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) if not words1 or not words2: return 0.0 intersection = len(words1.intersection(words2)) union = len(words1.union(words2)) return intersection / union if union > 0 else 0.0 def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float: if not patterns or not responses or len(patterns) != len(responses): return 0.1 base_confidence = min(0.9, len(patterns) / 10.0) return max(0.1, min(1.0, base_confidence)) if __name__ == "__main__": manager = FeatherManager() test_model = { 'patterns': ['hello', 'hi', 'hey'], 'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'], 'weights': [1.0, 0.9, 0.8], 'confidence': 0.8, 'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'], 'keywords': ['greeting', 'hello', 'hi'], 'training_samples': 150 } model_id = manager.get_next_model_id() manager.save_mini_model(test_model, model_id) loaded_model = manager.load_mini_model(model_id) print(f"Original model: {test_model}") print(f"Loaded model: {loaded_model}") print(f"Models count: {manager.get_model_count()}")