#!/usr/bin/env python3 """ SOFIA Advanced Reasoning Engine Implements task decomposition, strategy selection, and logical reasoning """ import re import json import logging from typing import Dict, List, Tuple, Optional, Any, Set from datetime import datetime from collections import defaultdict, deque import heapq logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Task: """ Represents a task that can be decomposed and reasoned about """ def __init__(self, description: str, complexity: int = 1, dependencies: List[str] = None): self.description = description self.complexity = complexity # 1-10 scale self.dependencies = dependencies or [] self.subtasks = [] self.completed = False self.created_at = datetime.now() self.estimated_time = self._estimate_time() def _estimate_time(self) -> float: """Estimate time required based on complexity and description""" base_time = self.complexity * 5 # 5 minutes per complexity unit # Adjust based on keywords desc_lower = self.description.lower() if any(word in desc_lower for word in ['research', 'analyze', 'investigate']): base_time *= 1.5 if any(word in desc_lower for word in ['create', 'build', 'implement']): base_time *= 2.0 if any(word in desc_lower for word in ['simple', 'quick', 'basic']): base_time *= 0.5 return base_time def add_subtask(self, subtask: 'Task'): """Add a subtask to this task""" self.subtasks.append(subtask) def mark_completed(self): """Mark this task as completed""" self.completed = True def to_dict(self) -> Dict[str, Any]: """Convert task to dictionary representation""" return { 'description': self.description, 'complexity': self.complexity, 'dependencies': self.dependencies, 'subtasks': [st.to_dict() for st in self.subtasks], 'completed': self.completed, 'estimated_time': self.estimated_time, 'created_at': self.created_at.isoformat() } class ReasoningStrategy: """ Represents a reasoning strategy for solving problems """ def __init__(self, name: str, description: str, applicable_domains: List[str], success_rate: float = 0.7, avg_time: float = 10.0): self.name = name self.description = description self.applicable_domains = applicable_domains self.success_rate = success_rate self.avg_time = avg_time self.usage_count = 0 self.success_count = 0 def is_applicable(self, problem_domain: str, problem_complexity: int) -> bool: """Check if this strategy is applicable to the given problem""" domain_match = problem_domain in self.applicable_domains or 'general' in self.applicable_domains # Some strategies work better for different complexity levels if 'simple_problems' in self.applicable_domains and problem_complexity <= 3: return True if 'complex_problems' in self.applicable_domains and problem_complexity >= 7: return True return domain_match def record_usage(self, success: bool): """Record usage of this strategy""" self.usage_count += 1 if success: self.success_count += 1 self.success_rate = self.success_count / self.usage_count def get_effectiveness_score(self) -> float: """Get effectiveness score for strategy selection""" # Combine success rate with usage experience experience_factor = min(1.0, self.usage_count / 10) # Diminishing returns after 10 uses return self.success_rate * (0.5 + 0.5 * experience_factor) class TaskDecomposer: """ Decomposes complex tasks into manageable subtasks """ def __init__(self): self.decomposition_patterns = self._load_patterns() def _load_patterns(self) -> Dict[str, List[str]]: """Load task decomposition patterns""" return { 'research': [ 'Define research question', 'Identify information sources', 'Gather relevant data', 'Analyze findings', 'Synthesize conclusions' ], 'implementation': [ 'Analyze requirements', 'Design solution architecture', 'Implement core functionality', 'Add error handling', 'Test implementation', 'Document solution' ], 'problem_solving': [ 'Understand the problem', 'Break down into components', 'Identify potential solutions', 'Evaluate solution options', 'Implement chosen solution', 'Verify solution works' ], 'learning': [ 'Assess current knowledge', 'Identify learning objectives', 'Find learning resources', 'Study materials', 'Practice concepts', 'Assess understanding' ] } def decompose_task(self, task_description: str, complexity: int = 5) -> Task: """Decompose a task into subtasks""" # Identify task type task_type = self._classify_task_type(task_description) # Create main task main_task = Task(task_description, complexity) # Get decomposition pattern if task_type in self.decomposition_patterns: pattern = self.decomposition_patterns[task_type] # Adjust pattern based on complexity if complexity <= 3: # Simplify for simple tasks pattern = pattern[:3] elif complexity >= 8: # Expand for complex tasks pattern.extend([ 'Review and refine', 'Optimize performance', 'Prepare for deployment' ]) # Create subtasks for i, subtask_desc in enumerate(pattern): subtask_complexity = max(1, complexity // len(pattern)) subtask = Task(subtask_desc, subtask_complexity) main_task.add_subtask(subtask) else: # Generic decomposition for unknown task types subtasks = self._generic_decomposition(task_description, complexity) for subtask_desc in subtasks: subtask = Task(subtask_desc, max(1, complexity // len(subtasks))) main_task.add_subtask(subtask) return main_task def _classify_task_type(self, description: str) -> str: """Classify the type of task""" desc_lower = description.lower() if any(word in desc_lower for word in ['research', 'investigate', 'analyze', 'study']): return 'research' elif any(word in desc_lower for word in ['implement', 'build', 'create', 'develop']): return 'implementation' elif any(word in desc_lower for word in ['solve', 'fix', 'resolve', 'answer']): return 'problem_solving' elif any(word in desc_lower for word in ['learn', 'understand', 'master', 'study']): return 'learning' else: return 'general' def _generic_decomposition(self, description: str, complexity: int) -> List[str]: """Generic task decomposition when no specific pattern matches""" base_steps = [ 'Plan the approach', 'Gather necessary resources', 'Execute the main work', 'Review and verify results' ] if complexity > 5: base_steps.insert(1, 'Break down into smaller steps') base_steps.insert(-1, 'Test and validate') return base_steps class StrategySelector: """ Selects the best reasoning strategy for a given problem """ def __init__(self): self.strategies = self._initialize_strategies() def _initialize_strategies(self) -> List[ReasoningStrategy]: """Initialize available reasoning strategies""" return [ ReasoningStrategy( "analytical_reasoning", "Break down problem into components and analyze systematically", ["mathematics", "science", "engineering", "general"], success_rate=0.85, avg_time=12.0 ), ReasoningStrategy( "creative_problem_solving", "Use creative thinking and brainstorming for novel solutions", ["design", "innovation", "art", "general"], success_rate=0.75, avg_time=15.0 ), ReasoningStrategy( "deductive_reasoning", "Apply logical deduction from general principles to specific cases", ["logic", "philosophy", "mathematics", "law"], success_rate=0.80, avg_time=10.0 ), ReasoningStrategy( "inductive_reasoning", "Draw general conclusions from specific observations", ["science", "research", "data_analysis"], success_rate=0.70, avg_time=18.0 ), ReasoningStrategy( "case_based_reasoning", "Solve problems by adapting solutions from similar past cases", ["medicine", "law", "customer_service", "general"], success_rate=0.78, avg_time=8.0 ), ReasoningStrategy( "algorithmic_approach", "Apply step-by-step algorithmic procedures", ["programming", "mathematics", "engineering"], success_rate=0.90, avg_time=6.0 ), ReasoningStrategy( "intuitive_reasoning", "Rely on intuition and experience for quick solutions", ["simple_problems", "general"], success_rate=0.65, avg_time=3.0 ) ] def select_strategy(self, problem_description: str, problem_complexity: int = 5, time_constraint: Optional[float] = None) -> ReasoningStrategy: """ Select the best reasoning strategy for the given problem Args: problem_description: Description of the problem problem_complexity: Complexity level (1-10) time_constraint: Maximum time allowed (minutes) Returns: Selected reasoning strategy """ # Identify problem domain problem_domain = self._identify_domain(problem_description) # Filter applicable strategies applicable_strategies = [ strategy for strategy in self.strategies if strategy.is_applicable(problem_domain, problem_complexity) ] if not applicable_strategies: # Fallback to general strategies applicable_strategies = [ strategy for strategy in self.strategies if 'general' in strategy.applicable_domains ] # Apply time constraint if specified if time_constraint: applicable_strategies = [ strategy for strategy in applicable_strategies if strategy.avg_time <= time_constraint ] if not applicable_strategies: # Ultimate fallback return self.strategies[0] # Score strategies based on effectiveness and time scored_strategies = [] for strategy in applicable_strategies: effectiveness = strategy.get_effectiveness_score() # Penalize slow strategies for complex problems under time pressure time_penalty = 0 if time_constraint and strategy.avg_time > time_constraint * 0.8: time_penalty = 0.2 final_score = effectiveness * (1 - time_penalty) scored_strategies.append((final_score, strategy)) # Select highest scoring strategy scored_strategies.sort(reverse=True) selected_strategy = scored_strategies[0][1] logger.info(f"Selected strategy: {selected_strategy.name} (score: {scored_strategies[0][0]:.3f})") return selected_strategy def _identify_domain(self, description: str) -> str: """Identify the problem domain from description""" desc_lower = description.lower() domain_keywords = { 'mathematics': ['math', 'calculate', 'equation', 'algebra', 'geometry'], 'programming': ['code', 'program', 'software', 'algorithm', 'debug'], 'science': ['experiment', 'hypothesis', 'theory', 'research', 'data'], 'engineering': ['design', 'build', 'construct', 'system', 'architecture'], 'business': ['profit', 'market', 'customer', 'strategy', 'finance'], 'medicine': ['patient', 'diagnosis', 'treatment', 'health', 'medical'], 'law': ['legal', 'contract', 'regulation', 'court', 'justice'], 'education': ['learn', 'teach', 'student', 'course', 'knowledge'] } for domain, keywords in domain_keywords.items(): if any(keyword in desc_lower for keyword in keywords): return domain return 'general' class LogicalReasoner: """ Performs logical reasoning and inference """ def __init__(self): self.knowledge_base = defaultdict(list) self.inference_rules = self._load_inference_rules() def _load_inference_rules(self) -> List[Dict[str, Any]]: """Load basic inference rules""" return [ { 'name': 'modus_ponens', 'description': 'If P implies Q and P is true, then Q is true', 'pattern': lambda premises: self._check_modus_ponens(premises) }, { 'name': 'transitivity', 'description': 'If A relates to B and B relates to C, then A relates to C', 'pattern': lambda premises: self._check_transitivity(premises) }, { 'name': 'contradiction_detection', 'description': 'Detect logical contradictions', 'pattern': lambda premises: self._check_contradiction(premises) } ] def add_knowledge(self, fact: str, category: str = 'general'): """Add a fact to the knowledge base""" self.knowledge_base[category].append({ 'fact': fact, 'added_at': datetime.now(), 'confidence': 1.0 }) def draw_inference(self, premises: List[str]) -> Dict[str, Any]: """Draw logical inferences from premises""" inferences = [] # Apply inference rules for rule in self.inference_rules: result = rule['pattern'](premises) if result: inferences.append({ 'rule': rule['name'], 'conclusion': result, 'confidence': 0.8 # Base confidence }) # Look for patterns in knowledge base kb_inferences = self._knowledge_base_inference(premises) inferences.extend(kb_inferences) return { 'inferences': inferences, 'premises_used': len(premises), 'total_inferences': len(inferences) } def _check_modus_ponens(self, premises: List[str]) -> Optional[str]: """Check for modus ponens pattern: If P then Q, P -> Q""" # Simplified implementation for premise in premises: if 'if' in premise.lower() and 'then' in premise.lower(): # This is a conditional premise parts = re.split(r'\s+(?:if|then)\s+', premise, flags=re.IGNORECASE) if len(parts) >= 2: condition = parts[0].strip() conclusion = parts[1].strip() # Check if condition is in other premises for other_premise in premises: if other_premise != premise and condition.lower() in other_premise.lower(): return conclusion return None def _check_transitivity(self, premises: List[str]) -> Optional[str]: """Check for transitivity pattern""" # Simplified implementation - look for chains relationships = [] for premise in premises: # Look for patterns like "A is related to B" match = re.search(r'(.+?)\s+(?:is|are|relates? to)\s+(.+)', premise, re.IGNORECASE) if match: relationships.append((match.group(1).strip(), match.group(2).strip())) # Check for transitive chains for i, (a1, b1) in enumerate(relationships): for j, (a2, b2) in enumerate(relationships): if i != j and b1.lower() == a2.lower(): return f"{a1} relates to {b2}" return None def _check_contradiction(self, premises: List[str]) -> Optional[str]: """Check for contradictions""" statements = [] negations = [] for premise in premises: premise_lower = premise.lower() statements.append(premise_lower) # Simple negation detection if premise_lower.startswith(('not ', 'no ', 'never ')): negations.append(premise_lower[4:]) elif 'not' in premise_lower: # Split on 'not' parts = premise_lower.split('not', 1) if len(parts) == 2: negations.append(parts[0].strip() + parts[1].strip()) # Check for contradictions for stmt in statements: for neg in negations: if self._are_contradictory(stmt, neg): return f"Contradiction detected: '{stmt}' vs 'not {neg}'" return None def _are_contradictory(self, stmt1: str, stmt2: str) -> bool: """Check if two statements are contradictory""" # Very simplified contradiction detection return stmt1.strip() == stmt2.strip() def _knowledge_base_inference(self, premises: List[str]) -> List[Dict[str, Any]]: """Draw inferences using knowledge base""" inferences = [] for category, facts in self.knowledge_base.items(): for fact in facts: fact_text = fact['fact'] # Check if any premise relates to this fact for premise in premises: if self._texts_related(premise, fact_text): inferences.append({ 'rule': 'knowledge_base_similarity', 'conclusion': f"Related knowledge: {fact_text}", 'confidence': fact['confidence'] * 0.7, 'category': category }) return inferences def _texts_related(self, text1: str, text2: str) -> bool: """Check if two texts are related (simplified)""" words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) # Check for word overlap overlap = len(words1.intersection(words2)) return overlap >= 2 # At least 2 words in common class AdvancedReasoningEngine: """ Main advanced reasoning engine combining all reasoning capabilities """ def __init__(self): self.task_decomposer = TaskDecomposer() self.strategy_selector = StrategySelector() self.logical_reasoner = LogicalReasoner() # Reasoning history self.reasoning_history = deque(maxlen=1000) def reason_about_task(self, task_description: str, complexity: int = 5, time_constraint: Optional[float] = None) -> Dict[str, Any]: """ Perform comprehensive reasoning about a task Args: task_description: Description of the task complexity: Task complexity (1-10) time_constraint: Time limit in minutes Returns: Reasoning results """ reasoning_start = datetime.now() # 1. Decompose the task decomposed_task = self.task_decomposer.decompose_task(task_description, complexity) # 2. Select reasoning strategy strategy = self.strategy_selector.select_strategy( task_description, complexity, time_constraint ) # 3. Perform logical reasoning task_premises = [task_description] if decomposed_task.subtasks: task_premises.extend([st.description for st in decomposed_task.subtasks]) logical_inferences = self.logical_reasoner.draw_inference(task_premises) # 4. Generate execution plan execution_plan = self._generate_execution_plan(decomposed_task, strategy) # 5. Assess feasibility feasibility = self._assess_feasibility(decomposed_task, time_constraint) reasoning_result = { 'task_analysis': { 'original_task': task_description, 'complexity': complexity, 'estimated_total_time': decomposed_task.estimated_time, 'subtasks_count': len(decomposed_task.subtasks) }, 'decomposition': decomposed_task.to_dict(), 'selected_strategy': { 'name': strategy.name, 'description': strategy.description, 'expected_success_rate': strategy.success_rate, 'estimated_time': strategy.avg_time }, 'logical_inferences': logical_inferences, 'execution_plan': execution_plan, 'feasibility_assessment': feasibility, 'reasoning_time': (datetime.now() - reasoning_start).total_seconds() } # Record in history self.reasoning_history.append({ 'timestamp': datetime.now().isoformat(), 'task': task_description, 'result': reasoning_result }) return reasoning_result def _generate_execution_plan(self, task: Task, strategy: ReasoningStrategy) -> List[Dict[str, Any]]: """Generate a step-by-step execution plan""" plan = [] # Add strategy-specific preparation steps if strategy.name == 'analytical_reasoning': plan.append({ 'step': 'analysis', 'description': 'Analyze task components systematically', 'estimated_time': 5.0 }) elif strategy.name == 'creative_problem_solving': plan.append({ 'step': 'brainstorming', 'description': 'Generate multiple solution approaches', 'estimated_time': 10.0 }) # Add task subtasks for i, subtask in enumerate(task.subtasks): plan.append({ 'step': f'subtask_{i+1}', 'description': subtask.description, 'estimated_time': subtask.estimated_time, 'dependencies': subtask.dependencies }) # Add verification step plan.append({ 'step': 'verification', 'description': 'Verify solution meets requirements', 'estimated_time': 3.0 }) return plan def _assess_feasibility(self, task: Task, time_constraint: Optional[float]) -> Dict[str, Any]: """Assess the feasibility of completing the task""" total_estimated_time = task.estimated_time if time_constraint and total_estimated_time > time_constraint: return { 'feasible': False, 'reason': f'Estimated time ({total_estimated_time:.1f} min) exceeds constraint ({time_constraint} min)', 'recommendation': 'Break task into smaller parts or extend time limit' } # Assess based on complexity and subtasks complexity_score = task.complexity / 10.0 subtask_score = len(task.subtasks) / 10.0 # Normalize feasibility_score = 1.0 - (complexity_score * 0.4 + subtask_score * 0.3) if feasibility_score > 0.7: assessment = 'highly_feasible' elif feasibility_score > 0.4: assessment = 'moderately_feasible' else: assessment = 'challenging' return { 'feasible': feasibility_score > 0.3, 'assessment': assessment, 'feasibility_score': feasibility_score, 'estimated_time': total_estimated_time } def learn_from_experience(self, task_description: str, success: bool, actual_time: float): """Learn from task execution experience""" # Find the reasoning result for this task for record in reversed(self.reasoning_history): if record['task'] == task_description: reasoning_result = record['result'] strategy_name = reasoning_result['selected_strategy']['name'] # Find the strategy and update its performance for strategy in self.strategy_selector.strategies: if strategy.name == strategy_name: strategy.record_usage(success) # Update time estimate old_time = strategy.avg_time strategy.avg_time = (old_time + actual_time) / 2 # Simple averaging logger.info(f"Updated strategy {strategy_name}: success_rate={strategy.success_rate:.3f}, avg_time={strategy.avg_time:.1f}") break break def get_reasoning_statistics(self) -> Dict[str, Any]: """Get statistics about reasoning performance""" if not self.reasoning_history: return {'total_reasoning_sessions': 0} total_sessions = len(self.reasoning_history) recent_sessions = list(self.reasoning_history)[-50:] # Last 50 sessions # Calculate average reasoning time reasoning_times = [r['result']['reasoning_time'] for r in recent_sessions] avg_reasoning_time = sum(reasoning_times) / len(reasoning_times) # Strategy usage statistics strategy_usage = defaultdict(int) for record in recent_sessions: strategy_name = record['result']['selected_strategy']['name'] strategy_usage[strategy_name] += 1 return { 'total_reasoning_sessions': total_sessions, 'average_reasoning_time': avg_reasoning_time, 'strategy_usage': dict(strategy_usage), 'most_used_strategy': max(strategy_usage.items(), key=lambda x: x[1])[0] if strategy_usage else None } # Example usage if __name__ == "__main__": print("SOFIA Advanced Reasoning Engine") print("This system provides task decomposition, strategy selection, and logical reasoning") # Example usage would be integrated with SOFIA """ from sofia_reasoning import AdvancedReasoningEngine reasoner = AdvancedReasoningEngine() # Reason about a complex task result = reasoner.reason_about_task( "Implement a machine learning model for image classification", complexity=8, time_constraint=120 # 2 hours ) print("Reasoning result:", result) # Learn from execution reasoner.learn_from_experience( "Implement a machine learning model for image classification", success=True, actual_time=95.0 ) """