import pandas as pd import os from typing import Dict, List, Any, Optional import math class FeatherManager: def __init__(self, models_dir: str = "models"): self.models_dir = models_dir os.makedirs(models_dir, exist_ok=True) def save_mini_model(self, model_data: Dict[str, Any], model_id: int, filename: str = None) -> str: """Save model with custom filename or default naming""" if filename is None: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) patterns = model_data.get('patterns', []) responses = model_data.get('responses', []) if not patterns or not responses: print(f"Warning: Model {model_id} has empty patterns or responses") patterns = patterns or ['hello'] responses = responses or ['Hello!'] df_data = { 'patterns': [str(pattern) for pattern in patterns], 'responses': [str(response) for response in responses], 'weights': model_data.get('weights', [1.0] * len(patterns)), 'confidence': [model_data.get('confidence', 0.5)] * len(patterns), 'grammar_rules': [str(rule) for rule in model_data.get('grammar_rules', [])] or ['none'], 'keywords': [' '.join(model_data.get('keywords', []))] * len(patterns), 'training_samples': [model_data.get('training_samples', 0)] * len(patterns) } max_len = max(len(v) if isinstance(v, list) else 1 for v in df_data.values()) for key, value in df_data.items(): if isinstance(value, list): while len(value) < max_len: value.append(value[-1] if value else '') df = pd.DataFrame(df_data) df.to_feather(filepath) print(f"Saved mini-model: {filename}") return filepath def load_feather_file(self, filepath: str) -> Optional[Dict[str, Any]]: """Load any .feather file and convert it to model format""" if not os.path.exists(filepath): return None try: df = pd.read_feather(filepath) # Try to extract model ID from filename filename = os.path.basename(filepath) model_id = 0 if filename.startswith("AgGPT_Expert_"): try: model_id = int(filename.split("_")[2].split(".")[0]) except (ValueError, IndexError): model_id = hash(filename) % 10000 elif filename.startswith("target_AgGPT_"): try: model_id = int(filename.split("_")[2].split(".")[0]) except (ValueError, IndexError): model_id = hash(filename) % 10000 else: model_id = hash(filename) % 10000 model_data = { 'patterns': [p for p in df['patterns'].tolist() if p], 'responses': [r for r in df['responses'].tolist() if r], 'weights': df['weights'].tolist() if 'weights' in df.columns else [1.0] * len(df), 'confidence': df['confidence'].iloc[0] if len(df) > 0 and 'confidence' in df.columns else 0.5, 'grammar_rules': [rule for rule in df['grammar_rules'].tolist() if rule] if 'grammar_rules' in df.columns else [], 'keywords': df['keywords'].iloc[0].split() if len(df) > 0 and 'keywords' in df.columns and df['keywords'].iloc[0] else [], 'training_samples': df['training_samples'].iloc[0] if len(df) > 0 and 'training_samples' in df.columns else 0, 'model_id': model_id, 'filename': filename } return model_data except Exception as e: print(f"Error loading feather file {filepath}: {e}") return None def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) return self.load_feather_file(filepath) def load_all_models(self) -> List[Dict[str, Any]]: """Load ALL .feather files from the models directory""" models = [] if not os.path.exists(self.models_dir): return models print(f"Scanning {self.models_dir} for .feather files...") for filename in sorted(os.listdir(self.models_dir)): if filename.endswith(".feather"): filepath = os.path.join(self.models_dir, filename) model = self.load_feather_file(filepath) if model: models.append(model) print(f" ✅ Loaded: {filename}") else: print(f" ❌ Failed to load: {filename}") print(f"Total models loaded: {len(models)}") return models def get_model_count(self) -> int: """Count ALL .feather files in the models directory""" if not os.path.exists(self.models_dir): return 0 count = 0 for filename in os.listdir(self.models_dir): if filename.endswith(".feather"): count += 1 return count def get_next_model_id(self) -> int: """Get next available model ID for AgGPT_Expert_ files""" if not os.path.exists(self.models_dir): return 1 max_id = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: model_id = int(filename.split("_")[2].split(".")[0]) max_id = max(max_id, model_id) except (ValueError, IndexError): continue elif filename.endswith(".feather"): # For non-standard named files, use a high number to avoid conflicts try: # Extract numbers from any part of the filename import re numbers = re.findall(r'\d+', filename) if numbers: file_id = int(numbers[-1]) # Use the last number found max_id = max(max_id, file_id) except (ValueError, IndexError): continue return max_id + 1 def delete_model(self, model_id: int) -> bool: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) if os.path.exists(filepath): try: os.remove(filepath) print(f"Deleted model: {filename}") return True except Exception as e: print(f"Error deleting model {model_id}: {e}") return False return False def clear_all_models(self) -> int: if not os.path.exists(self.models_dir): return 0 deleted_count = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: os.remove(os.path.join(self.models_dir, filename)) deleted_count += 1 except Exception as e: print(f"Error deleting {filename}: {e}") print(f"Deleted {deleted_count} model files") return deleted_count def similarity_score(text1: str, text2: str) -> float: if not text1 or not text2: return 0.0 words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) if not words1 or not words2: return 0.0 intersection = len(words1.intersection(words2)) union = len(words1.union(words2)) return intersection / union if union > 0 else 0.0 def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float: if not patterns or not responses or len(patterns) != len(responses): return 0.1 base_confidence = min(0.9, len(patterns) / 10.0) return max(0.1, min(1.0, base_confidence)) if __name__ == "__main__": manager = FeatherManager() test_model = { 'patterns': ['hello', 'hi', 'hey'], 'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'], 'weights': [1.0, 0.9, 0.8], 'confidence': 0.8, 'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'], 'keywords': ['greeting', 'hello', 'hi'], 'training_samples': 150 } model_id = manager.get_next_model_id() manager.save_mini_model(test_model, model_id) loaded_model = manager.load_mini_model(model_id) print(f"Original model: {test_model}") print(f"Loaded model: {loaded_model}") print(f"Models count: {manager.get_model_count()}")