import pandas as pd import os from typing import Dict, List, Any, Optional import math class FeatherManager: def __init__(self, models_dir: str = "models"): self.models_dir = models_dir os.makedirs(models_dir, exist_ok=True) def save_mini_model(self, model_data: Dict[str, Any], model_id: int) -> str: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) patterns = model_data.get('patterns', []) responses = model_data.get('responses', []) if not patterns or not responses: print(f"Warning: Model {model_id} has empty patterns or responses") patterns = patterns or ['hello'] responses = responses or ['Hello!'] # FIXED: Efficient data structure for AgGPT-19 # Store only pattern-level data in main table df_data = { 'patterns': [str(pattern) for pattern in patterns], 'responses': [str(response) for response in responses], 'weights': model_data.get('weights', [1.0] * len(patterns)), 'response_templates': model_data.get('response_templates', [{}] * len(patterns)), } # Ensure all arrays are same length max_len = len(patterns) for key, value in df_data.items(): if isinstance(value, list): while len(value) < max_len: if key == 'weights': value.append(1.0) elif key == 'response_templates': value.append({}) else: value.append('') # Store model-level metadata separately (NO DUPLICATION!) model_metadata = { 'model_id': model_id, 'confidence': model_data.get('confidence', 0.5), 'keywords': ' '.join(model_data.get('keywords', [])), 'training_samples': model_data.get('training_samples', 0), 'knowledge_base': str(model_data.get('knowledge_base', {})), 'semantic_categories': str(model_data.get('semantic_categories', {})), 'grammar_rules': str(model_data.get('grammar_rules', [])) } # Create main patterns DataFrame df_patterns = pd.DataFrame(df_data) # Create single-row metadata DataFrame df_metadata = pd.DataFrame([model_metadata]) # Save both as separate sheets in the same feather file using a different approach # Since feather doesn't support multiple sheets, we'll store metadata as the first row # and add a special marker # Create combined structure with metadata as first row metadata_row = { 'patterns': f"__METADATA__{model_metadata['model_id']}", 'responses': model_metadata['knowledge_base'], 'weights': float(model_metadata['confidence']), 'response_templates': f"{model_metadata['keywords']}|{model_metadata['training_samples']}|{model_metadata['semantic_categories']}|{model_metadata['grammar_rules']}" } # Combine metadata row with pattern data combined_data = [metadata_row] for i in range(len(patterns)): row = { 'patterns': df_data['patterns'][i], 'responses': df_data['responses'][i], 'weights': df_data['weights'][i], 'response_templates': str(df_data['response_templates'][i]) } combined_data.append(row) df_combined = pd.DataFrame(combined_data) df_combined.to_feather(filepath) print(f"Saved optimized mini-model: {filename} ({len(patterns)} patterns + metadata)") return filepath def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) if not os.path.exists(filepath): return None try: df = pd.read_feather(filepath) # FIXED: Load optimized structure for AgGPT-19 if len(df) == 0: return None # First row contains metadata (marked with __METADATA__) if df['patterns'].iloc[0].startswith('__METADATA__'): # Extract metadata from first row metadata_parts = df['response_templates'].iloc[0].split('|') model_data = { 'model_id': model_id, 'confidence': float(df['weights'].iloc[0]), 'keywords': metadata_parts[0].split() if len(metadata_parts) > 0 else [], 'training_samples': int(metadata_parts[1]) if len(metadata_parts) > 1 and metadata_parts[1].isdigit() else 0, } # Parse knowledge base from responses column try: kb_str = df['responses'].iloc[0] model_data['knowledge_base'] = eval(kb_str) if kb_str != '{}' else {} except: model_data['knowledge_base'] = {} # Parse semantic categories try: if len(metadata_parts) > 2: model_data['semantic_categories'] = eval(metadata_parts[2]) if metadata_parts[2] != '{}' else {} else: model_data['semantic_categories'] = {} except: model_data['semantic_categories'] = {} # Parse grammar rules try: if len(metadata_parts) > 3: model_data['grammar_rules'] = eval(metadata_parts[3]) if metadata_parts[3] != '[]' else [] else: model_data['grammar_rules'] = [] except: model_data['grammar_rules'] = [] # Extract pattern data (skip metadata row) pattern_df = df.iloc[1:].copy() model_data['patterns'] = [p for p in pattern_df['patterns'].tolist() if p] model_data['responses'] = [r for r in pattern_df['responses'].tolist() if r] model_data['weights'] = pattern_df['weights'].tolist() # Parse response templates response_templates = [] for template_str in pattern_df['response_templates'].tolist(): try: template = eval(template_str) if template_str not in ['{}', ''] else {} response_templates.append(template) except: response_templates.append({}) model_data['response_templates'] = response_templates else: # Fallback: Load old format (for backward compatibility) model_data = { 'patterns': [p for p in df['patterns'].tolist() if p], 'responses': [r for r in df['responses'].tolist() if r], 'weights': df['weights'].tolist(), 'confidence': df.get('confidence', pd.Series([0.5])).iloc[0] if 'confidence' in df.columns else 0.5, 'keywords': df.get('keywords', pd.Series([''])).iloc[0].split() if 'keywords' in df.columns else [], 'training_samples': df.get('training_samples', pd.Series([0])).iloc[0] if 'training_samples' in df.columns else 0, 'model_id': model_id, 'knowledge_base': {}, 'semantic_categories': {}, 'response_templates': [], 'grammar_rules': [] } return model_data except Exception as e: print(f"Error loading optimized model {model_id}: {e}") return None def load_all_models(self) -> List[Dict[str, Any]]: models = [] if not os.path.exists(self.models_dir): return models for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: model_id = int(filename.split("_")[2].split(".")[0]) model = self.load_mini_model(model_id) if model: models.append(model) except (ValueError, IndexError): print(f"Warning: Invalid model filename format: {filename}") continue return models def get_model_count(self) -> int: if not os.path.exists(self.models_dir): return 0 count = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): count += 1 return count def get_next_model_id(self) -> int: if not os.path.exists(self.models_dir): return 1 max_id = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: model_id = int(filename.split("_")[2].split(".")[0]) max_id = max(max_id, model_id) except (ValueError, IndexError): continue return max_id + 1 def delete_model(self, model_id: int) -> bool: filename = f"AgGPT_Expert_{model_id:04d}.feather" filepath = os.path.join(self.models_dir, filename) if os.path.exists(filepath): try: os.remove(filepath) print(f"Deleted model: {filename}") return True except Exception as e: print(f"Error deleting model {model_id}: {e}") return False return False def clear_all_models(self) -> int: if not os.path.exists(self.models_dir): return 0 deleted_count = 0 for filename in os.listdir(self.models_dir): if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"): try: os.remove(os.path.join(self.models_dir, filename)) deleted_count += 1 except Exception as e: print(f"Error deleting {filename}: {e}") print(f"Deleted {deleted_count} model files") return deleted_count def similarity_score(text1: str, text2: str) -> float: """Enhanced semantic similarity calculation for AgGPT-19""" if not text1 or not text2: return 0.0 # Normalize texts text1_clean = text1.lower().strip() text2_clean = text2.lower().strip() # Exact match bonus if text1_clean == text2_clean: return 1.0 # Character-level similarity (for typos and variations) char_sim = _character_similarity(text1_clean, text2_clean) # Word-level analysis words1 = set(text1_clean.split()) words2 = set(text2_clean.split()) if not words1 or not words2: return char_sim * 0.3 # Jaccard similarity (word overlap) intersection = len(words1.intersection(words2)) union = len(words1.union(words2)) jaccard = intersection / union if union > 0 else 0.0 # Semantic word analysis semantic_sim = _semantic_word_similarity(words1, words2) # N-gram similarity ngram_sim = _ngram_similarity(text1_clean, text2_clean) # Length penalty for very different lengths len1, len2 = len(text1_clean.split()), len(text2_clean.split()) length_penalty = 1.0 - min(abs(len1 - len2) / max(len1, len2, 1), 0.5) # Combine all similarity measures final_score = ( jaccard * 0.4 + # Word overlap semantic_sim * 0.3 + # Semantic similarity ngram_sim * 0.2 + # Character patterns char_sim * 0.1 # Character similarity ) * length_penalty return min(final_score, 1.0) def _character_similarity(text1: str, text2: str) -> float: """Calculate character-level similarity using longest common subsequence""" if not text1 or not text2: return 0.0 # Simple LCS implementation len1, len2 = len(text1), len(text2) dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] for i in range(1, len1 + 1): for j in range(1, len2 + 1): if text1[i-1] == text2[j-1]: dp[i][j] = dp[i-1][j-1] + 1 else: dp[i][j] = max(dp[i-1][j], dp[i][j-1]) lcs_length = dp[len1][len2] return (2.0 * lcs_length) / (len1 + len2) if (len1 + len2) > 0 else 0.0 def _semantic_word_similarity(words1: set, words2: set) -> float: """Calculate semantic similarity between word sets""" if not words1 or not words2: return 0.0 # Common semantic patterns synonyms = { 'hello': {'hi', 'hey', 'greetings', 'good morning', 'good afternoon'}, 'thanks': {'thank you', 'appreciate', 'grateful'}, 'yes': {'yeah', 'yep', 'sure', 'absolutely', 'definitely'}, 'no': {'nope', 'negative', 'not really'}, 'good': {'great', 'excellent', 'wonderful', 'amazing', 'fantastic'}, 'bad': {'terrible', 'awful', 'horrible', 'poor'}, 'big': {'large', 'huge', 'enormous', 'massive'}, 'small': {'little', 'tiny', 'mini', 'minute'}, } # Find semantic matches semantic_matches = 0 total_comparisons = 0 for word1 in words1: for word2 in words2: total_comparisons += 1 # Direct match if word1 == word2: semantic_matches += 1 continue # Check synonyms for key, synonym_set in synonyms.items(): if word1 in synonym_set and word2 in synonym_set: semantic_matches += 0.8 break elif (word1 == key and word2 in synonym_set) or (word2 == key and word1 in synonym_set): semantic_matches += 0.9 break # Partial word matching (for variations) if len(word1) > 3 and len(word2) > 3: if word1 in word2 or word2 in word1: semantic_matches += 0.6 elif word1[:3] == word2[:3]: # Same prefix semantic_matches += 0.4 return semantic_matches / total_comparisons if total_comparisons > 0 else 0.0 def _ngram_similarity(text1: str, text2: str, n: int = 3) -> float: """Calculate n-gram similarity for character patterns""" if len(text1) < n or len(text2) < n: return 0.0 ngrams1 = set(text1[i:i+n] for i in range(len(text1) - n + 1)) ngrams2 = set(text2[i:i+n] for i in range(len(text2) - n + 1)) if not ngrams1 or not ngrams2: return 0.0 intersection = len(ngrams1.intersection(ngrams2)) union = len(ngrams1.union(ngrams2)) return intersection / union if union > 0 else 0.0 def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float: if not patterns or not responses or len(patterns) != len(responses): return 0.1 base_confidence = min(0.9, len(patterns) / 10.0) return max(0.1, min(1.0, base_confidence)) if __name__ == "__main__": manager = FeatherManager() test_model = { 'patterns': ['hello', 'hi', 'hey'], 'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'], 'weights': [1.0, 0.9, 0.8], 'confidence': 0.8, 'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'], 'keywords': ['greeting', 'hello', 'hi'], 'training_samples': 150 } model_id = manager.get_next_model_id() manager.save_mini_model(test_model, model_id) loaded_model = manager.load_mini_model(model_id) print(f"Original model: {test_model}") print(f"Loaded model: {loaded_model}") print(f"Models count: {manager.get_model_count()}")