conscious_Ai-2 / general_reasoning.py
Ret's picture
Upload 16 files
2660a90 verified
import torch
import torch.nn as nn
import json
import os
import re
from typing import Dict, List, Tuple, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
from openai import OpenAI
import logging
from transformer import Transformer, create_transformer_model, initialize_weights
import random
import datetime
import numpy as np
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReasoningType(Enum):
"""Types of reasoning approaches"""
DEDUCTIVE = "deductive" # General to specific
INDUCTIVE = "inductive" # Specific to general
ABDUCTIVE = "abductive" # Best explanation
ANALOGICAL = "analogical" # Pattern matching
CAUSAL = "causal" # Cause and effect
TEMPORAL = "temporal" # Time-based reasoning
SPATIAL = "spatial" # Space/location reasoning
LOGICAL = "logical" # Formal logic
CREATIVE = "creative" # Novel solutions
@dataclass
class ReasoningStep:
"""Individual step in reasoning process"""
step_id: int
reasoning_type: ReasoningType
premise: str
conclusion: str
confidence: float
evidence: List[str]
assumptions: List[str]
@dataclass
class KnowledgeNode:
"""Node in the knowledge graph"""
concept: str
properties: Dict[str, Any]
relationships: Dict[str, List[str]]
confidence: float
source: str
class GeneralReasoningEngine:
"""
General Reasoning Engine for AGI-like reasoning capabilities
Implements multiple reasoning strategies and knowledge integration
"""
def __init__(self,
openai_client: OpenAI = None,
device: str = "cuda" if torch.cuda.is_available() else "cpu"):
self.device = device
self.openai_client = openai_client
# Initialize reasoning neural networks
self.reasoning_transformer = create_transformer_model(
src_vocab_size=100000,
tgt_vocab_size=100000
).to(self.device)
initialize_weights(self.reasoning_transformer)
# Knowledge base and working memory
self.knowledge_graph = {}
self.working_memory = []
self.reasoning_history = []
# Reasoning strategies
self.reasoning_strategies = {
ReasoningType.DEDUCTIVE: self._deductive_reasoning,
ReasoningType.INDUCTIVE: self._inductive_reasoning,
ReasoningType.ABDUCTIVE: self._abductive_reasoning,
ReasoningType.ANALOGICAL: self._analogical_reasoning,
ReasoningType.CAUSAL: self._causal_reasoning,
ReasoningType.TEMPORAL: self._temporal_reasoning,
ReasoningType.SPATIAL: self._spatial_reasoning,
ReasoningType.LOGICAL: self._logical_reasoning,
ReasoningType.CREATIVE: self._creative_reasoning
}
# Meta-reasoning for strategy selection
self.strategy_weights = {strategy: 1.0 for strategy in ReasoningType}
# Initialize with basic knowledge
self._initialize_base_knowledge()
# Neural network for advanced reasoning
self.neural_network = None
self.learning_rate = 0.01
self.confidence_threshold = 0.7
self.optimizer = None
self.positional_encoder = None
self.sep_token = None
# Error handling
self.error_handlers = {
'api_error': self._handle_api_error,
'network_error': self._handle_network_error,
'validation_error': self._handle_validation_error,
'timeout_error': self._handle_timeout_error,
'resource_error': self._handle_resource_error
}
def _initialize_base_knowledge(self):
"""Initialize with fundamental knowledge concepts"""
base_concepts = [
("object", {"type": "entity", "properties": ["existence", "identity"]}),
("action", {"type": "process", "properties": ["causality", "temporality"]}),
("relationship", {"type": "connection", "properties": ["bidirectional", "typed"]}),
("pattern", {"type": "structure", "properties": ["repetition", "similarity"]}),
("goal", {"type": "objective", "properties": ["desirability", "achievability"]}),
("constraint", {"type": "limitation", "properties": ["boundary", "restriction"]}),
("context", {"type": "environment", "properties": ["situational", "influential"]})
]
for concept, properties in base_concepts:
self.add_knowledge(concept, properties, {}, 1.0, "base_initialization")
def add_knowledge(self, concept: str, properties: Dict, relationships: Dict,
confidence: float, source: str):
"""Add knowledge to the knowledge graph"""
node = KnowledgeNode(concept, properties, relationships, confidence, source)
self.knowledge_graph[concept] = node
logger.info(f"Added knowledge: {concept}")
def reason_about_problem(self, problem: str, context: Dict = None) -> Dict:
"""
Main reasoning function with neural network integration and error handling
"""
start_time = time.time()
result = None
try:
# Try neural reasoning first
neural_result, neural_error = self._safe_call(
self._neural_reasoning,
problem,
context or {}
)
if neural_error:
logger.warning(f"Neural reasoning failed: {neural_error}")
if neural_result and neural_result.get('confidence', 0) >= self.confidence_threshold:
result = neural_result
self._update_reasoning_history(True)
else:
# Fall back to symbolic reasoning
symbolic_result, symbolic_error = self._safe_call(
self._symbolic_reasoning,
problem,
context or {}
)
if symbolic_error:
# If both neural and symbolic failed, use the better error
if neural_error:
logger.error("Both neural and symbolic reasoning failed")
error_info = self._select_best_error(neural_error, symbolic_error)
return self._format_error_response(error_info, start_time)
# If only symbolic failed but neural has a result, use neural
elif neural_result:
result = neural_result
else:
# No results available
return self._format_error_response(symbolic_error, start_time)
else:
# Use the better result between neural and symbolic
if (not neural_result or
symbolic_result.get('confidence', 0) > neural_result.get('confidence', 0)):
result = symbolic_result
self._update_reasoning_history(True)
else:
result = neural_result
self._update_reasoning_history(False)
# Add metadata
result['processing_time'] = time.time() - start_time
result['self_assessment'] = self._generate_self_assessment(result)
# Log success
logger.info(f"Successfully processed problem in {result['processing_time']:.2f}s")
return result
except Exception as e:
logger.critical(f"Unexpected error in reason_about_problem: {str(e)}", exc_info=True)
error_info = self._handle_generic_error(e, {
'problem': problem[:100] + '...' if len(problem) > 100 else problem,
'context_keys': list(context.keys()) if context else None
})
return self._format_error_response(error_info, start_time)
def _symbolic_reasoning(self, problem: str, context: Dict = None) -> Dict:
"""
Main reasoning function - analyzes problem and applies appropriate reasoning
with enhanced pattern recognition and learning
"""
logger.info(f"Starting reasoning about: {problem}")
try:
# First, try to apply learned patterns
pattern_solution = self._apply_patterns(problem, context)
if pattern_solution and pattern_solution.get('confidence', 0) > 0.7:
logger.info("Successfully applied pattern-based solution")
return {
'problem': problem,
'solution': pattern_solution,
'strategy': 'pattern_matching',
'confidence': pattern_solution.get('confidence', 0.7)
}
# If no pattern matches, proceed with full reasoning
problem_analysis = self._analyze_problem(problem, context)
# Rest of the reasoning process...
selected_strategies = self._select_reasoning_strategies(problem_analysis)
reasoning_results = []
for strategy in selected_strategies:
try:
result = self._apply_reasoning_strategy(strategy, problem, problem_analysis)
if result:
reasoning_results.append(result)
except Exception as e:
logger.error(f"Error applying {strategy.value} reasoning: {str(e)}")
if not reasoning_results:
logger.warning("No reasoning strategies produced results, using default")
reasoning_results.append(self._default_reasoning(problem, problem_analysis))
final_reasoning = self._synthesize_reasoning(reasoning_results)
solution = self._generate_solution(problem, final_reasoning)
# Learn from this experience if the solution was good
if solution.get('confidence', 0) > 0.7:
self._learn_pattern(problem, solution, context)
return {
'problem': problem,
'analysis': problem_analysis,
'strategies_used': [s.value for s in selected_strategies],
'reasoning_steps': final_reasoning,
'solution': solution,
'confidence': self._calculate_confidence(final_reasoning)
}
except Exception as e:
logger.error(f"Error in reason_about_problem: {str(e)}")
return {
'problem': problem,
'error': str(e),
'solution': {
'primary_approach': 'Error occurred during reasoning',
'confidence': 0.0
},
'confidence': 0.0
}
def _analyze_problem(self, problem: str, context: Dict = None) -> Dict:
"""Analyze and decompose the problem"""
analysis = {
'problem_type': self._classify_problem_type(problem),
'key_concepts': self._extract_concepts(problem),
'constraints': self._identify_constraints(problem),
'goals': self._identify_goals(problem),
'context': context or {},
'complexity': self._assess_complexity(problem),
'domain': self._identify_domain(problem)
}
# Use neural network for deeper analysis if available
if self.neural_network:
enhanced_analysis = self._neural_reasoning(problem, analysis)
analysis.update(enhanced_analysis)
return analysis
def _classify_problem_type(self, problem: str) -> str:
"""Classify the type of problem"""
problem_lower = problem.lower()
if any(word in problem_lower for word in ['create', 'build', 'design', 'generate']):
return 'creative'
elif any(word in problem_lower for word in ['analyze', 'understand', 'explain']):
return 'analytical'
elif any(word in problem_lower for word in ['solve', 'find', 'calculate']):
return 'problem_solving'
elif any(word in problem_lower for word in ['predict', 'forecast', 'estimate']):
return 'predictive'
elif any(word in problem_lower for word in ['optimize', 'improve', 'enhance']):
return 'optimization'
else:
return 'general'
def _extract_concepts(self, problem: str) -> List[str]:
"""Extract key concepts from the problem"""
# Simple keyword extraction (can be enhanced with NLP)
words = re.findall(r'\b[a-zA-Z]+\b', problem.lower())
# Filter for meaningful concepts
stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
concepts = [word for word in words if word not in stopwords and len(word) > 2]
# Check against knowledge graph
known_concepts = [concept for concept in concepts if concept in self.knowledge_graph]
return list(set(concepts)), known_concepts
def _identify_constraints(self, problem: str) -> List[str]:
"""Identify constraints in the problem"""
constraint_indicators = ['must', 'cannot', 'should not', 'limited', 'only', 'within', 'without']
constraints = []
for indicator in constraint_indicators:
if indicator in problem.lower():
# Extract sentence containing constraint
sentences = problem.split('.')
for sentence in sentences:
if indicator in sentence.lower():
constraints.append(sentence.strip())
return constraints
def _identify_goals(self, problem: str) -> List[str]:
"""Identify goals in the problem"""
goal_indicators = ['want', 'need', 'goal', 'objective', 'aim', 'target', 'achieve']
goals = []
for indicator in goal_indicators:
if indicator in problem.lower():
sentences = problem.split('.')
for sentence in sentences:
if indicator in sentence.lower():
goals.append(sentence.strip())
return goals
def _assess_complexity(self, problem: str) -> str:
"""Assess problem complexity"""
word_count = len(problem.split())
concept_count = len(self._extract_concepts(problem)[0])
if word_count < 10 and concept_count < 3:
return 'simple'
elif word_count < 50 and concept_count < 10:
return 'moderate'
else:
return 'complex'
def _identify_domain(self, problem: str) -> str:
"""Identify the domain of the problem"""
domain_keywords = {
'technology': ['code', 'program', 'software', 'computer', 'algorithm'],
'science': ['experiment', 'hypothesis', 'theory', 'research', 'data'],
'business': ['profit', 'market', 'customer', 'revenue', 'strategy'],
'mathematics': ['equation', 'calculate', 'formula', 'number', 'solve'],
'creative': ['design', 'art', 'creative', 'innovative', 'original']
}
problem_lower = problem.lower()
for domain, keywords in domain_keywords.items():
if any(keyword in problem_lower for keyword in keywords):
return domain
return 'general'
def _select_reasoning_strategies(self, analysis: Dict) -> List[ReasoningType]:
"""Select appropriate reasoning strategies based on problem analysis"""
strategies = []
problem_type = analysis['problem_type']
domain = analysis['domain']
complexity = analysis['complexity']
# Strategy selection based on problem characteristics
if problem_type == 'creative':
strategies.extend([ReasoningType.CREATIVE, ReasoningType.ANALOGICAL])
elif problem_type == 'analytical':
strategies.extend([ReasoningType.DEDUCTIVE, ReasoningType.INDUCTIVE])
elif problem_type == 'problem_solving':
strategies.extend([ReasoningType.LOGICAL, ReasoningType.CAUSAL])
elif problem_type == 'predictive':
strategies.extend([ReasoningType.INDUCTIVE, ReasoningType.TEMPORAL])
# Add domain-specific strategies
if domain == 'technology':
strategies.append(ReasoningType.LOGICAL)
elif domain == 'science':
strategies.extend([ReasoningType.CAUSAL, ReasoningType.ABDUCTIVE])
# Ensure at least one strategy
if not strategies:
strategies = [ReasoningType.DEDUCTIVE, ReasoningType.LOGICAL]
return list(set(strategies))
def _apply_reasoning_strategy(self, strategy: ReasoningType, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply a specific reasoning strategy"""
logger.info(f"Applying {strategy.value} reasoning")
if strategy in self.reasoning_strategies:
return self.reasoning_strategies[strategy](problem, analysis)
else:
return self._default_reasoning(problem, analysis)
def _deductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply deductive reasoning (general to specific)"""
# Find general principles that apply to this problem
relevant_knowledge = self._find_relevant_knowledge(analysis['key_concepts'][0])
premise = f"General principle: {relevant_knowledge}"
conclusion = f"Applied to specific case: {problem}"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.DEDUCTIVE,
premise=premise,
conclusion=conclusion,
confidence=0.8,
evidence=[relevant_knowledge],
assumptions=["General principle applies to specific case"]
)
def _inductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply inductive reasoning (specific to general)"""
# Look for patterns in similar problems
patterns = self._find_patterns(analysis['key_concepts'][0])
premise = f"Observed patterns: {patterns}"
conclusion = f"General rule: Similar problems follow this pattern"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.INDUCTIVE,
premise=premise,
conclusion=conclusion,
confidence=0.7,
evidence=patterns,
assumptions=["Past patterns predict future behavior"]
)
def _abductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply abductive reasoning (best explanation)"""
# Find the most likely explanation
possible_explanations = self._generate_explanations(problem, analysis)
best_explanation = max(possible_explanations, key=lambda x: x.get('likelihood', 0))
premise = f"Observed: {problem}"
conclusion = f"Best explanation: {best_explanation['explanation']}"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.ABDUCTIVE,
premise=premise,
conclusion=conclusion,
confidence=best_explanation.get('likelihood', 0.6),
evidence=[best_explanation['explanation']],
assumptions=["Most likely explanation is correct"]
)
def _analogical_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply analogical reasoning (pattern matching)"""
# Find similar problems or situations
analogies = self._find_analogies(analysis['key_concepts'][0])
premise = f"Similar situation: {analogies}"
conclusion = f"By analogy: Apply similar solution approach"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.ANALOGICAL,
premise=premise,
conclusion=conclusion,
confidence=0.7,
evidence=analogies,
assumptions=["Similar problems have similar solutions"]
)
def _causal_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply causal reasoning (cause and effect)"""
# Identify cause-effect relationships
causes = self._identify_causes(problem, analysis)
effects = self._predict_effects(causes)
premise = f"Causes: {causes}"
conclusion = f"Expected effects: {effects}"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.CAUSAL,
premise=premise,
conclusion=conclusion,
confidence=0.8,
evidence=causes + effects,
assumptions=["Causal relationships are stable"]
)
def _temporal_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply temporal reasoning (time-based)"""
# Consider time-based aspects
temporal_aspects = self._analyze_temporal_aspects(problem)
premise = f"Temporal context: {temporal_aspects}"
conclusion = f"Time-based implications: Consider sequence and timing"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.TEMPORAL,
premise=premise,
conclusion=conclusion,
confidence=0.7,
evidence=temporal_aspects,
assumptions=["Time sequence affects outcomes"]
)
def _spatial_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply spatial reasoning (space/location)"""
# Consider spatial relationships
spatial_aspects = self._analyze_spatial_aspects(problem)
premise = f"Spatial context: {spatial_aspects}"
conclusion = f"Spatial implications: Consider location and arrangement"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.SPATIAL,
premise=premise,
conclusion=conclusion,
confidence=0.6,
evidence=spatial_aspects,
assumptions=["Spatial arrangement affects function"]
)
def _logical_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply logical reasoning (formal logic)"""
# Apply logical rules
logical_structure = self._analyze_logical_structure(problem)
premise = f"Logical structure: {logical_structure}"
conclusion = f"Logical conclusion: Apply formal reasoning rules"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.LOGICAL,
premise=premise,
conclusion=conclusion,
confidence=0.9,
evidence=[logical_structure],
assumptions=["Logical rules are consistent"]
)
def _analyze_logical_structure(self, problem: str) -> str:
"""
Analyze the logical structure of a problem statement.
Args:
problem: The problem statement to analyze
Returns:
str: Description of the logical structure
"""
try:
# Simple pattern matching for common logical structures
if any(connective in problem.lower() for connective in ['if', 'then', 'implies']):
return "Implication (if-then) structure detected"
elif any(connective in problem.lower() for connective in ['and', 'both', 'also']):
return "Conjunction (AND) structure detected"
elif any(connective in problem.lower() for connective in ['or', 'either']):
return "Disjunction (OR) structure detected"
elif any(connective in problem.lower() for connective in ['not', 'no ', 'never']):
return "Negation (NOT) structure detected"
elif any(connective in problem.lower() for connective in ['all', 'every', 'any']):
return "Universal quantification (FOR ALL) detected"
elif any(connective in problem.lower() for connective in ['some', 'there exists', 'at least one']):
return "Existential quantification (THERE EXISTS) detected"
else:
return "Simple proposition detected"
except Exception as e:
logger.warning(f"Error analyzing logical structure: {str(e)}")
return "Unable to determine logical structure"
def _creative_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Apply creative reasoning (novel solutions)"""
# Generate novel approaches
creative_ideas = self._generate_creative_ideas(problem, analysis)
premise = f"Creative exploration: {problem}"
conclusion = f"Novel approaches: {creative_ideas}"
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.CREATIVE,
premise=premise,
conclusion=conclusion,
confidence=0.6,
evidence=creative_ideas,
assumptions=["Novel approaches may be effective"]
)
def _default_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep:
"""Default reasoning when no specific strategy applies"""
return ReasoningStep(
step_id=len(self.reasoning_history),
reasoning_type=ReasoningType.LOGICAL,
premise=f"Problem: {problem}",
conclusion="Apply general problem-solving approach",
confidence=0.5,
evidence=[problem],
assumptions=["General approach is applicable"]
)
def _synthesize_reasoning(self, reasoning_results: List[ReasoningStep]) -> List[ReasoningStep]:
"""Integrate and synthesize multiple reasoning results"""
# Weight results by confidence and strategy effectiveness
weighted_results = []
for result in reasoning_results:
weight = self.strategy_weights.get(result.reasoning_type, 1.0)
result.confidence *= weight
weighted_results.append(result)
# Sort by confidence
weighted_results.sort(key=lambda x: x.confidence, reverse=True)
# Update strategy weights based on performance
self._update_strategy_weights(weighted_results)
return weighted_results
def _generate_solution(self, problem: str, reasoning_steps: List[ReasoningStep]) -> Dict:
"""Generate final solution based on reasoning"""
if not reasoning_steps:
return {"solution": "No solution found", "confidence": 0.0}
# Combine insights from all reasoning steps
best_step = reasoning_steps[0]
all_conclusions = [step.conclusion for step in reasoning_steps]
solution = {
"primary_approach": best_step.conclusion,
"reasoning_type": best_step.reasoning_type.value,
"alternative_approaches": all_conclusions[1:3], # Top 3 alternatives
"confidence": best_step.confidence,
"supporting_evidence": best_step.evidence,
"assumptions": best_step.assumptions
}
# Enhance with LLM if available
if self.openai_client:
enhanced_solution = self._enhance_solution_with_llm(problem, solution)
solution.update(enhanced_solution)
return solution
def _call_llm(self, prompt: str, max_retries: int = 3) -> str:
"""Call the language model with retry logic and error handling"""
if not self.openai_client:
logger.warning("LLM client not available")
return ""
for attempt in range(max_retries):
try:
# Use the TogetherAI client's generate_text method
response = self.openai_client.generate_text(
prompt=prompt,
model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
max_tokens=1000,
temperature=0.5
)
return response.strip()
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"LLM call failed after {max_retries} attempts: {str(e)}")
else:
logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)}")
return ""
def _enhance_analysis_with_llm(self, problem: str, analysis: Dict) -> Dict:
"""Enhance problem analysis using LLM"""
try:
prompt = f"""
Analyze this problem and enhance the analysis:
Problem: {problem}
Current Analysis:
{json.dumps(analysis, indent=2)}
Provide enhanced analysis focusing on:
1. Hidden assumptions and biases
2. Alternative approaches
3. Potential risks
4. Key success factors
Return as JSON with fields: enhanced_analysis, assumptions, alternatives, risks, success_factors
"""
response = self._call_llm(prompt)
if not response:
return {}
# Try to parse response as JSON
try:
return json.loads(response)
except json.JSONDecodeError:
# Fallback to plain text if JSON parsing fails
return {'enhanced_analysis': response}
except Exception as e:
logger.error(f"Error in _enhance_analysis_with_llm: {str(e)}")
return {}
def _enhance_solution_with_llm(self, problem: str, solution: Dict) -> Dict:
"""Enhance solution using LLM"""
try:
prompt = f"""
Enhance this solution with implementation details:
Problem: {problem}
Solution:
{json.dumps(solution, indent=2)}
Provide:
1. Step-by-step implementation
2. Code examples if applicable
3. Potential pitfalls
4. Performance considerations
Return as JSON with fields: detailed_solution, steps, code_examples, pitfalls, performance
"""
response = self._call_llm(prompt)
if not response:
return {}
try:
return json.loads(response)
except json.JSONDecodeError:
return {'detailed_solution': response}
except Exception as e:
logger.error(f"Error in _enhance_solution_with_llm: {str(e)}")
return {}
# Helper methods for reasoning strategies
def _find_relevant_knowledge(self, concepts: List[str]) -> str:
"""Find relevant knowledge from knowledge graph"""
relevant = []
for concept in concepts:
if concept in self.knowledge_graph:
node = self.knowledge_graph[concept]
relevant.append(f"{concept}: {node.properties}")
return str(relevant) if relevant else "No specific knowledge found"
def _find_patterns(self, concepts: List[str], min_support: float = 0.3) -> List[Dict]:
"""
Find relevant patterns in the knowledge graph based on concepts
Args:
concepts: List of concepts to find patterns for
min_support: Minimum support threshold for pattern matching (0.0 to 1.0)
Returns:
List of matching patterns with their confidence scores
"""
if not concepts:
return []
# Get all patterns from the knowledge graph
all_patterns = [
node for node in self.knowledge_graph.values()
if node.properties.get('type') == 'pattern'
]
if not all_patterns:
return []
# Calculate similarity between input concepts and pattern concepts
matching_patterns = []
for pattern_node in all_patterns:
pattern_concepts = set(pattern_node.properties.get('concepts', []))
if not pattern_concepts:
continue
# Calculate Jaccard similarity
input_set = set(concepts)
intersection = len(input_set.intersection(pattern_concepts))
union = len(input_set.union(pattern_concepts))
similarity = intersection / union if union > 0 else 0
if similarity >= min_support:
matching_patterns.append({
'pattern_id': pattern_node.concept,
'similarity': similarity,
'confidence': pattern_node.confidence,
'concepts': list(pattern_concepts),
'solution': pattern_node.properties.get('solution', {})
})
# Sort by combined score of similarity and confidence
matching_patterns.sort(
key=lambda x: (x['similarity'] * 0.6 + x['confidence'] * 0.4),
reverse=True
)
return matching_patterns
def _learn_pattern(self, problem: str, solution: Dict, context: Dict = None) -> bool:
"""
Learn a new pattern from a problem-solution pair
Args:
problem: The problem text
solution: The solution dictionary
context: Additional context about the problem
Returns:
bool: True if a new pattern was learned, False otherwise
"""
try:
# Extract key concepts from the problem
concepts, _ = self._extract_concepts(problem)
if not concepts or len(concepts) < 2:
return False # Need at least 2 concepts to form a pattern
# Check if a similar pattern already exists
existing_patterns = self._find_patterns(concepts, min_support=0.7)
if existing_patterns and existing_patterns[0]['similarity'] > 0.8:
# Update existing pattern confidence
pattern_id = existing_patterns[0]['pattern_id']
if pattern_id in self.knowledge_graph:
node = self.knowledge_graph[pattern_id]
node.confidence = min(1.0, node.confidence + 0.05)
node.properties['last_used'] = datetime.datetime.now().isoformat()
node.properties['usage_count'] = node.properties.get('usage_count', 0) + 1
logger.info(f"Updated pattern confidence for {pattern_id}")
return False
# Create a new pattern
pattern_id = f"pattern_{len([n for n in self.knowledge_graph.values() if n.properties.get('type') == 'pattern'])}"
# Store the pattern in the knowledge graph
self.add_knowledge(
concept=pattern_id,
properties={
'type': 'pattern',
'concepts': concepts,
'solution': solution,
'created_at': datetime.datetime.now().isoformat(),
'usage_count': 1,
'last_used': datetime.datetime.now().isoformat(),
'context': context or {}
},
relationships={},
confidence=0.5, # Initial confidence
source='pattern_learning'
)
logger.info(f"Learned new pattern: {pattern_id} with {len(concepts)} concepts")
return True
except Exception as e:
logger.error(f"Error learning pattern: {str(e)}")
return False
def _apply_patterns(self, problem: str, context: Dict = None) -> Optional[Dict]:
"""
Try to apply learned patterns to solve the problem
Args:
problem: The problem to solve
context: Additional context about the problem
Returns:
Dict containing the solution if a matching pattern is found, None otherwise
"""
try:
# Extract key concepts from the problem
concepts, _ = self._extract_concepts(problem)
if not concepts:
return None
# Find matching patterns
patterns = self._find_patterns(concepts)
# Filter patterns by minimum confidence
patterns = [p for p in patterns if p['confidence'] > 0.6 and p['similarity'] > 0.5]
if not patterns:
return None
# Sort by combined score (similarity * confidence)
best_pattern = max(
patterns,
key=lambda p: p['similarity'] * p['confidence']
)
# Update pattern usage statistics
if best_pattern['pattern_id'] in self.knowledge_graph:
node = self.knowledge_graph[best_pattern['pattern_id']]
node.confidence = min(1.0, node.confidence + 0.02) # Small confidence boost
node.properties['last_used'] = datetime.datetime.now().isoformat()
node.properties['usage_count'] = node.properties.get('usage_count', 0) + 1
logger.info(f"Applied pattern {best_pattern['pattern_id']} with "
f"confidence {best_pattern['confidence']:.2f}")
# Return the solution from the pattern
return best_pattern['solution']
except Exception as e:
logger.error(f"Error applying patterns: {str(e)}")
return None
def _neural_reasoning(self, problem: str, context: Dict = None) -> Dict:
"""Use neural network for advanced reasoning"""
if not self.neural_network:
return {"error": "Neural network not initialized", "confidence": 0.0}
try:
# Prepare input for the neural network
input_tensor = self._prepare_neural_input(problem, context)
# Run the neural network
with torch.no_grad():
output = self.neural_network(input_tensor)
# Process the output
reasoning_result = self._process_neural_output(output)
# Update neural network based on results
self._update_neural_weights(reasoning_result)
return reasoning_result
except Exception as e:
logger.error(f"Neural reasoning failed: {str(e)}")
return {"error": str(e), "confidence": 0.0}
def _prepare_neural_input(self, problem: str, context: Dict) -> torch.Tensor:
"""Convert problem and context to neural network input"""
# Tokenize and convert to tensor
tokens = self._tokenize_text(problem)
# Add context if available
if context:
context_tokens = self._tokenize_text(json.dumps(context))
tokens = tokens + [self.sep_token] + context_tokens
# Convert to tensor and add batch dimension
input_tensor = torch.tensor([tokens], dtype=torch.long)
# Add positional encoding
positions = torch.arange(0, input_tensor.size(1), dtype=torch.long).unsqueeze(0)
pos_encoding = self.positional_encoder(positions)
return input_tensor + pos_encoding
def _process_neural_output(self, output: torch.Tensor) -> Dict:
"""Convert neural network output to reasoning result"""
# Get the most likely reasoning strategy
strategy_logits = output['strategy_logits']
strategy_probs = torch.softmax(strategy_logits, dim=-1)
best_strategy_idx = torch.argmax(strategy_probs).item()
# Get confidence and reasoning steps
confidence = output['confidence'].sigmoid().item()
reasoning_steps = self._decode_reasoning_steps(output['reasoning_steps'])
return {
'strategy': ReasoningType(best_strategy_idx).name,
'confidence': confidence,
'reasoning_steps': reasoning_steps,
'raw_output': output
}
def _update_neural_weights(self, result: Dict):
"""Update neural network weights based on reasoning results"""
if result.get('confidence', 0) > self.confidence_threshold:
# Positive reinforcement for successful reasoning
self.learning_rate *= 1.05 # Slightly increase learning rate
else:
# Negative feedback for poor reasoning
self.learning_rate *= 0.95 # Slightly decrease learning rate
# Ensure learning rate stays in reasonable bounds
self.learning_rate = max(1e-6, min(0.1, self.learning_rate))
# Update optimizer with new learning rate
if hasattr(self, 'optimizer'):
for param_group in self.optimizer.param_groups:
param_group['lr'] = self.learning_rate
def _update_strategy_weights(self, results: List[ReasoningStep]):
"""Update strategy weights based on performance with enhanced learning
Implements a more sophisticated weight update mechanism that considers:
- Recent performance history
- Confidence levels
- Strategy diversity
- Long-term vs short-term performance
"""
if not results:
return
# Track performance metrics
performance_metrics = {}
# Calculate performance metrics for each strategy
for result in results:
strategy = result.reasoning_type
if strategy not in performance_metrics:
performance_metrics[strategy] = {
'total_confidence': 0,
'count': 0,
'successes': 0,
'recent_successes': []
}
metrics = performance_metrics[strategy]
metrics['total_confidence'] += result.confidence
metrics['count'] += 1
# Track successes (confidence > threshold)
if result.confidence > 0.7:
metrics['successes'] += 1
metrics['recent_successes'].append(1)
else:
metrics['recent_successes'].append(0)
# Keep only last 10 results for recent performance
metrics['recent_successes'] = metrics['recent_successes'][-10:]
# Update weights based on performance
for strategy, metrics in performance_metrics.items():
if metrics['count'] == 0:
continue
avg_confidence = metrics['total_confidence'] / metrics['count']
success_rate = metrics['successes'] / metrics['count']
# Calculate recent success rate (last 10 attempts)
recent_success_rate = sum(metrics['recent_successes']) / len(metrics['recent_successes']) if metrics['recent_successes'] else 0
# Calculate weight adjustment factor
# More weight to recent performance (60%) than overall (40%)
performance_score = (recent_success_rate * 0.6) + (success_rate * 0.4)
# Adjust weight based on performance
# Cap maximum adjustment to prevent extreme values
max_adjustment = 0.2 # 20% max adjustment per update
adjustment = (performance_score - 0.5) * 2 * max_adjustment
# Apply non-linear scaling to prevent dominance of any single strategy
new_weight = self.strategy_weights.get(strategy, 1.0) * (1 + adjustment)
# Apply bounds to weights (0.1 to 10.0)
self.strategy_weights[strategy] = max(0.1, min(10.0, new_weight))
logger.debug(f"Updated {strategy.value} weight to {self.strategy_weights[strategy]:.2f} "
f"(performance: {performance_score:.2f}, adjustment: {adjustment:+.2f})")
# Ensure diversity by preventing any single strategy from dominating
self._maintain_strategy_diversity()
def _maintain_strategy_diversity(self):
"""Ensure no single strategy dominates by maintaining minimum weights"""
min_weight = 0.1 # Absolute minimum weight
max_weight = 10.0 # Absolute maximum weight
# Calculate current weight distribution
total_weight = sum(self.strategy_weights.values())
num_strategies = len(self.strategy_weights)
# If any strategy is too dominant, reduce it and redistribute
max_strategy = max(self.strategy_weights, key=self.strategy_weights.get)
max_value = self.strategy_weights[max_strategy]
if max_value / total_weight > 0.5: # If any strategy has >50% weight
# Reduce the dominant strategy's weight
self.strategy_weights[max_strategy] = total_weight * 0.4 # Reduce to 40%
# Redistribute the remaining weight proportionally
remaining_weight = total_weight - self.strategy_weights[max_strategy]
other_strategies = [s for s in self.strategy_weights if s != max_strategy]
if other_strategies:
total_other_weight = sum(self.strategy_weights[s] for s in other_strategies)
if total_other_weight > 0:
for strategy in other_strategies:
proportion = self.strategy_weights[strategy] / total_other_weight
self.strategy_weights[strategy] = proportion * remaining_weight
logger.info(f"Reduced dominance of {max_strategy.value} strategy to maintain diversity")
def update_knowledge(self, problem: str = None, solution: Dict = None, context: Dict = None,
pattern: str = None, examples: List[Dict] = None, confidence: float = 0.5) -> bool:
"""
Update the knowledge base with new information.
Enhanced with better learning mechanisms:
- Tracks usage and success of different knowledge items
- Updates confidence based on usage patterns
- Forgets or demotes unused knowledge
"""
try:
# Track knowledge usage for learning
if problem and solution:
self._track_knowledge_usage(problem, solution, context)
# Handle legacy call pattern (problem/solution)
if problem is not None and solution is not None:
return self._update_knowledge_from_problem(problem, solution)
# Handle new call pattern (pattern/examples)
elif pattern is not None and examples is not None:
return self._update_knowledge_from_pattern(pattern, examples, confidence)
else:
logger.warning("Invalid arguments to update_knowledge. Need either (problem and solution) or (pattern and examples).")
return False
except Exception as e:
logger.error(f"Error updating knowledge: {str(e)}")
return False
def _track_knowledge_usage(self, problem: str, solution: Dict, context: Dict = None):
"""Track how knowledge is being used to improve learning"""
# Extract key concepts from the problem
concepts, _ = self._extract_concepts(problem)
# Update usage statistics for each concept
for concept in concepts:
if concept in self.knowledge_graph:
node = self.knowledge_graph[concept]
# Initialize usage tracking if not present
if '_usage' not in node.properties:
node.properties['_usage'] = {
'count': 0,
'last_used': datetime.datetime.now().isoformat(),
'success_count': 0,
'recent_uses': []
}
# Update usage statistics
usage = node.properties['_usage']
usage['count'] += 1
usage['last_used'] = datetime.datetime.now().isoformat()
# Track success if available
if solution.get('confidence', 0) > 0.7:
usage['success_count'] += 1
usage['recent_uses'].append(1) # 1 for success
else:
usage['recent_uses'].append(0) # 0 for failure
# Keep only last 10 uses for recent performance
usage['recent_uses'] = usage['recent_uses'][-10:]
# Calculate success rate
success_rate = usage['success_count'] / usage['count'] if usage['count'] > 0 else 0
recent_success_rate = sum(usage['recent_uses']) / len(usage['recent_uses']) if usage['recent_uses'] else 0
# Update node confidence based on usage
# More weight to recent performance (70%) than overall (30%)
new_confidence = (recent_success_rate * 0.7) + (success_rate * 0.3)
node.confidence = max(0.1, min(1.0, new_confidence))
# Log significant changes
if abs(node.confidence - new_confidence) > 0.1:
logger.info(f"Updated confidence for '{concept}' to {node.confidence:.2f} "
f"(success rate: {success_rate:.2f}, recent: {recent_success_rate:.2f})")
# Periodically clean up unused knowledge
if random.random() < 0.05: # 5% chance to run cleanup on any update
self._cleanup_unused_knowledge()
def _cleanup_unused_knowledge(self):
"""Remove or demote unused knowledge to keep the knowledge base relevant"""
current_time = datetime.datetime.now()
max_age = datetime.timedelta(days=30) # Consider knowledge unused after 30 days
concepts_to_remove = []
for concept, node in list(self.knowledge_graph.items()):
# Skip base knowledge
if node.source == 'base_initialization':
continue
usage = node.properties.get('_usage', {})
last_used = usage.get('last_used')
if last_used:
try:
last_used = datetime.datetime.fromisoformat(last_used)
age = current_time - last_used
# If unused for too long and low confidence, mark for removal
if age > max_age and node.confidence < 0.3:
concepts_to_remove.append(concept)
# If unused for long but has some confidence, just reduce confidence
elif age > max_age:
node.confidence *= 0.8 # Decay confidence
except (ValueError, TypeError):
continue
# Remove low-confidence, unused concepts
for concept in concepts_to_remove:
del self.knowledge_graph[concept]
logger.info(f"Removed unused knowledge: {concept}")
return len(concepts_to_remove)
def _update_knowledge_from_reasoning(self, problem: str, solution: Dict, reasoning: List[ReasoningStep]):
"""Update knowledge base from reasoning experience"""
# Extract new knowledge from successful reasoning
if solution.get('confidence', 0) > 0.7:
new_concept = f"problem_solution_{len(self.knowledge_graph)}"
properties = {
"problem_type": solution.get('reasoning_type', 'unknown'),
"solution_approach": solution.get('primary_approach', ''),
"success_rate": solution.get('confidence', 0)
}
self.add_knowledge(new_concept, properties, {}, solution.get('confidence', 0), "reasoning_experience")
def _update_knowledge_from_problem(self, problem: str, solution: Dict) -> bool:
"""Update knowledge base from a problem-solution pair (legacy method)."""
# Extract key concepts from the problem
concepts, _ = self._extract_concepts(problem)
# Create or update knowledge nodes for each concept
for concept in concepts:
if concept not in self.knowledge_graph:
# Add new concept to knowledge graph
properties = {
'type': 'concept',
'first_seen': datetime.datetime.now().isoformat(),
'occurrences': 1
}
self.add_knowledge(
concept=concept,
properties=properties,
relationships={},
confidence=0.7,
source='interaction'
)
else:
# Update existing concept
node = self.knowledge_graph[concept]
node.properties['last_seen'] = datetime.datetime.now().isoformat()
node.properties['occurrences'] = node.properties.get('occurrences', 1) + 1
# Store the solution as a new knowledge node
solution_id = f"solution_{len(self.knowledge_graph)}"
solution_properties = {
'type': 'solution',
'problem': problem,
'timestamp': datetime.datetime.now().isoformat(),
'confidence': solution.get('confidence', 0.5),
'success': solution.get('success', False)
}
# Add relationships to concepts
relationships = {
'addresses': concepts,
'related_to': list(set(concepts)) # Remove duplicates
}
self.add_knowledge(
concept=solution_id,
properties=solution_properties,
relationships=relationships,
confidence=solution.get('confidence', 0.5),
source='interaction'
)
# Update strategy weights based on success
if 'reasoning_type' in solution:
try:
reasoning_type = ReasoningType(solution['reasoning_type'])
if solution.get('success', False):
self.strategy_weights[reasoning_type] = min(10.0,
self.strategy_weights.get(reasoning_type, 1.0) * 1.1)
else:
self.strategy_weights[reasoning_type] = max(0.1,
self.strategy_weights.get(reasoning_type, 1.0) * 0.9)
except ValueError:
pass # Unknown reasoning type
logger.info(f"Updated knowledge base with new information from problem: {problem[:50]}...")
return True
def _update_knowledge_from_pattern(self, pattern: str, examples: List[Dict], confidence: float) -> bool:
"""Update knowledge base from a pattern and examples."""
# Create or update pattern node
if pattern not in self.knowledge_graph:
properties = {
'type': 'pattern',
'first_seen': datetime.datetime.now().isoformat(),
'example_count': len(examples),
'last_updated': datetime.datetime.now().isoformat()
}
self.add_knowledge(
concept=pattern,
properties=properties,
relationships={},
confidence=confidence,
source='learning'
)
else:
# Update existing pattern
node = self.knowledge_graph[pattern]
node.properties['last_updated'] = datetime.datetime.now().isoformat()
node.properties['example_count'] = len(examples)
node.confidence = max(node.confidence, confidence)
# Add example solutions
for example in examples:
solution_id = f"example_{len(self.knowledge_graph)}"
solution_properties = {
'type': 'example_solution',
'content': str(example),
'timestamp': datetime.datetime.now().isoformat(),
'confidence': confidence,
'source': 'learning'
}
relationships = {
'exemplifies': [pattern],
'related_to': [pattern]
}
self.add_knowledge(
concept=solution_id,
properties=solution_properties,
relationships=relationships,
confidence=confidence,
source='learning'
)
logger.info(f"Updated knowledge base with new pattern: {pattern[:50]}...")
return True
def _calculate_confidence(self, reasoning_steps: List[Union[Dict, ReasoningStep]]) -> float:
"""
Calculate overall confidence score with enhanced calibration
Args:
reasoning_steps: List of reasoning steps (either dicts or ReasoningStep objects)
Returns:
float: Calibrated confidence score between 0.0 and 1.0
"""
if not reasoning_steps:
return 0.0
# Convert ReasoningStep objects to dicts if needed
steps = []
for step in reasoning_steps:
if isinstance(step, ReasoningStep):
steps.append({
'confidence': step.confidence,
'reasoning_type': step.reasoning_type.value,
'evidence': step.evidence,
'assumptions': step.assumptions
})
else:
steps.append(step)
# Calculate base confidence as weighted average
weights = [1.0] * len(steps) # Equal weights for now
total_weight = sum(weights)
if total_weight == 0:
return 0.0
base_confidence = sum(step.get('confidence', 0) * weight
for step, weight in zip(steps, weights)) / total_weight
# Apply calibration factors
calibrated_confidence = self._apply_calibration_factors(
base_confidence,
steps
)
# Ensure confidence is within bounds
return max(0.0, min(1.0, calibrated_confidence))
def _apply_calibration_factors(
self,
base_confidence: float,
reasoning_steps: List[Dict]
) -> float:
"""
Apply various calibration factors to the base confidence
Args:
base_confidence: The initial confidence score
reasoning_steps: List of reasoning steps
Returns:
float: Calibrated confidence score
"""
calibrated = base_confidence
# Factor 1: Consistency across reasoning steps
step_confidences = [step.get('confidence', 0.0) for step in reasoning_steps]
if step_confidences:
std_dev = np.std(step_confidences)
# Reduce confidence if there's high variance in step confidences
if std_dev > 0.2:
calibrated *= 0.9 # 10% penalty for high variance
# Factor 2: Agreement between different reasoning strategies
strategies = [step.get('strategy') for step in reasoning_steps]
unique_strategies = len(set(strategies))
if unique_strategies > 1:
# Bonus for multiple strategies agreeing
calibrated *= 1.05
# Factor 3: Knowledge base coverage
knowledge_coverage = self._calculate_knowledge_coverage(reasoning_steps)
calibrated *= (0.7 + 0.3 * knowledge_coverage) # 30% weight to knowledge coverage
# Factor 4: Recent performance
recent_success_rate = self._get_recent_success_rate()
calibrated *= (0.8 + 0.2 * recent_success_rate) # 20% weight to recent performance
return calibrated
def _calculate_knowledge_coverage(self, reasoning_steps: List[Union[Dict, ReasoningStep]]) -> float:
"""
Calculate how well the reasoning is covered by existing knowledge
Args:
reasoning_steps: List of reasoning steps (either dicts or ReasoningStep objects)
Returns:
float: Coverage score between 0.0 and 1.0
"""
if not reasoning_steps:
return 0.0
total_concepts = 0
covered_concepts = 0
for step in reasoning_steps:
# Convert ReasoningStep to dict if needed
if isinstance(step, ReasoningStep):
concepts = step.evidence + step.assumptions
else:
concepts = step.get('concepts', []) + step.get('evidence', []) + step.get('assumptions', [])
total_concepts += len(concepts)
# Count how many concepts are in our knowledge graph
for concept in concepts:
if concept in self.knowledge_graph:
covered_concepts += 1
if total_concepts == 0:
return 1.0 # If no concepts to check, consider it fully covered
return covered_concepts / total_concepts
def _get_recent_success_rate(self, window_size: int = 10) -> float:
"""
Calculate the success rate of recent reasoning attempts
Args:
window_size: Number of recent attempts to consider
Returns:
float: Success rate between 0.0 and 1.0
"""
if not hasattr(self, '_reasoning_history'):
self._reasoning_history = []
# Only consider recent history
recent_history = self._reasoning_history[-window_size:]
if not recent_history:
return 0.7 # Default to moderate confidence
# Count successful reasoning attempts (confidence > threshold)
success_count = sum(1 for _, success in recent_history if success)
return success_count / len(recent_history)
def _update_reasoning_history(self, success: bool):
"""
Update the reasoning history with the latest result
Args:
success: Whether the reasoning was successful
"""
if not hasattr(self, '_reasoning_history'):
self._reasoning_history = []
# Add timestamp and result
self._reasoning_history.append((datetime.datetime.now(), success))
# Keep history size manageable
if len(self._reasoning_history) > 100: # Keep last 100 entries
self._reasoning_history = self._reasoning_history[-100:]
def _generate_self_assessment(self, result: Dict) -> Dict:
"""
Generate a self-assessment of the reasoning process
Args:
result: The reasoning result to assess
Returns:
Dict containing self-assessment information
"""
assessment = {
'confidence': result.get('confidence', 0.0),
'issues': [],
'strengths': [],
'suggestions': []
}
# Check confidence level
if assessment['confidence'] < 0.3:
assessment['issues'].append("Low confidence in the solution")
assessment['suggestions'].append("Try providing more context or rephrasing the problem")
# Check processing time
process_time = result.get('processing_time', 0)
if process_time > 5.0: # More than 5 seconds
assessment['issues'].append("Reasoning took longer than expected")
assessment['suggestions'].append("Consider simplifying the problem or using a different approach")
# Check knowledge coverage
if hasattr(self, 'knowledge_graph'):
coverage = self._calculate_knowledge_coverage(result.get('reasoning_steps', []))
if coverage < 0.3:
assessment['issues'].append("Limited knowledge coverage for this problem")
assessment['suggestions'].append("Consider adding more relevant knowledge to the knowledge base")
# Add strengths if confidence is high
if assessment['confidence'] > 0.7:
assessment['strengths'].append("High confidence in the solution")
if process_time < 1.0:
assessment['strengths'].append("Efficient reasoning process")
if coverage > 0.7:
assessment['strengths'].append("Good knowledge coverage")
return assessment
def _handle_error(self, error_type: str, error: Exception, context: Dict = None) -> Dict:
"""
Centralized error handling with appropriate fallback
Args:
error_type: Type of error (e.g., 'api_error', 'network_error')
error: The exception that was raised
context: Additional context about the error
Returns:
Dict containing error information and fallback response
"""
try:
# Log the error with context
logger.error(f"{error_type}: {str(error)}\nContext: {context}")
# Get the appropriate handler or use default
handler = self.error_handlers.get(error_type, self._handle_generic_error)
# Call the handler
return handler(error, context)
except Exception as e:
# If error handling itself fails
logger.critical(f"Error handler failed: {str(e)}")
return self._handle_critical_error(handler_error)
def _handle_api_error(self, error: Exception, context: Dict) -> Dict:
"""Handle API-related errors"""
# Try to extract useful information from the error
error_info = {
'error_type': 'api_error',
'message': str(error),
'suggestion': 'Check your API key and quota',
'can_retry': True,
'retry_after': 60 # seconds
}
# Add context if available
if context:
error_info.update(context)
return error_info
def _handle_network_error(self, error: Exception, context: Dict) -> Dict:
"""Handle network-related errors"""
return {
'error_type': 'network_error',
'message': 'Network connectivity issue',
'suggestion': 'Check your internet connection and try again',
'can_retry': True,
'retry_after': 30 # seconds
}
def _handle_validation_error(self, error: Exception, context: Dict) -> Dict:
"""Handle input validation errors"""
return {
'error_type': 'validation_error',
'message': str(error),
'suggestion': 'Check your input and try again',
'can_retry': True,
'retry_immediately': True
}
def _handle_timeout_error(self, error: Exception, context: Dict) -> Dict:
"""Handle timeout errors"""
return {
'error_type': 'timeout_error',
'message': 'Operation timed out',
'suggestion': 'The request took too long. Please try again later',
'can_retry': True,
'retry_after': 60 # seconds
}
def _handle_resource_error(self, error: Exception, context: Dict) -> Dict:
"""Handle resource-related errors"""
return {
'error_type': 'resource_error',
'message': 'Insufficient resources',
'suggestion': 'Try again with a smaller input or contact support',
'can_retry': False
}
def _handle_generic_error(self, error: Exception, context: Dict) -> Dict:
"""Handle any unclassified errors"""
return {
'error_type': 'unexpected_error',
'message': 'An unexpected error occurred',
'suggestion': 'Please try again later',
'can_retry': True,
'retry_after': 300 # 5 minutes
}
def _handle_critical_error(self, error: Exception) -> Dict:
"""Handle critical errors in error handling"""
return {
'error_type': 'critical_error',
'message': 'A critical error occurred',
'suggestion': 'Please restart the application',
'can_retry': False
}
def _safe_call(self, func, *args, **kwargs):
"""
Safely call a function with error handling and retries
Args:
func: The function to call
*args: Positional arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
Tuple of (result, error_info)
"""
max_retries = kwargs.pop('max_retries', 3)
retry_delay = kwargs.pop('retry_delay', 1.0)
error_handlers = kwargs.pop('error_handlers', {})
last_error = None
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
return result, None
except Exception as e:
last_error = e
error_type = self._classify_error(e)
# Use custom handler if provided, otherwise use default
handler = error_handlers.get(error_type, self._handle_error)
error_info = handler(e, {
'attempt': attempt + 1,
'max_retries': max_retries,
'function': func.__name__
})
# Check if we should retry
if not error_info.get('can_retry', False) or attempt == max_retries - 1:
return None, error_info
# Wait before retrying
time.sleep(error_info.get('retry_after', retry_delay))
# If we get here, all retries failed
return None, self._handle_generic_error(
last_error,
{'message': 'All retries failed'}
)
def _classify_error(self, error: Exception) -> str:
"""Classify an exception into an error type"""
error_str = str(error).lower()
if any(e in error_str for e in ['api', 'authentication', 'quota']):
return 'api_error'
elif any(e in error_str for e in ['network', 'connection', 'timeout', 'socket']):
return 'network_error'
elif any(e in error_str for e in ['validation', 'invalid', 'value', 'type']):
return 'validation_error'
elif 'timeout' in error_str:
return 'timeout_error'
elif any(e in error_str for e in ['memory', 'resource', 'capacity']):
return 'resource_error'
else:
return 'unexpected_error'
def _format_error_response(self, error_info: Dict, start_time: float) -> Dict:
"""Format an error response with additional metadata"""
return {
'success': False,
'error': error_info,
'processing_time': time.time() - start_time,
'timestamp': datetime.datetime.now().isoformat(),
'suggestion': error_info.get('suggestion', 'Please try again later')
}
def _select_best_error(self, *errors: Dict) -> Dict:
"""Select the most appropriate error from multiple errors"""
if not errors:
return self._handle_generic_error(
ValueError("No errors provided"),
{}
)
# Prefer more specific errors over generic ones
error_priority = [
'validation_error',
'api_error',
'network_error',
'resource_error',
'timeout_error',
'unexpected_error',
'critical_error'
]
# Find the highest priority error
for error_type in error_priority:
for error in errors:
if error.get('error_type') == error_type:
return error
# If no matches, return the first error
return errors[0]
def _find_analogies(self, concept: str, top_n: int = 3) -> List[str]:
"""
Find analogies or similar concepts in the knowledge graph.
Args:
concept: The concept to find analogies for
top_n: Maximum number of analogies to return
Returns:
List of analogies or similar concepts
"""
try:
# First try to find direct analogies in the knowledge graph
analogies = []
# Look for similar concepts based on properties and relationships
if concept in self.knowledge_graph:
node = self.knowledge_graph[concept]
# Check for 'similar_to' relationships
if 'similar_to' in node.relationships:
for similar_concept in node.relationships['similar_to']:
if similar_concept in self.knowledge_graph:
analogies.append(f"Similar to {similar_concept}: {self.knowledge_graph[similar_concept].get('description', 'No description')}")
# If we have enough analogies, return them
if len(analogies) >= top_n:
return analogies[:top_n]
# If not enough analogies found, use the LLM to generate some
prompt = f"""
Generate {top_n} analogies or similar concepts to help understand:
"{concept}"
Format each analogy on a new line starting with a dash.
Be creative but keep the analogies relevant and helpful.
"""
response = self._call_llm(prompt)
# Parse the response into a list of analogies
if response:
# Split by lines and clean up
lines = [line.strip() for line in response.split('\n') if line.strip()]
for line in lines:
# Remove bullet points or dashes if present
line = re.sub(r'^[\-\*•]\s*', '', line).strip()
if line:
analogies.append(line)
# Ensure we have at least some analogies
if not analogies:
analogies = [
f"This is similar to learning a new language - it takes practice and immersion",
f"Think of this like solving a puzzle - break it down into smaller pieces",
f"This concept is like a tool - its value comes from how you use it"
]
return analogies[:top_n]
except Exception as e:
logger.warning(f"Error finding analogies: {str(e)}")
# Return some default analogies
return [
f"This is similar to learning a new skill - it takes time and practice",
f"Think of this like building a house - you need a solid foundation first",
f"This concept is like a map - it helps you navigate complex information"
][:top_n]
def _generate_creative_ideas(self, problem: str, analysis: Dict) -> List[str]:
"""
Generate creative ideas or approaches to solve a problem.
Args:
problem: The problem statement
analysis: The problem analysis dictionary
Returns:
List of creative ideas or approaches
"""
try:
# Safely extract and format key concepts
key_concepts = analysis.get('key_concepts', [])
# Ensure all items are strings and flatten any nested lists
flat_concepts = []
for item in key_concepts:
if isinstance(item, (list, tuple)):
flat_concepts.extend(str(x) for x in item if x)
elif item: # Only add non-empty items
flat_concepts.append(str(item))
# Use the LLM to generate creative ideas
prompt = f"""
Generate 3-5 creative approaches to solve the following problem.
Focus on innovative or unconventional solutions.
Problem: {problem}
Key concepts: {', '.join(flat_concepts) if flat_concepts else 'None identified'}
Problem type: {analysis.get('problem_type', 'unknown')}
Creative approaches:
1."""
response = self._call_llm(prompt)
# Parse the response into a list of ideas
ideas = []
if response:
# Split by numbered list items
for line in response.split('\n'):
line = line.strip()
if line and line[0].isdigit() and '. ' in line:
idea = line.split('. ', 1)[1].strip()
ideas.append(idea)
# Fallback if no ideas were parsed
if not ideas:
ideas = [
"Approach the problem from a different perspective",
"Combine elements from unrelated domains",
"Challenge existing assumptions about the problem"
]
return ideas
except Exception as e:
logger.warning(f"Error generating creative ideas: {str(e)}")
# Return some default creative approaches
return [
"Consider an unconventional perspective on the problem",
"Look for inspiration from unrelated fields",
"Break the problem down into smaller, more manageable parts"
]
def create_general_reasoning_engine(together_ai_client = None) -> GeneralReasoningEngine:
"""Factory function to create reasoning engine
Args:
together_ai_client: An instance of TogetherAI client
"""
if together_ai_client:
# Use the provided TogetherAI client
return GeneralReasoningEngine(openai_client=together_ai_client)
else:
raise ValueError("TogetherAI client is required")
# Example usage
if __name__ == "__main__":
# Create reasoning engine
reasoning_engine = create_general_reasoning_engine(together_ai_client=None)
# Test problems
test_problems = [
"How can I optimize the performance of a machine learning model?",
"What's the best way to design a user interface for elderly users?",
"How do I solve the problem of declining team productivity?",
"What causes climate change and how can we address it?",
"How can I create a more efficient algorithm for sorting data?"
]
for problem in test_problems:
print(f"\n{'='*60}")
print(f"REASONING ABOUT: {problem}")
print('='*60)
result = reasoning_engine.reason_about_problem(problem)
explanation = reasoning_engine.get_reasoning_explanation(result)
print(explanation)