AgGPT19 / chat.py
AGofficial's picture
Upload 6 files
cd5b50b verified
import os
import re
import random
from typing import List, Dict, Tuple, Any
from collections import defaultdict
import math
from feather import FeatherManager, similarity_score
from train import GrammarRules, PatternExtractor
class SimpleReasoningEngine:
"""Basic reasoning engine for AgGPT-19"""
def __init__(self):
self.logical_patterns = {
'cause_effect': ['because', 'since', 'therefore', 'as a result', 'leads to'],
'comparison': ['better than', 'worse than', 'similar to', 'different from', 'compared to'],
'sequence': ['first', 'then', 'next', 'finally', 'after that'],
'conditional': ['if', 'unless', 'provided that', 'in case'],
}
self.context_memory = {}
def analyze_intent(self, user_input: str, context: List[Tuple[str, str]]) -> Dict[str, Any]:
"""Analyze user intent and context"""
input_lower = user_input.lower()
intent = {
'type': 'unknown',
'topic': '',
'emotion': 'neutral',
'requires_reasoning': False,
'context_dependent': False
}
# Detect intent type
if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'who']):
intent['type'] = 'question'
elif any(word in input_lower for word in ['hello', 'hi', 'hey', 'good morning']):
intent['type'] = 'greeting'
elif any(word in input_lower for word in ['please', 'can you', 'help me']):
intent['type'] = 'request'
elif any(word in input_lower for word in ['thank', 'thanks', 'bye', 'goodbye']):
intent['type'] = 'social'
# Detect if reasoning is needed
if any(pattern in input_lower for patterns in self.logical_patterns.values() for pattern in patterns):
intent['requires_reasoning'] = True
# Check context dependency
if any(word in input_lower for word in ['this', 'that', 'it', 'they', 'what about']):
intent['context_dependent'] = True
# Extract topic
words = [word for word in input_lower.split() if len(word) > 3]
if words:
intent['topic'] = ' '.join(words[:3])
return intent
def apply_reasoning(self, intent: Dict[str, Any], knowledge: Dict[str, str], context: List[Tuple[str, str]]) -> str:
"""Apply basic reasoning to generate more intelligent responses"""
if intent['context_dependent'] and context:
# Use recent context
recent_context = ' '.join([item[0] + ' ' + item[1] for item in context[-2:]])
return f"Building on our conversation: {recent_context[:100]}..."
if intent['requires_reasoning']:
# Apply logical reasoning patterns
if intent['type'] == 'question' and 'why' in intent['topic']:
return "This is likely due to several factors that work together..."
elif 'how' in intent['topic']:
return "Here's a step-by-step approach to understand this..."
return ""
class ResponseGenerator:
def __init__(self, feather_manager: FeatherManager):
self.feather_manager = feather_manager
self.pattern_extractor = PatternExtractor()
self.grammar_rules = GrammarRules()
self.models = []
self.context_window = []
self.max_context_length = 10
self.knowledge_cache = {}
# Enhanced response generation for AgGPT-19
self.reasoning_engine = SimpleReasoningEngine()
self.response_templates = {
'greeting': ["Hello! {topic}", "Hi there! {topic}", "Good {time_of_day}! {topic}"],
'question': ["Based on what I know, {answer}", "Let me think about that. {answer}", "{answer}"],
'explanation': ["Here's how it works: {explanation}", "The key point is {explanation}", "{explanation}"],
'request': ["I'd be happy to help with {request}", "Sure, I can assist with {request}", "Let me help you with {request}"]
}
def load_models(self):
print("Loading enhanced mini-models...")
self.models = self.feather_manager.load_all_models()
print(f"Loaded {len(self.models)} enhanced mini-models")
if not self.models:
print("No trained models found! Please run train.py first.")
return False
# Build knowledge cache from all models
self._build_knowledge_cache()
return True
def _build_knowledge_cache(self):
"""Build a unified knowledge cache from all models"""
for model in self.models:
knowledge_base = model.get('knowledge_base', {})
if knowledge_base:
self.knowledge_cache.update(knowledge_base)
print(f"Built knowledge cache with {len(self.knowledge_cache)} entries")
def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]:
if not self.models:
return []
input_pattern = self.pattern_extractor.create_pattern(user_input)
input_keywords = set(self.pattern_extractor.extract_keywords(user_input))
model_scores = []
for model in self.models:
score = 0.0
pattern_matches = 0
keyword_matches = 0
for pattern in model.get('patterns', []):
pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
score += pattern_sim
if pattern_sim > 0.3:
pattern_matches += 1
model_keywords = set(model.get('keywords', []))
if model_keywords and input_keywords:
keyword_overlap = len(input_keywords.intersection(model_keywords))
keyword_total = len(input_keywords.union(model_keywords))
keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0
score += keyword_score * 2
keyword_matches = keyword_overlap
confidence = model.get('confidence', 0.5)
score *= confidence
training_samples = model.get('training_samples', 1)
training_bonus = min(0.2, training_samples / 100)
score += training_bonus
context_bonus = self._calculate_context_bonus(user_input, model)
score += context_bonus
model_scores.append((model, score))
model_scores.sort(key=lambda x: x[1], reverse=True)
return model_scores
def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float:
if not self.context_window:
return 0.0
context_bonus = 0.0
for prev_input, prev_response in self.context_window[-3:]:
for pattern in model.get('patterns', [])[:5]:
pattern_sim = similarity_score(prev_input, pattern.strip())
context_bonus += pattern_sim * 0.1
return min(context_bonus, 0.3)
def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]:
valid_models = [(model, score) for model, score in model_scores if score > 0.01]
if not valid_models:
valid_models = random.sample(model_scores, min(3, len(model_scores)))
return valid_models[:top_k]
def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]:
responses = []
input_pattern = self.pattern_extractor.create_pattern(user_input)
for model, model_score in top_models:
model_responses = []
best_similarity = 0.0
patterns = model.get('patterns', [])
model_responses_list = model.get('responses', [])
if not patterns or not model_responses_list:
continue
best_matches = []
for i, pattern in enumerate(patterns):
if i < len(model_responses_list):
sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
if sim > 0.1:
best_matches.append((model_responses_list[i], sim))
best_matches.sort(key=lambda x: x[1], reverse=True)
selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)]
for response, pattern_sim in selected_responses:
weight = model_score * (0.7 + pattern_sim * 0.3)
responses.append((response, weight))
return responses
def combine_responses(self, responses: List[Tuple[str, float]], intent: Dict[str, Any] = None, reasoning_response: str = "") -> str:
"""Intelligently combine responses using context and reasoning"""
if not responses:
return "I'm not sure how to respond to that."
# If we have reasoning response, prioritize it
if reasoning_response:
best_response = responses[0][0] if responses else ""
if best_response:
combined = f"{reasoning_response} {best_response}"
return combined[:300] # Limit length
return reasoning_response
filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05]
if not filtered_responses:
filtered_responses = responses[:1]
# Smart response grouping
response_groups = defaultdict(list)
for response, weight in filtered_responses:
# Group by semantic similarity rather than just first words
key = self._get_semantic_key(response)
response_groups[key].append((response, weight))
best_responses = []
for group in response_groups.values():
best_resp, best_weight = max(group, key=lambda x: x[1])
best_responses.append((best_resp, best_weight))
# Select best response intelligently
if len(best_responses) > 1:
# Consider intent when selecting response
if intent and intent.get('type') == 'greeting':
# Prefer shorter, friendlier responses for greetings
best_responses.sort(key=lambda x: len(x[0]) + (0 if any(word in x[0].lower() for word in ['hello', 'hi', 'good']) else 100))
elif intent and intent.get('type') == 'question':
# Prefer longer, more informative responses for questions
best_responses.sort(key=lambda x: -len(x[0]))
selected_response = best_responses[0][0]
else:
selected_response = best_responses[0][0]
# Apply response template if appropriate
if intent:
templated_response = self._apply_response_template(selected_response, intent)
if templated_response:
selected_response = templated_response
final_response = selected_response
if not final_response.endswith('<eos>'):
final_response += ' <eos>'
return final_response
def _get_semantic_key(self, response: str) -> str:
"""Get semantic grouping key for response"""
words = response.lower().split()
# Group by intent/sentiment
if any(word in words for word in ['hello', 'hi', 'good']):
return 'greeting'
elif any(word in words for word in ['thank', 'welcome']):
return 'gratitude'
elif any(word in words for word in ['sorry', 'apologize']):
return 'apology'
elif len(words) > 20:
return 'explanation'
else:
return 'simple'
def _apply_response_template(self, response: str, intent: Dict[str, Any]) -> str:
"""Apply response templates for more natural responses"""
intent_type = intent.get('type', 'unknown')
if intent_type in self.response_templates:
templates = self.response_templates[intent_type]
template = random.choice(templates)
# Simple template filling
if '{topic}' in template:
template = template.replace('{topic}', intent.get('topic', ''))
if '{answer}' in template:
template = template.replace('{answer}', response[:100])
if '{explanation}' in template:
template = template.replace('{explanation}', response[:150])
if '{request}' in template:
template = template.replace('{request}', intent.get('topic', 'that'))
return template
return ""
def generate_response(self, user_input: str) -> str:
if not user_input.strip():
return "Please say something! <eos>"
# Enhanced response generation for AgGPT-19
# 1. Analyze intent and context
intent = self.reasoning_engine.analyze_intent(user_input, self.context_window)
# 2. Check for direct knowledge match
knowledge_response = self._check_knowledge_base(user_input, intent)
if knowledge_response:
self.context_window.append((user_input, knowledge_response))
if len(self.context_window) > self.max_context_length:
self.context_window.pop(0)
return knowledge_response
# 3. Apply reasoning if needed
reasoning_response = self.reasoning_engine.apply_reasoning(intent, self.knowledge_cache, self.context_window)
# 4. Get model-based responses
model_scores = self.calculate_model_scores(user_input)
if not model_scores:
fallback_response = self._generate_fallback_response(intent)
self.context_window.append((user_input, fallback_response))
return fallback_response
top_models = self.select_top_models(model_scores, top_k=5)
responses = self.generate_responses_from_models(user_input, top_models)
# 5. Intelligently combine responses
final_response = self.combine_responses(responses, intent, reasoning_response)
# 6. Apply post-processing
final_response = self._post_process_response(final_response, intent)
self.context_window.append((user_input, final_response))
if len(self.context_window) > self.max_context_length:
self.context_window.pop(0)
return final_response
def _check_knowledge_base(self, user_input: str, intent: Dict[str, Any]) -> str:
"""Check if we have direct knowledge to answer this query"""
input_lower = user_input.lower()
# Look for exact matches first
for key, value in self.knowledge_cache.items():
if key.replace('_', ' ') in input_lower or any(word in key for word in input_lower.split() if len(word) > 3):
return f"{value} <eos>"
return ""
def _generate_fallback_response(self, intent: Dict[str, Any]) -> str:
"""Generate intelligent fallback responses"""
if intent['type'] == 'greeting':
return random.choice([
"Hello! How can I help you today?",
"Hi there! What would you like to know?",
"Good day! I'm here to assist you."
]) + " <eos>"
elif intent['type'] == 'question':
return "That's an interesting question. I'd need to learn more about that topic to give you a good answer. <eos>"
elif intent['type'] == 'request':
return "I'd like to help you with that. Could you provide more details about what you need? <eos>"
else:
return "I understand what you're saying. Could you tell me more about that? <eos>"
def _post_process_response(self, response: str, intent: Dict[str, Any]) -> str:
"""Apply post-processing to make responses more natural"""
if not response.endswith('<eos>'):
response += ' <eos>'
# Add conversational elements based on intent
if intent['type'] == 'question' and not any(phrase in response.lower() for phrase in ['based on', 'according to']):
response = response.replace('<eos>', '').strip()
response = f"From what I understand, {response} <eos>"
# Ensure proper capitalization
response = response[0].upper() + response[1:] if response else response
return response
def get_model_statistics(self) -> Dict[str, Any]:
if not self.models:
return {"total_models": 0}
total_patterns = sum(len(model.get('patterns', [])) for model in self.models)
total_responses = sum(len(model.get('responses', [])) for model in self.models)
avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models)
total_training_samples = sum(model.get('training_samples', 0) for model in self.models)
return {
"total_models": len(self.models),
"total_patterns": total_patterns,
"total_responses": total_responses,
"average_confidence": avg_confidence,
"total_training_samples": total_training_samples
}
class AgGPTChat:
def __init__(self, models_dir: str = "models"):
self.feather_manager = FeatherManager(models_dir)
self.response_generator = ResponseGenerator(self.feather_manager)
self.conversation_history = []
def initialize(self) -> bool:
print("AgGPT-19 Enhanced Intelligence Chat System")
print("=" * 50)
success = self.response_generator.load_models()
if success:
stats = self.response_generator.get_model_statistics()
print(f"Enhanced Model Statistics:")
print(f" Mini-models loaded: {stats['total_models']}")
print(f" Total patterns: {stats['total_patterns']}")
print(f" Total responses: {stats['total_responses']}")
print(f" Average confidence: {stats['average_confidence']:.3f}")
print(f" Training samples: {stats['total_training_samples']}")
print(f" Knowledge entries: {len(self.response_generator.knowledge_cache)}")
print("=" * 50)
print("Enhanced chat initialized! Type 'quit' to exit.")
print("Features: Context awareness, semantic understanding, reasoning!")
print()
return success
def chat_loop(self):
if not self.initialize():
return
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']:
print("AgGPT-19: Goodbye! Thanks for chatting with me! <eos>")
break
if user_input.lower() in ['stats', 'statistics']:
stats = self.response_generator.get_model_statistics()
print("Current Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
continue
if user_input.lower() in ['clear', 'reset']:
self.response_generator.context_window = []
print("Context cleared!")
continue
print("AgGPT: ", end="", flush=True)
response = self.response_generator.generate_response(user_input)
display_response = response.replace(' <eos>', '').replace('<eos>', '')
print(display_response)
print()
self.conversation_history.append({
'user': user_input,
'assistant': display_response
})
except KeyboardInterrupt:
print("\n\nAgGPT-19: Chat interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
print("Let me try again...")
continue
def batch_test(self, test_inputs: List[str]):
if not self.initialize():
return
print("Running enhanced batch test...")
print("=" * 50)
for i, test_input in enumerate(test_inputs, 1):
print(f"Test {i}: {test_input}")
response = self.response_generator.generate_response(test_input)
display_response = response.replace(' <eos>', '').replace('<eos>', '')
print(f"Response: {display_response}")
print("-" * 30)
def main():
chat = AgGPTChat()
import sys
if len(sys.argv) > 1 and sys.argv[1] == "test":
test_inputs = [
"hi",
"hello there",
"how are you?",
"what's your favorite color?",
"tell me a joke",
"thank you",
"goodbye"
]
chat.batch_test(test_inputs)
else:
chat.chat_loop()
if __name__ == "__main__":
main()