|
|
|
|
|
""" |
|
|
SOFIA Advanced Reasoning Engine |
|
|
Implements task decomposition, strategy selection, and logical reasoning |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import logging |
|
|
from typing import Dict, List, Tuple, Optional, Any, Set |
|
|
from datetime import datetime |
|
|
from collections import defaultdict, deque |
|
|
import heapq |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Task: |
|
|
""" |
|
|
Represents a task that can be decomposed and reasoned about |
|
|
""" |
|
|
|
|
|
def __init__(self, description: str, complexity: int = 1, dependencies: List[str] = None): |
|
|
self.description = description |
|
|
self.complexity = complexity |
|
|
self.dependencies = dependencies or [] |
|
|
self.subtasks = [] |
|
|
self.completed = False |
|
|
self.created_at = datetime.now() |
|
|
self.estimated_time = self._estimate_time() |
|
|
|
|
|
def _estimate_time(self) -> float: |
|
|
"""Estimate time required based on complexity and description""" |
|
|
base_time = self.complexity * 5 |
|
|
|
|
|
|
|
|
desc_lower = self.description.lower() |
|
|
if any(word in desc_lower for word in ['research', 'analyze', 'investigate']): |
|
|
base_time *= 1.5 |
|
|
if any(word in desc_lower for word in ['create', 'build', 'implement']): |
|
|
base_time *= 2.0 |
|
|
if any(word in desc_lower for word in ['simple', 'quick', 'basic']): |
|
|
base_time *= 0.5 |
|
|
|
|
|
return base_time |
|
|
|
|
|
def add_subtask(self, subtask: 'Task'): |
|
|
"""Add a subtask to this task""" |
|
|
self.subtasks.append(subtask) |
|
|
|
|
|
def mark_completed(self): |
|
|
"""Mark this task as completed""" |
|
|
self.completed = True |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert task to dictionary representation""" |
|
|
return { |
|
|
'description': self.description, |
|
|
'complexity': self.complexity, |
|
|
'dependencies': self.dependencies, |
|
|
'subtasks': [st.to_dict() for st in self.subtasks], |
|
|
'completed': self.completed, |
|
|
'estimated_time': self.estimated_time, |
|
|
'created_at': self.created_at.isoformat() |
|
|
} |
|
|
|
|
|
class ReasoningStrategy: |
|
|
""" |
|
|
Represents a reasoning strategy for solving problems |
|
|
""" |
|
|
|
|
|
def __init__(self, name: str, description: str, applicable_domains: List[str], |
|
|
success_rate: float = 0.7, avg_time: float = 10.0): |
|
|
self.name = name |
|
|
self.description = description |
|
|
self.applicable_domains = applicable_domains |
|
|
self.success_rate = success_rate |
|
|
self.avg_time = avg_time |
|
|
self.usage_count = 0 |
|
|
self.success_count = 0 |
|
|
|
|
|
def is_applicable(self, problem_domain: str, problem_complexity: int) -> bool: |
|
|
"""Check if this strategy is applicable to the given problem""" |
|
|
domain_match = problem_domain in self.applicable_domains or 'general' in self.applicable_domains |
|
|
|
|
|
|
|
|
if 'simple_problems' in self.applicable_domains and problem_complexity <= 3: |
|
|
return True |
|
|
if 'complex_problems' in self.applicable_domains and problem_complexity >= 7: |
|
|
return True |
|
|
|
|
|
return domain_match |
|
|
|
|
|
def record_usage(self, success: bool): |
|
|
"""Record usage of this strategy""" |
|
|
self.usage_count += 1 |
|
|
if success: |
|
|
self.success_count += 1 |
|
|
self.success_rate = self.success_count / self.usage_count |
|
|
|
|
|
def get_effectiveness_score(self) -> float: |
|
|
"""Get effectiveness score for strategy selection""" |
|
|
|
|
|
experience_factor = min(1.0, self.usage_count / 10) |
|
|
return self.success_rate * (0.5 + 0.5 * experience_factor) |
|
|
|
|
|
class TaskDecomposer: |
|
|
""" |
|
|
Decomposes complex tasks into manageable subtasks |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.decomposition_patterns = self._load_patterns() |
|
|
|
|
|
def _load_patterns(self) -> Dict[str, List[str]]: |
|
|
"""Load task decomposition patterns""" |
|
|
return { |
|
|
'research': [ |
|
|
'Define research question', |
|
|
'Identify information sources', |
|
|
'Gather relevant data', |
|
|
'Analyze findings', |
|
|
'Synthesize conclusions' |
|
|
], |
|
|
'implementation': [ |
|
|
'Analyze requirements', |
|
|
'Design solution architecture', |
|
|
'Implement core functionality', |
|
|
'Add error handling', |
|
|
'Test implementation', |
|
|
'Document solution' |
|
|
], |
|
|
'problem_solving': [ |
|
|
'Understand the problem', |
|
|
'Break down into components', |
|
|
'Identify potential solutions', |
|
|
'Evaluate solution options', |
|
|
'Implement chosen solution', |
|
|
'Verify solution works' |
|
|
], |
|
|
'learning': [ |
|
|
'Assess current knowledge', |
|
|
'Identify learning objectives', |
|
|
'Find learning resources', |
|
|
'Study materials', |
|
|
'Practice concepts', |
|
|
'Assess understanding' |
|
|
] |
|
|
} |
|
|
|
|
|
def decompose_task(self, task_description: str, complexity: int = 5) -> Task: |
|
|
"""Decompose a task into subtasks""" |
|
|
|
|
|
task_type = self._classify_task_type(task_description) |
|
|
|
|
|
|
|
|
main_task = Task(task_description, complexity) |
|
|
|
|
|
|
|
|
if task_type in self.decomposition_patterns: |
|
|
pattern = self.decomposition_patterns[task_type] |
|
|
|
|
|
|
|
|
if complexity <= 3: |
|
|
|
|
|
pattern = pattern[:3] |
|
|
elif complexity >= 8: |
|
|
|
|
|
pattern.extend([ |
|
|
'Review and refine', |
|
|
'Optimize performance', |
|
|
'Prepare for deployment' |
|
|
]) |
|
|
|
|
|
|
|
|
for i, subtask_desc in enumerate(pattern): |
|
|
subtask_complexity = max(1, complexity // len(pattern)) |
|
|
subtask = Task(subtask_desc, subtask_complexity) |
|
|
main_task.add_subtask(subtask) |
|
|
|
|
|
else: |
|
|
|
|
|
subtasks = self._generic_decomposition(task_description, complexity) |
|
|
for subtask_desc in subtasks: |
|
|
subtask = Task(subtask_desc, max(1, complexity // len(subtasks))) |
|
|
main_task.add_subtask(subtask) |
|
|
|
|
|
return main_task |
|
|
|
|
|
def _classify_task_type(self, description: str) -> str: |
|
|
"""Classify the type of task""" |
|
|
desc_lower = description.lower() |
|
|
|
|
|
if any(word in desc_lower for word in ['research', 'investigate', 'analyze', 'study']): |
|
|
return 'research' |
|
|
elif any(word in desc_lower for word in ['implement', 'build', 'create', 'develop']): |
|
|
return 'implementation' |
|
|
elif any(word in desc_lower for word in ['solve', 'fix', 'resolve', 'answer']): |
|
|
return 'problem_solving' |
|
|
elif any(word in desc_lower for word in ['learn', 'understand', 'master', 'study']): |
|
|
return 'learning' |
|
|
else: |
|
|
return 'general' |
|
|
|
|
|
def _generic_decomposition(self, description: str, complexity: int) -> List[str]: |
|
|
"""Generic task decomposition when no specific pattern matches""" |
|
|
base_steps = [ |
|
|
'Plan the approach', |
|
|
'Gather necessary resources', |
|
|
'Execute the main work', |
|
|
'Review and verify results' |
|
|
] |
|
|
|
|
|
if complexity > 5: |
|
|
base_steps.insert(1, 'Break down into smaller steps') |
|
|
base_steps.insert(-1, 'Test and validate') |
|
|
|
|
|
return base_steps |
|
|
|
|
|
class StrategySelector: |
|
|
""" |
|
|
Selects the best reasoning strategy for a given problem |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.strategies = self._initialize_strategies() |
|
|
|
|
|
def _initialize_strategies(self) -> List[ReasoningStrategy]: |
|
|
"""Initialize available reasoning strategies""" |
|
|
return [ |
|
|
ReasoningStrategy( |
|
|
"analytical_reasoning", |
|
|
"Break down problem into components and analyze systematically", |
|
|
["mathematics", "science", "engineering", "general"], |
|
|
success_rate=0.85, |
|
|
avg_time=12.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"creative_problem_solving", |
|
|
"Use creative thinking and brainstorming for novel solutions", |
|
|
["design", "innovation", "art", "general"], |
|
|
success_rate=0.75, |
|
|
avg_time=15.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"deductive_reasoning", |
|
|
"Apply logical deduction from general principles to specific cases", |
|
|
["logic", "philosophy", "mathematics", "law"], |
|
|
success_rate=0.80, |
|
|
avg_time=10.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"inductive_reasoning", |
|
|
"Draw general conclusions from specific observations", |
|
|
["science", "research", "data_analysis"], |
|
|
success_rate=0.70, |
|
|
avg_time=18.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"case_based_reasoning", |
|
|
"Solve problems by adapting solutions from similar past cases", |
|
|
["medicine", "law", "customer_service", "general"], |
|
|
success_rate=0.78, |
|
|
avg_time=8.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"algorithmic_approach", |
|
|
"Apply step-by-step algorithmic procedures", |
|
|
["programming", "mathematics", "engineering"], |
|
|
success_rate=0.90, |
|
|
avg_time=6.0 |
|
|
), |
|
|
ReasoningStrategy( |
|
|
"intuitive_reasoning", |
|
|
"Rely on intuition and experience for quick solutions", |
|
|
["simple_problems", "general"], |
|
|
success_rate=0.65, |
|
|
avg_time=3.0 |
|
|
) |
|
|
] |
|
|
|
|
|
def select_strategy(self, problem_description: str, problem_complexity: int = 5, |
|
|
time_constraint: Optional[float] = None) -> ReasoningStrategy: |
|
|
""" |
|
|
Select the best reasoning strategy for the given problem |
|
|
|
|
|
Args: |
|
|
problem_description: Description of the problem |
|
|
problem_complexity: Complexity level (1-10) |
|
|
time_constraint: Maximum time allowed (minutes) |
|
|
|
|
|
Returns: |
|
|
Selected reasoning strategy |
|
|
""" |
|
|
|
|
|
|
|
|
problem_domain = self._identify_domain(problem_description) |
|
|
|
|
|
|
|
|
applicable_strategies = [ |
|
|
strategy for strategy in self.strategies |
|
|
if strategy.is_applicable(problem_domain, problem_complexity) |
|
|
] |
|
|
|
|
|
if not applicable_strategies: |
|
|
|
|
|
applicable_strategies = [ |
|
|
strategy for strategy in self.strategies |
|
|
if 'general' in strategy.applicable_domains |
|
|
] |
|
|
|
|
|
|
|
|
if time_constraint: |
|
|
applicable_strategies = [ |
|
|
strategy for strategy in applicable_strategies |
|
|
if strategy.avg_time <= time_constraint |
|
|
] |
|
|
|
|
|
if not applicable_strategies: |
|
|
|
|
|
return self.strategies[0] |
|
|
|
|
|
|
|
|
scored_strategies = [] |
|
|
for strategy in applicable_strategies: |
|
|
effectiveness = strategy.get_effectiveness_score() |
|
|
|
|
|
|
|
|
time_penalty = 0 |
|
|
if time_constraint and strategy.avg_time > time_constraint * 0.8: |
|
|
time_penalty = 0.2 |
|
|
|
|
|
final_score = effectiveness * (1 - time_penalty) |
|
|
scored_strategies.append((final_score, strategy)) |
|
|
|
|
|
|
|
|
scored_strategies.sort(reverse=True) |
|
|
selected_strategy = scored_strategies[0][1] |
|
|
|
|
|
logger.info(f"Selected strategy: {selected_strategy.name} (score: {scored_strategies[0][0]:.3f})") |
|
|
return selected_strategy |
|
|
|
|
|
def _identify_domain(self, description: str) -> str: |
|
|
"""Identify the problem domain from description""" |
|
|
desc_lower = description.lower() |
|
|
|
|
|
domain_keywords = { |
|
|
'mathematics': ['math', 'calculate', 'equation', 'algebra', 'geometry'], |
|
|
'programming': ['code', 'program', 'software', 'algorithm', 'debug'], |
|
|
'science': ['experiment', 'hypothesis', 'theory', 'research', 'data'], |
|
|
'engineering': ['design', 'build', 'construct', 'system', 'architecture'], |
|
|
'business': ['profit', 'market', 'customer', 'strategy', 'finance'], |
|
|
'medicine': ['patient', 'diagnosis', 'treatment', 'health', 'medical'], |
|
|
'law': ['legal', 'contract', 'regulation', 'court', 'justice'], |
|
|
'education': ['learn', 'teach', 'student', 'course', 'knowledge'] |
|
|
} |
|
|
|
|
|
for domain, keywords in domain_keywords.items(): |
|
|
if any(keyword in desc_lower for keyword in keywords): |
|
|
return domain |
|
|
|
|
|
return 'general' |
|
|
|
|
|
class LogicalReasoner: |
|
|
""" |
|
|
Performs logical reasoning and inference |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.knowledge_base = defaultdict(list) |
|
|
self.inference_rules = self._load_inference_rules() |
|
|
|
|
|
def _load_inference_rules(self) -> List[Dict[str, Any]]: |
|
|
"""Load basic inference rules""" |
|
|
return [ |
|
|
{ |
|
|
'name': 'modus_ponens', |
|
|
'description': 'If P implies Q and P is true, then Q is true', |
|
|
'pattern': lambda premises: self._check_modus_ponens(premises) |
|
|
}, |
|
|
{ |
|
|
'name': 'transitivity', |
|
|
'description': 'If A relates to B and B relates to C, then A relates to C', |
|
|
'pattern': lambda premises: self._check_transitivity(premises) |
|
|
}, |
|
|
{ |
|
|
'name': 'contradiction_detection', |
|
|
'description': 'Detect logical contradictions', |
|
|
'pattern': lambda premises: self._check_contradiction(premises) |
|
|
} |
|
|
] |
|
|
|
|
|
def add_knowledge(self, fact: str, category: str = 'general'): |
|
|
"""Add a fact to the knowledge base""" |
|
|
self.knowledge_base[category].append({ |
|
|
'fact': fact, |
|
|
'added_at': datetime.now(), |
|
|
'confidence': 1.0 |
|
|
}) |
|
|
|
|
|
def draw_inference(self, premises: List[str]) -> Dict[str, Any]: |
|
|
"""Draw logical inferences from premises""" |
|
|
inferences = [] |
|
|
|
|
|
|
|
|
for rule in self.inference_rules: |
|
|
result = rule['pattern'](premises) |
|
|
if result: |
|
|
inferences.append({ |
|
|
'rule': rule['name'], |
|
|
'conclusion': result, |
|
|
'confidence': 0.8 |
|
|
}) |
|
|
|
|
|
|
|
|
kb_inferences = self._knowledge_base_inference(premises) |
|
|
inferences.extend(kb_inferences) |
|
|
|
|
|
return { |
|
|
'inferences': inferences, |
|
|
'premises_used': len(premises), |
|
|
'total_inferences': len(inferences) |
|
|
} |
|
|
|
|
|
def _check_modus_ponens(self, premises: List[str]) -> Optional[str]: |
|
|
"""Check for modus ponens pattern: If P then Q, P -> Q""" |
|
|
|
|
|
for premise in premises: |
|
|
if 'if' in premise.lower() and 'then' in premise.lower(): |
|
|
|
|
|
parts = re.split(r'\s+(?:if|then)\s+', premise, flags=re.IGNORECASE) |
|
|
if len(parts) >= 2: |
|
|
condition = parts[0].strip() |
|
|
conclusion = parts[1].strip() |
|
|
|
|
|
|
|
|
for other_premise in premises: |
|
|
if other_premise != premise and condition.lower() in other_premise.lower(): |
|
|
return conclusion |
|
|
|
|
|
return None |
|
|
|
|
|
def _check_transitivity(self, premises: List[str]) -> Optional[str]: |
|
|
"""Check for transitivity pattern""" |
|
|
|
|
|
relationships = [] |
|
|
for premise in premises: |
|
|
|
|
|
match = re.search(r'(.+?)\s+(?:is|are|relates? to)\s+(.+)', premise, re.IGNORECASE) |
|
|
if match: |
|
|
relationships.append((match.group(1).strip(), match.group(2).strip())) |
|
|
|
|
|
|
|
|
for i, (a1, b1) in enumerate(relationships): |
|
|
for j, (a2, b2) in enumerate(relationships): |
|
|
if i != j and b1.lower() == a2.lower(): |
|
|
return f"{a1} relates to {b2}" |
|
|
|
|
|
return None |
|
|
|
|
|
def _check_contradiction(self, premises: List[str]) -> Optional[str]: |
|
|
"""Check for contradictions""" |
|
|
statements = [] |
|
|
negations = [] |
|
|
|
|
|
for premise in premises: |
|
|
premise_lower = premise.lower() |
|
|
statements.append(premise_lower) |
|
|
|
|
|
|
|
|
if premise_lower.startswith(('not ', 'no ', 'never ')): |
|
|
negations.append(premise_lower[4:]) |
|
|
elif 'not' in premise_lower: |
|
|
|
|
|
parts = premise_lower.split('not', 1) |
|
|
if len(parts) == 2: |
|
|
negations.append(parts[0].strip() + parts[1].strip()) |
|
|
|
|
|
|
|
|
for stmt in statements: |
|
|
for neg in negations: |
|
|
if self._are_contradictory(stmt, neg): |
|
|
return f"Contradiction detected: '{stmt}' vs 'not {neg}'" |
|
|
|
|
|
return None |
|
|
|
|
|
def _are_contradictory(self, stmt1: str, stmt2: str) -> bool: |
|
|
"""Check if two statements are contradictory""" |
|
|
|
|
|
return stmt1.strip() == stmt2.strip() |
|
|
|
|
|
def _knowledge_base_inference(self, premises: List[str]) -> List[Dict[str, Any]]: |
|
|
"""Draw inferences using knowledge base""" |
|
|
inferences = [] |
|
|
|
|
|
for category, facts in self.knowledge_base.items(): |
|
|
for fact in facts: |
|
|
fact_text = fact['fact'] |
|
|
|
|
|
|
|
|
for premise in premises: |
|
|
if self._texts_related(premise, fact_text): |
|
|
inferences.append({ |
|
|
'rule': 'knowledge_base_similarity', |
|
|
'conclusion': f"Related knowledge: {fact_text}", |
|
|
'confidence': fact['confidence'] * 0.7, |
|
|
'category': category |
|
|
}) |
|
|
|
|
|
return inferences |
|
|
|
|
|
def _texts_related(self, text1: str, text2: str) -> bool: |
|
|
"""Check if two texts are related (simplified)""" |
|
|
words1 = set(text1.lower().split()) |
|
|
words2 = set(text2.lower().split()) |
|
|
|
|
|
|
|
|
overlap = len(words1.intersection(words2)) |
|
|
return overlap >= 2 |
|
|
|
|
|
class AdvancedReasoningEngine: |
|
|
""" |
|
|
Main advanced reasoning engine combining all reasoning capabilities |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.task_decomposer = TaskDecomposer() |
|
|
self.strategy_selector = StrategySelector() |
|
|
self.logical_reasoner = LogicalReasoner() |
|
|
|
|
|
|
|
|
self.reasoning_history = deque(maxlen=1000) |
|
|
|
|
|
def reason_about_task(self, task_description: str, |
|
|
complexity: int = 5, |
|
|
time_constraint: Optional[float] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform comprehensive reasoning about a task |
|
|
|
|
|
Args: |
|
|
task_description: Description of the task |
|
|
complexity: Task complexity (1-10) |
|
|
time_constraint: Time limit in minutes |
|
|
|
|
|
Returns: |
|
|
Reasoning results |
|
|
""" |
|
|
|
|
|
reasoning_start = datetime.now() |
|
|
|
|
|
|
|
|
decomposed_task = self.task_decomposer.decompose_task(task_description, complexity) |
|
|
|
|
|
|
|
|
strategy = self.strategy_selector.select_strategy( |
|
|
task_description, complexity, time_constraint |
|
|
) |
|
|
|
|
|
|
|
|
task_premises = [task_description] |
|
|
if decomposed_task.subtasks: |
|
|
task_premises.extend([st.description for st in decomposed_task.subtasks]) |
|
|
|
|
|
logical_inferences = self.logical_reasoner.draw_inference(task_premises) |
|
|
|
|
|
|
|
|
execution_plan = self._generate_execution_plan(decomposed_task, strategy) |
|
|
|
|
|
|
|
|
feasibility = self._assess_feasibility(decomposed_task, time_constraint) |
|
|
|
|
|
reasoning_result = { |
|
|
'task_analysis': { |
|
|
'original_task': task_description, |
|
|
'complexity': complexity, |
|
|
'estimated_total_time': decomposed_task.estimated_time, |
|
|
'subtasks_count': len(decomposed_task.subtasks) |
|
|
}, |
|
|
'decomposition': decomposed_task.to_dict(), |
|
|
'selected_strategy': { |
|
|
'name': strategy.name, |
|
|
'description': strategy.description, |
|
|
'expected_success_rate': strategy.success_rate, |
|
|
'estimated_time': strategy.avg_time |
|
|
}, |
|
|
'logical_inferences': logical_inferences, |
|
|
'execution_plan': execution_plan, |
|
|
'feasibility_assessment': feasibility, |
|
|
'reasoning_time': (datetime.now() - reasoning_start).total_seconds() |
|
|
} |
|
|
|
|
|
|
|
|
self.reasoning_history.append({ |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'task': task_description, |
|
|
'result': reasoning_result |
|
|
}) |
|
|
|
|
|
return reasoning_result |
|
|
|
|
|
def _generate_execution_plan(self, task: Task, strategy: ReasoningStrategy) -> List[Dict[str, Any]]: |
|
|
"""Generate a step-by-step execution plan""" |
|
|
plan = [] |
|
|
|
|
|
|
|
|
if strategy.name == 'analytical_reasoning': |
|
|
plan.append({ |
|
|
'step': 'analysis', |
|
|
'description': 'Analyze task components systematically', |
|
|
'estimated_time': 5.0 |
|
|
}) |
|
|
elif strategy.name == 'creative_problem_solving': |
|
|
plan.append({ |
|
|
'step': 'brainstorming', |
|
|
'description': 'Generate multiple solution approaches', |
|
|
'estimated_time': 10.0 |
|
|
}) |
|
|
|
|
|
|
|
|
for i, subtask in enumerate(task.subtasks): |
|
|
plan.append({ |
|
|
'step': f'subtask_{i+1}', |
|
|
'description': subtask.description, |
|
|
'estimated_time': subtask.estimated_time, |
|
|
'dependencies': subtask.dependencies |
|
|
}) |
|
|
|
|
|
|
|
|
plan.append({ |
|
|
'step': 'verification', |
|
|
'description': 'Verify solution meets requirements', |
|
|
'estimated_time': 3.0 |
|
|
}) |
|
|
|
|
|
return plan |
|
|
|
|
|
def _assess_feasibility(self, task: Task, time_constraint: Optional[float]) -> Dict[str, Any]: |
|
|
"""Assess the feasibility of completing the task""" |
|
|
total_estimated_time = task.estimated_time |
|
|
|
|
|
if time_constraint and total_estimated_time > time_constraint: |
|
|
return { |
|
|
'feasible': False, |
|
|
'reason': f'Estimated time ({total_estimated_time:.1f} min) exceeds constraint ({time_constraint} min)', |
|
|
'recommendation': 'Break task into smaller parts or extend time limit' |
|
|
} |
|
|
|
|
|
|
|
|
complexity_score = task.complexity / 10.0 |
|
|
subtask_score = len(task.subtasks) / 10.0 |
|
|
|
|
|
feasibility_score = 1.0 - (complexity_score * 0.4 + subtask_score * 0.3) |
|
|
|
|
|
if feasibility_score > 0.7: |
|
|
assessment = 'highly_feasible' |
|
|
elif feasibility_score > 0.4: |
|
|
assessment = 'moderately_feasible' |
|
|
else: |
|
|
assessment = 'challenging' |
|
|
|
|
|
return { |
|
|
'feasible': feasibility_score > 0.3, |
|
|
'assessment': assessment, |
|
|
'feasibility_score': feasibility_score, |
|
|
'estimated_time': total_estimated_time |
|
|
} |
|
|
|
|
|
def learn_from_experience(self, task_description: str, success: bool, actual_time: float): |
|
|
"""Learn from task execution experience""" |
|
|
|
|
|
for record in reversed(self.reasoning_history): |
|
|
if record['task'] == task_description: |
|
|
reasoning_result = record['result'] |
|
|
strategy_name = reasoning_result['selected_strategy']['name'] |
|
|
|
|
|
|
|
|
for strategy in self.strategy_selector.strategies: |
|
|
if strategy.name == strategy_name: |
|
|
strategy.record_usage(success) |
|
|
|
|
|
|
|
|
old_time = strategy.avg_time |
|
|
strategy.avg_time = (old_time + actual_time) / 2 |
|
|
|
|
|
logger.info(f"Updated strategy {strategy_name}: success_rate={strategy.success_rate:.3f}, avg_time={strategy.avg_time:.1f}") |
|
|
break |
|
|
|
|
|
break |
|
|
|
|
|
def get_reasoning_statistics(self) -> Dict[str, Any]: |
|
|
"""Get statistics about reasoning performance""" |
|
|
if not self.reasoning_history: |
|
|
return {'total_reasoning_sessions': 0} |
|
|
|
|
|
total_sessions = len(self.reasoning_history) |
|
|
recent_sessions = list(self.reasoning_history)[-50:] |
|
|
|
|
|
|
|
|
reasoning_times = [r['result']['reasoning_time'] for r in recent_sessions] |
|
|
avg_reasoning_time = sum(reasoning_times) / len(reasoning_times) |
|
|
|
|
|
|
|
|
strategy_usage = defaultdict(int) |
|
|
for record in recent_sessions: |
|
|
strategy_name = record['result']['selected_strategy']['name'] |
|
|
strategy_usage[strategy_name] += 1 |
|
|
|
|
|
return { |
|
|
'total_reasoning_sessions': total_sessions, |
|
|
'average_reasoning_time': avg_reasoning_time, |
|
|
'strategy_usage': dict(strategy_usage), |
|
|
'most_used_strategy': max(strategy_usage.items(), key=lambda x: x[1])[0] if strategy_usage else None |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("SOFIA Advanced Reasoning Engine") |
|
|
print("This system provides task decomposition, strategy selection, and logical reasoning") |
|
|
|
|
|
|
|
|
""" |
|
|
from sofia_reasoning import AdvancedReasoningEngine |
|
|
|
|
|
reasoner = AdvancedReasoningEngine() |
|
|
|
|
|
# Reason about a complex task |
|
|
result = reasoner.reason_about_task( |
|
|
"Implement a machine learning model for image classification", |
|
|
complexity=8, |
|
|
time_constraint=120 # 2 hours |
|
|
) |
|
|
|
|
|
print("Reasoning result:", result) |
|
|
|
|
|
# Learn from execution |
|
|
reasoner.learn_from_experience( |
|
|
"Implement a machine learning model for image classification", |
|
|
success=True, |
|
|
actual_time=95.0 |
|
|
) |
|
|
""" |
|
|
|