import torch import json from agent.agentic_core import AgenticTokens class FailureAwareOrchestrator: """ Stage 2 of FAMA: Orchestration & Targeted Mitigation. Selects specialized 'sub-agents' based on the failure trajectory. """ def __init__(self): self.specialized_agents = { "syntax_error": "Sub-Agent [Syntax Expert]: The previous tool call failed due to JSON formatting. Ensure all keys are double-quoted and brackets are balanced.", "api_timeout": "Sub-Agent [Network Expert]: The API timed out. Try reducing the payload size or calling a fallback endpoint.", "hallucination": "Sub-Agent [Grounding Expert]: The requested tool does not exist. Please scan the workspace via <|discover|> first.", "permission_denied": "Sub-Agent [Security Expert]: Access to this resource is restricted. Request elevated tokens or check local.env configuration.", "type_mismatch": "Sub-Agent [Data Expert]: The tool received an invalid argument type. Ensure dates are in ISO-8601 and numbers are floats." } def mitigate_failure(self, failure_type): """Returns targeted context to mitigate the specific failure.""" return self.specialized_agents.get(failure_type, "Sub-Agent [General]: Review the previous failure and adjust your approach.") from agent.mcp_connector import MCPConnector class RecursiveAgenticLoop: """ Implements the 'Tiny Recursive Model' reasoning loop with FAMA (Failure-Aware Meta-Agentic) and real MCP integration. """ def __init__(self, model, tokenizer, discovery_protocol=None, max_recursion=4, demo_mode=True): self.model = model self.tokenizer = tokenizer self.mcp = MCPConnector() self.mcp.load_config() self.discovery_available = True self.max_recursion = max_recursion self.demo_mode = demo_mode self.orchestrator = FailureAwareOrchestrator() self.failure_trajectories = [] def execute_real_tool(self, action_payload): """Attempts to parse and execute a real MCP tool call.""" try: # Simple parser for <|action|> tool_name {"arg": "val"} parts = action_payload.split(AgenticTokens.ACTION) if len(parts) < 2: return {"status": "failed", "type": "syntax_error", "msg": "Missing action marker."} cmd = parts[1].strip().split(" ", 1) tool_name = cmd[0] args = json.loads(cmd[1]) if len(cmd) > 1 else {} return self.mcp.execute_tool(tool_name, args) except Exception as e: return {"status": "failed", "type": "syntax_error", "msg": f"Parser error: {str(e)}"} @torch.no_grad() def generate_with_reasoning(self, prompt, max_new_tokens=100): # 1. Initial reasoning / draft with Goal Token thought_prompt = f"{AgenticTokens.GOAL} User: {prompt}\n{AgenticTokens.THOUGHT}" input_ids = self.tokenizer.encode(thought_prompt, return_tensors='pt') thought = "" for i in range(self.max_recursion): # Generate next 'thought' if hasattr(self.model, 'generate') and not self.demo_mode: output = self.model.generate( input_ids, max_new_tokens=max_new_tokens, do_sample=False, # Force Greedy Search top_k=1 # Pick only the #1 top probability token ) thought = self.tokenizer.decode(output[0]) else: # Dynamic stub for structural demonstration (multi-step) if i == 0 and any(kw in prompt.lower() for kw in ["scan", "discover", "apps", "check"]): thought = f"{AgenticTokens.THOUGHT} Scanning workspace for MCP apps. {AgenticTokens.DISCOVER}" elif "sqlite" in prompt.lower(): thought = f"{AgenticTokens.THOUGHT} Reading from database. {AgenticTokens.ACTION} read_query {{\"sql\": \"SELECT * FROM users\"}}" else: thought = f"{AgenticTokens.THOUGHT} I should use a tool here. {AgenticTokens.ACTION} unknown_tool" # Recursive check: If model wants to DISCOVER new tools if AgenticTokens.DISCOVER in thought: print(f"Action: Scanning for real MCP endpoints...", flush=True) tools = self.mcp.discover_tools() if tools: print(f"Discovery Result: Found MCP tools: {tools}", flush=True) if self.demo_mode: break else: print("No real MCP servers found. Falling back to simulation.", flush=True) # FAMA Implementation: Action Execution & Failure Detection if AgenticTokens.ACTION in thought: print(f"Executing action... (Iteration {i})", flush=True) execution_result = self.execute_real_tool(thought) if execution_result.get("status") == "failed" or execution_result.get("status") == "error": failure_type = execution_result.get("type", "hallucination") print(f"[FAMA Stage 1] Failure detected: {failure_type}", flush=True) self.failure_trajectories.append({ "iteration": i, "thought": thought, "error": execution_result.get("msg", "Unknown error") }) # FAMA Stage 2: Orchestration & Mitigation mitigation_context = self.orchestrator.mitigate_failure(failure_type) print(f"[FAMA Stage 2] Injecting mitigation: {mitigation_context}", flush=True) # Refine thought with targeted failure context refinement_prompt = f"{thought}\n{AgenticTokens.OBSERVE} Tool Failed: {execution_result.get('msg', 'Error')}\n{mitigation_context}\n" input_ids = self.tokenizer.encode(refinement_prompt, return_tensors='pt') continue else: print(f"Action Succeeded: {execution_result['output']}") break if i == self.max_recursion - 1: break input_ids = self.tokenizer.encode(f"{thought}\n", return_tensors='pt') return "Final Answer generated via live MCP tool orchestration." def test_recursive_loop(_model=None, _tokenizer=None): import tiktoken # Use cl100k_base (standard for gpt-4 style tokens) tokenizer = tiktoken.get_encoding("cl100k_base") class TiktokenWrapper: def __init__(self, enc): self.enc = enc def encode(self, text, **kwargs): # return as list for simplicity, or tensor if specified tokens = self.enc.encode(text) if kwargs.get('return_tensors') == 'pt': return torch.tensor([tokens]) return tokens def decode(self, ids): if isinstance(ids, torch.Tensor): ids = ids.tolist()[0] return self.enc.decode(ids) agent = RecursiveAgenticLoop(None, TiktokenWrapper(tokenizer)) response = agent.generate_with_reasoning("Calculate the trajectory using the elevated tool.") print(response)