|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| import json
|
| from typing import TypedDict, Annotated
|
| from operator import add as _list_merge
|
|
|
| from langchain_mistralai import ChatMistralAI
|
| from langgraph.graph import StateGraph, START, END
|
|
|
| from parameters import MODEL, TEMPERATURE, MAX_TOKENS, MAX_AGENT_STEPS
|
| from ringmaster_tools import RINGMASTER_TOOL_FUNCTIONS, RINGMASTER_TOOL_SCHEMAS
|
|
|
|
|
| BACKEND_NAME = "LangGraph Ringmaster"
|
|
|
|
|
|
|
|
|
|
|
| SUPERVISOR_SYSTEM_PROMPT = """You are the Ringmaster, the coordinator of a computational research workbench for qualitative text analysis.
|
|
|
| Your job: help researchers run Computational Grounded Theory (Nelson 2020) and Computational Thematic Analysis (Braun & Clarke 2006) on text data they upload.
|
|
|
| RESEARCH METHODOLOGIES AVAILABLE
|
| - Computational Grounded Theory: inductive clustering + LLM cluster labeling. Best for exploring what patterns exist in a corpus without predefined categories. Call run_grounded_theory.
|
| - Computational Thematic Analysis: LLM-based open coding of individual sentences. Best for building up a codebook from raw text. Call run_thematic_analysis.
|
|
|
| YOUR TOOLS
|
| - check_data_status — ALWAYS call this first if the user asks for any analysis. It tells you whether data is loaded.
|
| - run_grounded_theory — only call after check_data_status confirms data is loaded
|
| - run_thematic_analysis — only call after check_data_status confirms data is loaded
|
| - summarize_cgt_result — fetch the last grounded theory run's summary for follow-up questions
|
| - summarize_cta_result — fetch the last thematic analysis run's summary
|
|
|
| DECISION RULES
|
| 1. If the user asks a general question (hello, what can you do, explain grounded theory, etc.), reply directly without tools.
|
| 2. If the user asks to RUN an analysis (grounded theory, thematic analysis, clustering, coding):
|
| a. First call check_data_status.
|
| b. If NO DATA LOADED, tell the user to go to the Inputs tab and upload a file, paste text, or scrape a URL. Do not try to run the analysis.
|
| c. If data is loaded, call the appropriate analysis tool.
|
| 3. If the user asks about PRIOR results (what did you find, show me again, what was cluster 3), call the summarize tool.
|
| 4. When you have the result of a tool call, compose a short natural-language reply to the user that includes the key findings. Do not just paste the tool's raw output; write it as a conversational message.
|
|
|
| RESPONSE STYLE
|
| - Short. One or two paragraphs maximum.
|
| - Concrete. If a cluster was found, name it.
|
| - Honest. If the analysis was partial (e.g. Thematic Analysis only has Phase 2 implemented), say so briefly.
|
| - Never hallucinate results. Only report what the tools actually returned.
|
| """
|
|
|
|
|
|
|
|
|
|
|
| class RingmasterState(TypedDict):
|
| user_message: str
|
| messages: Annotated[list, _list_merge]
|
| steps: Annotated[list, _list_merge]
|
| tool_results: Annotated[list, _list_merge]
|
| next_action: str
|
| reply: str
|
| iteration: int
|
|
|
|
|
| def get_client(api_key):
|
| """Return a configured ChatMistralAI client."""
|
| key = (api_key or "").strip() or os.environ.get("MISTRAL_API_KEY", "")
|
| return ChatMistralAI(
|
| model=MODEL,
|
| temperature=TEMPERATURE,
|
| max_tokens=MAX_TOKENS,
|
| mistral_api_key=key,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| def supervisor_node(state, client, context):
|
| iteration = state.get("iteration", 0) + 1
|
|
|
|
|
| messages = [
|
| {"role": "system", "content": SUPERVISOR_SYSTEM_PROMPT},
|
| {"role": "user", "content": state["user_message"]},
|
| ]
|
|
|
| for tr in state.get("tool_results", []):
|
| messages.append({
|
| "role": "assistant",
|
| "content": f"Tool {tr['tool']} returned:\n{tr['result']}",
|
| })
|
|
|
|
|
|
|
| bound = client.bind_tools(_langchain_tool_schemas())
|
| response = bound.invoke(messages)
|
|
|
| step_entry = {
|
| "step": iteration,
|
| "type": "supervisor",
|
| "tool": "-",
|
| "args": "-",
|
| "result": (response.content or "")[:200] + ("..." if len(response.content or "") > 200 else ""),
|
| }
|
|
|
|
|
| tool_calls = getattr(response, "tool_calls", None) or []
|
| if tool_calls:
|
| return {
|
| "next_action": "call_tool",
|
| "iteration": iteration,
|
| "steps": [step_entry],
|
| "messages": [{"role": "assistant", "tool_calls": tool_calls}],
|
| "_pending_tool_calls": tool_calls,
|
| }
|
| else:
|
|
|
| return {
|
| "next_action": "respond",
|
| "iteration": iteration,
|
| "steps": [step_entry],
|
| "reply": response.content or "",
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def tool_executor_node(state, client, context):
|
| pending = state.get("_pending_tool_calls") or []
|
| new_steps = []
|
| new_tool_results = []
|
|
|
| for tc in pending:
|
|
|
| name = tc.get("name") if isinstance(tc, dict) else getattr(tc, "name", None)
|
| args = tc.get("args") if isinstance(tc, dict) else getattr(tc, "args", {})
|
|
|
| fn = RINGMASTER_TOOL_FUNCTIONS.get(name)
|
| if fn is None:
|
| result = f"ERROR: unknown tool {name}"
|
| else:
|
|
|
| result = fn(context, **(args or {}))
|
|
|
| new_steps.append({
|
| "step": state.get("iteration", 0),
|
| "type": "tool_call",
|
| "tool": name,
|
| "args": json.dumps(args or {}),
|
| "result": result[:200] + ("..." if len(result) > 200 else ""),
|
| })
|
| new_tool_results.append({"tool": name, "args": args, "result": result})
|
|
|
| return {
|
| "next_action": "",
|
| "steps": new_steps,
|
| "tool_results": new_tool_results,
|
| "_pending_tool_calls": [],
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def respond_node(state, client, context):
|
| return {
|
| "steps": [{
|
| "step": state.get("iteration", 0) + 1,
|
| "type": "final",
|
| "tool": "-",
|
| "args": "-",
|
| "result": (state.get("reply") or "")[:200],
|
| }],
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def route_from_supervisor(state):
|
| action = state.get("next_action", "")
|
| if action == "call_tool":
|
| return "tool_executor"
|
| return "respond"
|
|
|
|
|
|
|
|
|
|
|
| def _langchain_tool_schemas():
|
| """Convert OpenAI-style schemas to LangChain-style bind_tools() input.
|
|
|
| LangChain's ChatMistralAI.bind_tools() accepts OpenAI-format schemas
|
| directly, so we pass them through as-is. This function exists in case
|
| a future LangChain version needs conversion — right now it's a pass-through.
|
| """
|
| return RINGMASTER_TOOL_SCHEMAS
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _build_graph(client, context):
|
| graph = StateGraph(RingmasterState)
|
|
|
| graph.add_node("supervisor", lambda s: supervisor_node(s, client, context))
|
| graph.add_node("tool_executor", lambda s: tool_executor_node(s, client, context))
|
| graph.add_node("respond", lambda s: respond_node(s, client, context))
|
|
|
| graph.add_edge(START, "supervisor")
|
| graph.add_conditional_edges(
|
| "supervisor",
|
| route_from_supervisor,
|
| {
|
| "tool_executor": "tool_executor",
|
| "respond": "respond",
|
| },
|
| )
|
| graph.add_edge("tool_executor", "supervisor")
|
| graph.add_edge("respond", END)
|
|
|
| return graph.compile()
|
|
|
|
|
|
|
|
|
|
|
| def run_ringmaster(client, user_message, context):
|
| """Execute the ringmaster supervisor graph with Gradio session context.
|
|
|
| Args:
|
| client: ChatMistralAI instance from get_client()
|
| user_message: the user's chat message
|
| context: dict with loaded_context, llm_provider, llm_key,
|
| cgt_result, cta_result. Tools read and mutate this.
|
|
|
| Returns a dict with reply, steps, extracted — matching the standard
|
| backend contract used by process_message in app.py.
|
| """
|
| compiled = _build_graph(client, context)
|
|
|
| initial_state = {
|
| "user_message": user_message,
|
| "messages": [],
|
| "steps": [],
|
| "tool_results": [],
|
| "next_action": "",
|
| "reply": "",
|
| "iteration": 0,
|
| }
|
|
|
| final_state = compiled.invoke(
|
| initial_state,
|
| config={"recursion_limit": MAX_AGENT_STEPS * 4},
|
| )
|
|
|
|
|
| steps = final_state.get("steps", [])
|
| for i, s in enumerate(steps, start=1):
|
| s["step"] = i
|
|
|
| return {
|
| "reply": final_state.get("reply", "") or "",
|
| "steps": steps,
|
| "extracted": {
|
| "tool_results": final_state.get("tool_results", []),
|
| "total_iterations": final_state.get("iteration", 0),
|
| },
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def run(client, user_message):
|
| """Legacy 2-arg entry point. Builds an empty context so the ringmaster
|
| still runs but cannot see any loaded data. app.py should prefer
|
| run_ringmaster() for chat handling.
|
| """
|
| empty_context = {
|
| "loaded_context": "",
|
| "llm_provider": "Mistral",
|
| "llm_key": "",
|
| "cgt_result": None,
|
| "cta_result": None,
|
| }
|
| return run_ringmaster(client, user_message, empty_context)
|
|
|
|
|
|
|
|
|
|
|
| def build_code_snippets(user_message, steps):
|
| lines = [
|
| "# Backend: LangGraph Ringmaster",
|
| "# Supervisor + tool_executor + respond nodes.",
|
| "# Tools: check_data_status, run_grounded_theory, run_thematic_analysis,",
|
| "# summarize_cgt_result, summarize_cta_result",
|
| "",
|
| "# Trace of this run:",
|
| ]
|
| for s in steps:
|
| lines.append(
|
| f"# step {s.get('step')}: {s.get('type')} "
|
| f"tool={s.get('tool')} args={s.get('args')}"
|
| )
|
| return "\n".join(lines)
|
|
|