| import torch |
| import torch.nn as nn |
| import json |
| import os |
| import re |
| from typing import Dict, List, Tuple, Optional, Any, Union |
| from dataclasses import dataclass |
| from enum import Enum |
| from openai import OpenAI |
| import logging |
| from transformer import Transformer, create_transformer_model, initialize_weights |
| import random |
| import datetime |
| import numpy as np |
| import time |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class ReasoningType(Enum): |
| """Types of reasoning approaches""" |
| DEDUCTIVE = "deductive" |
| INDUCTIVE = "inductive" |
| ABDUCTIVE = "abductive" |
| ANALOGICAL = "analogical" |
| CAUSAL = "causal" |
| TEMPORAL = "temporal" |
| SPATIAL = "spatial" |
| LOGICAL = "logical" |
| CREATIVE = "creative" |
|
|
| @dataclass |
| class ReasoningStep: |
| """Individual step in reasoning process""" |
| step_id: int |
| reasoning_type: ReasoningType |
| premise: str |
| conclusion: str |
| confidence: float |
| evidence: List[str] |
| assumptions: List[str] |
|
|
| @dataclass |
| class KnowledgeNode: |
| """Node in the knowledge graph""" |
| concept: str |
| properties: Dict[str, Any] |
| relationships: Dict[str, List[str]] |
| confidence: float |
| source: str |
|
|
| class GeneralReasoningEngine: |
| """ |
| General Reasoning Engine for AGI-like reasoning capabilities |
| Implements multiple reasoning strategies and knowledge integration |
| """ |
| |
| def __init__(self, |
| openai_client: OpenAI = None, |
| device: str = "cuda" if torch.cuda.is_available() else "cpu"): |
| |
| self.device = device |
| self.openai_client = openai_client |
| |
| |
| self.reasoning_transformer = create_transformer_model( |
| src_vocab_size=100000, |
| tgt_vocab_size=100000 |
| ).to(self.device) |
| initialize_weights(self.reasoning_transformer) |
| |
| |
| self.knowledge_graph = {} |
| self.working_memory = [] |
| self.reasoning_history = [] |
| |
| |
| self.reasoning_strategies = { |
| ReasoningType.DEDUCTIVE: self._deductive_reasoning, |
| ReasoningType.INDUCTIVE: self._inductive_reasoning, |
| ReasoningType.ABDUCTIVE: self._abductive_reasoning, |
| ReasoningType.ANALOGICAL: self._analogical_reasoning, |
| ReasoningType.CAUSAL: self._causal_reasoning, |
| ReasoningType.TEMPORAL: self._temporal_reasoning, |
| ReasoningType.SPATIAL: self._spatial_reasoning, |
| ReasoningType.LOGICAL: self._logical_reasoning, |
| ReasoningType.CREATIVE: self._creative_reasoning |
| } |
| |
| |
| self.strategy_weights = {strategy: 1.0 for strategy in ReasoningType} |
| |
| |
| self._initialize_base_knowledge() |
| |
| |
| self.neural_network = None |
| self.learning_rate = 0.01 |
| self.confidence_threshold = 0.7 |
| self.optimizer = None |
| self.positional_encoder = None |
| self.sep_token = None |
| |
| |
| self.error_handlers = { |
| 'api_error': self._handle_api_error, |
| 'network_error': self._handle_network_error, |
| 'validation_error': self._handle_validation_error, |
| 'timeout_error': self._handle_timeout_error, |
| 'resource_error': self._handle_resource_error |
| } |
| |
| def _initialize_base_knowledge(self): |
| """Initialize with fundamental knowledge concepts""" |
| base_concepts = [ |
| ("object", {"type": "entity", "properties": ["existence", "identity"]}), |
| ("action", {"type": "process", "properties": ["causality", "temporality"]}), |
| ("relationship", {"type": "connection", "properties": ["bidirectional", "typed"]}), |
| ("pattern", {"type": "structure", "properties": ["repetition", "similarity"]}), |
| ("goal", {"type": "objective", "properties": ["desirability", "achievability"]}), |
| ("constraint", {"type": "limitation", "properties": ["boundary", "restriction"]}), |
| ("context", {"type": "environment", "properties": ["situational", "influential"]}) |
| ] |
| |
| for concept, properties in base_concepts: |
| self.add_knowledge(concept, properties, {}, 1.0, "base_initialization") |
| |
| def add_knowledge(self, concept: str, properties: Dict, relationships: Dict, |
| confidence: float, source: str): |
| """Add knowledge to the knowledge graph""" |
| node = KnowledgeNode(concept, properties, relationships, confidence, source) |
| self.knowledge_graph[concept] = node |
| logger.info(f"Added knowledge: {concept}") |
| |
| def reason_about_problem(self, problem: str, context: Dict = None) -> Dict: |
| """ |
| Main reasoning function with neural network integration and error handling |
| """ |
| start_time = time.time() |
| result = None |
| |
| try: |
| |
| neural_result, neural_error = self._safe_call( |
| self._neural_reasoning, |
| problem, |
| context or {} |
| ) |
| |
| if neural_error: |
| logger.warning(f"Neural reasoning failed: {neural_error}") |
| |
| if neural_result and neural_result.get('confidence', 0) >= self.confidence_threshold: |
| result = neural_result |
| self._update_reasoning_history(True) |
| else: |
| |
| symbolic_result, symbolic_error = self._safe_call( |
| self._symbolic_reasoning, |
| problem, |
| context or {} |
| ) |
| |
| if symbolic_error: |
| |
| if neural_error: |
| logger.error("Both neural and symbolic reasoning failed") |
| error_info = self._select_best_error(neural_error, symbolic_error) |
| return self._format_error_response(error_info, start_time) |
| |
| elif neural_result: |
| result = neural_result |
| else: |
| |
| return self._format_error_response(symbolic_error, start_time) |
| else: |
| |
| if (not neural_result or |
| symbolic_result.get('confidence', 0) > neural_result.get('confidence', 0)): |
| result = symbolic_result |
| self._update_reasoning_history(True) |
| else: |
| result = neural_result |
| self._update_reasoning_history(False) |
| |
| |
| result['processing_time'] = time.time() - start_time |
| result['self_assessment'] = self._generate_self_assessment(result) |
| |
| |
| logger.info(f"Successfully processed problem in {result['processing_time']:.2f}s") |
| |
| return result |
| |
| except Exception as e: |
| logger.critical(f"Unexpected error in reason_about_problem: {str(e)}", exc_info=True) |
| error_info = self._handle_generic_error(e, { |
| 'problem': problem[:100] + '...' if len(problem) > 100 else problem, |
| 'context_keys': list(context.keys()) if context else None |
| }) |
| return self._format_error_response(error_info, start_time) |
| |
| def _symbolic_reasoning(self, problem: str, context: Dict = None) -> Dict: |
| """ |
| Main reasoning function - analyzes problem and applies appropriate reasoning |
| with enhanced pattern recognition and learning |
| """ |
| logger.info(f"Starting reasoning about: {problem}") |
| |
| try: |
| |
| pattern_solution = self._apply_patterns(problem, context) |
| if pattern_solution and pattern_solution.get('confidence', 0) > 0.7: |
| logger.info("Successfully applied pattern-based solution") |
| return { |
| 'problem': problem, |
| 'solution': pattern_solution, |
| 'strategy': 'pattern_matching', |
| 'confidence': pattern_solution.get('confidence', 0.7) |
| } |
| |
| |
| problem_analysis = self._analyze_problem(problem, context) |
| |
| |
| selected_strategies = self._select_reasoning_strategies(problem_analysis) |
| reasoning_results = [] |
| |
| for strategy in selected_strategies: |
| try: |
| result = self._apply_reasoning_strategy(strategy, problem, problem_analysis) |
| if result: |
| reasoning_results.append(result) |
| except Exception as e: |
| logger.error(f"Error applying {strategy.value} reasoning: {str(e)}") |
| |
| if not reasoning_results: |
| logger.warning("No reasoning strategies produced results, using default") |
| reasoning_results.append(self._default_reasoning(problem, problem_analysis)) |
| |
| final_reasoning = self._synthesize_reasoning(reasoning_results) |
| solution = self._generate_solution(problem, final_reasoning) |
| |
| |
| if solution.get('confidence', 0) > 0.7: |
| self._learn_pattern(problem, solution, context) |
| |
| return { |
| 'problem': problem, |
| 'analysis': problem_analysis, |
| 'strategies_used': [s.value for s in selected_strategies], |
| 'reasoning_steps': final_reasoning, |
| 'solution': solution, |
| 'confidence': self._calculate_confidence(final_reasoning) |
| } |
| |
| except Exception as e: |
| logger.error(f"Error in reason_about_problem: {str(e)}") |
| return { |
| 'problem': problem, |
| 'error': str(e), |
| 'solution': { |
| 'primary_approach': 'Error occurred during reasoning', |
| 'confidence': 0.0 |
| }, |
| 'confidence': 0.0 |
| } |
|
|
| def _analyze_problem(self, problem: str, context: Dict = None) -> Dict: |
| """Analyze and decompose the problem""" |
| analysis = { |
| 'problem_type': self._classify_problem_type(problem), |
| 'key_concepts': self._extract_concepts(problem), |
| 'constraints': self._identify_constraints(problem), |
| 'goals': self._identify_goals(problem), |
| 'context': context or {}, |
| 'complexity': self._assess_complexity(problem), |
| 'domain': self._identify_domain(problem) |
| } |
| |
| |
| if self.neural_network: |
| enhanced_analysis = self._neural_reasoning(problem, analysis) |
| analysis.update(enhanced_analysis) |
| |
| return analysis |
| |
| def _classify_problem_type(self, problem: str) -> str: |
| """Classify the type of problem""" |
| problem_lower = problem.lower() |
| |
| if any(word in problem_lower for word in ['create', 'build', 'design', 'generate']): |
| return 'creative' |
| elif any(word in problem_lower for word in ['analyze', 'understand', 'explain']): |
| return 'analytical' |
| elif any(word in problem_lower for word in ['solve', 'find', 'calculate']): |
| return 'problem_solving' |
| elif any(word in problem_lower for word in ['predict', 'forecast', 'estimate']): |
| return 'predictive' |
| elif any(word in problem_lower for word in ['optimize', 'improve', 'enhance']): |
| return 'optimization' |
| else: |
| return 'general' |
| |
| def _extract_concepts(self, problem: str) -> List[str]: |
| """Extract key concepts from the problem""" |
| |
| words = re.findall(r'\b[a-zA-Z]+\b', problem.lower()) |
| |
| |
| stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} |
| concepts = [word for word in words if word not in stopwords and len(word) > 2] |
| |
| |
| known_concepts = [concept for concept in concepts if concept in self.knowledge_graph] |
| |
| return list(set(concepts)), known_concepts |
| |
| def _identify_constraints(self, problem: str) -> List[str]: |
| """Identify constraints in the problem""" |
| constraint_indicators = ['must', 'cannot', 'should not', 'limited', 'only', 'within', 'without'] |
| constraints = [] |
| |
| for indicator in constraint_indicators: |
| if indicator in problem.lower(): |
| |
| sentences = problem.split('.') |
| for sentence in sentences: |
| if indicator in sentence.lower(): |
| constraints.append(sentence.strip()) |
| |
| return constraints |
| |
| def _identify_goals(self, problem: str) -> List[str]: |
| """Identify goals in the problem""" |
| goal_indicators = ['want', 'need', 'goal', 'objective', 'aim', 'target', 'achieve'] |
| goals = [] |
| |
| for indicator in goal_indicators: |
| if indicator in problem.lower(): |
| sentences = problem.split('.') |
| for sentence in sentences: |
| if indicator in sentence.lower(): |
| goals.append(sentence.strip()) |
| |
| return goals |
| |
| def _assess_complexity(self, problem: str) -> str: |
| """Assess problem complexity""" |
| word_count = len(problem.split()) |
| concept_count = len(self._extract_concepts(problem)[0]) |
| |
| if word_count < 10 and concept_count < 3: |
| return 'simple' |
| elif word_count < 50 and concept_count < 10: |
| return 'moderate' |
| else: |
| return 'complex' |
| |
| def _identify_domain(self, problem: str) -> str: |
| """Identify the domain of the problem""" |
| domain_keywords = { |
| 'technology': ['code', 'program', 'software', 'computer', 'algorithm'], |
| 'science': ['experiment', 'hypothesis', 'theory', 'research', 'data'], |
| 'business': ['profit', 'market', 'customer', 'revenue', 'strategy'], |
| 'mathematics': ['equation', 'calculate', 'formula', 'number', 'solve'], |
| 'creative': ['design', 'art', 'creative', 'innovative', 'original'] |
| } |
| |
| problem_lower = problem.lower() |
| for domain, keywords in domain_keywords.items(): |
| if any(keyword in problem_lower for keyword in keywords): |
| return domain |
| |
| return 'general' |
| |
| def _select_reasoning_strategies(self, analysis: Dict) -> List[ReasoningType]: |
| """Select appropriate reasoning strategies based on problem analysis""" |
| strategies = [] |
| problem_type = analysis['problem_type'] |
| domain = analysis['domain'] |
| complexity = analysis['complexity'] |
| |
| |
| if problem_type == 'creative': |
| strategies.extend([ReasoningType.CREATIVE, ReasoningType.ANALOGICAL]) |
| elif problem_type == 'analytical': |
| strategies.extend([ReasoningType.DEDUCTIVE, ReasoningType.INDUCTIVE]) |
| elif problem_type == 'problem_solving': |
| strategies.extend([ReasoningType.LOGICAL, ReasoningType.CAUSAL]) |
| elif problem_type == 'predictive': |
| strategies.extend([ReasoningType.INDUCTIVE, ReasoningType.TEMPORAL]) |
| |
| |
| if domain == 'technology': |
| strategies.append(ReasoningType.LOGICAL) |
| elif domain == 'science': |
| strategies.extend([ReasoningType.CAUSAL, ReasoningType.ABDUCTIVE]) |
| |
| |
| if not strategies: |
| strategies = [ReasoningType.DEDUCTIVE, ReasoningType.LOGICAL] |
| |
| return list(set(strategies)) |
| |
| def _apply_reasoning_strategy(self, strategy: ReasoningType, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply a specific reasoning strategy""" |
| logger.info(f"Applying {strategy.value} reasoning") |
| |
| if strategy in self.reasoning_strategies: |
| return self.reasoning_strategies[strategy](problem, analysis) |
| else: |
| return self._default_reasoning(problem, analysis) |
| |
| def _deductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply deductive reasoning (general to specific)""" |
| |
| relevant_knowledge = self._find_relevant_knowledge(analysis['key_concepts'][0]) |
| |
| premise = f"General principle: {relevant_knowledge}" |
| conclusion = f"Applied to specific case: {problem}" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.DEDUCTIVE, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.8, |
| evidence=[relevant_knowledge], |
| assumptions=["General principle applies to specific case"] |
| ) |
| |
| def _inductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply inductive reasoning (specific to general)""" |
| |
| patterns = self._find_patterns(analysis['key_concepts'][0]) |
| |
| premise = f"Observed patterns: {patterns}" |
| conclusion = f"General rule: Similar problems follow this pattern" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.INDUCTIVE, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.7, |
| evidence=patterns, |
| assumptions=["Past patterns predict future behavior"] |
| ) |
| |
| def _abductive_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply abductive reasoning (best explanation)""" |
| |
| possible_explanations = self._generate_explanations(problem, analysis) |
| best_explanation = max(possible_explanations, key=lambda x: x.get('likelihood', 0)) |
| |
| premise = f"Observed: {problem}" |
| conclusion = f"Best explanation: {best_explanation['explanation']}" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.ABDUCTIVE, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=best_explanation.get('likelihood', 0.6), |
| evidence=[best_explanation['explanation']], |
| assumptions=["Most likely explanation is correct"] |
| ) |
| |
| def _analogical_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply analogical reasoning (pattern matching)""" |
| |
| analogies = self._find_analogies(analysis['key_concepts'][0]) |
| |
| premise = f"Similar situation: {analogies}" |
| conclusion = f"By analogy: Apply similar solution approach" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.ANALOGICAL, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.7, |
| evidence=analogies, |
| assumptions=["Similar problems have similar solutions"] |
| ) |
| |
| def _causal_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply causal reasoning (cause and effect)""" |
| |
| causes = self._identify_causes(problem, analysis) |
| effects = self._predict_effects(causes) |
| |
| premise = f"Causes: {causes}" |
| conclusion = f"Expected effects: {effects}" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.CAUSAL, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.8, |
| evidence=causes + effects, |
| assumptions=["Causal relationships are stable"] |
| ) |
| |
| def _temporal_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply temporal reasoning (time-based)""" |
| |
| temporal_aspects = self._analyze_temporal_aspects(problem) |
| |
| premise = f"Temporal context: {temporal_aspects}" |
| conclusion = f"Time-based implications: Consider sequence and timing" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.TEMPORAL, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.7, |
| evidence=temporal_aspects, |
| assumptions=["Time sequence affects outcomes"] |
| ) |
| |
| def _spatial_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply spatial reasoning (space/location)""" |
| |
| spatial_aspects = self._analyze_spatial_aspects(problem) |
| |
| premise = f"Spatial context: {spatial_aspects}" |
| conclusion = f"Spatial implications: Consider location and arrangement" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.SPATIAL, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.6, |
| evidence=spatial_aspects, |
| assumptions=["Spatial arrangement affects function"] |
| ) |
| |
| def _logical_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply logical reasoning (formal logic)""" |
| |
| logical_structure = self._analyze_logical_structure(problem) |
| |
| premise = f"Logical structure: {logical_structure}" |
| conclusion = f"Logical conclusion: Apply formal reasoning rules" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.LOGICAL, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.9, |
| evidence=[logical_structure], |
| assumptions=["Logical rules are consistent"] |
| ) |
| |
| def _analyze_logical_structure(self, problem: str) -> str: |
| """ |
| Analyze the logical structure of a problem statement. |
| |
| Args: |
| problem: The problem statement to analyze |
| |
| Returns: |
| str: Description of the logical structure |
| """ |
| try: |
| |
| if any(connective in problem.lower() for connective in ['if', 'then', 'implies']): |
| return "Implication (if-then) structure detected" |
| elif any(connective in problem.lower() for connective in ['and', 'both', 'also']): |
| return "Conjunction (AND) structure detected" |
| elif any(connective in problem.lower() for connective in ['or', 'either']): |
| return "Disjunction (OR) structure detected" |
| elif any(connective in problem.lower() for connective in ['not', 'no ', 'never']): |
| return "Negation (NOT) structure detected" |
| elif any(connective in problem.lower() for connective in ['all', 'every', 'any']): |
| return "Universal quantification (FOR ALL) detected" |
| elif any(connective in problem.lower() for connective in ['some', 'there exists', 'at least one']): |
| return "Existential quantification (THERE EXISTS) detected" |
| else: |
| return "Simple proposition detected" |
| |
| except Exception as e: |
| logger.warning(f"Error analyzing logical structure: {str(e)}") |
| return "Unable to determine logical structure" |
| |
| def _creative_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Apply creative reasoning (novel solutions)""" |
| |
| creative_ideas = self._generate_creative_ideas(problem, analysis) |
| |
| premise = f"Creative exploration: {problem}" |
| conclusion = f"Novel approaches: {creative_ideas}" |
| |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.CREATIVE, |
| premise=premise, |
| conclusion=conclusion, |
| confidence=0.6, |
| evidence=creative_ideas, |
| assumptions=["Novel approaches may be effective"] |
| ) |
| |
| def _default_reasoning(self, problem: str, analysis: Dict) -> ReasoningStep: |
| """Default reasoning when no specific strategy applies""" |
| return ReasoningStep( |
| step_id=len(self.reasoning_history), |
| reasoning_type=ReasoningType.LOGICAL, |
| premise=f"Problem: {problem}", |
| conclusion="Apply general problem-solving approach", |
| confidence=0.5, |
| evidence=[problem], |
| assumptions=["General approach is applicable"] |
| ) |
| |
| def _synthesize_reasoning(self, reasoning_results: List[ReasoningStep]) -> List[ReasoningStep]: |
| """Integrate and synthesize multiple reasoning results""" |
| |
| weighted_results = [] |
| |
| for result in reasoning_results: |
| weight = self.strategy_weights.get(result.reasoning_type, 1.0) |
| result.confidence *= weight |
| weighted_results.append(result) |
| |
| |
| weighted_results.sort(key=lambda x: x.confidence, reverse=True) |
| |
| |
| self._update_strategy_weights(weighted_results) |
| |
| return weighted_results |
| |
| def _generate_solution(self, problem: str, reasoning_steps: List[ReasoningStep]) -> Dict: |
| """Generate final solution based on reasoning""" |
| if not reasoning_steps: |
| return {"solution": "No solution found", "confidence": 0.0} |
| |
| |
| best_step = reasoning_steps[0] |
| all_conclusions = [step.conclusion for step in reasoning_steps] |
| |
| solution = { |
| "primary_approach": best_step.conclusion, |
| "reasoning_type": best_step.reasoning_type.value, |
| "alternative_approaches": all_conclusions[1:3], |
| "confidence": best_step.confidence, |
| "supporting_evidence": best_step.evidence, |
| "assumptions": best_step.assumptions |
| } |
| |
| |
| if self.openai_client: |
| enhanced_solution = self._enhance_solution_with_llm(problem, solution) |
| solution.update(enhanced_solution) |
| |
| return solution |
| |
| def _call_llm(self, prompt: str, max_retries: int = 3) -> str: |
| """Call the language model with retry logic and error handling""" |
| if not self.openai_client: |
| logger.warning("LLM client not available") |
| return "" |
| |
| for attempt in range(max_retries): |
| try: |
| |
| response = self.openai_client.generate_text( |
| prompt=prompt, |
| model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", |
| max_tokens=1000, |
| temperature=0.5 |
| ) |
| return response.strip() |
| except Exception as e: |
| if attempt == max_retries - 1: |
| logger.error(f"LLM call failed after {max_retries} attempts: {str(e)}") |
| else: |
| logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)}") |
| return "" |
| |
| def _enhance_analysis_with_llm(self, problem: str, analysis: Dict) -> Dict: |
| """Enhance problem analysis using LLM""" |
| try: |
| prompt = f""" |
| Analyze this problem and enhance the analysis: |
| |
| Problem: {problem} |
| |
| Current Analysis: |
| {json.dumps(analysis, indent=2)} |
| |
| Provide enhanced analysis focusing on: |
| 1. Hidden assumptions and biases |
| 2. Alternative approaches |
| 3. Potential risks |
| 4. Key success factors |
| |
| Return as JSON with fields: enhanced_analysis, assumptions, alternatives, risks, success_factors |
| """ |
| |
| response = self._call_llm(prompt) |
| if not response: |
| return {} |
| |
| |
| try: |
| return json.loads(response) |
| except json.JSONDecodeError: |
| |
| return {'enhanced_analysis': response} |
| |
| except Exception as e: |
| logger.error(f"Error in _enhance_analysis_with_llm: {str(e)}") |
| return {} |
| |
| def _enhance_solution_with_llm(self, problem: str, solution: Dict) -> Dict: |
| """Enhance solution using LLM""" |
| try: |
| prompt = f""" |
| Enhance this solution with implementation details: |
| |
| Problem: {problem} |
| |
| Solution: |
| {json.dumps(solution, indent=2)} |
| |
| Provide: |
| 1. Step-by-step implementation |
| 2. Code examples if applicable |
| 3. Potential pitfalls |
| 4. Performance considerations |
| |
| Return as JSON with fields: detailed_solution, steps, code_examples, pitfalls, performance |
| """ |
| |
| response = self._call_llm(prompt) |
| if not response: |
| return {} |
| |
| try: |
| return json.loads(response) |
| except json.JSONDecodeError: |
| return {'detailed_solution': response} |
| |
| except Exception as e: |
| logger.error(f"Error in _enhance_solution_with_llm: {str(e)}") |
| return {} |
|
|
| |
| def _find_relevant_knowledge(self, concepts: List[str]) -> str: |
| """Find relevant knowledge from knowledge graph""" |
| relevant = [] |
| for concept in concepts: |
| if concept in self.knowledge_graph: |
| node = self.knowledge_graph[concept] |
| relevant.append(f"{concept}: {node.properties}") |
| return str(relevant) if relevant else "No specific knowledge found" |
| |
| def _find_patterns(self, concepts: List[str], min_support: float = 0.3) -> List[Dict]: |
| """ |
| Find relevant patterns in the knowledge graph based on concepts |
| |
| Args: |
| concepts: List of concepts to find patterns for |
| min_support: Minimum support threshold for pattern matching (0.0 to 1.0) |
| |
| Returns: |
| List of matching patterns with their confidence scores |
| """ |
| if not concepts: |
| return [] |
| |
| |
| all_patterns = [ |
| node for node in self.knowledge_graph.values() |
| if node.properties.get('type') == 'pattern' |
| ] |
| |
| if not all_patterns: |
| return [] |
| |
| |
| matching_patterns = [] |
| |
| for pattern_node in all_patterns: |
| pattern_concepts = set(pattern_node.properties.get('concepts', [])) |
| if not pattern_concepts: |
| continue |
| |
| |
| input_set = set(concepts) |
| intersection = len(input_set.intersection(pattern_concepts)) |
| union = len(input_set.union(pattern_concepts)) |
| similarity = intersection / union if union > 0 else 0 |
| |
| if similarity >= min_support: |
| matching_patterns.append({ |
| 'pattern_id': pattern_node.concept, |
| 'similarity': similarity, |
| 'confidence': pattern_node.confidence, |
| 'concepts': list(pattern_concepts), |
| 'solution': pattern_node.properties.get('solution', {}) |
| }) |
| |
| |
| matching_patterns.sort( |
| key=lambda x: (x['similarity'] * 0.6 + x['confidence'] * 0.4), |
| reverse=True |
| ) |
| |
| return matching_patterns |
| |
| def _learn_pattern(self, problem: str, solution: Dict, context: Dict = None) -> bool: |
| """ |
| Learn a new pattern from a problem-solution pair |
| |
| Args: |
| problem: The problem text |
| solution: The solution dictionary |
| context: Additional context about the problem |
| |
| Returns: |
| bool: True if a new pattern was learned, False otherwise |
| """ |
| try: |
| |
| concepts, _ = self._extract_concepts(problem) |
| |
| if not concepts or len(concepts) < 2: |
| return False |
| |
| |
| existing_patterns = self._find_patterns(concepts, min_support=0.7) |
| if existing_patterns and existing_patterns[0]['similarity'] > 0.8: |
| |
| pattern_id = existing_patterns[0]['pattern_id'] |
| if pattern_id in self.knowledge_graph: |
| node = self.knowledge_graph[pattern_id] |
| node.confidence = min(1.0, node.confidence + 0.05) |
| node.properties['last_used'] = datetime.datetime.now().isoformat() |
| node.properties['usage_count'] = node.properties.get('usage_count', 0) + 1 |
| logger.info(f"Updated pattern confidence for {pattern_id}") |
| return False |
| |
| |
| pattern_id = f"pattern_{len([n for n in self.knowledge_graph.values() if n.properties.get('type') == 'pattern'])}" |
| |
| |
| self.add_knowledge( |
| concept=pattern_id, |
| properties={ |
| 'type': 'pattern', |
| 'concepts': concepts, |
| 'solution': solution, |
| 'created_at': datetime.datetime.now().isoformat(), |
| 'usage_count': 1, |
| 'last_used': datetime.datetime.now().isoformat(), |
| 'context': context or {} |
| }, |
| relationships={}, |
| confidence=0.5, |
| source='pattern_learning' |
| ) |
| |
| logger.info(f"Learned new pattern: {pattern_id} with {len(concepts)} concepts") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error learning pattern: {str(e)}") |
| return False |
| |
| def _apply_patterns(self, problem: str, context: Dict = None) -> Optional[Dict]: |
| """ |
| Try to apply learned patterns to solve the problem |
| |
| Args: |
| problem: The problem to solve |
| context: Additional context about the problem |
| |
| Returns: |
| Dict containing the solution if a matching pattern is found, None otherwise |
| """ |
| try: |
| |
| concepts, _ = self._extract_concepts(problem) |
| |
| if not concepts: |
| return None |
| |
| |
| patterns = self._find_patterns(concepts) |
| |
| |
| patterns = [p for p in patterns if p['confidence'] > 0.6 and p['similarity'] > 0.5] |
| |
| if not patterns: |
| return None |
| |
| |
| best_pattern = max( |
| patterns, |
| key=lambda p: p['similarity'] * p['confidence'] |
| ) |
| |
| |
| if best_pattern['pattern_id'] in self.knowledge_graph: |
| node = self.knowledge_graph[best_pattern['pattern_id']] |
| node.confidence = min(1.0, node.confidence + 0.02) |
| node.properties['last_used'] = datetime.datetime.now().isoformat() |
| node.properties['usage_count'] = node.properties.get('usage_count', 0) + 1 |
| |
| logger.info(f"Applied pattern {best_pattern['pattern_id']} with " |
| f"confidence {best_pattern['confidence']:.2f}") |
| |
| |
| return best_pattern['solution'] |
| |
| except Exception as e: |
| logger.error(f"Error applying patterns: {str(e)}") |
| return None |
|
|
| def _neural_reasoning(self, problem: str, context: Dict = None) -> Dict: |
| """Use neural network for advanced reasoning""" |
| if not self.neural_network: |
| return {"error": "Neural network not initialized", "confidence": 0.0} |
| |
| try: |
| |
| input_tensor = self._prepare_neural_input(problem, context) |
| |
| |
| with torch.no_grad(): |
| output = self.neural_network(input_tensor) |
| |
| |
| reasoning_result = self._process_neural_output(output) |
| |
| |
| self._update_neural_weights(reasoning_result) |
| |
| return reasoning_result |
| |
| except Exception as e: |
| logger.error(f"Neural reasoning failed: {str(e)}") |
| return {"error": str(e), "confidence": 0.0} |
| |
| def _prepare_neural_input(self, problem: str, context: Dict) -> torch.Tensor: |
| """Convert problem and context to neural network input""" |
| |
| tokens = self._tokenize_text(problem) |
| |
| |
| if context: |
| context_tokens = self._tokenize_text(json.dumps(context)) |
| tokens = tokens + [self.sep_token] + context_tokens |
| |
| |
| input_tensor = torch.tensor([tokens], dtype=torch.long) |
| |
| |
| positions = torch.arange(0, input_tensor.size(1), dtype=torch.long).unsqueeze(0) |
| pos_encoding = self.positional_encoder(positions) |
| |
| return input_tensor + pos_encoding |
| |
| def _process_neural_output(self, output: torch.Tensor) -> Dict: |
| """Convert neural network output to reasoning result""" |
| |
| strategy_logits = output['strategy_logits'] |
| strategy_probs = torch.softmax(strategy_logits, dim=-1) |
| best_strategy_idx = torch.argmax(strategy_probs).item() |
| |
| |
| confidence = output['confidence'].sigmoid().item() |
| reasoning_steps = self._decode_reasoning_steps(output['reasoning_steps']) |
| |
| return { |
| 'strategy': ReasoningType(best_strategy_idx).name, |
| 'confidence': confidence, |
| 'reasoning_steps': reasoning_steps, |
| 'raw_output': output |
| } |
| |
| def _update_neural_weights(self, result: Dict): |
| """Update neural network weights based on reasoning results""" |
| if result.get('confidence', 0) > self.confidence_threshold: |
| |
| self.learning_rate *= 1.05 |
| else: |
| |
| self.learning_rate *= 0.95 |
| |
| |
| self.learning_rate = max(1e-6, min(0.1, self.learning_rate)) |
| |
| |
| if hasattr(self, 'optimizer'): |
| for param_group in self.optimizer.param_groups: |
| param_group['lr'] = self.learning_rate |
|
|
| def _update_strategy_weights(self, results: List[ReasoningStep]): |
| """Update strategy weights based on performance with enhanced learning |
| |
| Implements a more sophisticated weight update mechanism that considers: |
| - Recent performance history |
| - Confidence levels |
| - Strategy diversity |
| - Long-term vs short-term performance |
| """ |
| if not results: |
| return |
| |
| |
| performance_metrics = {} |
| |
| |
| for result in results: |
| strategy = result.reasoning_type |
| if strategy not in performance_metrics: |
| performance_metrics[strategy] = { |
| 'total_confidence': 0, |
| 'count': 0, |
| 'successes': 0, |
| 'recent_successes': [] |
| } |
| |
| metrics = performance_metrics[strategy] |
| metrics['total_confidence'] += result.confidence |
| metrics['count'] += 1 |
| |
| |
| if result.confidence > 0.7: |
| metrics['successes'] += 1 |
| metrics['recent_successes'].append(1) |
| else: |
| metrics['recent_successes'].append(0) |
| |
| |
| metrics['recent_successes'] = metrics['recent_successes'][-10:] |
| |
| |
| for strategy, metrics in performance_metrics.items(): |
| if metrics['count'] == 0: |
| continue |
| |
| avg_confidence = metrics['total_confidence'] / metrics['count'] |
| success_rate = metrics['successes'] / metrics['count'] |
| |
| |
| recent_success_rate = sum(metrics['recent_successes']) / len(metrics['recent_successes']) if metrics['recent_successes'] else 0 |
| |
| |
| |
| performance_score = (recent_success_rate * 0.6) + (success_rate * 0.4) |
| |
| |
| |
| max_adjustment = 0.2 |
| adjustment = (performance_score - 0.5) * 2 * max_adjustment |
| |
| |
| new_weight = self.strategy_weights.get(strategy, 1.0) * (1 + adjustment) |
| |
| |
| self.strategy_weights[strategy] = max(0.1, min(10.0, new_weight)) |
| |
| logger.debug(f"Updated {strategy.value} weight to {self.strategy_weights[strategy]:.2f} " |
| f"(performance: {performance_score:.2f}, adjustment: {adjustment:+.2f})") |
| |
| |
| self._maintain_strategy_diversity() |
| |
| def _maintain_strategy_diversity(self): |
| """Ensure no single strategy dominates by maintaining minimum weights""" |
| min_weight = 0.1 |
| max_weight = 10.0 |
| |
| |
| total_weight = sum(self.strategy_weights.values()) |
| num_strategies = len(self.strategy_weights) |
| |
| |
| max_strategy = max(self.strategy_weights, key=self.strategy_weights.get) |
| max_value = self.strategy_weights[max_strategy] |
| |
| if max_value / total_weight > 0.5: |
| |
| self.strategy_weights[max_strategy] = total_weight * 0.4 |
| |
| |
| remaining_weight = total_weight - self.strategy_weights[max_strategy] |
| other_strategies = [s for s in self.strategy_weights if s != max_strategy] |
| |
| if other_strategies: |
| total_other_weight = sum(self.strategy_weights[s] for s in other_strategies) |
| if total_other_weight > 0: |
| for strategy in other_strategies: |
| proportion = self.strategy_weights[strategy] / total_other_weight |
| self.strategy_weights[strategy] = proportion * remaining_weight |
| |
| logger.info(f"Reduced dominance of {max_strategy.value} strategy to maintain diversity") |
|
|
| def update_knowledge(self, problem: str = None, solution: Dict = None, context: Dict = None, |
| pattern: str = None, examples: List[Dict] = None, confidence: float = 0.5) -> bool: |
| """ |
| Update the knowledge base with new information. |
| |
| Enhanced with better learning mechanisms: |
| - Tracks usage and success of different knowledge items |
| - Updates confidence based on usage patterns |
| - Forgets or demotes unused knowledge |
| """ |
| try: |
| |
| if problem and solution: |
| self._track_knowledge_usage(problem, solution, context) |
| |
| |
| if problem is not None and solution is not None: |
| return self._update_knowledge_from_problem(problem, solution) |
| |
| |
| elif pattern is not None and examples is not None: |
| return self._update_knowledge_from_pattern(pattern, examples, confidence) |
| |
| else: |
| logger.warning("Invalid arguments to update_knowledge. Need either (problem and solution) or (pattern and examples).") |
| return False |
| |
| except Exception as e: |
| logger.error(f"Error updating knowledge: {str(e)}") |
| return False |
| |
| def _track_knowledge_usage(self, problem: str, solution: Dict, context: Dict = None): |
| """Track how knowledge is being used to improve learning""" |
| |
| concepts, _ = self._extract_concepts(problem) |
| |
| |
| for concept in concepts: |
| if concept in self.knowledge_graph: |
| node = self.knowledge_graph[concept] |
| |
| |
| if '_usage' not in node.properties: |
| node.properties['_usage'] = { |
| 'count': 0, |
| 'last_used': datetime.datetime.now().isoformat(), |
| 'success_count': 0, |
| 'recent_uses': [] |
| } |
| |
| |
| usage = node.properties['_usage'] |
| usage['count'] += 1 |
| usage['last_used'] = datetime.datetime.now().isoformat() |
| |
| |
| if solution.get('confidence', 0) > 0.7: |
| usage['success_count'] += 1 |
| usage['recent_uses'].append(1) |
| else: |
| usage['recent_uses'].append(0) |
| |
| |
| usage['recent_uses'] = usage['recent_uses'][-10:] |
| |
| |
| success_rate = usage['success_count'] / usage['count'] if usage['count'] > 0 else 0 |
| recent_success_rate = sum(usage['recent_uses']) / len(usage['recent_uses']) if usage['recent_uses'] else 0 |
| |
| |
| |
| new_confidence = (recent_success_rate * 0.7) + (success_rate * 0.3) |
| node.confidence = max(0.1, min(1.0, new_confidence)) |
| |
| |
| if abs(node.confidence - new_confidence) > 0.1: |
| logger.info(f"Updated confidence for '{concept}' to {node.confidence:.2f} " |
| f"(success rate: {success_rate:.2f}, recent: {recent_success_rate:.2f})") |
| |
| |
| if random.random() < 0.05: |
| self._cleanup_unused_knowledge() |
| |
| def _cleanup_unused_knowledge(self): |
| """Remove or demote unused knowledge to keep the knowledge base relevant""" |
| current_time = datetime.datetime.now() |
| max_age = datetime.timedelta(days=30) |
| |
| concepts_to_remove = [] |
| |
| for concept, node in list(self.knowledge_graph.items()): |
| |
| if node.source == 'base_initialization': |
| continue |
| |
| usage = node.properties.get('_usage', {}) |
| last_used = usage.get('last_used') |
| |
| if last_used: |
| try: |
| last_used = datetime.datetime.fromisoformat(last_used) |
| age = current_time - last_used |
| |
| |
| if age > max_age and node.confidence < 0.3: |
| concepts_to_remove.append(concept) |
| |
| elif age > max_age: |
| node.confidence *= 0.8 |
| except (ValueError, TypeError): |
| continue |
| |
| |
| for concept in concepts_to_remove: |
| del self.knowledge_graph[concept] |
| logger.info(f"Removed unused knowledge: {concept}") |
| |
| return len(concepts_to_remove) |
|
|
| def _update_knowledge_from_reasoning(self, problem: str, solution: Dict, reasoning: List[ReasoningStep]): |
| """Update knowledge base from reasoning experience""" |
| |
| if solution.get('confidence', 0) > 0.7: |
| new_concept = f"problem_solution_{len(self.knowledge_graph)}" |
| properties = { |
| "problem_type": solution.get('reasoning_type', 'unknown'), |
| "solution_approach": solution.get('primary_approach', ''), |
| "success_rate": solution.get('confidence', 0) |
| } |
| |
| self.add_knowledge(new_concept, properties, {}, solution.get('confidence', 0), "reasoning_experience") |
| |
| def _update_knowledge_from_problem(self, problem: str, solution: Dict) -> bool: |
| """Update knowledge base from a problem-solution pair (legacy method).""" |
| |
| concepts, _ = self._extract_concepts(problem) |
| |
| |
| for concept in concepts: |
| if concept not in self.knowledge_graph: |
| |
| properties = { |
| 'type': 'concept', |
| 'first_seen': datetime.datetime.now().isoformat(), |
| 'occurrences': 1 |
| } |
| self.add_knowledge( |
| concept=concept, |
| properties=properties, |
| relationships={}, |
| confidence=0.7, |
| source='interaction' |
| ) |
| else: |
| |
| node = self.knowledge_graph[concept] |
| node.properties['last_seen'] = datetime.datetime.now().isoformat() |
| node.properties['occurrences'] = node.properties.get('occurrences', 1) + 1 |
| |
| |
| solution_id = f"solution_{len(self.knowledge_graph)}" |
| solution_properties = { |
| 'type': 'solution', |
| 'problem': problem, |
| 'timestamp': datetime.datetime.now().isoformat(), |
| 'confidence': solution.get('confidence', 0.5), |
| 'success': solution.get('success', False) |
| } |
| |
| |
| relationships = { |
| 'addresses': concepts, |
| 'related_to': list(set(concepts)) |
| } |
| |
| self.add_knowledge( |
| concept=solution_id, |
| properties=solution_properties, |
| relationships=relationships, |
| confidence=solution.get('confidence', 0.5), |
| source='interaction' |
| ) |
| |
| |
| if 'reasoning_type' in solution: |
| try: |
| reasoning_type = ReasoningType(solution['reasoning_type']) |
| if solution.get('success', False): |
| self.strategy_weights[reasoning_type] = min(10.0, |
| self.strategy_weights.get(reasoning_type, 1.0) * 1.1) |
| else: |
| self.strategy_weights[reasoning_type] = max(0.1, |
| self.strategy_weights.get(reasoning_type, 1.0) * 0.9) |
| except ValueError: |
| pass |
| |
| logger.info(f"Updated knowledge base with new information from problem: {problem[:50]}...") |
| return True |
| |
| def _update_knowledge_from_pattern(self, pattern: str, examples: List[Dict], confidence: float) -> bool: |
| """Update knowledge base from a pattern and examples.""" |
| |
| if pattern not in self.knowledge_graph: |
| properties = { |
| 'type': 'pattern', |
| 'first_seen': datetime.datetime.now().isoformat(), |
| 'example_count': len(examples), |
| 'last_updated': datetime.datetime.now().isoformat() |
| } |
| self.add_knowledge( |
| concept=pattern, |
| properties=properties, |
| relationships={}, |
| confidence=confidence, |
| source='learning' |
| ) |
| else: |
| |
| node = self.knowledge_graph[pattern] |
| node.properties['last_updated'] = datetime.datetime.now().isoformat() |
| node.properties['example_count'] = len(examples) |
| node.confidence = max(node.confidence, confidence) |
| |
| |
| for example in examples: |
| solution_id = f"example_{len(self.knowledge_graph)}" |
| solution_properties = { |
| 'type': 'example_solution', |
| 'content': str(example), |
| 'timestamp': datetime.datetime.now().isoformat(), |
| 'confidence': confidence, |
| 'source': 'learning' |
| } |
| |
| relationships = { |
| 'exemplifies': [pattern], |
| 'related_to': [pattern] |
| } |
| |
| self.add_knowledge( |
| concept=solution_id, |
| properties=solution_properties, |
| relationships=relationships, |
| confidence=confidence, |
| source='learning' |
| ) |
| |
| logger.info(f"Updated knowledge base with new pattern: {pattern[:50]}...") |
| return True |
| |
| def _calculate_confidence(self, reasoning_steps: List[Union[Dict, ReasoningStep]]) -> float: |
| """ |
| Calculate overall confidence score with enhanced calibration |
| |
| Args: |
| reasoning_steps: List of reasoning steps (either dicts or ReasoningStep objects) |
| |
| Returns: |
| float: Calibrated confidence score between 0.0 and 1.0 |
| """ |
| if not reasoning_steps: |
| return 0.0 |
| |
| |
| steps = [] |
| for step in reasoning_steps: |
| if isinstance(step, ReasoningStep): |
| steps.append({ |
| 'confidence': step.confidence, |
| 'reasoning_type': step.reasoning_type.value, |
| 'evidence': step.evidence, |
| 'assumptions': step.assumptions |
| }) |
| else: |
| steps.append(step) |
| |
| |
| weights = [1.0] * len(steps) |
| total_weight = sum(weights) |
| |
| if total_weight == 0: |
| return 0.0 |
| |
| base_confidence = sum(step.get('confidence', 0) * weight |
| for step, weight in zip(steps, weights)) / total_weight |
| |
| |
| calibrated_confidence = self._apply_calibration_factors( |
| base_confidence, |
| steps |
| ) |
| |
| |
| return max(0.0, min(1.0, calibrated_confidence)) |
| |
| def _apply_calibration_factors( |
| self, |
| base_confidence: float, |
| reasoning_steps: List[Dict] |
| ) -> float: |
| """ |
| Apply various calibration factors to the base confidence |
| |
| Args: |
| base_confidence: The initial confidence score |
| reasoning_steps: List of reasoning steps |
| |
| Returns: |
| float: Calibrated confidence score |
| """ |
| calibrated = base_confidence |
| |
| |
| step_confidences = [step.get('confidence', 0.0) for step in reasoning_steps] |
| if step_confidences: |
| std_dev = np.std(step_confidences) |
| |
| if std_dev > 0.2: |
| calibrated *= 0.9 |
| |
| |
| strategies = [step.get('strategy') for step in reasoning_steps] |
| unique_strategies = len(set(strategies)) |
| if unique_strategies > 1: |
| |
| calibrated *= 1.05 |
| |
| |
| knowledge_coverage = self._calculate_knowledge_coverage(reasoning_steps) |
| calibrated *= (0.7 + 0.3 * knowledge_coverage) |
| |
| |
| recent_success_rate = self._get_recent_success_rate() |
| calibrated *= (0.8 + 0.2 * recent_success_rate) |
| |
| return calibrated |
| |
| def _calculate_knowledge_coverage(self, reasoning_steps: List[Union[Dict, ReasoningStep]]) -> float: |
| """ |
| Calculate how well the reasoning is covered by existing knowledge |
| |
| Args: |
| reasoning_steps: List of reasoning steps (either dicts or ReasoningStep objects) |
| |
| Returns: |
| float: Coverage score between 0.0 and 1.0 |
| """ |
| if not reasoning_steps: |
| return 0.0 |
| |
| total_concepts = 0 |
| covered_concepts = 0 |
| |
| for step in reasoning_steps: |
| |
| if isinstance(step, ReasoningStep): |
| concepts = step.evidence + step.assumptions |
| else: |
| concepts = step.get('concepts', []) + step.get('evidence', []) + step.get('assumptions', []) |
| |
| total_concepts += len(concepts) |
| |
| |
| for concept in concepts: |
| if concept in self.knowledge_graph: |
| covered_concepts += 1 |
| |
| if total_concepts == 0: |
| return 1.0 |
| |
| return covered_concepts / total_concepts |
| |
| def _get_recent_success_rate(self, window_size: int = 10) -> float: |
| """ |
| Calculate the success rate of recent reasoning attempts |
| |
| Args: |
| window_size: Number of recent attempts to consider |
| |
| Returns: |
| float: Success rate between 0.0 and 1.0 |
| """ |
| if not hasattr(self, '_reasoning_history'): |
| self._reasoning_history = [] |
| |
| |
| recent_history = self._reasoning_history[-window_size:] |
| if not recent_history: |
| return 0.7 |
| |
| |
| success_count = sum(1 for _, success in recent_history if success) |
| return success_count / len(recent_history) |
| |
| def _update_reasoning_history(self, success: bool): |
| """ |
| Update the reasoning history with the latest result |
| |
| Args: |
| success: Whether the reasoning was successful |
| """ |
| if not hasattr(self, '_reasoning_history'): |
| self._reasoning_history = [] |
| |
| |
| self._reasoning_history.append((datetime.datetime.now(), success)) |
| |
| |
| if len(self._reasoning_history) > 100: |
| self._reasoning_history = self._reasoning_history[-100:] |
|
|
| def _generate_self_assessment(self, result: Dict) -> Dict: |
| """ |
| Generate a self-assessment of the reasoning process |
| |
| Args: |
| result: The reasoning result to assess |
| |
| Returns: |
| Dict containing self-assessment information |
| """ |
| assessment = { |
| 'confidence': result.get('confidence', 0.0), |
| 'issues': [], |
| 'strengths': [], |
| 'suggestions': [] |
| } |
| |
| |
| if assessment['confidence'] < 0.3: |
| assessment['issues'].append("Low confidence in the solution") |
| assessment['suggestions'].append("Try providing more context or rephrasing the problem") |
| |
| |
| process_time = result.get('processing_time', 0) |
| if process_time > 5.0: |
| assessment['issues'].append("Reasoning took longer than expected") |
| assessment['suggestions'].append("Consider simplifying the problem or using a different approach") |
| |
| |
| if hasattr(self, 'knowledge_graph'): |
| coverage = self._calculate_knowledge_coverage(result.get('reasoning_steps', [])) |
| if coverage < 0.3: |
| assessment['issues'].append("Limited knowledge coverage for this problem") |
| assessment['suggestions'].append("Consider adding more relevant knowledge to the knowledge base") |
| |
| |
| if assessment['confidence'] > 0.7: |
| assessment['strengths'].append("High confidence in the solution") |
| if process_time < 1.0: |
| assessment['strengths'].append("Efficient reasoning process") |
| if coverage > 0.7: |
| assessment['strengths'].append("Good knowledge coverage") |
| |
| return assessment |
|
|
| def _handle_error(self, error_type: str, error: Exception, context: Dict = None) -> Dict: |
| """ |
| Centralized error handling with appropriate fallback |
| |
| Args: |
| error_type: Type of error (e.g., 'api_error', 'network_error') |
| error: The exception that was raised |
| context: Additional context about the error |
| |
| Returns: |
| Dict containing error information and fallback response |
| """ |
| try: |
| |
| logger.error(f"{error_type}: {str(error)}\nContext: {context}") |
| |
| |
| handler = self.error_handlers.get(error_type, self._handle_generic_error) |
| |
| |
| return handler(error, context) |
| |
| except Exception as e: |
| |
| logger.critical(f"Error handler failed: {str(e)}") |
| return self._handle_critical_error(handler_error) |
| |
| def _handle_api_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle API-related errors""" |
| |
| error_info = { |
| 'error_type': 'api_error', |
| 'message': str(error), |
| 'suggestion': 'Check your API key and quota', |
| 'can_retry': True, |
| 'retry_after': 60 |
| } |
| |
| |
| if context: |
| error_info.update(context) |
| |
| return error_info |
| |
| def _handle_network_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle network-related errors""" |
| return { |
| 'error_type': 'network_error', |
| 'message': 'Network connectivity issue', |
| 'suggestion': 'Check your internet connection and try again', |
| 'can_retry': True, |
| 'retry_after': 30 |
| } |
| |
| def _handle_validation_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle input validation errors""" |
| return { |
| 'error_type': 'validation_error', |
| 'message': str(error), |
| 'suggestion': 'Check your input and try again', |
| 'can_retry': True, |
| 'retry_immediately': True |
| } |
| |
| def _handle_timeout_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle timeout errors""" |
| return { |
| 'error_type': 'timeout_error', |
| 'message': 'Operation timed out', |
| 'suggestion': 'The request took too long. Please try again later', |
| 'can_retry': True, |
| 'retry_after': 60 |
| } |
| |
| def _handle_resource_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle resource-related errors""" |
| return { |
| 'error_type': 'resource_error', |
| 'message': 'Insufficient resources', |
| 'suggestion': 'Try again with a smaller input or contact support', |
| 'can_retry': False |
| } |
| |
| def _handle_generic_error(self, error: Exception, context: Dict) -> Dict: |
| """Handle any unclassified errors""" |
| return { |
| 'error_type': 'unexpected_error', |
| 'message': 'An unexpected error occurred', |
| 'suggestion': 'Please try again later', |
| 'can_retry': True, |
| 'retry_after': 300 |
| } |
| |
| def _handle_critical_error(self, error: Exception) -> Dict: |
| """Handle critical errors in error handling""" |
| return { |
| 'error_type': 'critical_error', |
| 'message': 'A critical error occurred', |
| 'suggestion': 'Please restart the application', |
| 'can_retry': False |
| } |
| |
| def _safe_call(self, func, *args, **kwargs): |
| """ |
| Safely call a function with error handling and retries |
| |
| Args: |
| func: The function to call |
| *args: Positional arguments to pass to the function |
| **kwargs: Keyword arguments to pass to the function |
| |
| Returns: |
| Tuple of (result, error_info) |
| """ |
| max_retries = kwargs.pop('max_retries', 3) |
| retry_delay = kwargs.pop('retry_delay', 1.0) |
| error_handlers = kwargs.pop('error_handlers', {}) |
| |
| last_error = None |
| |
| for attempt in range(max_retries): |
| try: |
| result = func(*args, **kwargs) |
| return result, None |
| |
| except Exception as e: |
| last_error = e |
| error_type = self._classify_error(e) |
| |
| |
| handler = error_handlers.get(error_type, self._handle_error) |
| error_info = handler(e, { |
| 'attempt': attempt + 1, |
| 'max_retries': max_retries, |
| 'function': func.__name__ |
| }) |
| |
| |
| if not error_info.get('can_retry', False) or attempt == max_retries - 1: |
| return None, error_info |
| |
| |
| time.sleep(error_info.get('retry_after', retry_delay)) |
| |
| |
| return None, self._handle_generic_error( |
| last_error, |
| {'message': 'All retries failed'} |
| ) |
| |
| def _classify_error(self, error: Exception) -> str: |
| """Classify an exception into an error type""" |
| error_str = str(error).lower() |
| |
| if any(e in error_str for e in ['api', 'authentication', 'quota']): |
| return 'api_error' |
| elif any(e in error_str for e in ['network', 'connection', 'timeout', 'socket']): |
| return 'network_error' |
| elif any(e in error_str for e in ['validation', 'invalid', 'value', 'type']): |
| return 'validation_error' |
| elif 'timeout' in error_str: |
| return 'timeout_error' |
| elif any(e in error_str for e in ['memory', 'resource', 'capacity']): |
| return 'resource_error' |
| else: |
| return 'unexpected_error' |
|
|
| def _format_error_response(self, error_info: Dict, start_time: float) -> Dict: |
| """Format an error response with additional metadata""" |
| return { |
| 'success': False, |
| 'error': error_info, |
| 'processing_time': time.time() - start_time, |
| 'timestamp': datetime.datetime.now().isoformat(), |
| 'suggestion': error_info.get('suggestion', 'Please try again later') |
| } |
| |
| def _select_best_error(self, *errors: Dict) -> Dict: |
| """Select the most appropriate error from multiple errors""" |
| if not errors: |
| return self._handle_generic_error( |
| ValueError("No errors provided"), |
| {} |
| ) |
| |
| |
| error_priority = [ |
| 'validation_error', |
| 'api_error', |
| 'network_error', |
| 'resource_error', |
| 'timeout_error', |
| 'unexpected_error', |
| 'critical_error' |
| ] |
| |
| |
| for error_type in error_priority: |
| for error in errors: |
| if error.get('error_type') == error_type: |
| return error |
| |
| |
| return errors[0] |
|
|
| def _find_analogies(self, concept: str, top_n: int = 3) -> List[str]: |
| """ |
| Find analogies or similar concepts in the knowledge graph. |
| |
| Args: |
| concept: The concept to find analogies for |
| top_n: Maximum number of analogies to return |
| |
| Returns: |
| List of analogies or similar concepts |
| """ |
| try: |
| |
| analogies = [] |
| |
| |
| if concept in self.knowledge_graph: |
| node = self.knowledge_graph[concept] |
| |
| |
| if 'similar_to' in node.relationships: |
| for similar_concept in node.relationships['similar_to']: |
| if similar_concept in self.knowledge_graph: |
| analogies.append(f"Similar to {similar_concept}: {self.knowledge_graph[similar_concept].get('description', 'No description')}") |
| |
| |
| if len(analogies) >= top_n: |
| return analogies[:top_n] |
| |
| |
| prompt = f""" |
| Generate {top_n} analogies or similar concepts to help understand: |
| "{concept}" |
| |
| Format each analogy on a new line starting with a dash. |
| Be creative but keep the analogies relevant and helpful. |
| """ |
| |
| response = self._call_llm(prompt) |
| |
| |
| if response: |
| |
| lines = [line.strip() for line in response.split('\n') if line.strip()] |
| for line in lines: |
| |
| line = re.sub(r'^[\-\*•]\s*', '', line).strip() |
| if line: |
| analogies.append(line) |
| |
| |
| if not analogies: |
| analogies = [ |
| f"This is similar to learning a new language - it takes practice and immersion", |
| f"Think of this like solving a puzzle - break it down into smaller pieces", |
| f"This concept is like a tool - its value comes from how you use it" |
| ] |
| |
| return analogies[:top_n] |
| |
| except Exception as e: |
| logger.warning(f"Error finding analogies: {str(e)}") |
| |
| return [ |
| f"This is similar to learning a new skill - it takes time and practice", |
| f"Think of this like building a house - you need a solid foundation first", |
| f"This concept is like a map - it helps you navigate complex information" |
| ][:top_n] |
| |
| def _generate_creative_ideas(self, problem: str, analysis: Dict) -> List[str]: |
| """ |
| Generate creative ideas or approaches to solve a problem. |
| |
| Args: |
| problem: The problem statement |
| analysis: The problem analysis dictionary |
| |
| Returns: |
| List of creative ideas or approaches |
| """ |
| try: |
| |
| key_concepts = analysis.get('key_concepts', []) |
| |
| flat_concepts = [] |
| for item in key_concepts: |
| if isinstance(item, (list, tuple)): |
| flat_concepts.extend(str(x) for x in item if x) |
| elif item: |
| flat_concepts.append(str(item)) |
| |
| |
| prompt = f""" |
| Generate 3-5 creative approaches to solve the following problem. |
| Focus on innovative or unconventional solutions. |
| |
| Problem: {problem} |
| |
| Key concepts: {', '.join(flat_concepts) if flat_concepts else 'None identified'} |
| Problem type: {analysis.get('problem_type', 'unknown')} |
| |
| Creative approaches: |
| 1.""" |
| |
| response = self._call_llm(prompt) |
| |
| |
| ideas = [] |
| if response: |
| |
| for line in response.split('\n'): |
| line = line.strip() |
| if line and line[0].isdigit() and '. ' in line: |
| idea = line.split('. ', 1)[1].strip() |
| ideas.append(idea) |
| |
| |
| if not ideas: |
| ideas = [ |
| "Approach the problem from a different perspective", |
| "Combine elements from unrelated domains", |
| "Challenge existing assumptions about the problem" |
| ] |
| |
| return ideas |
| |
| except Exception as e: |
| logger.warning(f"Error generating creative ideas: {str(e)}") |
| |
| return [ |
| "Consider an unconventional perspective on the problem", |
| "Look for inspiration from unrelated fields", |
| "Break the problem down into smaller, more manageable parts" |
| ] |
|
|
| def create_general_reasoning_engine(together_ai_client = None) -> GeneralReasoningEngine: |
| """Factory function to create reasoning engine |
| |
| Args: |
| together_ai_client: An instance of TogetherAI client |
| """ |
| if together_ai_client: |
| |
| return GeneralReasoningEngine(openai_client=together_ai_client) |
| else: |
| raise ValueError("TogetherAI client is required") |
|
|
| |
| if __name__ == "__main__": |
| |
| reasoning_engine = create_general_reasoning_engine(together_ai_client=None) |
| |
| |
| test_problems = [ |
| "How can I optimize the performance of a machine learning model?", |
| "What's the best way to design a user interface for elderly users?", |
| "How do I solve the problem of declining team productivity?", |
| "What causes climate change and how can we address it?", |
| "How can I create a more efficient algorithm for sorting data?" |
| ] |
| |
| for problem in test_problems: |
| print(f"\n{'='*60}") |
| print(f"REASONING ABOUT: {problem}") |
| print('='*60) |
| |
| result = reasoning_engine.reason_about_problem(problem) |
| explanation = reasoning_engine.get_reasoning_explanation(result) |
| print(explanation) |
|
|