Spjimr / agent_langgraph_ringmaster.py
shahidshaikh's picture
Upload 40 files
a52bae4 verified
# ============================================================================
# agent_langgraph_ringmaster.py — LangGraph Ringmaster backend
# ============================================================================
#
# This is the "ringmaster" backend. Unlike agent_langgraph.py (which routes
# math vs info tools), this backend knows about:
# - workbench data loading status
# - running computational grounded theory
# - running computational thematic analysis
# - reporting prior results
#
# CONTRACT
# --------
# Standard contract: BACKEND_NAME, get_client, build_code_snippets.
# NEW CONTRACT ADDITION: instead of run(client, user_message), this backend
# exposes run_ringmaster(client, user_message, context) so app.py can pass
# the Gradio session state (loaded_context, cgt_result, cta_result) into
# the supervisor's tools. A standard run(client, user_message) wrapper is
# also provided for compatibility with any caller that doesn't know about
# the ringmaster contract.
#
# WHY NOT EXTEND agent_langgraph.py?
# ----------------------------------
# agent_langgraph.py is already a clean supervisor+task-agent demo that
# students compare against the other backends. Adding workbench tools
# there would muddy the comparison (students would wonder why only one
# of seven backends has extra tools). This new file is an independent
# backend that can be turned on/off and compared in future rounds.
#
# COMPLIANCE
# ----------
# Supervisor decides what to call. No Python if/else routing inside the
# task node — it's just a thin tool-execution loop. No MAX_ITERATIONS
# cap (LangGraph's recursion_limit is the single source of truth).
# No phase-order guards.
# ============================================================================
import os
import json
from typing import TypedDict, Annotated
from operator import add as _list_merge
from langchain_mistralai import ChatMistralAI
from langgraph.graph import StateGraph, START, END
from parameters import MODEL, TEMPERATURE, MAX_TOKENS, MAX_AGENT_STEPS
from ringmaster_tools import RINGMASTER_TOOL_FUNCTIONS, RINGMASTER_TOOL_SCHEMAS
BACKEND_NAME = "LangGraph Ringmaster"
# ----------------------------------------------------------------
# Supervisor system prompt
# ----------------------------------------------------------------
SUPERVISOR_SYSTEM_PROMPT = """You are the Ringmaster, the coordinator of a computational research workbench for qualitative text analysis.
Your job: help researchers run Computational Grounded Theory (Nelson 2020) and Computational Thematic Analysis (Braun & Clarke 2006) on text data they upload.
RESEARCH METHODOLOGIES AVAILABLE
- Computational Grounded Theory: inductive clustering + LLM cluster labeling. Best for exploring what patterns exist in a corpus without predefined categories. Call run_grounded_theory.
- Computational Thematic Analysis: LLM-based open coding of individual sentences. Best for building up a codebook from raw text. Call run_thematic_analysis.
YOUR TOOLS
- check_data_status — ALWAYS call this first if the user asks for any analysis. It tells you whether data is loaded.
- run_grounded_theory — only call after check_data_status confirms data is loaded
- run_thematic_analysis — only call after check_data_status confirms data is loaded
- summarize_cgt_result — fetch the last grounded theory run's summary for follow-up questions
- summarize_cta_result — fetch the last thematic analysis run's summary
DECISION RULES
1. If the user asks a general question (hello, what can you do, explain grounded theory, etc.), reply directly without tools.
2. If the user asks to RUN an analysis (grounded theory, thematic analysis, clustering, coding):
a. First call check_data_status.
b. If NO DATA LOADED, tell the user to go to the Inputs tab and upload a file, paste text, or scrape a URL. Do not try to run the analysis.
c. If data is loaded, call the appropriate analysis tool.
3. If the user asks about PRIOR results (what did you find, show me again, what was cluster 3), call the summarize tool.
4. When you have the result of a tool call, compose a short natural-language reply to the user that includes the key findings. Do not just paste the tool's raw output; write it as a conversational message.
RESPONSE STYLE
- Short. One or two paragraphs maximum.
- Concrete. If a cluster was found, name it.
- Honest. If the analysis was partial (e.g. Thematic Analysis only has Phase 2 implemented), say so briefly.
- Never hallucinate results. Only report what the tools actually returned.
"""
# ----------------------------------------------------------------
# Graph state
# ----------------------------------------------------------------
class RingmasterState(TypedDict):
user_message: str
messages: Annotated[list, _list_merge] # conversation so far for the supervisor
steps: Annotated[list, _list_merge] # trace entries for the Results table
tool_results: Annotated[list, _list_merge]
next_action: str
reply: str
iteration: int
def get_client(api_key):
"""Return a configured ChatMistralAI client."""
key = (api_key or "").strip() or os.environ.get("MISTRAL_API_KEY", "")
return ChatMistralAI(
model=MODEL,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
mistral_api_key=key,
)
# ----------------------------------------------------------------
# NODE: supervisor
# ----------------------------------------------------------------
def supervisor_node(state, client, context):
iteration = state.get("iteration", 0) + 1
# Build message list for the LLM
messages = [
{"role": "system", "content": SUPERVISOR_SYSTEM_PROMPT},
{"role": "user", "content": state["user_message"]},
]
# Append accumulated tool results as assistant/tool turns
for tr in state.get("tool_results", []):
messages.append({
"role": "assistant",
"content": f"Tool {tr['tool']} returned:\n{tr['result']}",
})
# Ask the LLM what to do next. We bind the tools so the LLM can
# emit a tool call, or a plain text reply.
bound = client.bind_tools(_langchain_tool_schemas())
response = bound.invoke(messages)
step_entry = {
"step": iteration,
"type": "supervisor",
"tool": "-",
"args": "-",
"result": (response.content or "")[:200] + ("..." if len(response.content or "") > 200 else ""),
}
# Decide routing based on whether the LLM called a tool
tool_calls = getattr(response, "tool_calls", None) or []
if tool_calls:
return {
"next_action": "call_tool",
"iteration": iteration,
"steps": [step_entry],
"messages": [{"role": "assistant", "tool_calls": tool_calls}],
"_pending_tool_calls": tool_calls,
}
else:
# No tool call — the LLM gave a direct reply
return {
"next_action": "respond",
"iteration": iteration,
"steps": [step_entry],
"reply": response.content or "",
}
# ----------------------------------------------------------------
# NODE: tool_executor
# Executes whatever tool the supervisor asked for, stores the result,
# then routes back to the supervisor.
# ----------------------------------------------------------------
def tool_executor_node(state, client, context):
pending = state.get("_pending_tool_calls") or []
new_steps = []
new_tool_results = []
for tc in pending:
# LangChain tool_calls can be dicts or objects
name = tc.get("name") if isinstance(tc, dict) else getattr(tc, "name", None)
args = tc.get("args") if isinstance(tc, dict) else getattr(tc, "args", {})
fn = RINGMASTER_TOOL_FUNCTIONS.get(name)
if fn is None:
result = f"ERROR: unknown tool {name}"
else:
# Every ringmaster tool takes context as first arg
result = fn(context, **(args or {}))
new_steps.append({
"step": state.get("iteration", 0),
"type": "tool_call",
"tool": name,
"args": json.dumps(args or {}),
"result": result[:200] + ("..." if len(result) > 200 else ""),
})
new_tool_results.append({"tool": name, "args": args, "result": result})
return {
"next_action": "",
"steps": new_steps,
"tool_results": new_tool_results,
"_pending_tool_calls": [],
}
# ----------------------------------------------------------------
# NODE: respond
# The supervisor's last turn already produced a reply. This node just
# stamps a final trace row.
# ----------------------------------------------------------------
def respond_node(state, client, context):
return {
"steps": [{
"step": state.get("iteration", 0) + 1,
"type": "final",
"tool": "-",
"args": "-",
"result": (state.get("reply") or "")[:200],
}],
}
# ----------------------------------------------------------------
# Routing function
# ----------------------------------------------------------------
def route_from_supervisor(state):
action = state.get("next_action", "")
if action == "call_tool":
return "tool_executor"
return "respond"
# ----------------------------------------------------------------
# LangChain tool schema adapter
# ----------------------------------------------------------------
def _langchain_tool_schemas():
"""Convert OpenAI-style schemas to LangChain-style bind_tools() input.
LangChain's ChatMistralAI.bind_tools() accepts OpenAI-format schemas
directly, so we pass them through as-is. This function exists in case
a future LangChain version needs conversion — right now it's a pass-through.
"""
return RINGMASTER_TOOL_SCHEMAS
# ----------------------------------------------------------------
# Graph builder — closure-captures the context so supervisor/tool/respond
# nodes can all see it without LangGraph needing to understand it
# ----------------------------------------------------------------
def _build_graph(client, context):
graph = StateGraph(RingmasterState)
graph.add_node("supervisor", lambda s: supervisor_node(s, client, context))
graph.add_node("tool_executor", lambda s: tool_executor_node(s, client, context))
graph.add_node("respond", lambda s: respond_node(s, client, context))
graph.add_edge(START, "supervisor")
graph.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"tool_executor": "tool_executor",
"respond": "respond",
},
)
graph.add_edge("tool_executor", "supervisor")
graph.add_edge("respond", END)
return graph.compile()
# ----------------------------------------------------------------
# Public entry point — the RINGMASTER-AWARE run function
# ----------------------------------------------------------------
def run_ringmaster(client, user_message, context):
"""Execute the ringmaster supervisor graph with Gradio session context.
Args:
client: ChatMistralAI instance from get_client()
user_message: the user's chat message
context: dict with loaded_context, llm_provider, llm_key,
cgt_result, cta_result. Tools read and mutate this.
Returns a dict with reply, steps, extracted — matching the standard
backend contract used by process_message in app.py.
"""
compiled = _build_graph(client, context)
initial_state = {
"user_message": user_message,
"messages": [],
"steps": [],
"tool_results": [],
"next_action": "",
"reply": "",
"iteration": 0,
}
final_state = compiled.invoke(
initial_state,
config={"recursion_limit": MAX_AGENT_STEPS * 4},
)
# Renumber steps sequentially
steps = final_state.get("steps", [])
for i, s in enumerate(steps, start=1):
s["step"] = i
return {
"reply": final_state.get("reply", "") or "",
"steps": steps,
"extracted": {
"tool_results": final_state.get("tool_results", []),
"total_iterations": final_state.get("iteration", 0),
},
}
# ----------------------------------------------------------------
# Compatibility shim — non-ringmaster-aware callers
# ----------------------------------------------------------------
def run(client, user_message):
"""Legacy 2-arg entry point. Builds an empty context so the ringmaster
still runs but cannot see any loaded data. app.py should prefer
run_ringmaster() for chat handling.
"""
empty_context = {
"loaded_context": "",
"llm_provider": "Mistral",
"llm_key": "",
"cgt_result": None,
"cta_result": None,
}
return run_ringmaster(client, user_message, empty_context)
# ----------------------------------------------------------------
# Code snippet builder — matches the other backends' contract
# ----------------------------------------------------------------
def build_code_snippets(user_message, steps):
lines = [
"# Backend: LangGraph Ringmaster",
"# Supervisor + tool_executor + respond nodes.",
"# Tools: check_data_status, run_grounded_theory, run_thematic_analysis,",
"# summarize_cgt_result, summarize_cta_result",
"",
"# Trace of this run:",
]
for s in steps:
lines.append(
f"# step {s.get('step')}: {s.get('type')} "
f"tool={s.get('tool')} args={s.get('args')}"
)
return "\n".join(lines)