Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Simplified Demonstration of Advanced AI Agent Architecture | |
| Shows core concepts without external dependencies | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Any, Callable | |
| from collections import defaultdict, deque | |
| import numpy as np | |
| import hashlib | |
| import json | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # ============================= | |
| # Core Data Structures | |
| # ============================= | |
| class AgentState: | |
| """Represents the current state of an agent""" | |
| name: str | |
| data: Dict[str, Any] = field(default_factory=dict) | |
| confidence: float = 1.0 | |
| timestamp: float = field(default_factory=time.time) | |
| class Transition: | |
| """Represents a state transition""" | |
| from_state: str | |
| to_state: str | |
| condition: Callable | |
| probability: float = 1.0 | |
| action: Optional[Callable] = None | |
| class ReasoningStep: | |
| """Represents a step in chain of thought reasoning""" | |
| step_id: int | |
| thought: str | |
| action: Optional[str] = None | |
| observation: Optional[str] = None | |
| confidence: float = 1.0 | |
| # ============================= | |
| # Enhanced FSM Implementation | |
| # ============================= | |
| class ProbabilisticFSM: | |
| """Enhanced FSM with probabilistic transitions and learning capabilities""" | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.states: Dict[str, AgentState] = {} | |
| self.transitions: List[Transition] = [] | |
| self.current_state: Optional[AgentState] = None | |
| self.state_history: deque = deque(maxlen=100) | |
| self.transition_history: deque = deque(maxlen=1000) | |
| self.learned_transitions: Dict[tuple, float] = {} | |
| def add_state(self, state: AgentState): | |
| """Add a state to the FSM""" | |
| self.states[state.name] = state | |
| def add_transition(self, transition: Transition): | |
| """Add a transition between states""" | |
| self.transitions.append(transition) | |
| def set_initial_state(self, state_name: str): | |
| """Set the initial state""" | |
| if state_name not in self.states: | |
| raise ValueError(f"State {state_name} not found") | |
| self.current_state = self.states[state_name] | |
| self.state_history.append(self.current_state) | |
| def evaluate_transitions(self) -> List[tuple]: | |
| """Evaluate all possible transitions from current state""" | |
| if not self.current_state: | |
| return [] | |
| possible_transitions = [] | |
| for transition in self.transitions: | |
| if transition.from_state == self.current_state.name: | |
| if transition.condition(self.current_state): | |
| # Apply learned probabilities | |
| key = (transition.from_state, transition.to_state) | |
| learned_prob = self.learned_transitions.get(key, transition.probability) | |
| final_prob = 0.7 * transition.probability + 0.3 * learned_prob | |
| possible_transitions.append((transition, final_prob)) | |
| return sorted(possible_transitions, key=lambda x: x[1], reverse=True) | |
| def execute_transition(self, transition: Transition): | |
| """Execute a state transition""" | |
| if transition.action: | |
| transition.action(self.current_state) | |
| self.current_state = self.states[transition.to_state] | |
| self.state_history.append(self.current_state) | |
| self.transition_history.append((transition, time.time())) | |
| # Update learned probabilities | |
| self.update_learning(transition) | |
| def update_learning(self, transition: Transition): | |
| """Update learned transition probabilities based on success""" | |
| key = (transition.from_state, transition.to_state) | |
| current = self.learned_transitions.get(key, transition.probability) | |
| # Simple exponential moving average | |
| self.learned_transitions[key] = 0.9 * current + 0.1 | |
| def step(self) -> bool: | |
| """Execute one step of the FSM""" | |
| transitions = self.evaluate_transitions() | |
| if not transitions: | |
| return False | |
| # Probabilistic selection | |
| if len(transitions) == 1: | |
| selected = transitions[0][0] | |
| else: | |
| probs = [t[1] for t in transitions] | |
| probs = np.array(probs) / sum(probs) | |
| idx = np.random.choice(len(transitions), p=probs) | |
| selected = transitions[idx][0] | |
| self.execute_transition(selected) | |
| return True | |
| # ============================= | |
| # Chain of Thought Implementation | |
| # ============================= | |
| class ChainOfThought: | |
| """Optimized CoT with adaptive depth and caching""" | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.reasoning_cache: Dict[str, List[ReasoningStep]] = {} | |
| self.complexity_analyzer = ComplexityAnalyzer() | |
| self.template_library = TemplateLibrary() | |
| def analyze_complexity(self, query: str) -> float: | |
| """Analyze query complexity to determine reasoning depth""" | |
| return self.complexity_analyzer.analyze(query) | |
| def get_cached_reasoning(self, query: str) -> Optional[List[ReasoningStep]]: | |
| """Check if we have cached reasoning for similar queries""" | |
| query_hash = hashlib.md5(query.encode()).hexdigest() | |
| return self.reasoning_cache.get(query_hash) | |
| def reason(self, query: str, max_depth: Optional[int] = None) -> List[ReasoningStep]: | |
| """Execute chain of thought reasoning""" | |
| # Check cache first | |
| cached = self.get_cached_reasoning(query) | |
| if cached: | |
| logger.info("Using cached reasoning") | |
| return cached | |
| # Determine reasoning depth based on complexity | |
| complexity = self.analyze_complexity(query) | |
| if max_depth is None: | |
| max_depth = min(int(complexity * 10), 20) | |
| # Select appropriate template | |
| template = self.template_library.select_template(query) | |
| # Execute reasoning | |
| steps = self.execute_reasoning(query, template, max_depth) | |
| # Cache successful reasoning | |
| if steps and steps[-1].confidence > 0.8: | |
| query_hash = hashlib.md5(query.encode()).hexdigest() | |
| self.reasoning_cache[query_hash] = steps | |
| return steps | |
| def execute_reasoning(self, query: str, template: str, | |
| max_depth: int) -> List[ReasoningStep]: | |
| """Execute the actual reasoning process""" | |
| steps = [] | |
| current_thought = query | |
| for i in range(max_depth): | |
| # Generate next thought based on template | |
| next_thought = f"{template} Step {i+1}: Analyzing '{current_thought}'" | |
| step = ReasoningStep( | |
| step_id=i, | |
| thought=next_thought, | |
| confidence=0.9 - (i * 0.05) # Decreasing confidence with depth | |
| ) | |
| steps.append(step) | |
| # Check if we've reached a conclusion | |
| if "therefore" in next_thought.lower() or i == max_depth - 1: | |
| break | |
| current_thought = next_thought | |
| return steps | |
| class ComplexityAnalyzer: | |
| """Analyzes query complexity""" | |
| def analyze(self, query: str) -> float: | |
| """Return complexity score between 0 and 1""" | |
| # Simplified implementation | |
| factors = { | |
| 'length': len(query) / 500, | |
| 'questions': query.count('?') / 5, | |
| 'conjunctions': sum(query.count(w) for w in ['and', 'or', 'but']) / 10, | |
| 'technical_terms': sum(query.count(w) for w in ['calculate', 'analyze', 'evaluate']) / 5 | |
| } | |
| complexity = min(sum(factors.values()) / len(factors), 1.0) | |
| return complexity | |
| class TemplateLibrary: | |
| """Library of reasoning templates""" | |
| def __init__(self): | |
| self.templates = { | |
| 'mathematical': "Let me solve this step by step using mathematical principles.", | |
| 'analytical': "I'll analyze this by breaking it down into components.", | |
| 'comparative': "I'll compare and contrast the different aspects.", | |
| 'default': "Let me think through this systematically." | |
| } | |
| def select_template(self, query: str) -> str: | |
| """Select appropriate template based on query""" | |
| query_lower = query.lower() | |
| if any(word in query_lower for word in ['calculate', 'solve', 'compute']): | |
| return self.templates['mathematical'] | |
| elif any(word in query_lower for word in ['analyze', 'examine', 'investigate']): | |
| return self.templates['analytical'] | |
| elif any(word in query_lower for word in ['compare', 'contrast', 'difference']): | |
| return self.templates['comparative'] | |
| else: | |
| return self.templates['default'] | |
| # ============================= | |
| # Simple Tool System | |
| # ============================= | |
| class SimpleTool: | |
| """Simple tool implementation""" | |
| def __init__(self, name: str, function: Callable, description: str): | |
| self.name = name | |
| self.function = function | |
| self.description = description | |
| def run(self, **kwargs): | |
| return self.function(**kwargs) | |
| class DemoTools: | |
| """Demo tools for testing""" | |
| def calculator_tool(expression: str) -> dict: | |
| """Simple calculator tool""" | |
| try: | |
| result = eval(expression) # Note: eval is unsafe in production | |
| return {"result": result, "expression": expression} | |
| except Exception as e: | |
| return {"error": str(e), "expression": expression} | |
| def text_analyzer_tool(text: str) -> dict: | |
| """Simple text analysis tool""" | |
| words = text.split() | |
| return { | |
| "word_count": len(words), | |
| "char_count": len(text), | |
| "avg_word_length": sum(len(word) for word in words) / len(words) if words else 0 | |
| } | |
| def mock_search_tool(query: str) -> dict: | |
| """Mock search tool""" | |
| return { | |
| "results": [ | |
| f"Result 1 for: {query}", | |
| f"Result 2 for: {query}", | |
| f"Result 3 for: {query}" | |
| ], | |
| "query": query | |
| } | |
| # ============================= | |
| # ReAct Implementation | |
| # ============================= | |
| class ReActAgent: | |
| """Simple ReAct agent implementation""" | |
| def __init__(self, name: str, tools: List[SimpleTool], max_steps: int = 10): | |
| self.name = name | |
| self.tools = {tool.name: tool for tool in tools} | |
| self.max_steps = max_steps | |
| self.reasoning_history: List[ReasoningStep] = [] | |
| self.tool_usage_stats: Dict[str, int] = defaultdict(int) | |
| def think(self, observation: str, context: Dict[str, Any]) -> str: | |
| """Generate a thought based on observation""" | |
| thought = f"Analyzing: {observation}. Context indicates we need to use available tools." | |
| return thought | |
| def act(self, thought: str, context: Dict[str, Any]) -> tuple: | |
| """Decide on an action based on thought""" | |
| available_tools = list(self.tools.keys()) | |
| if available_tools: | |
| selected_tool = available_tools[0] # Simple selection | |
| self.tool_usage_stats[selected_tool] += 1 | |
| return selected_tool, {} | |
| return "no_action", None | |
| def execute_tool(self, tool_name: str, args: Dict[str, Any]) -> str: | |
| """Execute a tool and return observation""" | |
| if tool_name not in self.tools: | |
| return f"Tool {tool_name} not found" | |
| tool = self.tools[tool_name] | |
| try: | |
| result = tool.run(**args) | |
| return json.dumps(result) | |
| except Exception as e: | |
| return f"Error executing tool: {str(e)}" | |
| async def reasoning_path(self, query: str, context: Dict[str, Any]) -> List[ReasoningStep]: | |
| """Execute a single reasoning path""" | |
| steps = [] | |
| observation = query | |
| for step_num in range(self.max_steps): | |
| # Think | |
| thought = self.think(observation, context) | |
| # Act | |
| action, args = self.act(thought, context) | |
| # Execute | |
| if action != "no_action": | |
| observation = self.execute_tool(action, args or {}) | |
| else: | |
| observation = "No action taken" | |
| step = ReasoningStep( | |
| step_id=step_num, | |
| thought=thought, | |
| action=action, | |
| observation=observation, | |
| confidence=0.8 + 0.2 * np.random.random() # Simulated confidence | |
| ) | |
| steps.append(step) | |
| # Check if we have a final answer | |
| if "final_answer" in observation.lower(): | |
| break | |
| return steps | |
| # ============================= | |
| # Unified Hybrid Agent | |
| # ============================= | |
| class HybridAgent: | |
| """Unified agent combining FSM, ReAct, and CoT""" | |
| def __init__(self, name: str, tools: List[SimpleTool] = None): | |
| self.name = name | |
| self.fsm = ProbabilisticFSM(f"{name}_fsm") | |
| self.react = ReActAgent(f"{name}_react", tools or []) | |
| self.cot = ChainOfThought(f"{name}_cot") | |
| self.current_mode = "fsm" | |
| self.mode_performance: Dict[str, float] = { | |
| "fsm": 0.5, | |
| "react": 0.5, | |
| "cot": 0.5 | |
| } | |
| def select_mode(self, task: Dict[str, Any]) -> str: | |
| """Select the best mode for the current task""" | |
| task_type = task.get("type", "unknown") | |
| # Simple heuristics for mode selection | |
| if task_type == "navigation" or task_type == "state_based": | |
| return "fsm" | |
| elif task_type == "tool_use" or task_type == "external_interaction": | |
| return "react" | |
| elif task_type == "reasoning" or task_type == "analysis": | |
| return "cot" | |
| else: | |
| # Select based on past performance | |
| return max(self.mode_performance.items(), key=lambda x: x[1])[0] | |
| async def execute_task(self, task: Dict[str, Any]) -> Any: | |
| """Execute a task using the appropriate mode""" | |
| mode = self.select_mode(task) | |
| self.current_mode = mode | |
| start_time = time.time() | |
| result = None | |
| success = False | |
| try: | |
| if mode == "fsm": | |
| result = await self.execute_fsm_task(task) | |
| elif mode == "react": | |
| result = await self.execute_react_task(task) | |
| elif mode == "cot": | |
| result = await self.execute_cot_task(task) | |
| success = result is not None | |
| except Exception as e: | |
| logger.error(f"Error executing task in {mode} mode: {str(e)}") | |
| # Update performance metrics | |
| execution_time = time.time() - start_time | |
| self.update_performance(mode, success, execution_time) | |
| return result | |
| async def execute_fsm_task(self, task: Dict[str, Any]) -> Any: | |
| """Execute task using FSM""" | |
| # Setup FSM for the task | |
| states = task.get("states", []) | |
| for state_data in states: | |
| state = AgentState(name=state_data["name"], data=state_data.get("data", {})) | |
| self.fsm.add_state(state) | |
| # Run FSM | |
| self.fsm.set_initial_state(states[0]["name"]) | |
| while self.fsm.step(): | |
| await asyncio.sleep(0.1) # Simulate processing | |
| return {"final_state": self.fsm.current_state.name} | |
| async def execute_react_task(self, task: Dict[str, Any]) -> Any: | |
| """Execute task using ReAct""" | |
| query = task.get("query", "") | |
| context = task.get("context", {}) | |
| # Execute reasoning path | |
| steps = await self.react.reasoning_path(query, context) | |
| return {"reasoning_path": steps} | |
| async def execute_cot_task(self, task: Dict[str, Any]) -> Any: | |
| """Execute task using Chain of Thought""" | |
| query = task.get("query", "") | |
| # Execute reasoning | |
| steps = self.cot.reason(query) | |
| return {"reasoning_steps": steps} | |
| def update_performance(self, mode: str, success: bool, execution_time: float): | |
| """Update performance metrics for mode selection""" | |
| # Simple exponential moving average | |
| alpha = 0.1 | |
| performance_score = (1.0 if success else 0.0) * (1.0 / (1.0 + execution_time)) | |
| self.mode_performance[mode] = ( | |
| (1 - alpha) * self.mode_performance[mode] + | |
| alpha * performance_score | |
| ) | |
| # ============================= | |
| # Demonstration Functions | |
| # ============================= | |
| async def demo_basic_hybrid_agent(): | |
| """Demonstrate basic hybrid agent functionality""" | |
| print("\n" + "="*60) | |
| print("DEMO 1: Basic Hybrid Agent") | |
| print("="*60) | |
| # Create demo tools | |
| tools = [ | |
| SimpleTool("calculator", DemoTools.calculator_tool, "Perform mathematical calculations"), | |
| SimpleTool("text_analyzer", DemoTools.text_analyzer_tool, "Analyze text statistics"), | |
| SimpleTool("search", DemoTools.mock_search_tool, "Search for information") | |
| ] | |
| # Create hybrid agent | |
| agent = HybridAgent("demo_agent", tools) | |
| # Test different task types | |
| tasks = [ | |
| { | |
| "type": "reasoning", | |
| "query": "What is the complexity of analyzing hybrid AI architectures?" | |
| }, | |
| { | |
| "type": "tool_use", | |
| "query": "Calculate 15 * 23 and analyze the text 'Hello World'", | |
| "context": {"require_calculation": True, "require_analysis": True} | |
| }, | |
| { | |
| "type": "state_based", | |
| "states": [ | |
| {"name": "start", "data": {"step": 1}}, | |
| {"name": "process", "data": {"step": 2}}, | |
| {"name": "complete", "data": {"step": 3}} | |
| ] | |
| } | |
| ] | |
| for i, task in enumerate(tasks, 1): | |
| print(f"\n--- Task {i}: {task['type']} ---") | |
| print(f"Query: {task.get('query', 'State-based task')}") | |
| result = await agent.execute_task(task) | |
| print(f"Mode used: {agent.current_mode}") | |
| print(f"Result: {result}") | |
| # Show performance metrics | |
| print(f"Performance by mode: {agent.mode_performance}") | |
| async def demo_fsm_learning(): | |
| """Demonstrate FSM learning capabilities""" | |
| print("\n" + "="*60) | |
| print("DEMO 2: FSM Learning") | |
| print("="*60) | |
| # Create a probabilistic FSM | |
| fsm = ProbabilisticFSM("learning_fsm") | |
| # Define states | |
| states = [ | |
| AgentState("idle", {"energy": 100}), | |
| AgentState("working", {"task": None}), | |
| AgentState("resting", {"recovery": 0}), | |
| AgentState("completed", {"result": None}) | |
| ] | |
| for state in states: | |
| fsm.add_state(state) | |
| # Define transitions with conditions | |
| def has_energy(state): | |
| return state.data.get("energy", 0) > 30 | |
| def is_tired(state): | |
| return state.data.get("energy", 0) < 50 | |
| def work_complete(state): | |
| return state.data.get("task") == "done" | |
| def rested(state): | |
| return state.data.get("recovery", 0) > 5 | |
| transitions = [ | |
| Transition("idle", "working", has_energy, probability=0.8), | |
| Transition("idle", "resting", is_tired, probability=0.2), | |
| Transition("working", "completed", work_complete, probability=0.9), | |
| Transition("working", "resting", is_tired, probability=0.1), | |
| Transition("resting", "idle", rested, probability=0.7), | |
| Transition("completed", "idle", lambda s: True, probability=1.0) | |
| ] | |
| for transition in transitions: | |
| fsm.add_transition(transition) | |
| # Set initial state | |
| fsm.set_initial_state("idle") | |
| print("Running FSM with learning...") | |
| print("Initial state:", fsm.current_state.name) | |
| # Run FSM for several steps | |
| for step in range(10): | |
| if fsm.step(): | |
| print(f"Step {step + 1}: {fsm.current_state.name} (energy: {fsm.current_state.data.get('energy', 0)})") | |
| # Simulate state changes | |
| if fsm.current_state.name == "working": | |
| fsm.current_state.data["energy"] = max(0, fsm.current_state.data.get("energy", 100) - 20) | |
| fsm.current_state.data["task"] = "done" if step > 5 else "in_progress" | |
| elif fsm.current_state.name == "resting": | |
| fsm.current_state.data["energy"] = min(100, fsm.current_state.data.get("energy", 0) + 30) | |
| fsm.current_state.data["recovery"] = fsm.current_state.data.get("recovery", 0) + 1 | |
| elif fsm.current_state.name == "idle": | |
| fsm.current_state.data["energy"] = min(100, fsm.current_state.data.get("energy", 0) + 10) | |
| else: | |
| print(f"Step {step + 1}: No valid transitions") | |
| break | |
| print(f"\nLearned transition probabilities: {fsm.learned_transitions}") | |
| async def demo_chain_of_thought(): | |
| """Demonstrate Chain of Thought reasoning""" | |
| print("\n" + "="*60) | |
| print("DEMO 3: Chain of Thought Reasoning") | |
| print("="*60) | |
| # Create CoT components | |
| cot = ChainOfThought("demo_cot") | |
| complexity_analyzer = ComplexityAnalyzer() | |
| template_library = TemplateLibrary() | |
| # Test queries of different complexity | |
| queries = [ | |
| "What is 2+2?", | |
| "Analyze the benefits of using hybrid AI architectures in modern applications", | |
| "Compare and contrast different approaches to multi-agent systems", | |
| "Calculate the complexity of implementing a hierarchical FSM with learning capabilities" | |
| ] | |
| for query in queries: | |
| print(f"\n--- Query: {query} ---") | |
| # Analyze complexity | |
| complexity = complexity_analyzer.analyze(query) | |
| print(f"Complexity score: {complexity:.3f}") | |
| # Select template | |
| template = template_library.select_template(query) | |
| print(f"Selected template: {template}") | |
| # Execute reasoning | |
| steps = cot.reason(query) | |
| print(f"Reasoning steps: {len(steps)}") | |
| for i, step in enumerate(steps[:3]): # Show first 3 steps | |
| print(f" Step {i+1}: {step.thought[:80]}... (confidence: {step.confidence:.2f})") | |
| async def main(): | |
| """Run all demonstrations""" | |
| print("Advanced AI Agent Architecture - Simplified Demonstration") | |
| print("=" * 60) | |
| try: | |
| # Run all demos | |
| await demo_basic_hybrid_agent() | |
| await demo_fsm_learning() | |
| await demo_chain_of_thought() | |
| print("\n" + "="*60) | |
| print("All demonstrations completed successfully!") | |
| print("="*60) | |
| except Exception as e: | |
| logger.error(f"Error in demonstration: {str(e)}") | |
| print(f"Error: {str(e)}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |