File size: 14,100 Bytes
a52bae4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 | # ============================================================================
# agent_langgraph_ringmaster.py β LangGraph Ringmaster backend
# ============================================================================
#
# This is the "ringmaster" backend. Unlike agent_langgraph.py (which routes
# math vs info tools), this backend knows about:
# - workbench data loading status
# - running computational grounded theory
# - running computational thematic analysis
# - reporting prior results
#
# CONTRACT
# --------
# Standard contract: BACKEND_NAME, get_client, build_code_snippets.
# NEW CONTRACT ADDITION: instead of run(client, user_message), this backend
# exposes run_ringmaster(client, user_message, context) so app.py can pass
# the Gradio session state (loaded_context, cgt_result, cta_result) into
# the supervisor's tools. A standard run(client, user_message) wrapper is
# also provided for compatibility with any caller that doesn't know about
# the ringmaster contract.
#
# WHY NOT EXTEND agent_langgraph.py?
# ----------------------------------
# agent_langgraph.py is already a clean supervisor+task-agent demo that
# students compare against the other backends. Adding workbench tools
# there would muddy the comparison (students would wonder why only one
# of seven backends has extra tools). This new file is an independent
# backend that can be turned on/off and compared in future rounds.
#
# COMPLIANCE
# ----------
# Supervisor decides what to call. No Python if/else routing inside the
# task node β it's just a thin tool-execution loop. No MAX_ITERATIONS
# cap (LangGraph's recursion_limit is the single source of truth).
# No phase-order guards.
# ============================================================================
import os
import json
from typing import TypedDict, Annotated
from operator import add as _list_merge
from langchain_mistralai import ChatMistralAI
from langgraph.graph import StateGraph, START, END
from parameters import MODEL, TEMPERATURE, MAX_TOKENS, MAX_AGENT_STEPS
from ringmaster_tools import RINGMASTER_TOOL_FUNCTIONS, RINGMASTER_TOOL_SCHEMAS
BACKEND_NAME = "LangGraph Ringmaster"
# ----------------------------------------------------------------
# Supervisor system prompt
# ----------------------------------------------------------------
SUPERVISOR_SYSTEM_PROMPT = """You are the Ringmaster, the coordinator of a computational research workbench for qualitative text analysis.
Your job: help researchers run Computational Grounded Theory (Nelson 2020) and Computational Thematic Analysis (Braun & Clarke 2006) on text data they upload.
RESEARCH METHODOLOGIES AVAILABLE
- Computational Grounded Theory: inductive clustering + LLM cluster labeling. Best for exploring what patterns exist in a corpus without predefined categories. Call run_grounded_theory.
- Computational Thematic Analysis: LLM-based open coding of individual sentences. Best for building up a codebook from raw text. Call run_thematic_analysis.
YOUR TOOLS
- check_data_status β ALWAYS call this first if the user asks for any analysis. It tells you whether data is loaded.
- run_grounded_theory β only call after check_data_status confirms data is loaded
- run_thematic_analysis β only call after check_data_status confirms data is loaded
- summarize_cgt_result β fetch the last grounded theory run's summary for follow-up questions
- summarize_cta_result β fetch the last thematic analysis run's summary
DECISION RULES
1. If the user asks a general question (hello, what can you do, explain grounded theory, etc.), reply directly without tools.
2. If the user asks to RUN an analysis (grounded theory, thematic analysis, clustering, coding):
a. First call check_data_status.
b. If NO DATA LOADED, tell the user to go to the Inputs tab and upload a file, paste text, or scrape a URL. Do not try to run the analysis.
c. If data is loaded, call the appropriate analysis tool.
3. If the user asks about PRIOR results (what did you find, show me again, what was cluster 3), call the summarize tool.
4. When you have the result of a tool call, compose a short natural-language reply to the user that includes the key findings. Do not just paste the tool's raw output; write it as a conversational message.
RESPONSE STYLE
- Short. One or two paragraphs maximum.
- Concrete. If a cluster was found, name it.
- Honest. If the analysis was partial (e.g. Thematic Analysis only has Phase 2 implemented), say so briefly.
- Never hallucinate results. Only report what the tools actually returned.
"""
# ----------------------------------------------------------------
# Graph state
# ----------------------------------------------------------------
class RingmasterState(TypedDict):
user_message: str
messages: Annotated[list, _list_merge] # conversation so far for the supervisor
steps: Annotated[list, _list_merge] # trace entries for the Results table
tool_results: Annotated[list, _list_merge]
next_action: str
reply: str
iteration: int
def get_client(api_key):
"""Return a configured ChatMistralAI client."""
key = (api_key or "").strip() or os.environ.get("MISTRAL_API_KEY", "")
return ChatMistralAI(
model=MODEL,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
mistral_api_key=key,
)
# ----------------------------------------------------------------
# NODE: supervisor
# ----------------------------------------------------------------
def supervisor_node(state, client, context):
iteration = state.get("iteration", 0) + 1
# Build message list for the LLM
messages = [
{"role": "system", "content": SUPERVISOR_SYSTEM_PROMPT},
{"role": "user", "content": state["user_message"]},
]
# Append accumulated tool results as assistant/tool turns
for tr in state.get("tool_results", []):
messages.append({
"role": "assistant",
"content": f"Tool {tr['tool']} returned:\n{tr['result']}",
})
# Ask the LLM what to do next. We bind the tools so the LLM can
# emit a tool call, or a plain text reply.
bound = client.bind_tools(_langchain_tool_schemas())
response = bound.invoke(messages)
step_entry = {
"step": iteration,
"type": "supervisor",
"tool": "-",
"args": "-",
"result": (response.content or "")[:200] + ("..." if len(response.content or "") > 200 else ""),
}
# Decide routing based on whether the LLM called a tool
tool_calls = getattr(response, "tool_calls", None) or []
if tool_calls:
return {
"next_action": "call_tool",
"iteration": iteration,
"steps": [step_entry],
"messages": [{"role": "assistant", "tool_calls": tool_calls}],
"_pending_tool_calls": tool_calls,
}
else:
# No tool call β the LLM gave a direct reply
return {
"next_action": "respond",
"iteration": iteration,
"steps": [step_entry],
"reply": response.content or "",
}
# ----------------------------------------------------------------
# NODE: tool_executor
# Executes whatever tool the supervisor asked for, stores the result,
# then routes back to the supervisor.
# ----------------------------------------------------------------
def tool_executor_node(state, client, context):
pending = state.get("_pending_tool_calls") or []
new_steps = []
new_tool_results = []
for tc in pending:
# LangChain tool_calls can be dicts or objects
name = tc.get("name") if isinstance(tc, dict) else getattr(tc, "name", None)
args = tc.get("args") if isinstance(tc, dict) else getattr(tc, "args", {})
fn = RINGMASTER_TOOL_FUNCTIONS.get(name)
if fn is None:
result = f"ERROR: unknown tool {name}"
else:
# Every ringmaster tool takes context as first arg
result = fn(context, **(args or {}))
new_steps.append({
"step": state.get("iteration", 0),
"type": "tool_call",
"tool": name,
"args": json.dumps(args or {}),
"result": result[:200] + ("..." if len(result) > 200 else ""),
})
new_tool_results.append({"tool": name, "args": args, "result": result})
return {
"next_action": "",
"steps": new_steps,
"tool_results": new_tool_results,
"_pending_tool_calls": [],
}
# ----------------------------------------------------------------
# NODE: respond
# The supervisor's last turn already produced a reply. This node just
# stamps a final trace row.
# ----------------------------------------------------------------
def respond_node(state, client, context):
return {
"steps": [{
"step": state.get("iteration", 0) + 1,
"type": "final",
"tool": "-",
"args": "-",
"result": (state.get("reply") or "")[:200],
}],
}
# ----------------------------------------------------------------
# Routing function
# ----------------------------------------------------------------
def route_from_supervisor(state):
action = state.get("next_action", "")
if action == "call_tool":
return "tool_executor"
return "respond"
# ----------------------------------------------------------------
# LangChain tool schema adapter
# ----------------------------------------------------------------
def _langchain_tool_schemas():
"""Convert OpenAI-style schemas to LangChain-style bind_tools() input.
LangChain's ChatMistralAI.bind_tools() accepts OpenAI-format schemas
directly, so we pass them through as-is. This function exists in case
a future LangChain version needs conversion β right now it's a pass-through.
"""
return RINGMASTER_TOOL_SCHEMAS
# ----------------------------------------------------------------
# Graph builder β closure-captures the context so supervisor/tool/respond
# nodes can all see it without LangGraph needing to understand it
# ----------------------------------------------------------------
def _build_graph(client, context):
graph = StateGraph(RingmasterState)
graph.add_node("supervisor", lambda s: supervisor_node(s, client, context))
graph.add_node("tool_executor", lambda s: tool_executor_node(s, client, context))
graph.add_node("respond", lambda s: respond_node(s, client, context))
graph.add_edge(START, "supervisor")
graph.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"tool_executor": "tool_executor",
"respond": "respond",
},
)
graph.add_edge("tool_executor", "supervisor")
graph.add_edge("respond", END)
return graph.compile()
# ----------------------------------------------------------------
# Public entry point β the RINGMASTER-AWARE run function
# ----------------------------------------------------------------
def run_ringmaster(client, user_message, context):
"""Execute the ringmaster supervisor graph with Gradio session context.
Args:
client: ChatMistralAI instance from get_client()
user_message: the user's chat message
context: dict with loaded_context, llm_provider, llm_key,
cgt_result, cta_result. Tools read and mutate this.
Returns a dict with reply, steps, extracted β matching the standard
backend contract used by process_message in app.py.
"""
compiled = _build_graph(client, context)
initial_state = {
"user_message": user_message,
"messages": [],
"steps": [],
"tool_results": [],
"next_action": "",
"reply": "",
"iteration": 0,
}
final_state = compiled.invoke(
initial_state,
config={"recursion_limit": MAX_AGENT_STEPS * 4},
)
# Renumber steps sequentially
steps = final_state.get("steps", [])
for i, s in enumerate(steps, start=1):
s["step"] = i
return {
"reply": final_state.get("reply", "") or "",
"steps": steps,
"extracted": {
"tool_results": final_state.get("tool_results", []),
"total_iterations": final_state.get("iteration", 0),
},
}
# ----------------------------------------------------------------
# Compatibility shim β non-ringmaster-aware callers
# ----------------------------------------------------------------
def run(client, user_message):
"""Legacy 2-arg entry point. Builds an empty context so the ringmaster
still runs but cannot see any loaded data. app.py should prefer
run_ringmaster() for chat handling.
"""
empty_context = {
"loaded_context": "",
"llm_provider": "Mistral",
"llm_key": "",
"cgt_result": None,
"cta_result": None,
}
return run_ringmaster(client, user_message, empty_context)
# ----------------------------------------------------------------
# Code snippet builder β matches the other backends' contract
# ----------------------------------------------------------------
def build_code_snippets(user_message, steps):
lines = [
"# Backend: LangGraph Ringmaster",
"# Supervisor + tool_executor + respond nodes.",
"# Tools: check_data_status, run_grounded_theory, run_thematic_analysis,",
"# summarize_cgt_result, summarize_cta_result",
"",
"# Trace of this run:",
]
for s in steps:
lines.append(
f"# step {s.get('step')}: {s.get('type')} "
f"tool={s.get('tool')} args={s.get('args')}"
)
return "\n".join(lines)
|