import os import re import random from typing import List, Dict, Tuple, Any from collections import defaultdict import math from feather import FeatherManager, similarity_score from train import GrammarRules, PatternExtractor class ResponseGenerator: def __init__(self, feather_manager: FeatherManager): self.feather_manager = feather_manager self.pattern_extractor = PatternExtractor() self.grammar_rules = GrammarRules() self.models = [] self.context_window = [] self.max_context_length = 10 def load_models(self): print("Loading mini-models...") self.models = self.feather_manager.load_all_models() print(f"Loaded {len(self.models)} mini-models") if not self.models: print("No trained models found! Please run train.py first.") return False return True def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]: if not self.models: return [] input_pattern = self.pattern_extractor.create_pattern(user_input) input_keywords = set(self.pattern_extractor.extract_keywords(user_input)) model_scores = [] for model in self.models: score = 0.0 pattern_matches = 0 keyword_matches = 0 for pattern in model.get('patterns', []): pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) score += pattern_sim if pattern_sim > 0.3: pattern_matches += 1 model_keywords = set(model.get('keywords', [])) if model_keywords and input_keywords: keyword_overlap = len(input_keywords.intersection(model_keywords)) keyword_total = len(input_keywords.union(model_keywords)) keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0 score += keyword_score * 2 keyword_matches = keyword_overlap confidence = model.get('confidence', 0.5) score *= confidence training_samples = model.get('training_samples', 1) training_bonus = min(0.2, training_samples / 100) score += training_bonus context_bonus = self._calculate_context_bonus(user_input, model) score += context_bonus model_scores.append((model, score)) model_scores.sort(key=lambda x: x[1], reverse=True) return model_scores def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float: if not self.context_window: return 0.0 context_bonus = 0.0 for prev_input, prev_response in self.context_window[-3:]: for pattern in model.get('patterns', [])[:5]: pattern_sim = similarity_score(prev_input, pattern.strip()) context_bonus += pattern_sim * 0.1 return min(context_bonus, 0.3) def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]: valid_models = [(model, score) for model, score in model_scores if score > 0.01] if not valid_models: valid_models = random.sample(model_scores, min(3, len(model_scores))) return valid_models[:top_k] def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]: responses = [] input_pattern = self.pattern_extractor.create_pattern(user_input) for model, model_score in top_models: model_responses = [] best_similarity = 0.0 patterns = model.get('patterns', []) model_responses_list = model.get('responses', []) if not patterns or not model_responses_list: continue best_matches = [] for i, pattern in enumerate(patterns): if i < len(model_responses_list): sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) if sim > 0.1: best_matches.append((model_responses_list[i], sim)) best_matches.sort(key=lambda x: x[1], reverse=True) selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)] for response, pattern_sim in selected_responses: weight = model_score * (0.7 + pattern_sim * 0.3) responses.append((response, weight)) return responses def combine_responses(self, responses: List[Tuple[str, float]]) -> str: if not responses: return "I'm not sure how to respond to that." filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05] if not filtered_responses: filtered_responses = responses[:1] response_groups = defaultdict(list) for response, weight in filtered_responses: key = ' '.join(response.split()[:3]).lower() response_groups[key].append((response, weight)) best_responses = [] for group in response_groups.values(): best_resp, best_weight = max(group, key=lambda x: x[1]) best_responses.append((best_resp, best_weight)) if len(best_responses) > 1: total_weight = sum(weight for _, weight in best_responses) if total_weight > 0: normalized_weights = [weight / total_weight for _, weight in best_responses] rand_val = random.random() cumsum = 0.0 for i, norm_weight in enumerate(normalized_weights): cumsum += norm_weight if rand_val <= cumsum: selected_response = best_responses[i][0] break else: selected_response = best_responses[0][0] else: selected_response = best_responses[0][0] else: selected_response = best_responses[0][0] final_response = selected_response if not final_response.endswith(''): final_response += ' ' return final_response def generate_response(self, user_input: str) -> str: if not user_input.strip(): return "Please say something! " model_scores = self.calculate_model_scores(user_input) if not model_scores: return "I need to learn more before I can respond properly. " top_models = self.select_top_models(model_scores, top_k=5) responses = self.generate_responses_from_models(user_input, top_models) final_response = self.combine_responses(responses) self.context_window.append((user_input, final_response)) if len(self.context_window) > self.max_context_length: self.context_window.pop(0) return final_response def get_model_statistics(self) -> Dict[str, Any]: if not self.models: return {"total_models": 0} total_patterns = sum(len(model.get('patterns', [])) for model in self.models) total_responses = sum(len(model.get('responses', [])) for model in self.models) avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models) total_training_samples = sum(model.get('training_samples', 0) for model in self.models) return { "total_models": len(self.models), "total_patterns": total_patterns, "total_responses": total_responses, "average_confidence": avg_confidence, "total_training_samples": total_training_samples } class AgGPTChat: def __init__(self, models_dir: str = "models"): self.feather_manager = FeatherManager(models_dir) self.response_generator = ResponseGenerator(self.feather_manager) self.conversation_history = [] def initialize(self) -> bool: print("AgGPT-18 Scalable Feather Architecture Chat") print("=" * 50) success = self.response_generator.load_models() if success: stats = self.response_generator.get_model_statistics() print(f"Model Statistics:") print(f" Mini-models loaded: {stats['total_models']}") print(f" Total patterns: {stats['total_patterns']}") print(f" Total responses: {stats['total_responses']}") print(f" Average confidence: {stats['average_confidence']:.3f}") print(f" Training samples: {stats['total_training_samples']}") print("=" * 50) print("Chat initialized! Type 'quit' to exit.") print("Large context window active - I'll remember our conversation!") print() return success def chat_loop(self): if not self.initialize(): return while True: try: user_input = input("You: ").strip() if not user_input: continue if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']: print("AgGPT: Goodbye! Thanks for chatting with me! ") break if user_input.lower() in ['stats', 'statistics']: stats = self.response_generator.get_model_statistics() print("Current Statistics:") for key, value in stats.items(): print(f" {key}: {value}") continue if user_input.lower() in ['clear', 'reset']: self.response_generator.context_window = [] print("Context cleared!") continue print("AgGPT: ", end="", flush=True) response = self.response_generator.generate_response(user_input) display_response = response.replace(' ', '').replace('', '') print(display_response) print() self.conversation_history.append({ 'user': user_input, 'assistant': display_response }) except KeyboardInterrupt: print("\n\nAgGPT: Chat interrupted. Goodbye!") break except Exception as e: print(f"\nError: {e}") print("Let me try again...") continue def batch_test(self, test_inputs: List[str]): if not self.initialize(): return print("Running batch test...") print("=" * 50) for i, test_input in enumerate(test_inputs, 1): print(f"Test {i}: {test_input}") response = self.response_generator.generate_response(test_input) display_response = response.replace(' ', '').replace('', '') print(f"Response: {display_response}") print("-" * 30) def main(): chat = AgGPTChat() import sys if len(sys.argv) > 1 and sys.argv[1] == "test": test_inputs = [ "hi", "hello there", "how are you?", "what's your favorite color?", "tell me a joke", "thank you", "goodbye" ] chat.batch_test(test_inputs) else: chat.chat_loop() if __name__ == "__main__": main()