Spaces:
Sleeping
Sleeping
| import torch | |
| import json | |
| from agent.agentic_core import AgenticTokens | |
| class FailureAwareOrchestrator: | |
| """ | |
| Stage 2 of FAMA: Orchestration & Targeted Mitigation. | |
| Selects specialized 'sub-agents' based on the failure trajectory. | |
| """ | |
| def __init__(self): | |
| self.specialized_agents = { | |
| "syntax_error": "Sub-Agent [Syntax Expert]: The previous tool call failed due to JSON formatting. Ensure all keys are double-quoted and brackets are balanced.", | |
| "api_timeout": "Sub-Agent [Network Expert]: The API timed out. Try reducing the payload size or calling a fallback endpoint.", | |
| "hallucination": "Sub-Agent [Grounding Expert]: The requested tool does not exist. Please scan the workspace via <|discover|> first.", | |
| "permission_denied": "Sub-Agent [Security Expert]: Access to this resource is restricted. Request elevated tokens or check local.env configuration.", | |
| "type_mismatch": "Sub-Agent [Data Expert]: The tool received an invalid argument type. Ensure dates are in ISO-8601 and numbers are floats." | |
| } | |
| def mitigate_failure(self, failure_type): | |
| """Returns targeted context to mitigate the specific failure.""" | |
| return self.specialized_agents.get(failure_type, "Sub-Agent [General]: Review the previous failure and adjust your approach.") | |
| from agent.mcp_connector import MCPConnector | |
| class RecursiveAgenticLoop: | |
| """ | |
| Implements the 'Tiny Recursive Model' reasoning loop with FAMA | |
| (Failure-Aware Meta-Agentic) and real MCP integration. | |
| """ | |
| def __init__(self, model, tokenizer, discovery_protocol=None, max_recursion=4, demo_mode=True): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.mcp = MCPConnector() | |
| self.mcp.load_config() | |
| self.discovery_available = True | |
| self.max_recursion = max_recursion | |
| self.demo_mode = demo_mode | |
| self.orchestrator = FailureAwareOrchestrator() | |
| self.failure_trajectories = [] | |
| def execute_real_tool(self, action_payload): | |
| """Attempts to parse and execute a real MCP tool call.""" | |
| try: | |
| # Simple parser for <|action|> tool_name {"arg": "val"} | |
| parts = action_payload.split(AgenticTokens.ACTION) | |
| if len(parts) < 2: return {"status": "failed", "type": "syntax_error", "msg": "Missing action marker."} | |
| cmd = parts[1].strip().split(" ", 1) | |
| tool_name = cmd[0] | |
| args = json.loads(cmd[1]) if len(cmd) > 1 else {} | |
| return self.mcp.execute_tool(tool_name, args) | |
| except Exception as e: | |
| return {"status": "failed", "type": "syntax_error", "msg": f"Parser error: {str(e)}"} | |
| def generate_with_reasoning(self, prompt, max_new_tokens=100): | |
| # 1. Initial reasoning / draft with Goal Token | |
| thought_prompt = f"{AgenticTokens.GOAL} User: {prompt}\n{AgenticTokens.THOUGHT}" | |
| input_ids = self.tokenizer.encode(thought_prompt, return_tensors='pt') | |
| thought = "" | |
| for i in range(self.max_recursion): | |
| # Generate next 'thought' | |
| if hasattr(self.model, 'generate') and not self.demo_mode: | |
| output = self.model.generate( | |
| input_ids, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, # Force Greedy Search | |
| top_k=1 # Pick only the #1 top probability token | |
| ) | |
| thought = self.tokenizer.decode(output[0]) | |
| else: | |
| # Dynamic stub for structural demonstration (multi-step) | |
| if i == 0 and any(kw in prompt.lower() for kw in ["scan", "discover", "apps", "check"]): | |
| thought = f"{AgenticTokens.THOUGHT} Scanning workspace for MCP apps. {AgenticTokens.DISCOVER}" | |
| elif "sqlite" in prompt.lower(): | |
| thought = f"{AgenticTokens.THOUGHT} Reading from database. {AgenticTokens.ACTION} read_query {{\"sql\": \"SELECT * FROM users\"}}" | |
| else: | |
| thought = f"{AgenticTokens.THOUGHT} I should use a tool here. {AgenticTokens.ACTION} unknown_tool" | |
| # Recursive check: If model wants to DISCOVER new tools | |
| if AgenticTokens.DISCOVER in thought: | |
| print(f"Action: Scanning for real MCP endpoints...", flush=True) | |
| tools = self.mcp.discover_tools() | |
| if tools: | |
| print(f"Discovery Result: Found MCP tools: {tools}", flush=True) | |
| if self.demo_mode: break | |
| else: | |
| print("No real MCP servers found. Falling back to simulation.", flush=True) | |
| # FAMA Implementation: Action Execution & Failure Detection | |
| if AgenticTokens.ACTION in thought: | |
| print(f"Executing action... (Iteration {i})", flush=True) | |
| execution_result = self.execute_real_tool(thought) | |
| if execution_result.get("status") == "failed" or execution_result.get("status") == "error": | |
| failure_type = execution_result.get("type", "hallucination") | |
| print(f"[FAMA Stage 1] Failure detected: {failure_type}", flush=True) | |
| self.failure_trajectories.append({ | |
| "iteration": i, | |
| "thought": thought, | |
| "error": execution_result.get("msg", "Unknown error") | |
| }) | |
| # FAMA Stage 2: Orchestration & Mitigation | |
| mitigation_context = self.orchestrator.mitigate_failure(failure_type) | |
| print(f"[FAMA Stage 2] Injecting mitigation: {mitigation_context}", flush=True) | |
| # Refine thought with targeted failure context | |
| refinement_prompt = f"{thought}\n{AgenticTokens.OBSERVE} Tool Failed: {execution_result.get('msg', 'Error')}\n{mitigation_context}\n<refinement_{i+1}>" | |
| input_ids = self.tokenizer.encode(refinement_prompt, return_tensors='pt') | |
| continue | |
| else: | |
| print(f"Action Succeeded: {execution_result['output']}") | |
| break | |
| if i == self.max_recursion - 1: | |
| break | |
| input_ids = self.tokenizer.encode(f"{thought}\n<refinement_{i+1}>", return_tensors='pt') | |
| return "Final Answer generated via live MCP tool orchestration." | |
| def test_recursive_loop(_model=None, _tokenizer=None): | |
| import tiktoken | |
| # Use cl100k_base (standard for gpt-4 style tokens) | |
| tokenizer = tiktoken.get_encoding("cl100k_base") | |
| class TiktokenWrapper: | |
| def __init__(self, enc): | |
| self.enc = enc | |
| def encode(self, text, **kwargs): | |
| # return as list for simplicity, or tensor if specified | |
| tokens = self.enc.encode(text) | |
| if kwargs.get('return_tensors') == 'pt': | |
| return torch.tensor([tokens]) | |
| return tokens | |
| def decode(self, ids): | |
| if isinstance(ids, torch.Tensor): | |
| ids = ids.tolist()[0] | |
| return self.enc.decode(ids) | |
| agent = RecursiveAgenticLoop(None, TiktokenWrapper(tokenizer)) | |
| response = agent.generate_with_reasoning("Calculate the trajectory using the elevated tool.") | |
| print(response) | |