Spjimr / ringmaster_tools.py
shahidshaikh's picture
Upload 40 files
a52bae4 verified
# ============================================================================
# ringmaster_tools.py — Tools the LangGraph Ringmaster supervisor can call
# ============================================================================
#
# These tools exist ONLY for the LangGraph Ringmaster backend. They are NOT
# registered in the standard tools.py because the other 6 backends do not
# know about workbench state, data loading, or research-corpus inspection.
#
# COMPLIANCE
# ----------
# Every tool here is a thin wrapper. It:
# - reads structured input
# - calls a real domain function (workbench_grounded_theory.run,
# workbench_thematic_analysis.run, or a simple string inspection)
# - returns a plain-string summary the LLM can include in its reply
#
# Tools NEVER do control flow. They NEVER route. They NEVER decide what
# runs next. The supervisor decides, the tool executes, the supervisor
# sees the result string and decides again.
#
# DATA CONTRACT
# -------------
# Every tool receives `context` — a dict the ringmaster backend builds
# from the Gradio session state before invoking the supervisor. Fields:
# context["loaded_context"] -> str, newline-separated sentences (may be empty)
# context["llm_provider"] -> str, the LLM provider name
# context["llm_key"] -> str, the API key (may be empty)
# context["cgt_result"] -> dict or None, last CGT run result
# context["cta_result"] -> dict or None, last CTA run result
#
# Tools that produce new results MUTATE context["cgt_result"] or
# context["cta_result"] so subsequent tool calls in the same chat turn
# can see them (and so the chat handler can extract them afterward to
# update the workbench tabs).
# ============================================================================
from typing import Dict, Any, List
# ----------------------------------------------------------------
# TOOL 1 — check_data_status
# ----------------------------------------------------------------
def check_data_status(context: Dict[str, Any]) -> str:
"""Report whether research data is currently loaded, and if so how much."""
loaded = (context.get("loaded_context") or "").strip()
if not loaded:
return (
"NO DATA LOADED. The user has not uploaded a file, pasted text, "
"or scraped a URL yet. Ask the user to go to the Inputs tab and "
"load data before running any research workbench."
)
sentences = [s.strip() for s in loaded.split("\n") if s.strip()]
n = len(sentences)
preview = sentences[:3]
if n == 0:
return "NO DATA LOADED — loaded_context is whitespace only."
return (
f"DATA LOADED: {n} sentences available for analysis.\n"
f"First 3 sentences for preview:\n"
+ "\n".join(f" {i+1}. {s}" for i, s in enumerate(preview))
)
# ----------------------------------------------------------------
# TOOL 2 — run_grounded_theory
# ----------------------------------------------------------------
def run_grounded_theory(
context: Dict[str, Any],
similarity_threshold: float = 0.60,
min_cluster_size: int = 3,
n_nearest: int = 3,
) -> str:
"""Run the Computational Grounded Theory supervisor on loaded data.
Returns a short text summary. Mutates context["cgt_result"] with the
full result dict so the chat handler can update the CGT tab afterward.
"""
loaded = (context.get("loaded_context") or "").strip()
if not loaded:
return (
"ERROR: cannot run grounded theory — no data loaded. "
"Ask the user to load data via the Inputs tab first."
)
sentences = [s.strip() for s in loaded.split("\n") if s.strip()]
true_labels = ["(unknown)"] * len(sentences)
# Import here to keep the ringmaster_tools module import-light and to
# avoid a circular import at app.py boot.
import workbench_grounded_theory as wb_cgt
result = wb_cgt.run(
user_message="Run computational grounded theory.",
sentences=sentences,
true_labels=true_labels,
data_source="uploaded",
similarity_threshold=float(similarity_threshold),
min_cluster_size=int(min_cluster_size),
n_nearest=int(n_nearest),
llm_provider=context.get("llm_provider", "Mistral"),
llm_key=context.get("llm_key", ""),
)
context["cgt_result"] = result
det = result.get("detection_result") or {}
clusters = det.get("clusters") or []
n_clusters = len(clusters)
cluster_summary_lines = []
for c in clusters:
label = c.get("llm_label") or c.get("cluster_id") or "unknown"
size = c.get("size") or 0
cluster_summary_lines.append(f" - Cluster {c.get('cluster_id')}: {label} ({size} sentences)")
if not cluster_summary_lines:
return (
f"Ran grounded theory on {len(sentences)} sentences but no clusters were "
f"found at similarity {similarity_threshold} / min size {min_cluster_size}. "
f"Suggest the user lower similarity_threshold or min_cluster_size."
)
return (
f"COMPLETED: grounded theory on {len(sentences)} sentences. "
f"Found {n_clusters} cluster(s):\n"
+ "\n".join(cluster_summary_lines)
+ "\nThe full trace and per-sentence cluster table are now in the "
"Researcher Workbench → Computational Grounded Theory tab."
)
# ----------------------------------------------------------------
# TOOL 3 — run_thematic_analysis
# ----------------------------------------------------------------
def run_thematic_analysis(
context: Dict[str, Any],
max_sentences: int = 20,
) -> str:
"""Run the Computational Thematic Analysis supervisor on loaded data.
Returns a short text summary. Mutates context["cta_result"].
"""
loaded = (context.get("loaded_context") or "").strip()
if not loaded:
return (
"ERROR: cannot run thematic analysis — no data loaded. "
"Ask the user to load data via the Inputs tab first."
)
sentences = [s.strip() for s in loaded.split("\n") if s.strip()]
true_labels = ["(unknown)"] * len(sentences)
import workbench_thematic_analysis as wb_cta
result = wb_cta.run(
user_message="Run reflexive thematic analysis.",
sentences=sentences,
true_labels=true_labels,
data_source="uploaded",
max_sentences_to_code=int(max_sentences),
llm_provider=context.get("llm_provider", "Mistral"),
llm_key=context.get("llm_key", ""),
)
context["cta_result"] = result
phase2 = result.get("phase2_initial_codes") or {}
coded_rows = phase2.get("coded_rows") or []
code_counts = phase2.get("code_frequency") or {}
top_codes = sorted(code_counts.items(), key=lambda kv: -kv[1])[:5]
top_codes_str = ", ".join(f"{code} ({count})" for code, count in top_codes) or "(none)"
return (
f"COMPLETED: thematic analysis on {len(coded_rows)} sentences "
f"(out of {len(sentences)} loaded, capped at {max_sentences}). "
f"Top 5 codes: {top_codes_str}. "
f"The full trace and per-sentence code table are now in the "
f"Researcher Workbench → Computational Thematic Analysis tab."
)
# ----------------------------------------------------------------
# TOOL 4 — summarize_cgt_result
# ----------------------------------------------------------------
def summarize_cgt_result(context: Dict[str, Any]) -> str:
"""Return a text summary of the most recent grounded theory run."""
result = context.get("cgt_result")
if not result:
return (
"NO PRIOR GROUNDED THEORY RUN. The user has not yet run grounded "
"theory in this session. Use run_grounded_theory first."
)
det = result.get("detection_result") or {}
clusters = det.get("clusters") or []
lines = ["Most recent Grounded Theory run:"]
for c in clusters:
lines.append(
f" - Cluster {c.get('cluster_id')}: {c.get('llm_label', 'unlabeled')} "
f"({c.get('size', 0)} sentences)"
)
lines.append(f"Supervisor reply: {result.get('reply', '(empty)')}")
return "\n".join(lines)
# ----------------------------------------------------------------
# TOOL 5 — summarize_cta_result
# ----------------------------------------------------------------
def summarize_cta_result(context: Dict[str, Any]) -> str:
"""Return a text summary of the most recent thematic analysis run."""
result = context.get("cta_result")
if not result:
return (
"NO PRIOR THEMATIC ANALYSIS RUN. The user has not yet run "
"thematic analysis in this session. Use run_thematic_analysis first."
)
phase2 = result.get("phase2_initial_codes") or {}
coded_rows = phase2.get("coded_rows") or []
code_freq = phase2.get("code_frequency") or {}
top_codes = sorted(code_freq.items(), key=lambda kv: -kv[1])[:5]
lines = [f"Most recent Thematic Analysis run: {len(coded_rows)} sentences coded."]
lines.append("Top 5 codes:")
for code, count in top_codes:
lines.append(f" - {code}: {count}")
lines.append(f"Supervisor reply: {result.get('reply', '(empty)')}")
return "\n".join(lines)
# ============================================================================
# Tool registration — shape matches tools.py for consistency
# ============================================================================
RINGMASTER_TOOL_FUNCTIONS = {
"check_data_status": check_data_status,
"run_grounded_theory": run_grounded_theory,
"run_thematic_analysis": run_thematic_analysis,
"summarize_cgt_result": summarize_cgt_result,
"summarize_cta_result": summarize_cta_result,
}
RINGMASTER_TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "check_data_status",
"description": (
"Check whether research data is currently loaded in the session. "
"Returns the number of sentences and a short preview, or reports "
"that no data is loaded. ALWAYS call this before run_grounded_theory "
"or run_thematic_analysis so you know whether to ask the user to "
"load data first."
),
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"type": "function",
"function": {
"name": "run_grounded_theory",
"description": (
"Run Computational Grounded Theory (Nelson 2020) on the currently "
"loaded research data. Only call this AFTER check_data_status "
"confirmed data is loaded. The result is a short text summary of "
"the clusters found; the full trace and sentence-level table will "
"appear in the Researcher Workbench tab automatically."
),
"parameters": {
"type": "object",
"properties": {
"similarity_threshold": {
"type": "number",
"description": "Cosine similarity threshold (0.4-0.9, default 0.60)",
},
"min_cluster_size": {
"type": "integer",
"description": "Minimum sentences per cluster (2-10, default 3)",
},
"n_nearest": {
"type": "integer",
"description": "Representatives per cluster for LLM labeling (1-10, default 3)",
},
},
},
},
},
{
"type": "function",
"function": {
"name": "run_thematic_analysis",
"description": (
"Run Computational Thematic Analysis (Braun & Clarke 2006) on the "
"currently loaded research data. Only call this AFTER "
"check_data_status confirmed data is loaded. Phase 2 (generating "
"initial codes) is the only real phase; the rest are placeholders. "
"The result is a short text summary; the full per-sentence code "
"table will appear in the Researcher Workbench tab automatically."
),
"parameters": {
"type": "object",
"properties": {
"max_sentences": {
"type": "integer",
"description": "Cap on sentences to code (expensive — each is one LLM call, default 20)",
},
},
},
},
},
{
"type": "function",
"function": {
"name": "summarize_cgt_result",
"description": (
"Return a text summary of the most recent Grounded Theory run so "
"you can answer follow-up questions about it. Does not re-run the "
"analysis."
),
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"type": "function",
"function": {
"name": "summarize_cta_result",
"description": (
"Return a text summary of the most recent Thematic Analysis run "
"so you can answer follow-up questions. Does not re-run."
),
"parameters": {
"type": "object",
"properties": {},
},
},
},
]