""" LangGraph State Machine for Secure Reasoning MCP Server Implements the Chain-of-Checks workflow with cryptographic logging. """ import json import os import re from huggingface_hub import InferenceClient from typing import Literal from datetime import datetime from langgraph.graph import StateGraph, END from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from state import AgentState from schemas import ( ExecutionPlan, StepPlan, SafetyCheckResult, ExecutionResult, Justification, CryptoLogEntry, HashRequest, MerkleUpdateRequest, WORMWriteRequest ) from prompts import ( format_planner_prompt, format_safety_prompt, format_executor_prompt, format_justification_prompt, format_synthesis_prompt ) from mock_tools import MockCryptoTools # ============================================================================ # GLOBAL CONFIGURATION # ============================================================================ # HuggingFace Inference Client - Ücretsiz tier kullanıyor HF_TOKEN = os.getenv("HF_TOKEN") MODEL_ID = "Qwen/Qwen2.5-72B-Instruct" # Güçlü ve ücretsiz hf_client = InferenceClient(model=MODEL_ID, token=HF_TOKEN) # Initialize crypto tools crypto_tools = MockCryptoTools() # ============================================================================ # HuggingFace LLM Wrapper - LangChain benzeri interface # ============================================================================ class HuggingFaceLLM: """ HuggingFace Inference API wrapper that mimics LangChain LLM interface. """ def __init__(self, client: InferenceClient): self.client = client def invoke(self, messages: list) -> "LLMResponse": """ Call HuggingFace model with messages. Returns object with .content attribute like LangChain. """ # Convert LangChain messages to HF format hf_messages = [] for msg in messages: if isinstance(msg, SystemMessage): hf_messages.append({"role": "system", "content": msg.content}) elif isinstance(msg, HumanMessage): hf_messages.append({"role": "user", "content": msg.content}) elif isinstance(msg, AIMessage): hf_messages.append({"role": "assistant", "content": msg.content}) try: response = self.client.chat_completion( messages=hf_messages, max_tokens=2048, temperature=0.1, # Low for deterministic reasoning stream=False ) content = response.choices[0].message.content return LLMResponse(content) except Exception as e: print(f"HF API Error: {e}") return LLMResponse(f'{{"error": "{str(e)}"}}') class LLMResponse: """Simple response object with content attribute.""" def __init__(self, content: str): self.content = content # Initialize our HF-based LLM llm = HuggingFaceLLM(hf_client) # ============================================================================ # NODE 1: PLANNER # ============================================================================ def planner_node(state: AgentState) -> AgentState: """ Generate a step-by-step execution plan for the task. Args: state: Current agent state with the task Returns: Updated state with the execution plan """ print(f"\n{'='*60}") print(f"🧠 PLANNER NODE - Generating execution plan") print(f"{'='*60}") # Format the prompt prompts = format_planner_prompt(state["task"]) # Create messages messages = [ SystemMessage(content=prompts["system"]), HumanMessage(content=prompts["user"]) ] # Call LLM response = llm.invoke(messages) # Parse JSON response try: plan_data = json.loads(response.content) # Convert to ExecutionPlan model steps = [StepPlan(**step) for step in plan_data["steps"]] plan = ExecutionPlan( steps=steps, total_steps=plan_data["total_steps"] ) print(f"✅ Generated plan with {plan.total_steps} steps:") for step in steps: print(f" Step {step.step_number}: {step.action}") # Update state state["plan"] = plan state["current_step_index"] = 0 state["status"] = "executing" state["messages"].extend([ HumanMessage(content=prompts["user"]), AIMessage(content=response.content) ]) return state except json.JSONDecodeError as e: print(f"❌ Failed to parse planner response: {e}") state["error"] = f"Planner failed to generate valid JSON: {str(e)}" state["status"] = "failed" return state # ============================================================================ # NODE 2: SAFETY CHECKER # ============================================================================ def safety_node(state: AgentState) -> AgentState: """ Validate that the current step is safe to execute. Args: state: Current agent state with the plan Returns: Updated state with safety validation result """ print(f"\n{'='*60}") print(f"🛡️ SAFETY NODE - Validating step {state['current_step_index'] + 1}") print(f"{'='*60}") # Get current step current_step = state["plan"].steps[state["current_step_index"]] # Format previous steps for context previous_steps = "None" if state["current_step_index"] > 0: prev_steps_list = [ f"Step {i+1}: {state['plan'].steps[i].action}" for i in range(state["current_step_index"]) ] previous_steps = "\n".join(prev_steps_list) # Format the prompt prompts = format_safety_prompt( step_description=current_step.action, task=state["task"], step_number=state["current_step_index"] + 1, total_steps=state["plan"].total_steps, previous_steps=previous_steps, additional_context="This is a secure reasoning system with cryptographic logging." ) # Create messages messages = [ SystemMessage(content=prompts["system"]), HumanMessage(content=prompts["user"]) ] # Call LLM response = llm.invoke(messages) # Parse JSON response try: safety_data = json.loads(response.content) safety_result = SafetyCheckResult(**safety_data) print(f"🔍 Safety Check Result:") print(f" Is Safe: {safety_result.is_safe}") print(f" Risk Level: {safety_result.risk_level}") print(f" Reasoning: {safety_result.reasoning[:100]}...") # Update state state["safety_status"] = safety_result state["messages"].extend([ HumanMessage(content=prompts["user"]), AIMessage(content=response.content) ]) # Mark if blocked if not safety_result.is_safe: state["safety_blocked"] = True print(f"🚫 Step BLOCKED due to safety concerns") else: print(f"✅ Step approved for execution") return state except json.JSONDecodeError as e: print(f"❌ Failed to parse safety response: {e}") # Default to blocking if parsing fails (fail-safe) state["safety_status"] = SafetyCheckResult( is_safe=False, risk_level="critical", reasoning=f"Safety check failed due to parsing error: {str(e)}", blocked_reasons=["parsing_error"] ) state["safety_blocked"] = True return state # ============================================================================ # NODE 3: EXECUTOR # ============================================================================ def executor_node(state: AgentState) -> AgentState: """ Execute the current step (call tools if needed). Args: state: Current agent state with approved step Returns: Updated state with execution result """ print(f"\n{'='*60}") print(f"⚡ EXECUTOR NODE - Executing step {state['current_step_index'] + 1}") print(f"{'='*60}") # Get current step current_step = state["plan"].steps[state["current_step_index"]] # Format previous results for context previous_results = "None" if state["justifications"]: prev_results_list = [ f"Step {j.step_number}: {j.reasoning[:100]}..." for j in state["justifications"][-3:] # Last 3 steps ] previous_results = "\n".join(prev_results_list) # Format the prompt prompts = format_executor_prompt( step_description=current_step.action, task=state["task"], expected_outcome=current_step.expected_outcome, requires_tools=current_step.requires_tools, previous_results=previous_results ) # Create messages messages = [ SystemMessage(content=prompts["system"]), HumanMessage(content=prompts["user"]) ] # Call LLM response = llm.invoke(messages) # Parse JSON response try: executor_data = json.loads(response.content) tool_needed = executor_data.get("tool_needed", "internal_reasoning") tool_params = executor_data.get("tool_params") direct_result = executor_data.get("direct_result") print(f"🔧 Tool Selection: {tool_needed}") # Execute based on tool selection if tool_needed == "internal_reasoning": result = ExecutionResult( success=True, output=direct_result or "Analysis completed through reasoning", tool_calls=["internal_reasoning"] ) else: # Simulate tool execution (in real system, dispatch to actual tools) result = ExecutionResult( success=True, output=f"Simulated result from {tool_needed} with params: {tool_params}", tool_calls=[tool_needed] ) print(f"✅ Execution successful") print(f" Output: {str(result.output)[:100]}...") # Update state state["execution_result"] = result state["messages"].extend([ HumanMessage(content=prompts["user"]), AIMessage(content=response.content) ]) return state except json.JSONDecodeError as e: print(f"❌ Execution failed: {e}") state["execution_result"] = ExecutionResult( success=False, output=None, error=f"Failed to parse executor response: {str(e)}", tool_calls=[] ) return state except Exception as e: print(f"❌ Execution error: {e}") state["execution_result"] = ExecutionResult( success=False, output=None, error=str(e), tool_calls=[] ) return state # ============================================================================ # NODE 4: LOGGER (Cryptographic Logging) # ============================================================================ def logger_node(state: AgentState) -> AgentState: """ Hash the execution result and log it to Merkle Tree + WORM storage. Args: state: Current agent state with execution result Returns: Updated state with cryptographic log entry """ print(f"\n{'='*60}") print(f"📝 LOGGER NODE - Creating cryptographic proof") print(f"{'='*60}") current_step = state["plan"].steps[state["current_step_index"]] execution_result = state["execution_result"] try: # 1. Prepare the data to log log_data = { "task_id": state["task_id"], "step_number": state["current_step_index"] + 1, "action": current_step.action, "result": execution_result.output if execution_result.success else execution_result.error, "timestamp": datetime.utcnow().isoformat(), "safety_approved": state["safety_status"].is_safe if state["safety_status"] else False } # 2. Hash the action data hash_request = HashRequest( data=json.dumps(log_data, sort_keys=True), algorithm="sha256" ) hash_response = crypto_tools.hash_tool(hash_request) action_hash = hash_response.hash print(f"🔐 Action Hash: {action_hash[:16]}...") # 3. Update Merkle Tree merkle_request = MerkleUpdateRequest( leaf_hash=action_hash, metadata={"step": state["current_step_index"] + 1} ) merkle_response = crypto_tools.merkle_update_tool(merkle_request) merkle_root = merkle_response.merkle_root print(f"🌳 Merkle Root: {merkle_root[:16]}...") # 4. Write to WORM storage entry_id = f"{state['task_id']}_step_{state['current_step_index'] + 1}" worm_request = WORMWriteRequest( entry_id=entry_id, data=log_data, merkle_root=merkle_root ) worm_response = crypto_tools.worm_write_tool(worm_request) print(f"💾 WORM Path: {worm_response.storage_path}") # 5. Create log entry log_entry = CryptoLogEntry( step_number=state["current_step_index"] + 1, action_hash=action_hash, merkle_root=merkle_root, worm_path=worm_response.storage_path ) # Update state state["logs"].append(log_entry) print(f"✅ Cryptographic logging complete") return state except Exception as e: print(f"❌ Logging failed: {e}") state["error"] = f"Cryptographic logging failed: {str(e)}" return state # ============================================================================ # NODE 5: JUSTIFICATION # ============================================================================ def justification_node(state: AgentState) -> AgentState: """ Generate an explanation for why the action was taken. Args: state: Current agent state with execution result Returns: Updated state with justification """ print(f"\n{'='*60}") print(f"💭 JUSTIFICATION NODE - Explaining the action") print(f"{'='*60}") current_step = state["plan"].steps[state["current_step_index"]] execution_result = state["execution_result"] # Determine tool used tool_used = ", ".join(execution_result.tool_calls) if execution_result.tool_calls else "none" # Format the prompt prompts = format_justification_prompt( step_description=current_step.action, tool_used=tool_used, execution_result=str(execution_result.output)[:500] if execution_result.success else execution_result.error, task=state["task"], step_number=state["current_step_index"] + 1, total_steps=state["plan"].total_steps, expected_outcome=current_step.expected_outcome ) # Create messages messages = [ SystemMessage(content=prompts["system"]), HumanMessage(content=prompts["user"]) ] # Call LLM response = llm.invoke(messages) # Parse JSON response try: justification_data = json.loads(response.content) justification = Justification(**justification_data) print(f"📋 Justification generated:") print(f" {justification.reasoning[:150]}...") # Update state state["justifications"].append(justification) state["messages"].extend([ HumanMessage(content=prompts["user"]), AIMessage(content=response.content) ]) return state except json.JSONDecodeError as e: print(f"⚠️ Failed to parse justification, using fallback: {e}") # Create fallback justification fallback = Justification( step_number=state["current_step_index"] + 1, reasoning=f"Executed {current_step.action} as planned. Result: {execution_result.success}", evidence=None, alternatives_considered=None ) state["justifications"].append(fallback) return state # ============================================================================ # NODE 6: STEP ITERATOR # ============================================================================ def step_iterator_node(state: AgentState) -> AgentState: """ Move to the next step or complete the task. Args: state: Current agent state Returns: Updated state with incremented step index """ print(f"\n{'='*60}") print(f"➡️ STEP ITERATOR - Moving to next step") print(f"{'='*60}") # Increment step index state["current_step_index"] += 1 # Check if we're done if state["current_step_index"] >= state["plan"].total_steps: print(f"🎉 All steps completed!") state["status"] = "completed" else: print(f"📍 Moving to step {state['current_step_index'] + 1}/{state['plan'].total_steps}") return state # ============================================================================ # NODE 7: REFINER (for unsafe steps) # ============================================================================ def refiner_node(state: AgentState) -> AgentState: """ Handle unsafe steps by modifying or skipping them. Args: state: Current agent state with blocked step Returns: Updated state with refinement decision """ print(f"\n{'='*60}") print(f"🔧 REFINER NODE - Handling unsafe step") print(f"{'='*60}") current_step = state["plan"].steps[state["current_step_index"]] safety_status = state["safety_status"] # Log the blocked action print(f"🚫 Step blocked: {current_step.action}") print(f" Reason: {safety_status.reasoning}") # Create a null execution result state["execution_result"] = ExecutionResult( success=False, output=None, error=f"Step blocked by safety guardrails: {safety_status.reasoning}", tool_calls=[] ) # Create justification for blocking justification = Justification( step_number=state["current_step_index"] + 1, reasoning=f"Step was blocked by safety guardrails. Risk level: {safety_status.risk_level}. Reason: {safety_status.reasoning}", evidence=safety_status.blocked_reasons or [], alternatives_considered=["Skip this step", "Abort entire task"] ) state["justifications"].append(justification) # Mark status state["status"] = "blocked" print(f"⚠️ Task blocked due to safety concerns") return state # ============================================================================ # CONDITIONAL EDGES # ============================================================================ def should_execute_or_refine(state: AgentState) -> Literal["execute", "refine"]: """ Decide whether to execute or refine based on safety check. Args: state: Current agent state Returns: "execute" if safe, "refine" if unsafe """ if state["safety_status"] and state["safety_status"].is_safe: return "execute" else: return "refine" def should_continue_or_end(state: AgentState) -> Literal["continue", "end"]: """ Decide whether to continue to next step or end the workflow. Args: state: Current agent state Returns: "continue" if more steps remain, "end" if done or blocked """ # End if blocked if state["safety_blocked"] and state["status"] == "blocked": return "end" # End if error occurred if state["error"]: return "end" # End if all steps completed if state["current_step_index"] >= state["plan"].total_steps: return "end" # Continue to next step return "continue" # ============================================================================ # GRAPH CONSTRUCTION # ============================================================================ def create_reasoning_graph() -> StateGraph: """ Construct the full LangGraph state machine. Returns: Compiled StateGraph ready for execution """ # Create the graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("planner", planner_node) workflow.add_node("safety", safety_node) workflow.add_node("executor", executor_node) workflow.add_node("logger", logger_node) workflow.add_node("justification", justification_node) workflow.add_node("iterator", step_iterator_node) workflow.add_node("refiner", refiner_node) # Set entry point workflow.set_entry_point("planner") # Add edges workflow.add_edge("planner", "safety") # Conditional: safe -> execute, unsafe -> refine workflow.add_conditional_edges( "safety", should_execute_or_refine, { "execute": "executor", "refine": "refiner" } ) # After execution: log -> justify -> iterate workflow.add_edge("executor", "logger") workflow.add_edge("logger", "justification") workflow.add_edge("justification", "iterator") # After refining: go to iterator (to mark as done) workflow.add_edge("refiner", "iterator") # Conditional: continue to next step or end workflow.add_conditional_edges( "iterator", should_continue_or_end, { "continue": "safety", # Loop back to safety check for next step "end": END } ) # Compile the graph return workflow.compile() # ============================================================================ # CONVENIENCE FUNCTION # ============================================================================ def run_reasoning_task(task: str, task_id: str, user_id: str = None) -> AgentState: """ Execute a reasoning task through the full pipeline. Args: task: The task to solve task_id: Unique identifier for this execution user_id: Optional user identifier Returns: Final agent state with results and logs """ from state import create_initial_state # Create initial state initial_state = create_initial_state(task, task_id, user_id) # Create and run the graph graph = create_reasoning_graph() print(f"\n{'#'*60}") print(f"🚀 STARTING REASONING PIPELINE") print(f" Task: {task}") print(f" Task ID: {task_id}") print(f"{'#'*60}") # Execute final_state = graph.invoke(initial_state) print(f"\n{'#'*60}") print(f"🏁 REASONING PIPELINE COMPLETE") print(f" Status: {final_state['status']}") print(f" Steps Executed: {len(final_state['justifications'])}/{final_state['plan'].total_steps if final_state['plan'] else 0}") print(f" Cryptographic Logs: {len(final_state['logs'])}") print(f"{'#'*60}\n") return final_state # ============================================================================ # EXAMPLE USAGE # ============================================================================ if __name__ == "__main__": # Test the graph result = run_reasoning_task( task="Analyze the current state of AI safety research and provide 3 key findings", task_id="test_001", user_id="demo_user" ) # Print results print("\n=== FINAL RESULTS ===") print(f"Status: {result['status']}") print(f"\nJustifications:") for j in result['justifications']: print(f" Step {j.step_number}: {j.reasoning[:100]}...") print(f"\nCryptographic Audit Trail:") for log in result['logs']: print(f" Step {log.step_number}: Hash {log.action_hash[:16]}... -> Root {log.merkle_root[:16]}...")