AgGPT18 / chat.py
AGofficial's picture
Upload 8 files
e0c0586 verified
import os
import re
import random
from typing import List, Dict, Tuple, Any
from collections import defaultdict
import math
from feather import FeatherManager, similarity_score
from train import GrammarRules, PatternExtractor
class ResponseGenerator:
def __init__(self, feather_manager: FeatherManager):
self.feather_manager = feather_manager
self.pattern_extractor = PatternExtractor()
self.grammar_rules = GrammarRules()
self.models = []
self.context_window = []
self.max_context_length = 10
def load_models(self):
print("Loading mini-models...")
self.models = self.feather_manager.load_all_models()
print(f"Loaded {len(self.models)} mini-models")
if not self.models:
print("No trained models found! Please run train.py first.")
return False
return True
def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]:
if not self.models:
return []
input_pattern = self.pattern_extractor.create_pattern(user_input)
input_keywords = set(self.pattern_extractor.extract_keywords(user_input))
model_scores = []
for model in self.models:
score = 0.0
pattern_matches = 0
keyword_matches = 0
for pattern in model.get('patterns', []):
pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
score += pattern_sim
if pattern_sim > 0.3:
pattern_matches += 1
model_keywords = set(model.get('keywords', []))
if model_keywords and input_keywords:
keyword_overlap = len(input_keywords.intersection(model_keywords))
keyword_total = len(input_keywords.union(model_keywords))
keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0
score += keyword_score * 2
keyword_matches = keyword_overlap
confidence = model.get('confidence', 0.5)
score *= confidence
training_samples = model.get('training_samples', 1)
training_bonus = min(0.2, training_samples / 100)
score += training_bonus
context_bonus = self._calculate_context_bonus(user_input, model)
score += context_bonus
model_scores.append((model, score))
model_scores.sort(key=lambda x: x[1], reverse=True)
return model_scores
def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float:
if not self.context_window:
return 0.0
context_bonus = 0.0
for prev_input, prev_response in self.context_window[-3:]:
for pattern in model.get('patterns', [])[:5]:
pattern_sim = similarity_score(prev_input, pattern.strip())
context_bonus += pattern_sim * 0.1
return min(context_bonus, 0.3)
def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]:
valid_models = [(model, score) for model, score in model_scores if score > 0.01]
if not valid_models:
valid_models = random.sample(model_scores, min(3, len(model_scores)))
return valid_models[:top_k]
def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]:
responses = []
input_pattern = self.pattern_extractor.create_pattern(user_input)
for model, model_score in top_models:
model_responses = []
best_similarity = 0.0
patterns = model.get('patterns', [])
model_responses_list = model.get('responses', [])
if not patterns or not model_responses_list:
continue
best_matches = []
for i, pattern in enumerate(patterns):
if i < len(model_responses_list):
sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
if sim > 0.1:
best_matches.append((model_responses_list[i], sim))
best_matches.sort(key=lambda x: x[1], reverse=True)
selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)]
for response, pattern_sim in selected_responses:
weight = model_score * (0.7 + pattern_sim * 0.3)
responses.append((response, weight))
return responses
def combine_responses(self, responses: List[Tuple[str, float]]) -> str:
if not responses:
return "I'm not sure how to respond to that."
filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05]
if not filtered_responses:
filtered_responses = responses[:1]
response_groups = defaultdict(list)
for response, weight in filtered_responses:
key = ' '.join(response.split()[:3]).lower()
response_groups[key].append((response, weight))
best_responses = []
for group in response_groups.values():
best_resp, best_weight = max(group, key=lambda x: x[1])
best_responses.append((best_resp, best_weight))
if len(best_responses) > 1:
total_weight = sum(weight for _, weight in best_responses)
if total_weight > 0:
normalized_weights = [weight / total_weight for _, weight in best_responses]
rand_val = random.random()
cumsum = 0.0
for i, norm_weight in enumerate(normalized_weights):
cumsum += norm_weight
if rand_val <= cumsum:
selected_response = best_responses[i][0]
break
else:
selected_response = best_responses[0][0]
else:
selected_response = best_responses[0][0]
else:
selected_response = best_responses[0][0]
final_response = selected_response
if not final_response.endswith('<eos>'):
final_response += ' <eos>'
return final_response
def generate_response(self, user_input: str) -> str:
if not user_input.strip():
return "Please say something! <eos>"
model_scores = self.calculate_model_scores(user_input)
if not model_scores:
return "I need to learn more before I can respond properly. <eos>"
top_models = self.select_top_models(model_scores, top_k=5)
responses = self.generate_responses_from_models(user_input, top_models)
final_response = self.combine_responses(responses)
self.context_window.append((user_input, final_response))
if len(self.context_window) > self.max_context_length:
self.context_window.pop(0)
return final_response
def get_model_statistics(self) -> Dict[str, Any]:
if not self.models:
return {"total_models": 0}
total_patterns = sum(len(model.get('patterns', [])) for model in self.models)
total_responses = sum(len(model.get('responses', [])) for model in self.models)
avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models)
total_training_samples = sum(model.get('training_samples', 0) for model in self.models)
return {
"total_models": len(self.models),
"total_patterns": total_patterns,
"total_responses": total_responses,
"average_confidence": avg_confidence,
"total_training_samples": total_training_samples
}
class AgGPTChat:
def __init__(self, models_dir: str = "models"):
self.feather_manager = FeatherManager(models_dir)
self.response_generator = ResponseGenerator(self.feather_manager)
self.conversation_history = []
def initialize(self) -> bool:
print("AgGPT-18 Scalable Feather Architecture Chat")
print("=" * 50)
success = self.response_generator.load_models()
if success:
stats = self.response_generator.get_model_statistics()
print(f"Model Statistics:")
print(f" Mini-models loaded: {stats['total_models']}")
print(f" Total patterns: {stats['total_patterns']}")
print(f" Total responses: {stats['total_responses']}")
print(f" Average confidence: {stats['average_confidence']:.3f}")
print(f" Training samples: {stats['total_training_samples']}")
print("=" * 50)
print("Chat initialized! Type 'quit' to exit.")
print("Large context window active - I'll remember our conversation!")
print()
return success
def chat_loop(self):
if not self.initialize():
return
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']:
print("AgGPT: Goodbye! Thanks for chatting with me! <eos>")
break
if user_input.lower() in ['stats', 'statistics']:
stats = self.response_generator.get_model_statistics()
print("Current Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
continue
if user_input.lower() in ['clear', 'reset']:
self.response_generator.context_window = []
print("Context cleared!")
continue
print("AgGPT: ", end="", flush=True)
response = self.response_generator.generate_response(user_input)
display_response = response.replace(' <eos>', '').replace('<eos>', '')
print(display_response)
print()
self.conversation_history.append({
'user': user_input,
'assistant': display_response
})
except KeyboardInterrupt:
print("\n\nAgGPT: Chat interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
print("Let me try again...")
continue
def batch_test(self, test_inputs: List[str]):
if not self.initialize():
return
print("Running batch test...")
print("=" * 50)
for i, test_input in enumerate(test_inputs, 1):
print(f"Test {i}: {test_input}")
response = self.response_generator.generate_response(test_input)
display_response = response.replace(' <eos>', '').replace('<eos>', '')
print(f"Response: {display_response}")
print("-" * 30)
def main():
chat = AgGPTChat()
import sys
if len(sys.argv) > 1 and sys.argv[1] == "test":
test_inputs = [
"hi",
"hello there",
"how are you?",
"what's your favorite color?",
"tell me a joke",
"thank you",
"goodbye"
]
chat.batch_test(test_inputs)
else:
chat.chat_loop()
if __name__ == "__main__":
main()