| import pandas as pd | |
| import os | |
| from typing import Dict, List, Any, Optional | |
| import math | |
| class FeatherManager: | |
| def __init__(self, models_dir: str = "models"): | |
| self.models_dir = models_dir | |
| os.makedirs(models_dir, exist_ok=True) | |
| def save_mini_model(self, model_data: Dict[str, Any], model_id: int) -> str: | |
| filename = f"AgGPT_Expert_{model_id:04d}.feather" | |
| filepath = os.path.join(self.models_dir, filename) | |
| patterns = model_data.get('patterns', []) | |
| responses = model_data.get('responses', []) | |
| if not patterns or not responses: | |
| print(f"Warning: Model {model_id} has empty patterns or responses") | |
| patterns = patterns or ['hello'] | |
| responses = responses or ['Hello!'] | |
| df_data = { | |
| 'patterns': [str(pattern) for pattern in patterns], | |
| 'responses': [str(response) for response in responses], | |
| 'weights': model_data.get('weights', [1.0] * len(patterns)), | |
| 'confidence': [model_data.get('confidence', 0.5)] * len(patterns), | |
| 'grammar_rules': [str(rule) for rule in model_data.get('grammar_rules', [])] or ['none'], | |
| 'keywords': [' '.join(model_data.get('keywords', []))] * len(patterns), | |
| 'training_samples': [model_data.get('training_samples', 0)] * len(patterns) | |
| } | |
| max_len = max(len(v) if isinstance(v, list) else 1 for v in df_data.values()) | |
| for key, value in df_data.items(): | |
| if isinstance(value, list): | |
| while len(value) < max_len: | |
| value.append(value[-1] if value else '') | |
| df = pd.DataFrame(df_data) | |
| df.to_feather(filepath) | |
| print(f"Saved mini-model: {filename}") | |
| return filepath | |
| def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]: | |
| filename = f"AgGPT_Expert_{model_id:04d}.feather" | |
| filepath = os.path.join(self.models_dir, filename) | |
| if not os.path.exists(filepath): | |
| return None | |
| try: | |
| df = pd.read_feather(filepath) | |
| model_data = { | |
| 'patterns': [p for p in df['patterns'].tolist() if p], | |
| 'responses': [r for r in df['responses'].tolist() if r], | |
| 'weights': df['weights'].tolist(), | |
| 'confidence': df['confidence'].iloc[0] if len(df) > 0 else 0.5, | |
| 'grammar_rules': [rule for rule in df['grammar_rules'].tolist() if rule], | |
| 'keywords': df['keywords'].iloc[0].split() if len(df) > 0 and df['keywords'].iloc[0] else [], | |
| 'training_samples': df['training_samples'].iloc[0] if len(df) > 0 else 0, | |
| 'model_id': model_id | |
| } | |
| return model_data | |
| except Exception as e: | |
| print(f"Error loading model {model_id}: {e}") | |
| return None | |
| def load_all_models(self) -> List[Dict[str, Any]]: | |
| models = [] | |
| if not os.path.exists(self.models_dir): | |
| return models | |
| for filename in os.listdir(self.models_dir): | |
| if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): | |
| try: | |
| model_id = int(filename.split("_")[2].split(".")[0]) | |
| model = self.load_mini_model(model_id) | |
| if model: | |
| models.append(model) | |
| except (ValueError, IndexError): | |
| print(f"Warning: Invalid model filename format: {filename}") | |
| continue | |
| return models | |
| def get_model_count(self) -> int: | |
| if not os.path.exists(self.models_dir): | |
| return 0 | |
| count = 0 | |
| for filename in os.listdir(self.models_dir): | |
| if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): | |
| count += 1 | |
| return count | |
| def get_next_model_id(self) -> int: | |
| if not os.path.exists(self.models_dir): | |
| return 1 | |
| max_id = 0 | |
| for filename in os.listdir(self.models_dir): | |
| if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): | |
| try: | |
| model_id = int(filename.split("_")[2].split(".")[0]) | |
| max_id = max(max_id, model_id) | |
| except (ValueError, IndexError): | |
| continue | |
| return max_id + 1 | |
| def delete_model(self, model_id: int) -> bool: | |
| filename = f"AgGPT_Expert_{model_id:04d}.feather" | |
| filepath = os.path.join(self.models_dir, filename) | |
| if os.path.exists(filepath): | |
| try: | |
| os.remove(filepath) | |
| print(f"Deleted model: {filename}") | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting model {model_id}: {e}") | |
| return False | |
| return False | |
| def clear_all_models(self) -> int: | |
| if not os.path.exists(self.models_dir): | |
| return 0 | |
| deleted_count = 0 | |
| for filename in os.listdir(self.models_dir): | |
| if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): | |
| try: | |
| os.remove(os.path.join(self.models_dir, filename)) | |
| deleted_count += 1 | |
| except Exception as e: | |
| print(f"Error deleting {filename}: {e}") | |
| print(f"Deleted {deleted_count} model files") | |
| return deleted_count | |
| def similarity_score(text1: str, text2: str) -> float: | |
| if not text1 or not text2: | |
| return 0.0 | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| intersection = len(words1.intersection(words2)) | |
| union = len(words1.union(words2)) | |
| return intersection / union if union > 0 else 0.0 | |
| def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float: | |
| if not patterns or not responses or len(patterns) != len(responses): | |
| return 0.1 | |
| base_confidence = min(0.9, len(patterns) / 10.0) | |
| return max(0.1, min(1.0, base_confidence)) | |
| if __name__ == "__main__": | |
| manager = FeatherManager() | |
| test_model = { | |
| 'patterns': ['hello', 'hi', 'hey'], | |
| 'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'], | |
| 'weights': [1.0, 0.9, 0.8], | |
| 'confidence': 0.8, | |
| 'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'], | |
| 'keywords': ['greeting', 'hello', 'hi'], | |
| 'training_samples': 150 | |
| } | |
| model_id = manager.get_next_model_id() | |
| manager.save_mini_model(test_model, model_id) | |
| loaded_model = manager.load_mini_model(model_id) | |
| print(f"Original model: {test_model}") | |
| print(f"Loaded model: {loaded_model}") | |
| print(f"Models count: {manager.get_model_count()}") | |