AgGPT17 / feather.py
AGofficial's picture
Upload 4 files
8d67672 verified
import pandas as pd
import os
from typing import Dict, List, Any, Optional
import math
class FeatherManager:
def __init__(self, models_dir: str = "models"):
self.models_dir = models_dir
os.makedirs(models_dir, exist_ok=True)
def save_mini_model(self, model_data: Dict[str, Any], model_id: int) -> str:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
patterns = model_data.get('patterns', [])
responses = model_data.get('responses', [])
if not patterns or not responses:
print(f"Warning: Model {model_id} has empty patterns or responses")
patterns = patterns or ['hello']
responses = responses or ['Hello!']
df_data = {
'patterns': [str(pattern) for pattern in patterns],
'responses': [str(response) for response in responses],
'weights': model_data.get('weights', [1.0] * len(patterns)),
'confidence': [model_data.get('confidence', 0.5)] * len(patterns),
'grammar_rules': [str(rule) for rule in model_data.get('grammar_rules', [])] or ['none'],
'keywords': [' '.join(model_data.get('keywords', []))] * len(patterns),
'training_samples': [model_data.get('training_samples', 0)] * len(patterns)
}
max_len = max(len(v) if isinstance(v, list) else 1 for v in df_data.values())
for key, value in df_data.items():
if isinstance(value, list):
while len(value) < max_len:
value.append(value[-1] if value else '')
df = pd.DataFrame(df_data)
df.to_feather(filepath)
print(f"Saved mini-model: {filename}")
return filepath
def load_mini_model(self, model_id: int) -> Optional[Dict[str, Any]]:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
if not os.path.exists(filepath):
return None
try:
df = pd.read_feather(filepath)
model_data = {
'patterns': [p for p in df['patterns'].tolist() if p],
'responses': [r for r in df['responses'].tolist() if r],
'weights': df['weights'].tolist(),
'confidence': df['confidence'].iloc[0] if len(df) > 0 else 0.5,
'grammar_rules': [rule for rule in df['grammar_rules'].tolist() if rule],
'keywords': df['keywords'].iloc[0].split() if len(df) > 0 and df['keywords'].iloc[0] else [],
'training_samples': df['training_samples'].iloc[0] if len(df) > 0 else 0,
'model_id': model_id
}
return model_data
except Exception as e:
print(f"Error loading model {model_id}: {e}")
return None
def load_all_models(self) -> List[Dict[str, Any]]:
models = []
if not os.path.exists(self.models_dir):
return models
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
model_id = int(filename.split("_")[2].split(".")[0])
model = self.load_mini_model(model_id)
if model:
models.append(model)
except (ValueError, IndexError):
print(f"Warning: Invalid model filename format: {filename}")
continue
return models
def get_model_count(self) -> int:
if not os.path.exists(self.models_dir):
return 0
count = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
count += 1
return count
def get_next_model_id(self) -> int:
if not os.path.exists(self.models_dir):
return 1
max_id = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
model_id = int(filename.split("_")[2].split(".")[0])
max_id = max(max_id, model_id)
except (ValueError, IndexError):
continue
return max_id + 1
def delete_model(self, model_id: int) -> bool:
filename = f"AgGPT_Expert_{model_id:04d}.feather"
filepath = os.path.join(self.models_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"Deleted model: {filename}")
return True
except Exception as e:
print(f"Error deleting model {model_id}: {e}")
return False
return False
def clear_all_models(self) -> int:
if not os.path.exists(self.models_dir):
return 0
deleted_count = 0
for filename in os.listdir(self.models_dir):
if filename.startswith("AgGPT_Expert_") and filename.endswith(".feather"):
try:
os.remove(os.path.join(self.models_dir, filename))
deleted_count += 1
except Exception as e:
print(f"Error deleting {filename}: {e}")
print(f"Deleted {deleted_count} model files")
return deleted_count
def similarity_score(text1: str, text2: str) -> float:
if not text1 or not text2:
return 0.0
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def calculate_confidence_score(patterns: List[str], responses: List[str]) -> float:
if not patterns or not responses or len(patterns) != len(responses):
return 0.1
base_confidence = min(0.9, len(patterns) / 10.0)
return max(0.1, min(1.0, base_confidence))
if __name__ == "__main__":
manager = FeatherManager()
test_model = {
'patterns': ['hello', 'hi', 'hey'],
'responses': ['Hello! How can I help you?', 'Hi there!', 'Hey! What\'s up?'],
'weights': [1.0, 0.9, 0.8],
'confidence': 0.8,
'grammar_rules': ['capitalize_first_word', 'end_with_punctuation'],
'keywords': ['greeting', 'hello', 'hi'],
'training_samples': 150
}
model_id = manager.get_next_model_id()
manager.save_mini_model(test_model, model_id)
loaded_model = manager.load_mini_model(model_id)
print(f"Original model: {test_model}")
print(f"Loaded model: {loaded_model}")
print(f"Models count: {manager.get_model_count()}")