| import os | |
| import re | |
| import random | |
| from typing import List, Dict, Tuple, Any | |
| from collections import defaultdict | |
| import math | |
| from feather import FeatherManager, similarity_score | |
| from train import GrammarRules, PatternExtractor | |
| class ResponseGenerator: | |
| def __init__(self, feather_manager: FeatherManager): | |
| self.feather_manager = feather_manager | |
| self.pattern_extractor = PatternExtractor() | |
| self.grammar_rules = GrammarRules() | |
| self.models = [] | |
| self.context_window = [] | |
| self.max_context_length = 10 | |
| def load_models(self): | |
| print("Loading mini-models...") | |
| self.models = self.feather_manager.load_all_models() | |
| print(f"Loaded {len(self.models)} mini-models") | |
| if not self.models: | |
| print("No trained models found! Please run train.py first.") | |
| return False | |
| return True | |
| def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]: | |
| if not self.models: | |
| return [] | |
| input_pattern = self.pattern_extractor.create_pattern(user_input) | |
| input_keywords = set(self.pattern_extractor.extract_keywords(user_input)) | |
| model_scores = [] | |
| for model in self.models: | |
| score = 0.0 | |
| pattern_matches = 0 | |
| keyword_matches = 0 | |
| for pattern in model.get('patterns', []): | |
| pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) | |
| score += pattern_sim | |
| if pattern_sim > 0.3: | |
| pattern_matches += 1 | |
| model_keywords = set(model.get('keywords', [])) | |
| if model_keywords and input_keywords: | |
| keyword_overlap = len(input_keywords.intersection(model_keywords)) | |
| keyword_total = len(input_keywords.union(model_keywords)) | |
| keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0 | |
| score += keyword_score * 2 | |
| keyword_matches = keyword_overlap | |
| confidence = model.get('confidence', 0.5) | |
| score *= confidence | |
| training_samples = model.get('training_samples', 1) | |
| training_bonus = min(0.2, training_samples / 100) | |
| score += training_bonus | |
| context_bonus = self._calculate_context_bonus(user_input, model) | |
| score += context_bonus | |
| model_scores.append((model, score)) | |
| model_scores.sort(key=lambda x: x[1], reverse=True) | |
| return model_scores | |
| def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float: | |
| if not self.context_window: | |
| return 0.0 | |
| context_bonus = 0.0 | |
| for prev_input, prev_response in self.context_window[-3:]: | |
| for pattern in model.get('patterns', [])[:5]: | |
| pattern_sim = similarity_score(prev_input, pattern.strip()) | |
| context_bonus += pattern_sim * 0.1 | |
| return min(context_bonus, 0.3) | |
| def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]: | |
| valid_models = [(model, score) for model, score in model_scores if score > 0.01] | |
| if not valid_models: | |
| valid_models = random.sample(model_scores, min(3, len(model_scores))) | |
| return valid_models[:top_k] | |
| def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]: | |
| responses = [] | |
| input_pattern = self.pattern_extractor.create_pattern(user_input) | |
| for model, model_score in top_models: | |
| model_responses = [] | |
| best_similarity = 0.0 | |
| patterns = model.get('patterns', []) | |
| model_responses_list = model.get('responses', []) | |
| if not patterns or not model_responses_list: | |
| continue | |
| best_matches = [] | |
| for i, pattern in enumerate(patterns): | |
| if i < len(model_responses_list): | |
| sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) | |
| if sim > 0.1: | |
| best_matches.append((model_responses_list[i], sim)) | |
| best_matches.sort(key=lambda x: x[1], reverse=True) | |
| selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)] | |
| for response, pattern_sim in selected_responses: | |
| weight = model_score * (0.7 + pattern_sim * 0.3) | |
| responses.append((response, weight)) | |
| return responses | |
| def combine_responses(self, responses: List[Tuple[str, float]]) -> str: | |
| if not responses: | |
| return "I'm not sure how to respond to that." | |
| filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05] | |
| if not filtered_responses: | |
| filtered_responses = responses[:1] | |
| response_groups = defaultdict(list) | |
| for response, weight in filtered_responses: | |
| key = ' '.join(response.split()[:3]).lower() | |
| response_groups[key].append((response, weight)) | |
| best_responses = [] | |
| for group in response_groups.values(): | |
| best_resp, best_weight = max(group, key=lambda x: x[1]) | |
| best_responses.append((best_resp, best_weight)) | |
| if len(best_responses) > 1: | |
| total_weight = sum(weight for _, weight in best_responses) | |
| if total_weight > 0: | |
| normalized_weights = [weight / total_weight for _, weight in best_responses] | |
| rand_val = random.random() | |
| cumsum = 0.0 | |
| for i, norm_weight in enumerate(normalized_weights): | |
| cumsum += norm_weight | |
| if rand_val <= cumsum: | |
| selected_response = best_responses[i][0] | |
| break | |
| else: | |
| selected_response = best_responses[0][0] | |
| else: | |
| selected_response = best_responses[0][0] | |
| else: | |
| selected_response = best_responses[0][0] | |
| final_response = selected_response | |
| if not final_response.endswith('<eos>'): | |
| final_response += ' <eos>' | |
| return final_response | |
| def generate_response(self, user_input: str) -> str: | |
| if not user_input.strip(): | |
| return "Please say something! <eos>" | |
| model_scores = self.calculate_model_scores(user_input) | |
| if not model_scores: | |
| return "I need to learn more before I can respond properly. <eos>" | |
| top_models = self.select_top_models(model_scores, top_k=5) | |
| responses = self.generate_responses_from_models(user_input, top_models) | |
| final_response = self.combine_responses(responses) | |
| self.context_window.append((user_input, final_response)) | |
| if len(self.context_window) > self.max_context_length: | |
| self.context_window.pop(0) | |
| return final_response | |
| def get_model_statistics(self) -> Dict[str, Any]: | |
| if not self.models: | |
| return {"total_models": 0} | |
| total_patterns = sum(len(model.get('patterns', [])) for model in self.models) | |
| total_responses = sum(len(model.get('responses', [])) for model in self.models) | |
| avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models) | |
| total_training_samples = sum(model.get('training_samples', 0) for model in self.models) | |
| return { | |
| "total_models": len(self.models), | |
| "total_patterns": total_patterns, | |
| "total_responses": total_responses, | |
| "average_confidence": avg_confidence, | |
| "total_training_samples": total_training_samples | |
| } | |
| class AgGPTChat: | |
| def __init__(self, models_dir: str = "models"): | |
| self.feather_manager = FeatherManager(models_dir) | |
| self.response_generator = ResponseGenerator(self.feather_manager) | |
| self.conversation_history = [] | |
| def initialize(self) -> bool: | |
| print("AgGPT-20 Scalable Feather Architecture Chat") | |
| print("=" * 50) | |
| success = self.response_generator.load_models() | |
| if success: | |
| stats = self.response_generator.get_model_statistics() | |
| print(f"Model Statistics:") | |
| print(f" Mini-models loaded: {stats['total_models']}") | |
| print(f" Total patterns: {stats['total_patterns']}") | |
| print(f" Total responses: {stats['total_responses']}") | |
| print(f" Average confidence: {stats['average_confidence']:.3f}") | |
| print(f" Training samples: {stats['total_training_samples']}") | |
| print("=" * 50) | |
| print("Chat initialized! Type 'quit' to exit.") | |
| print("Large context window active - I'll remember our conversation!") | |
| print() | |
| return success | |
| def chat_loop(self): | |
| if not self.initialize(): | |
| return | |
| while True: | |
| try: | |
| user_input = input("You: ").strip() | |
| if not user_input: | |
| continue | |
| if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']: | |
| print("AgGPT: Goodbye! Thanks for chatting with me! <eos>") | |
| break | |
| if user_input.lower() in ['stats', 'statistics']: | |
| stats = self.response_generator.get_model_statistics() | |
| print("Current Statistics:") | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| continue | |
| if user_input.lower() in ['clear', 'reset']: | |
| self.response_generator.context_window = [] | |
| print("Context cleared!") | |
| continue | |
| print("AgGPT: ", end="", flush=True) | |
| response = self.response_generator.generate_response(user_input) | |
| display_response = response.replace(' <eos>', '').replace('<eos>', '') | |
| print(display_response) | |
| print() | |
| self.conversation_history.append({ | |
| 'user': user_input, | |
| 'assistant': display_response | |
| }) | |
| except KeyboardInterrupt: | |
| print("\n\nAgGPT: Chat interrupted. Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"\nError: {e}") | |
| print("Let me try again...") | |
| continue | |
| def batch_test(self, test_inputs: List[str]): | |
| if not self.initialize(): | |
| return | |
| print("Running batch test...") | |
| print("=" * 50) | |
| for i, test_input in enumerate(test_inputs, 1): | |
| print(f"Test {i}: {test_input}") | |
| response = self.response_generator.generate_response(test_input) | |
| display_response = response.replace(' <eos>', '').replace('<eos>', '') | |
| print(f"Response: {display_response}") | |
| print("-" * 30) | |
| def main(): | |
| chat = AgGPTChat() | |
| import sys | |
| if len(sys.argv) > 1 and sys.argv[1] == "test": | |
| test_inputs = [ | |
| "hi", | |
| "hello there", | |
| "how are you?", | |
| "what's your favorite color?", | |
| "tell me a joke", | |
| "thank you", | |
| "goodbye" | |
| ] | |
| chat.batch_test(test_inputs) | |
| else: | |
| chat.chat_loop() | |
| if __name__ == "__main__": | |
| main() | |