|
|
import os |
|
|
import re |
|
|
import random |
|
|
from typing import List, Dict, Tuple, Any |
|
|
from collections import defaultdict |
|
|
import math |
|
|
from feather import FeatherManager, similarity_score |
|
|
from train import GrammarRules, PatternExtractor |
|
|
|
|
|
|
|
|
class SimpleReasoningEngine: |
|
|
"""Basic reasoning engine for AgGPT-19""" |
|
|
|
|
|
def __init__(self): |
|
|
self.logical_patterns = { |
|
|
'cause_effect': ['because', 'since', 'therefore', 'as a result', 'leads to'], |
|
|
'comparison': ['better than', 'worse than', 'similar to', 'different from', 'compared to'], |
|
|
'sequence': ['first', 'then', 'next', 'finally', 'after that'], |
|
|
'conditional': ['if', 'unless', 'provided that', 'in case'], |
|
|
} |
|
|
|
|
|
self.context_memory = {} |
|
|
|
|
|
def analyze_intent(self, user_input: str, context: List[Tuple[str, str]]) -> Dict[str, Any]: |
|
|
"""Analyze user intent and context""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
intent = { |
|
|
'type': 'unknown', |
|
|
'topic': '', |
|
|
'emotion': 'neutral', |
|
|
'requires_reasoning': False, |
|
|
'context_dependent': False |
|
|
} |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'who']): |
|
|
intent['type'] = 'question' |
|
|
elif any(word in input_lower for word in ['hello', 'hi', 'hey', 'good morning']): |
|
|
intent['type'] = 'greeting' |
|
|
elif any(word in input_lower for word in ['please', 'can you', 'help me']): |
|
|
intent['type'] = 'request' |
|
|
elif any(word in input_lower for word in ['thank', 'thanks', 'bye', 'goodbye']): |
|
|
intent['type'] = 'social' |
|
|
|
|
|
|
|
|
if any(pattern in input_lower for patterns in self.logical_patterns.values() for pattern in patterns): |
|
|
intent['requires_reasoning'] = True |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['this', 'that', 'it', 'they', 'what about']): |
|
|
intent['context_dependent'] = True |
|
|
|
|
|
|
|
|
words = [word for word in input_lower.split() if len(word) > 3] |
|
|
if words: |
|
|
intent['topic'] = ' '.join(words[:3]) |
|
|
|
|
|
return intent |
|
|
|
|
|
def apply_reasoning(self, intent: Dict[str, Any], knowledge: Dict[str, str], context: List[Tuple[str, str]]) -> str: |
|
|
"""Apply basic reasoning to generate more intelligent responses""" |
|
|
|
|
|
if intent['context_dependent'] and context: |
|
|
|
|
|
recent_context = ' '.join([item[0] + ' ' + item[1] for item in context[-2:]]) |
|
|
return f"Building on our conversation: {recent_context[:100]}..." |
|
|
|
|
|
if intent['requires_reasoning']: |
|
|
|
|
|
if intent['type'] == 'question' and 'why' in intent['topic']: |
|
|
return "This is likely due to several factors that work together..." |
|
|
elif 'how' in intent['topic']: |
|
|
return "Here's a step-by-step approach to understand this..." |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
class ResponseGenerator: |
|
|
def __init__(self, feather_manager: FeatherManager): |
|
|
self.feather_manager = feather_manager |
|
|
self.pattern_extractor = PatternExtractor() |
|
|
self.grammar_rules = GrammarRules() |
|
|
self.models = [] |
|
|
self.context_window = [] |
|
|
self.max_context_length = 10 |
|
|
self.knowledge_cache = {} |
|
|
|
|
|
|
|
|
self.reasoning_engine = SimpleReasoningEngine() |
|
|
self.response_templates = { |
|
|
'greeting': ["Hello! {topic}", "Hi there! {topic}", "Good {time_of_day}! {topic}"], |
|
|
'question': ["Based on what I know, {answer}", "Let me think about that. {answer}", "{answer}"], |
|
|
'explanation': ["Here's how it works: {explanation}", "The key point is {explanation}", "{explanation}"], |
|
|
'request': ["I'd be happy to help with {request}", "Sure, I can assist with {request}", "Let me help you with {request}"] |
|
|
} |
|
|
|
|
|
def load_models(self): |
|
|
print("Loading enhanced mini-models...") |
|
|
self.models = self.feather_manager.load_all_models() |
|
|
print(f"Loaded {len(self.models)} enhanced mini-models") |
|
|
|
|
|
if not self.models: |
|
|
print("No trained models found! Please run train.py first.") |
|
|
return False |
|
|
|
|
|
|
|
|
self._build_knowledge_cache() |
|
|
|
|
|
return True |
|
|
|
|
|
def _build_knowledge_cache(self): |
|
|
"""Build a unified knowledge cache from all models""" |
|
|
for model in self.models: |
|
|
knowledge_base = model.get('knowledge_base', {}) |
|
|
if knowledge_base: |
|
|
self.knowledge_cache.update(knowledge_base) |
|
|
|
|
|
print(f"Built knowledge cache with {len(self.knowledge_cache)} entries") |
|
|
|
|
|
def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]: |
|
|
if not self.models: |
|
|
return [] |
|
|
|
|
|
input_pattern = self.pattern_extractor.create_pattern(user_input) |
|
|
input_keywords = set(self.pattern_extractor.extract_keywords(user_input)) |
|
|
|
|
|
model_scores = [] |
|
|
|
|
|
for model in self.models: |
|
|
score = 0.0 |
|
|
pattern_matches = 0 |
|
|
keyword_matches = 0 |
|
|
|
|
|
for pattern in model.get('patterns', []): |
|
|
pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) |
|
|
score += pattern_sim |
|
|
if pattern_sim > 0.3: |
|
|
pattern_matches += 1 |
|
|
|
|
|
model_keywords = set(model.get('keywords', [])) |
|
|
if model_keywords and input_keywords: |
|
|
keyword_overlap = len(input_keywords.intersection(model_keywords)) |
|
|
keyword_total = len(input_keywords.union(model_keywords)) |
|
|
keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0 |
|
|
score += keyword_score * 2 |
|
|
keyword_matches = keyword_overlap |
|
|
|
|
|
confidence = model.get('confidence', 0.5) |
|
|
score *= confidence |
|
|
|
|
|
training_samples = model.get('training_samples', 1) |
|
|
training_bonus = min(0.2, training_samples / 100) |
|
|
score += training_bonus |
|
|
|
|
|
context_bonus = self._calculate_context_bonus(user_input, model) |
|
|
score += context_bonus |
|
|
|
|
|
model_scores.append((model, score)) |
|
|
|
|
|
model_scores.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
return model_scores |
|
|
|
|
|
def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float: |
|
|
if not self.context_window: |
|
|
return 0.0 |
|
|
|
|
|
context_bonus = 0.0 |
|
|
|
|
|
for prev_input, prev_response in self.context_window[-3:]: |
|
|
for pattern in model.get('patterns', [])[:5]: |
|
|
pattern_sim = similarity_score(prev_input, pattern.strip()) |
|
|
context_bonus += pattern_sim * 0.1 |
|
|
|
|
|
return min(context_bonus, 0.3) |
|
|
|
|
|
def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]: |
|
|
valid_models = [(model, score) for model, score in model_scores if score > 0.01] |
|
|
|
|
|
if not valid_models: |
|
|
valid_models = random.sample(model_scores, min(3, len(model_scores))) |
|
|
|
|
|
return valid_models[:top_k] |
|
|
|
|
|
def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]: |
|
|
responses = [] |
|
|
input_pattern = self.pattern_extractor.create_pattern(user_input) |
|
|
|
|
|
for model, model_score in top_models: |
|
|
model_responses = [] |
|
|
best_similarity = 0.0 |
|
|
|
|
|
patterns = model.get('patterns', []) |
|
|
model_responses_list = model.get('responses', []) |
|
|
|
|
|
if not patterns or not model_responses_list: |
|
|
continue |
|
|
|
|
|
best_matches = [] |
|
|
for i, pattern in enumerate(patterns): |
|
|
if i < len(model_responses_list): |
|
|
sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern) |
|
|
if sim > 0.1: |
|
|
best_matches.append((model_responses_list[i], sim)) |
|
|
|
|
|
best_matches.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)] |
|
|
|
|
|
for response, pattern_sim in selected_responses: |
|
|
weight = model_score * (0.7 + pattern_sim * 0.3) |
|
|
responses.append((response, weight)) |
|
|
|
|
|
return responses |
|
|
|
|
|
def combine_responses(self, responses: List[Tuple[str, float]], intent: Dict[str, Any] = None, reasoning_response: str = "") -> str: |
|
|
"""Intelligently combine responses using context and reasoning""" |
|
|
if not responses: |
|
|
return "I'm not sure how to respond to that." |
|
|
|
|
|
|
|
|
if reasoning_response: |
|
|
best_response = responses[0][0] if responses else "" |
|
|
if best_response: |
|
|
combined = f"{reasoning_response} {best_response}" |
|
|
return combined[:300] |
|
|
return reasoning_response |
|
|
|
|
|
filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05] |
|
|
if not filtered_responses: |
|
|
filtered_responses = responses[:1] |
|
|
|
|
|
|
|
|
response_groups = defaultdict(list) |
|
|
for response, weight in filtered_responses: |
|
|
|
|
|
key = self._get_semantic_key(response) |
|
|
response_groups[key].append((response, weight)) |
|
|
|
|
|
best_responses = [] |
|
|
for group in response_groups.values(): |
|
|
best_resp, best_weight = max(group, key=lambda x: x[1]) |
|
|
best_responses.append((best_resp, best_weight)) |
|
|
|
|
|
|
|
|
if len(best_responses) > 1: |
|
|
|
|
|
if intent and intent.get('type') == 'greeting': |
|
|
|
|
|
best_responses.sort(key=lambda x: len(x[0]) + (0 if any(word in x[0].lower() for word in ['hello', 'hi', 'good']) else 100)) |
|
|
elif intent and intent.get('type') == 'question': |
|
|
|
|
|
best_responses.sort(key=lambda x: -len(x[0])) |
|
|
|
|
|
selected_response = best_responses[0][0] |
|
|
else: |
|
|
selected_response = best_responses[0][0] |
|
|
|
|
|
|
|
|
if intent: |
|
|
templated_response = self._apply_response_template(selected_response, intent) |
|
|
if templated_response: |
|
|
selected_response = templated_response |
|
|
|
|
|
final_response = selected_response |
|
|
|
|
|
if not final_response.endswith('<eos>'): |
|
|
final_response += ' <eos>' |
|
|
|
|
|
return final_response |
|
|
|
|
|
def _get_semantic_key(self, response: str) -> str: |
|
|
"""Get semantic grouping key for response""" |
|
|
words = response.lower().split() |
|
|
|
|
|
|
|
|
if any(word in words for word in ['hello', 'hi', 'good']): |
|
|
return 'greeting' |
|
|
elif any(word in words for word in ['thank', 'welcome']): |
|
|
return 'gratitude' |
|
|
elif any(word in words for word in ['sorry', 'apologize']): |
|
|
return 'apology' |
|
|
elif len(words) > 20: |
|
|
return 'explanation' |
|
|
else: |
|
|
return 'simple' |
|
|
|
|
|
def _apply_response_template(self, response: str, intent: Dict[str, Any]) -> str: |
|
|
"""Apply response templates for more natural responses""" |
|
|
intent_type = intent.get('type', 'unknown') |
|
|
|
|
|
if intent_type in self.response_templates: |
|
|
templates = self.response_templates[intent_type] |
|
|
template = random.choice(templates) |
|
|
|
|
|
|
|
|
if '{topic}' in template: |
|
|
template = template.replace('{topic}', intent.get('topic', '')) |
|
|
if '{answer}' in template: |
|
|
template = template.replace('{answer}', response[:100]) |
|
|
if '{explanation}' in template: |
|
|
template = template.replace('{explanation}', response[:150]) |
|
|
if '{request}' in template: |
|
|
template = template.replace('{request}', intent.get('topic', 'that')) |
|
|
|
|
|
return template |
|
|
|
|
|
return "" |
|
|
|
|
|
def generate_response(self, user_input: str) -> str: |
|
|
if not user_input.strip(): |
|
|
return "Please say something! <eos>" |
|
|
|
|
|
|
|
|
|
|
|
intent = self.reasoning_engine.analyze_intent(user_input, self.context_window) |
|
|
|
|
|
|
|
|
knowledge_response = self._check_knowledge_base(user_input, intent) |
|
|
if knowledge_response: |
|
|
self.context_window.append((user_input, knowledge_response)) |
|
|
if len(self.context_window) > self.max_context_length: |
|
|
self.context_window.pop(0) |
|
|
return knowledge_response |
|
|
|
|
|
|
|
|
reasoning_response = self.reasoning_engine.apply_reasoning(intent, self.knowledge_cache, self.context_window) |
|
|
|
|
|
|
|
|
model_scores = self.calculate_model_scores(user_input) |
|
|
|
|
|
if not model_scores: |
|
|
fallback_response = self._generate_fallback_response(intent) |
|
|
self.context_window.append((user_input, fallback_response)) |
|
|
return fallback_response |
|
|
|
|
|
top_models = self.select_top_models(model_scores, top_k=5) |
|
|
responses = self.generate_responses_from_models(user_input, top_models) |
|
|
|
|
|
|
|
|
final_response = self.combine_responses(responses, intent, reasoning_response) |
|
|
|
|
|
|
|
|
final_response = self._post_process_response(final_response, intent) |
|
|
|
|
|
self.context_window.append((user_input, final_response)) |
|
|
if len(self.context_window) > self.max_context_length: |
|
|
self.context_window.pop(0) |
|
|
|
|
|
return final_response |
|
|
|
|
|
def _check_knowledge_base(self, user_input: str, intent: Dict[str, Any]) -> str: |
|
|
"""Check if we have direct knowledge to answer this query""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
|
|
|
for key, value in self.knowledge_cache.items(): |
|
|
if key.replace('_', ' ') in input_lower or any(word in key for word in input_lower.split() if len(word) > 3): |
|
|
return f"{value} <eos>" |
|
|
|
|
|
return "" |
|
|
|
|
|
def _generate_fallback_response(self, intent: Dict[str, Any]) -> str: |
|
|
"""Generate intelligent fallback responses""" |
|
|
if intent['type'] == 'greeting': |
|
|
return random.choice([ |
|
|
"Hello! How can I help you today?", |
|
|
"Hi there! What would you like to know?", |
|
|
"Good day! I'm here to assist you." |
|
|
]) + " <eos>" |
|
|
elif intent['type'] == 'question': |
|
|
return "That's an interesting question. I'd need to learn more about that topic to give you a good answer. <eos>" |
|
|
elif intent['type'] == 'request': |
|
|
return "I'd like to help you with that. Could you provide more details about what you need? <eos>" |
|
|
else: |
|
|
return "I understand what you're saying. Could you tell me more about that? <eos>" |
|
|
|
|
|
def _post_process_response(self, response: str, intent: Dict[str, Any]) -> str: |
|
|
"""Apply post-processing to make responses more natural""" |
|
|
if not response.endswith('<eos>'): |
|
|
response += ' <eos>' |
|
|
|
|
|
|
|
|
if intent['type'] == 'question' and not any(phrase in response.lower() for phrase in ['based on', 'according to']): |
|
|
response = response.replace('<eos>', '').strip() |
|
|
response = f"From what I understand, {response} <eos>" |
|
|
|
|
|
|
|
|
response = response[0].upper() + response[1:] if response else response |
|
|
|
|
|
return response |
|
|
|
|
|
def get_model_statistics(self) -> Dict[str, Any]: |
|
|
if not self.models: |
|
|
return {"total_models": 0} |
|
|
|
|
|
total_patterns = sum(len(model.get('patterns', [])) for model in self.models) |
|
|
total_responses = sum(len(model.get('responses', [])) for model in self.models) |
|
|
avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models) |
|
|
total_training_samples = sum(model.get('training_samples', 0) for model in self.models) |
|
|
|
|
|
return { |
|
|
"total_models": len(self.models), |
|
|
"total_patterns": total_patterns, |
|
|
"total_responses": total_responses, |
|
|
"average_confidence": avg_confidence, |
|
|
"total_training_samples": total_training_samples |
|
|
} |
|
|
|
|
|
|
|
|
class AgGPTChat: |
|
|
|
|
|
def __init__(self, models_dir: str = "models"): |
|
|
self.feather_manager = FeatherManager(models_dir) |
|
|
self.response_generator = ResponseGenerator(self.feather_manager) |
|
|
self.conversation_history = [] |
|
|
|
|
|
def initialize(self) -> bool: |
|
|
print("AgGPT-19 Enhanced Intelligence Chat System") |
|
|
print("=" * 50) |
|
|
|
|
|
success = self.response_generator.load_models() |
|
|
if success: |
|
|
stats = self.response_generator.get_model_statistics() |
|
|
print(f"Enhanced Model Statistics:") |
|
|
print(f" Mini-models loaded: {stats['total_models']}") |
|
|
print(f" Total patterns: {stats['total_patterns']}") |
|
|
print(f" Total responses: {stats['total_responses']}") |
|
|
print(f" Average confidence: {stats['average_confidence']:.3f}") |
|
|
print(f" Training samples: {stats['total_training_samples']}") |
|
|
print(f" Knowledge entries: {len(self.response_generator.knowledge_cache)}") |
|
|
print("=" * 50) |
|
|
print("Enhanced chat initialized! Type 'quit' to exit.") |
|
|
print("Features: Context awareness, semantic understanding, reasoning!") |
|
|
print() |
|
|
|
|
|
return success |
|
|
|
|
|
def chat_loop(self): |
|
|
if not self.initialize(): |
|
|
return |
|
|
|
|
|
while True: |
|
|
try: |
|
|
user_input = input("You: ").strip() |
|
|
|
|
|
if not user_input: |
|
|
continue |
|
|
|
|
|
if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']: |
|
|
print("AgGPT-19: Goodbye! Thanks for chatting with me! <eos>") |
|
|
break |
|
|
|
|
|
if user_input.lower() in ['stats', 'statistics']: |
|
|
stats = self.response_generator.get_model_statistics() |
|
|
print("Current Statistics:") |
|
|
for key, value in stats.items(): |
|
|
print(f" {key}: {value}") |
|
|
continue |
|
|
|
|
|
if user_input.lower() in ['clear', 'reset']: |
|
|
self.response_generator.context_window = [] |
|
|
print("Context cleared!") |
|
|
continue |
|
|
|
|
|
print("AgGPT: ", end="", flush=True) |
|
|
response = self.response_generator.generate_response(user_input) |
|
|
|
|
|
display_response = response.replace(' <eos>', '').replace('<eos>', '') |
|
|
print(display_response) |
|
|
print() |
|
|
|
|
|
self.conversation_history.append({ |
|
|
'user': user_input, |
|
|
'assistant': display_response |
|
|
}) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\nAgGPT-19: Chat interrupted. Goodbye!") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"\nError: {e}") |
|
|
print("Let me try again...") |
|
|
continue |
|
|
|
|
|
def batch_test(self, test_inputs: List[str]): |
|
|
if not self.initialize(): |
|
|
return |
|
|
|
|
|
print("Running enhanced batch test...") |
|
|
print("=" * 50) |
|
|
|
|
|
for i, test_input in enumerate(test_inputs, 1): |
|
|
print(f"Test {i}: {test_input}") |
|
|
response = self.response_generator.generate_response(test_input) |
|
|
display_response = response.replace(' <eos>', '').replace('<eos>', '') |
|
|
print(f"Response: {display_response}") |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
def main(): |
|
|
chat = AgGPTChat() |
|
|
|
|
|
import sys |
|
|
if len(sys.argv) > 1 and sys.argv[1] == "test": |
|
|
test_inputs = [ |
|
|
"hi", |
|
|
"hello there", |
|
|
"how are you?", |
|
|
"what's your favorite color?", |
|
|
"tell me a joke", |
|
|
"thank you", |
|
|
"goodbye" |
|
|
] |
|
|
chat.batch_test(test_inputs) |
|
|
else: |
|
|
chat.chat_loop() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|