EAM-100M-Agentic-Kernel / agent /recursive_reasoning.py
saur7764's picture
Upload folder using huggingface_hub
26d580a verified
import torch
import json
from agent.agentic_core import AgenticTokens
class FailureAwareOrchestrator:
"""
Stage 2 of FAMA: Orchestration & Targeted Mitigation.
Selects specialized 'sub-agents' based on the failure trajectory.
"""
def __init__(self):
self.specialized_agents = {
"syntax_error": "Sub-Agent [Syntax Expert]: The previous tool call failed due to JSON formatting. Ensure all keys are double-quoted and brackets are balanced.",
"api_timeout": "Sub-Agent [Network Expert]: The API timed out. Try reducing the payload size or calling a fallback endpoint.",
"hallucination": "Sub-Agent [Grounding Expert]: The requested tool does not exist. Please scan the workspace via <|discover|> first.",
"permission_denied": "Sub-Agent [Security Expert]: Access to this resource is restricted. Request elevated tokens or check local.env configuration.",
"type_mismatch": "Sub-Agent [Data Expert]: The tool received an invalid argument type. Ensure dates are in ISO-8601 and numbers are floats."
}
def mitigate_failure(self, failure_type):
"""Returns targeted context to mitigate the specific failure."""
return self.specialized_agents.get(failure_type, "Sub-Agent [General]: Review the previous failure and adjust your approach.")
from agent.mcp_connector import MCPConnector
class RecursiveAgenticLoop:
"""
Implements the 'Tiny Recursive Model' reasoning loop with FAMA
(Failure-Aware Meta-Agentic) and real MCP integration.
"""
def __init__(self, model, tokenizer, discovery_protocol=None, max_recursion=4, demo_mode=True):
self.model = model
self.tokenizer = tokenizer
self.mcp = MCPConnector()
self.mcp.load_config()
self.discovery_available = True
self.max_recursion = max_recursion
self.demo_mode = demo_mode
self.orchestrator = FailureAwareOrchestrator()
self.failure_trajectories = []
def execute_real_tool(self, action_payload):
"""Attempts to parse and execute a real MCP tool call."""
try:
# Simple parser for <|action|> tool_name {"arg": "val"}
parts = action_payload.split(AgenticTokens.ACTION)
if len(parts) < 2: return {"status": "failed", "type": "syntax_error", "msg": "Missing action marker."}
cmd = parts[1].strip().split(" ", 1)
tool_name = cmd[0]
args = json.loads(cmd[1]) if len(cmd) > 1 else {}
return self.mcp.execute_tool(tool_name, args)
except Exception as e:
return {"status": "failed", "type": "syntax_error", "msg": f"Parser error: {str(e)}"}
@torch.no_grad()
def generate_with_reasoning(self, prompt, max_new_tokens=100):
# 1. Initial reasoning / draft with Goal Token
thought_prompt = f"{AgenticTokens.GOAL} User: {prompt}\n{AgenticTokens.THOUGHT}"
input_ids = self.tokenizer.encode(thought_prompt, return_tensors='pt')
thought = ""
for i in range(self.max_recursion):
# Generate next 'thought'
if hasattr(self.model, 'generate') and not self.demo_mode:
output = self.model.generate(
input_ids,
max_new_tokens=max_new_tokens,
do_sample=False, # Force Greedy Search
top_k=1 # Pick only the #1 top probability token
)
thought = self.tokenizer.decode(output[0])
else:
# Dynamic stub for structural demonstration (multi-step)
if i == 0 and any(kw in prompt.lower() for kw in ["scan", "discover", "apps", "check"]):
thought = f"{AgenticTokens.THOUGHT} Scanning workspace for MCP apps. {AgenticTokens.DISCOVER}"
elif "sqlite" in prompt.lower():
thought = f"{AgenticTokens.THOUGHT} Reading from database. {AgenticTokens.ACTION} read_query {{\"sql\": \"SELECT * FROM users\"}}"
else:
thought = f"{AgenticTokens.THOUGHT} I should use a tool here. {AgenticTokens.ACTION} unknown_tool"
# Recursive check: If model wants to DISCOVER new tools
if AgenticTokens.DISCOVER in thought:
print(f"Action: Scanning for real MCP endpoints...", flush=True)
tools = self.mcp.discover_tools()
if tools:
print(f"Discovery Result: Found MCP tools: {tools}", flush=True)
if self.demo_mode: break
else:
print("No real MCP servers found. Falling back to simulation.", flush=True)
# FAMA Implementation: Action Execution & Failure Detection
if AgenticTokens.ACTION in thought:
print(f"Executing action... (Iteration {i})", flush=True)
execution_result = self.execute_real_tool(thought)
if execution_result.get("status") == "failed" or execution_result.get("status") == "error":
failure_type = execution_result.get("type", "hallucination")
print(f"[FAMA Stage 1] Failure detected: {failure_type}", flush=True)
self.failure_trajectories.append({
"iteration": i,
"thought": thought,
"error": execution_result.get("msg", "Unknown error")
})
# FAMA Stage 2: Orchestration & Mitigation
mitigation_context = self.orchestrator.mitigate_failure(failure_type)
print(f"[FAMA Stage 2] Injecting mitigation: {mitigation_context}", flush=True)
# Refine thought with targeted failure context
refinement_prompt = f"{thought}\n{AgenticTokens.OBSERVE} Tool Failed: {execution_result.get('msg', 'Error')}\n{mitigation_context}\n<refinement_{i+1}>"
input_ids = self.tokenizer.encode(refinement_prompt, return_tensors='pt')
continue
else:
print(f"Action Succeeded: {execution_result['output']}")
break
if i == self.max_recursion - 1:
break
input_ids = self.tokenizer.encode(f"{thought}\n<refinement_{i+1}>", return_tensors='pt')
return "Final Answer generated via live MCP tool orchestration."
def test_recursive_loop(_model=None, _tokenizer=None):
import tiktoken
# Use cl100k_base (standard for gpt-4 style tokens)
tokenizer = tiktoken.get_encoding("cl100k_base")
class TiktokenWrapper:
def __init__(self, enc):
self.enc = enc
def encode(self, text, **kwargs):
# return as list for simplicity, or tensor if specified
tokens = self.enc.encode(text)
if kwargs.get('return_tensors') == 'pt':
return torch.tensor([tokens])
return tokens
def decode(self, ids):
if isinstance(ids, torch.Tensor):
ids = ids.tolist()[0]
return self.enc.decode(ids)
agent = RecursiveAgenticLoop(None, TiktokenWrapper(tokenizer))
response = agent.generate_with_reasoning("Calculate the trajectory using the elevated tool.")
print(response)