File size: 27,959 Bytes
3472b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 | #!/usr/bin/env python3
"""
SOFIA Advanced Reasoning Engine
Implements task decomposition, strategy selection, and logical reasoning
"""
import re
import json
import logging
from typing import Dict, List, Tuple, Optional, Any, Set
from datetime import datetime
from collections import defaultdict, deque
import heapq
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Task:
"""
Represents a task that can be decomposed and reasoned about
"""
def __init__(self, description: str, complexity: int = 1, dependencies: List[str] = None):
self.description = description
self.complexity = complexity # 1-10 scale
self.dependencies = dependencies or []
self.subtasks = []
self.completed = False
self.created_at = datetime.now()
self.estimated_time = self._estimate_time()
def _estimate_time(self) -> float:
"""Estimate time required based on complexity and description"""
base_time = self.complexity * 5 # 5 minutes per complexity unit
# Adjust based on keywords
desc_lower = self.description.lower()
if any(word in desc_lower for word in ['research', 'analyze', 'investigate']):
base_time *= 1.5
if any(word in desc_lower for word in ['create', 'build', 'implement']):
base_time *= 2.0
if any(word in desc_lower for word in ['simple', 'quick', 'basic']):
base_time *= 0.5
return base_time
def add_subtask(self, subtask: 'Task'):
"""Add a subtask to this task"""
self.subtasks.append(subtask)
def mark_completed(self):
"""Mark this task as completed"""
self.completed = True
def to_dict(self) -> Dict[str, Any]:
"""Convert task to dictionary representation"""
return {
'description': self.description,
'complexity': self.complexity,
'dependencies': self.dependencies,
'subtasks': [st.to_dict() for st in self.subtasks],
'completed': self.completed,
'estimated_time': self.estimated_time,
'created_at': self.created_at.isoformat()
}
class ReasoningStrategy:
"""
Represents a reasoning strategy for solving problems
"""
def __init__(self, name: str, description: str, applicable_domains: List[str],
success_rate: float = 0.7, avg_time: float = 10.0):
self.name = name
self.description = description
self.applicable_domains = applicable_domains
self.success_rate = success_rate
self.avg_time = avg_time
self.usage_count = 0
self.success_count = 0
def is_applicable(self, problem_domain: str, problem_complexity: int) -> bool:
"""Check if this strategy is applicable to the given problem"""
domain_match = problem_domain in self.applicable_domains or 'general' in self.applicable_domains
# Some strategies work better for different complexity levels
if 'simple_problems' in self.applicable_domains and problem_complexity <= 3:
return True
if 'complex_problems' in self.applicable_domains and problem_complexity >= 7:
return True
return domain_match
def record_usage(self, success: bool):
"""Record usage of this strategy"""
self.usage_count += 1
if success:
self.success_count += 1
self.success_rate = self.success_count / self.usage_count
def get_effectiveness_score(self) -> float:
"""Get effectiveness score for strategy selection"""
# Combine success rate with usage experience
experience_factor = min(1.0, self.usage_count / 10) # Diminishing returns after 10 uses
return self.success_rate * (0.5 + 0.5 * experience_factor)
class TaskDecomposer:
"""
Decomposes complex tasks into manageable subtasks
"""
def __init__(self):
self.decomposition_patterns = self._load_patterns()
def _load_patterns(self) -> Dict[str, List[str]]:
"""Load task decomposition patterns"""
return {
'research': [
'Define research question',
'Identify information sources',
'Gather relevant data',
'Analyze findings',
'Synthesize conclusions'
],
'implementation': [
'Analyze requirements',
'Design solution architecture',
'Implement core functionality',
'Add error handling',
'Test implementation',
'Document solution'
],
'problem_solving': [
'Understand the problem',
'Break down into components',
'Identify potential solutions',
'Evaluate solution options',
'Implement chosen solution',
'Verify solution works'
],
'learning': [
'Assess current knowledge',
'Identify learning objectives',
'Find learning resources',
'Study materials',
'Practice concepts',
'Assess understanding'
]
}
def decompose_task(self, task_description: str, complexity: int = 5) -> Task:
"""Decompose a task into subtasks"""
# Identify task type
task_type = self._classify_task_type(task_description)
# Create main task
main_task = Task(task_description, complexity)
# Get decomposition pattern
if task_type in self.decomposition_patterns:
pattern = self.decomposition_patterns[task_type]
# Adjust pattern based on complexity
if complexity <= 3:
# Simplify for simple tasks
pattern = pattern[:3]
elif complexity >= 8:
# Expand for complex tasks
pattern.extend([
'Review and refine',
'Optimize performance',
'Prepare for deployment'
])
# Create subtasks
for i, subtask_desc in enumerate(pattern):
subtask_complexity = max(1, complexity // len(pattern))
subtask = Task(subtask_desc, subtask_complexity)
main_task.add_subtask(subtask)
else:
# Generic decomposition for unknown task types
subtasks = self._generic_decomposition(task_description, complexity)
for subtask_desc in subtasks:
subtask = Task(subtask_desc, max(1, complexity // len(subtasks)))
main_task.add_subtask(subtask)
return main_task
def _classify_task_type(self, description: str) -> str:
"""Classify the type of task"""
desc_lower = description.lower()
if any(word in desc_lower for word in ['research', 'investigate', 'analyze', 'study']):
return 'research'
elif any(word in desc_lower for word in ['implement', 'build', 'create', 'develop']):
return 'implementation'
elif any(word in desc_lower for word in ['solve', 'fix', 'resolve', 'answer']):
return 'problem_solving'
elif any(word in desc_lower for word in ['learn', 'understand', 'master', 'study']):
return 'learning'
else:
return 'general'
def _generic_decomposition(self, description: str, complexity: int) -> List[str]:
"""Generic task decomposition when no specific pattern matches"""
base_steps = [
'Plan the approach',
'Gather necessary resources',
'Execute the main work',
'Review and verify results'
]
if complexity > 5:
base_steps.insert(1, 'Break down into smaller steps')
base_steps.insert(-1, 'Test and validate')
return base_steps
class StrategySelector:
"""
Selects the best reasoning strategy for a given problem
"""
def __init__(self):
self.strategies = self._initialize_strategies()
def _initialize_strategies(self) -> List[ReasoningStrategy]:
"""Initialize available reasoning strategies"""
return [
ReasoningStrategy(
"analytical_reasoning",
"Break down problem into components and analyze systematically",
["mathematics", "science", "engineering", "general"],
success_rate=0.85,
avg_time=12.0
),
ReasoningStrategy(
"creative_problem_solving",
"Use creative thinking and brainstorming for novel solutions",
["design", "innovation", "art", "general"],
success_rate=0.75,
avg_time=15.0
),
ReasoningStrategy(
"deductive_reasoning",
"Apply logical deduction from general principles to specific cases",
["logic", "philosophy", "mathematics", "law"],
success_rate=0.80,
avg_time=10.0
),
ReasoningStrategy(
"inductive_reasoning",
"Draw general conclusions from specific observations",
["science", "research", "data_analysis"],
success_rate=0.70,
avg_time=18.0
),
ReasoningStrategy(
"case_based_reasoning",
"Solve problems by adapting solutions from similar past cases",
["medicine", "law", "customer_service", "general"],
success_rate=0.78,
avg_time=8.0
),
ReasoningStrategy(
"algorithmic_approach",
"Apply step-by-step algorithmic procedures",
["programming", "mathematics", "engineering"],
success_rate=0.90,
avg_time=6.0
),
ReasoningStrategy(
"intuitive_reasoning",
"Rely on intuition and experience for quick solutions",
["simple_problems", "general"],
success_rate=0.65,
avg_time=3.0
)
]
def select_strategy(self, problem_description: str, problem_complexity: int = 5,
time_constraint: Optional[float] = None) -> ReasoningStrategy:
"""
Select the best reasoning strategy for the given problem
Args:
problem_description: Description of the problem
problem_complexity: Complexity level (1-10)
time_constraint: Maximum time allowed (minutes)
Returns:
Selected reasoning strategy
"""
# Identify problem domain
problem_domain = self._identify_domain(problem_description)
# Filter applicable strategies
applicable_strategies = [
strategy for strategy in self.strategies
if strategy.is_applicable(problem_domain, problem_complexity)
]
if not applicable_strategies:
# Fallback to general strategies
applicable_strategies = [
strategy for strategy in self.strategies
if 'general' in strategy.applicable_domains
]
# Apply time constraint if specified
if time_constraint:
applicable_strategies = [
strategy for strategy in applicable_strategies
if strategy.avg_time <= time_constraint
]
if not applicable_strategies:
# Ultimate fallback
return self.strategies[0]
# Score strategies based on effectiveness and time
scored_strategies = []
for strategy in applicable_strategies:
effectiveness = strategy.get_effectiveness_score()
# Penalize slow strategies for complex problems under time pressure
time_penalty = 0
if time_constraint and strategy.avg_time > time_constraint * 0.8:
time_penalty = 0.2
final_score = effectiveness * (1 - time_penalty)
scored_strategies.append((final_score, strategy))
# Select highest scoring strategy
scored_strategies.sort(reverse=True)
selected_strategy = scored_strategies[0][1]
logger.info(f"Selected strategy: {selected_strategy.name} (score: {scored_strategies[0][0]:.3f})")
return selected_strategy
def _identify_domain(self, description: str) -> str:
"""Identify the problem domain from description"""
desc_lower = description.lower()
domain_keywords = {
'mathematics': ['math', 'calculate', 'equation', 'algebra', 'geometry'],
'programming': ['code', 'program', 'software', 'algorithm', 'debug'],
'science': ['experiment', 'hypothesis', 'theory', 'research', 'data'],
'engineering': ['design', 'build', 'construct', 'system', 'architecture'],
'business': ['profit', 'market', 'customer', 'strategy', 'finance'],
'medicine': ['patient', 'diagnosis', 'treatment', 'health', 'medical'],
'law': ['legal', 'contract', 'regulation', 'court', 'justice'],
'education': ['learn', 'teach', 'student', 'course', 'knowledge']
}
for domain, keywords in domain_keywords.items():
if any(keyword in desc_lower for keyword in keywords):
return domain
return 'general'
class LogicalReasoner:
"""
Performs logical reasoning and inference
"""
def __init__(self):
self.knowledge_base = defaultdict(list)
self.inference_rules = self._load_inference_rules()
def _load_inference_rules(self) -> List[Dict[str, Any]]:
"""Load basic inference rules"""
return [
{
'name': 'modus_ponens',
'description': 'If P implies Q and P is true, then Q is true',
'pattern': lambda premises: self._check_modus_ponens(premises)
},
{
'name': 'transitivity',
'description': 'If A relates to B and B relates to C, then A relates to C',
'pattern': lambda premises: self._check_transitivity(premises)
},
{
'name': 'contradiction_detection',
'description': 'Detect logical contradictions',
'pattern': lambda premises: self._check_contradiction(premises)
}
]
def add_knowledge(self, fact: str, category: str = 'general'):
"""Add a fact to the knowledge base"""
self.knowledge_base[category].append({
'fact': fact,
'added_at': datetime.now(),
'confidence': 1.0
})
def draw_inference(self, premises: List[str]) -> Dict[str, Any]:
"""Draw logical inferences from premises"""
inferences = []
# Apply inference rules
for rule in self.inference_rules:
result = rule['pattern'](premises)
if result:
inferences.append({
'rule': rule['name'],
'conclusion': result,
'confidence': 0.8 # Base confidence
})
# Look for patterns in knowledge base
kb_inferences = self._knowledge_base_inference(premises)
inferences.extend(kb_inferences)
return {
'inferences': inferences,
'premises_used': len(premises),
'total_inferences': len(inferences)
}
def _check_modus_ponens(self, premises: List[str]) -> Optional[str]:
"""Check for modus ponens pattern: If P then Q, P -> Q"""
# Simplified implementation
for premise in premises:
if 'if' in premise.lower() and 'then' in premise.lower():
# This is a conditional premise
parts = re.split(r'\s+(?:if|then)\s+', premise, flags=re.IGNORECASE)
if len(parts) >= 2:
condition = parts[0].strip()
conclusion = parts[1].strip()
# Check if condition is in other premises
for other_premise in premises:
if other_premise != premise and condition.lower() in other_premise.lower():
return conclusion
return None
def _check_transitivity(self, premises: List[str]) -> Optional[str]:
"""Check for transitivity pattern"""
# Simplified implementation - look for chains
relationships = []
for premise in premises:
# Look for patterns like "A is related to B"
match = re.search(r'(.+?)\s+(?:is|are|relates? to)\s+(.+)', premise, re.IGNORECASE)
if match:
relationships.append((match.group(1).strip(), match.group(2).strip()))
# Check for transitive chains
for i, (a1, b1) in enumerate(relationships):
for j, (a2, b2) in enumerate(relationships):
if i != j and b1.lower() == a2.lower():
return f"{a1} relates to {b2}"
return None
def _check_contradiction(self, premises: List[str]) -> Optional[str]:
"""Check for contradictions"""
statements = []
negations = []
for premise in premises:
premise_lower = premise.lower()
statements.append(premise_lower)
# Simple negation detection
if premise_lower.startswith(('not ', 'no ', 'never ')):
negations.append(premise_lower[4:])
elif 'not' in premise_lower:
# Split on 'not'
parts = premise_lower.split('not', 1)
if len(parts) == 2:
negations.append(parts[0].strip() + parts[1].strip())
# Check for contradictions
for stmt in statements:
for neg in negations:
if self._are_contradictory(stmt, neg):
return f"Contradiction detected: '{stmt}' vs 'not {neg}'"
return None
def _are_contradictory(self, stmt1: str, stmt2: str) -> bool:
"""Check if two statements are contradictory"""
# Very simplified contradiction detection
return stmt1.strip() == stmt2.strip()
def _knowledge_base_inference(self, premises: List[str]) -> List[Dict[str, Any]]:
"""Draw inferences using knowledge base"""
inferences = []
for category, facts in self.knowledge_base.items():
for fact in facts:
fact_text = fact['fact']
# Check if any premise relates to this fact
for premise in premises:
if self._texts_related(premise, fact_text):
inferences.append({
'rule': 'knowledge_base_similarity',
'conclusion': f"Related knowledge: {fact_text}",
'confidence': fact['confidence'] * 0.7,
'category': category
})
return inferences
def _texts_related(self, text1: str, text2: str) -> bool:
"""Check if two texts are related (simplified)"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
# Check for word overlap
overlap = len(words1.intersection(words2))
return overlap >= 2 # At least 2 words in common
class AdvancedReasoningEngine:
"""
Main advanced reasoning engine combining all reasoning capabilities
"""
def __init__(self):
self.task_decomposer = TaskDecomposer()
self.strategy_selector = StrategySelector()
self.logical_reasoner = LogicalReasoner()
# Reasoning history
self.reasoning_history = deque(maxlen=1000)
def reason_about_task(self, task_description: str,
complexity: int = 5,
time_constraint: Optional[float] = None) -> Dict[str, Any]:
"""
Perform comprehensive reasoning about a task
Args:
task_description: Description of the task
complexity: Task complexity (1-10)
time_constraint: Time limit in minutes
Returns:
Reasoning results
"""
reasoning_start = datetime.now()
# 1. Decompose the task
decomposed_task = self.task_decomposer.decompose_task(task_description, complexity)
# 2. Select reasoning strategy
strategy = self.strategy_selector.select_strategy(
task_description, complexity, time_constraint
)
# 3. Perform logical reasoning
task_premises = [task_description]
if decomposed_task.subtasks:
task_premises.extend([st.description for st in decomposed_task.subtasks])
logical_inferences = self.logical_reasoner.draw_inference(task_premises)
# 4. Generate execution plan
execution_plan = self._generate_execution_plan(decomposed_task, strategy)
# 5. Assess feasibility
feasibility = self._assess_feasibility(decomposed_task, time_constraint)
reasoning_result = {
'task_analysis': {
'original_task': task_description,
'complexity': complexity,
'estimated_total_time': decomposed_task.estimated_time,
'subtasks_count': len(decomposed_task.subtasks)
},
'decomposition': decomposed_task.to_dict(),
'selected_strategy': {
'name': strategy.name,
'description': strategy.description,
'expected_success_rate': strategy.success_rate,
'estimated_time': strategy.avg_time
},
'logical_inferences': logical_inferences,
'execution_plan': execution_plan,
'feasibility_assessment': feasibility,
'reasoning_time': (datetime.now() - reasoning_start).total_seconds()
}
# Record in history
self.reasoning_history.append({
'timestamp': datetime.now().isoformat(),
'task': task_description,
'result': reasoning_result
})
return reasoning_result
def _generate_execution_plan(self, task: Task, strategy: ReasoningStrategy) -> List[Dict[str, Any]]:
"""Generate a step-by-step execution plan"""
plan = []
# Add strategy-specific preparation steps
if strategy.name == 'analytical_reasoning':
plan.append({
'step': 'analysis',
'description': 'Analyze task components systematically',
'estimated_time': 5.0
})
elif strategy.name == 'creative_problem_solving':
plan.append({
'step': 'brainstorming',
'description': 'Generate multiple solution approaches',
'estimated_time': 10.0
})
# Add task subtasks
for i, subtask in enumerate(task.subtasks):
plan.append({
'step': f'subtask_{i+1}',
'description': subtask.description,
'estimated_time': subtask.estimated_time,
'dependencies': subtask.dependencies
})
# Add verification step
plan.append({
'step': 'verification',
'description': 'Verify solution meets requirements',
'estimated_time': 3.0
})
return plan
def _assess_feasibility(self, task: Task, time_constraint: Optional[float]) -> Dict[str, Any]:
"""Assess the feasibility of completing the task"""
total_estimated_time = task.estimated_time
if time_constraint and total_estimated_time > time_constraint:
return {
'feasible': False,
'reason': f'Estimated time ({total_estimated_time:.1f} min) exceeds constraint ({time_constraint} min)',
'recommendation': 'Break task into smaller parts or extend time limit'
}
# Assess based on complexity and subtasks
complexity_score = task.complexity / 10.0
subtask_score = len(task.subtasks) / 10.0 # Normalize
feasibility_score = 1.0 - (complexity_score * 0.4 + subtask_score * 0.3)
if feasibility_score > 0.7:
assessment = 'highly_feasible'
elif feasibility_score > 0.4:
assessment = 'moderately_feasible'
else:
assessment = 'challenging'
return {
'feasible': feasibility_score > 0.3,
'assessment': assessment,
'feasibility_score': feasibility_score,
'estimated_time': total_estimated_time
}
def learn_from_experience(self, task_description: str, success: bool, actual_time: float):
"""Learn from task execution experience"""
# Find the reasoning result for this task
for record in reversed(self.reasoning_history):
if record['task'] == task_description:
reasoning_result = record['result']
strategy_name = reasoning_result['selected_strategy']['name']
# Find the strategy and update its performance
for strategy in self.strategy_selector.strategies:
if strategy.name == strategy_name:
strategy.record_usage(success)
# Update time estimate
old_time = strategy.avg_time
strategy.avg_time = (old_time + actual_time) / 2 # Simple averaging
logger.info(f"Updated strategy {strategy_name}: success_rate={strategy.success_rate:.3f}, avg_time={strategy.avg_time:.1f}")
break
break
def get_reasoning_statistics(self) -> Dict[str, Any]:
"""Get statistics about reasoning performance"""
if not self.reasoning_history:
return {'total_reasoning_sessions': 0}
total_sessions = len(self.reasoning_history)
recent_sessions = list(self.reasoning_history)[-50:] # Last 50 sessions
# Calculate average reasoning time
reasoning_times = [r['result']['reasoning_time'] for r in recent_sessions]
avg_reasoning_time = sum(reasoning_times) / len(reasoning_times)
# Strategy usage statistics
strategy_usage = defaultdict(int)
for record in recent_sessions:
strategy_name = record['result']['selected_strategy']['name']
strategy_usage[strategy_name] += 1
return {
'total_reasoning_sessions': total_sessions,
'average_reasoning_time': avg_reasoning_time,
'strategy_usage': dict(strategy_usage),
'most_used_strategy': max(strategy_usage.items(), key=lambda x: x[1])[0] if strategy_usage else None
}
# Example usage
if __name__ == "__main__":
print("SOFIA Advanced Reasoning Engine")
print("This system provides task decomposition, strategy selection, and logical reasoning")
# Example usage would be integrated with SOFIA
"""
from sofia_reasoning import AdvancedReasoningEngine
reasoner = AdvancedReasoningEngine()
# Reason about a complex task
result = reasoner.reason_about_task(
"Implement a machine learning model for image classification",
complexity=8,
time_constraint=120 # 2 hours
)
print("Reasoning result:", result)
# Learn from execution
reasoner.learn_from_experience(
"Implement a machine learning model for image classification",
success=True,
actual_time=95.0
)
"""
|