AgGPT19 / feather.py
AGofficial's picture
Upload 6 files
cd5b50b verified
import pandas as pd
import os
from typing import Dict, List, Any, Optional
import math
class FeatherManager:
def __init__(self, models_dir: str = "models"):
self.models_dir = models_dir
os.makedirs(models_dir, exist_ok=True)
def save_mini_model(self, model_data: Dict[str, Any], model_id: int) -> str:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
patterns = model_data.get('patterns', [])
responses = model_data.get('responses', [])
if not patterns or not responses:
print(f"Warning: Model {model_id} has empty patterns or responses")
patterns = patterns or ['hello']
responses = responses or ['Hello!']
# FIXED: Efficient data structure for AgGPT-19
# Store only pattern-level data in main table
df_data = {
'patterns': [str(pattern) for pattern in patterns],
'responses': [str(response) for response in responses],
'weights': model_data.get('weights', [1.0] * len(patterns)),
'response_templates': model_data.get('response_templates', [{}] * len(patterns)),
}
# Ensure all arrays are same length
max_len = len(patterns)
for key, value in df_data.items():
if isinstance(value, list):
while len(value) < max_len:
if key == 'weights':
value.append(1.0)
elif key == 'response_templates':
value.append({})
else:
value.append('')
# Store model-level metadata separately (NO DUPLICATION!)
model_metadata = {
'model_id': model_id,
'confidence': model_data.get('confidence', 0.5),
'keywords': ' '.join(model_data.get('keywords', [])),
'training_samples': model_data.get('training_samples', 0),
'knowledge_base': str(model_data.get('knowledge_base', {})),
'semantic_categories': str(model_data.get('semantic_categories', {})),
'grammar_rules': str(model_data.get('grammar_rules', []))
}
# Create main patterns DataFrame
df_patterns = pd.DataFrame(df_data)
# Create single-row metadata DataFrame
df_metadata = pd.DataFrame([model_metadata])
# Save both as separate sheets in the same feather file using a different approach
# Since feather doesn't support multiple sheets, we'll store metadata as the first row
# and add a special marker
# Create combined structure with metadata as first row
metadata_row = {
'patterns': f"__METADATA__{model_metadata['model_id']}",
'responses': model_metadata['knowledge_base'],
'weights': float(model_metadata['confidence']),
'response_templates': f"{model_metadata['keywords']}|{model_metadata['training_samples']}|{model_metadata['semantic_categories']}|{model_metadata['grammar_rules']}"
}
# Combine metadata row with pattern data
combined_data = [metadata_row]
for i in range(len(patterns)):
row = {
'patterns': df_data['patterns'][i],
'responses': df_data['responses'][i],
'weights': df_data['weights'][i],
'response_templates': str(df_data['response_templates'][i])
}
combined_data.append(row)
df_combined = pd.DataFrame(combined_data)
df_combined.to_feather(filepath)
print(f"Saved optimized mini-model: {filename} ({len(patterns)} patterns + metadata)")
return filepath
def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
if not os.path.exists(filepath):
return None
try:
df = pd.read_feather(filepath)
# FIXED: Load optimized structure for AgGPT-19
if len(df) == 0:
return None
# First row contains metadata (marked with __METADATA__)
if df['patterns'].iloc[0].startswith('__METADATA__'):
# Extract metadata from first row
metadata_parts = df['response_templates'].iloc[0].split('|')
model_data = {
'model_id': model_id,
'confidence': float(df['weights'].iloc[0]),
'keywords': metadata_parts[0].split() if len(metadata_parts) > 0 else [],
'training_samples': int(metadata_parts[1]) if len(metadata_parts) > 1 and metadata_parts[1].isdigit() else 0,
}
# Parse knowledge base from responses column
try:
kb_str = df['responses'].iloc[0]
model_data['knowledge_base'] = eval(kb_str) if kb_str != '{}' else {}
except:
model_data['knowledge_base'] = {}
# Parse semantic categories
try:
if len(metadata_parts) > 2:
model_data['semantic_categories'] = eval(metadata_parts[2]) if metadata_parts[2] != '{}' else {}
else:
model_data['semantic_categories'] = {}
except:
model_data['semantic_categories'] = {}
# Parse grammar rules
try:
if len(metadata_parts) > 3:
model_data['grammar_rules'] = eval(metadata_parts[3]) if metadata_parts[3] != '[]' else []
else:
model_data['grammar_rules'] = []
except:
model_data['grammar_rules'] = []
# Extract pattern data (skip metadata row)
pattern_df = df.iloc[1:].copy()
model_data['patterns'] = [p for p in pattern_df['patterns'].tolist() if p]
model_data['responses'] = [r for r in pattern_df['responses'].tolist() if r]
model_data['weights'] = pattern_df['weights'].tolist()
# Parse response templates
response_templates = []
for template_str in pattern_df['response_templates'].tolist():
try:
template = eval(template_str) if template_str not in ['{}', ''] else {}
response_templates.append(template)
except:
response_templates.append({})
model_data['response_templates'] = response_templates
else:
# Fallback: Load old format (for backward compatibility)
model_data = {
'patterns': [p for p in df['patterns'].tolist() if p],
'responses': [r for r in df['responses'].tolist() if r],
'weights': df['weights'].tolist(),
'confidence': df.get('confidence', pd.Series([0.5])).iloc[0] if 'confidence' in df.columns else 0.5,
'keywords': df.get('keywords', pd.Series([''])).iloc[0].split() if 'keywords' in df.columns else [],
'training_samples': df.get('training_samples', pd.Series([0])).iloc[0] if 'training_samples' in df.columns else 0,
'model_id': model_id,
'knowledge_base': {},
'semantic_categories': {},
'response_templates': [],
'grammar_rules': []
}
return model_data
except Exception as e:
print(f"Error loading optimized model {model_id}: {e}")
return None
def load_all_models(self) -> List[Dict[str, Any]]:
models = []
if not os.path.exists(self.models_dir):
return models
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
model_id = int(filename.split("_")[2].split(".")[0])
model = self.load_mini_model(model_id)
if model:
models.append(model)
except (ValueError, IndexError):
print(f"Warning: Invalid model filename format: {filename}")
continue
return models
def get_model_count(self) -> int:
if not os.path.exists(self.models_dir):
return 0
count = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
count += 1
return count
def get_next_model_id(self) -> int:
if not os.path.exists(self.models_dir):
return 1
max_id = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
model_id = int(filename.split("_")[2].split(".")[0])
max_id = max(max_id, model_id)
except (ValueError, IndexError):
continue
return max_id + 1
def delete_model(self, model_id: int) -> bool:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"Deleted model: {filename}")
return True
except Exception as e:
print(f"Error deleting model {model_id}: {e}")
return False
return False
def clear_all_models(self) -> int:
if not os.path.exists(self.models_dir):
return 0
deleted_count = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
os.remove(os.path.join(self.models_dir, filename))
deleted_count += 1
except Exception as e:
print(f"Error deleting {filename}: {e}")
print(f"Deleted {deleted_count} model files")
return deleted_count
def similarity_score(text1: str, text2: str) -> float:
"""Enhanced semantic similarity calculation for AgGPT-19"""
if not text1 or not text2:
return 0.0
# Normalize texts
text1_clean = text1.lower().strip()
text2_clean = text2.lower().strip()
# Exact match bonus
if text1_clean == text2_clean:
return 1.0
# Character-level similarity (for typos and variations)
char_sim = _character_similarity(text1_clean, text2_clean)
# Word-level analysis
words1 = set(text1_clean.split())
words2 = set(text2_clean.split())
if not words1 or not words2:
return char_sim * 0.3
# Jaccard similarity (word overlap)
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
jaccard = intersection / union if union > 0 else 0.0
# Semantic word analysis
semantic_sim = _semantic_word_similarity(words1, words2)
# N-gram similarity
ngram_sim = _ngram_similarity(text1_clean, text2_clean)
# Length penalty for very different lengths
len1, len2 = len(text1_clean.split()), len(text2_clean.split())
length_penalty = 1.0 - min(abs(len1 - len2) / max(len1, len2, 1), 0.5)
# Combine all similarity measures
final_score = (
jaccard * 0.4 + # Word overlap
semantic_sim * 0.3 + # Semantic similarity
ngram_sim * 0.2 + # Character patterns
char_sim * 0.1 # Character similarity
) * length_penalty
return min(final_score, 1.0)
def _character_similarity(text1: str, text2: str) -> float:
"""Calculate character-level similarity using longest common subsequence"""
if not text1 or not text2:
return 0.0
# Simple LCS implementation
len1, len2 = len(text1), len(text2)
dp = [[0] * (len2 + 1) for _ in range(len1 + 1)]
for i in range(1, len1 + 1):
for j in range(1, len2 + 1):
if text1[i-1] == text2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
lcs_length = dp[len1][len2]
return (2.0 * lcs_length) / (len1 + len2) if (len1 + len2) > 0 else 0.0
def _semantic_word_similarity(words1: set, words2: set) -> float:
"""Calculate semantic similarity between word sets"""
if not words1 or not words2:
return 0.0
# Common semantic patterns
synonyms = {
'hello': {'hi', 'hey', 'greetings', 'good morning', 'good afternoon'},
'thanks': {'thank you', 'appreciate', 'grateful'},
'yes': {'yeah', 'yep', 'sure', 'absolutely', 'definitely'},
'no': {'nope', 'negative', 'not really'},
'good': {'great', 'excellent', 'wonderful', 'amazing', 'fantastic'},
'bad': {'terrible', 'awful', 'horrible', 'poor'},
'big': {'large', 'huge', 'enormous', 'massive'},
'small': {'little', 'tiny', 'mini', 'minute'},
}
# Find semantic matches
semantic_matches = 0
total_comparisons = 0
for word1 in words1:
for word2 in words2:
total_comparisons += 1
# Direct match
if word1 == word2:
semantic_matches += 1
continue
# Check synonyms
for key, synonym_set in synonyms.items():
if word1 in synonym_set and word2 in synonym_set:
semantic_matches += 0.8
break
elif (word1 == key and word2 in synonym_set) or (word2 == key and word1 in synonym_set):
semantic_matches += 0.9
break
# Partial word matching (for variations)
if len(word1) > 3 and len(word2) > 3:
if word1 in word2 or word2 in word1:
semantic_matches += 0.6
elif word1[:3] == word2[:3]: # Same prefix
semantic_matches += 0.4
return semantic_matches / total_comparisons if total_comparisons > 0 else 0.0
def _ngram_similarity(text1: str, text2: str, n: int = 3) -> float:
"""Calculate n-gram similarity for character patterns"""
if len(text1) < n or len(text2) < n:
return 0.0
ngrams1 = set(text1[i:i+n] for i in range(len(text1) - n + 1))
ngrams2 = set(text2[i:i+n] for i in range(len(text2) - n + 1))
if not ngrams1 or not ngrams2:
return 0.0
intersection = len(ngrams1.intersection(ngrams2))
union = len(ngrams1.union(ngrams2))
return intersection / union if union > 0 else 0.0
def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float:
if not patterns or not responses or len(patterns) != len(responses):
return 0.1
base_confidence = min(0.9, len(patterns) / 10.0)
return max(0.1, min(1.0, base_confidence))
if __name__ == "__main__":
manager = FeatherManager()
test_model = {
'patterns': ['hello', 'hi', 'hey'],
'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'],
'weights': [1.0, 0.9, 0.8],
'confidence': 0.8,
'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'],
'keywords': ['greeting', 'hello', 'hi'],
'training_samples': 150
}
model_id = manager.get_next_model_id()
manager.save_mini_model(test_model, model_id)
loaded_model = manager.load_mini_model(model_id)
print(f"Original model: {test_model}")
print(f"Loaded model: {loaded_model}")
print(f"Models count: {manager.get_model_count()}")