luqman2520's picture
Upload 4 files
ccab3d4 verified
"""
agent.py — LangGraph ReAct Agent for BERTopic Agentic Thematic Analysis
Uses ChatMistralAI + MemorySaver + all 7 tools from tools.py
"""
import json
import os
import re
import pandas as pd
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_mistralai import ChatMistralAI
from tools import (
load_scopus_csv,
run_bertopic_discovery,
label_topics_with_llm,
consolidate_into_themes,
compare_with_taxonomy,
generate_comparison_csv,
export_narrative,
)
llm = ChatMistralAI(
model="mistral-large-latest",
temperature=0.2,
api_key=os.environ.get("MISTRAL_API_KEY", ""),
)
memory = MemorySaver()
SYSTEM_PROMPT = """
You are an expert computational thematic analysis agent. You follow Braun & Clarke (2006)
six-phase thematic analysis methodology, adapted for computational corpus analysis using
BERTopic with sentence-transformer embeddings and agglomerative clustering.
1. load_scopus_csv(file_path: str)
→ Load the CSV. Count papers, abstract sentences, title sentences.
→ Strip boilerplate text from abstracts.
→ Saves cleaned_data.json to outputs/.
→ Input: absolute file path string.
2. run_bertopic_discovery(run_config: str)
→ Embeds sentences using all-MiniLM-L6-v2.
→ Clusters with AgglomerativeClustering (cosine, threshold=0.7).
→ Extracts 5 nearest evidence sentences per cluster.
→ Saves summaries_{tag}.json, embeddings_{tag}.npy, and 2 chart HTML files.
→ Input JSON: {"columns": ["Abstract"]} or {"columns": ["Title"]}
→ Run TWICE: once for Abstract (tag=abstract), once for Title (tag=title).
3. label_topics_with_llm(labelling_input: str)
→ You (the LLM) read the top_sentences for each cluster from summaries_{tag}.json,
then SELF-SUPPLY the llm_labels list with your best label, category,
confidence (0–1), and reasoning for each cluster.
→ Input JSON: {
"tag": "abstract",
"llm_labels": [
{"cluster_id": 0, "label": "AI in Healthcare", "category": "Applied AI",
"confidence": 0.92, "reasoning": "Sentences discuss medical diagnostics..."},
...
]
}
4. consolidate_into_themes(consolidation_input: str)
→ Applies user approvals from the Review Table.
→ Merges approved clusters into final themes with final labels.
→ Saves themes_{tag}.json and chart_keywords.html.
→ Input JSON: {
"tag": "abstract",
"approvals": [
{"cluster_id": 0, "approved": true, "rename_to": "AI in Medicine",
"reasoning": "Covers core domain"},
...
]
}
5. compare_with_taxonomy(taxonomy_input: str)
→ Maps each final theme to the PAJAIS taxonomy.
→ Marks each theme as MAPPED or NOVEL.
→ You self-supply the mappings list.
→ Input JSON: {
"tag": "abstract",
"mappings": [
{"final_label": "AI in Medicine", "pajais_category": "Healthcare IS",
"mapped": true},
...
]
}
6. generate_comparison_csv(comparison_input: str)
→ Generates side-by-side CSV and Plotly chart comparing abstract vs title themes.
→ Input JSON: {"tags": ["abstract", "title"]}
7. export_narrative(narrative_input: str)
→ You write the ~500-word Section 7 narrative yourself.
→ Input JSON: {
"tag": "abstract",
"narrative": "...(your 500-word narrative here)...",
"researcher_name": "..."
}
════════════════════════════════════════════════════════════════
RUN CONFIGURATIONS
════════════════════════════════════════════════════════════════
• Abstract run: columns = ["Abstract"] → tag = "abstract"
• Title run: columns = ["Title"] → tag = "title"
Always run BERTopic for BOTH configurations before Phase 3.
════════════════════════════════════════════════════════════════
BRAUN & CLARKE 6-PHASE WORKFLOW
════════════════════════════════════════════════════════════════
PHASE 1 — FAMILIARISATION
Goal: Understand the dataset.
Action:
1. Call load_scopus_csv(file_path) with the uploaded file path.
2. Report: total papers, abstract sentences, title sentences, column list.
3. Show 5 sample titles.
STOP after Phase 1. Say:
"✅ Phase 1 complete. Familiarisation done. Say 'Start Phase 2' to begin coding."
──────────────────────────────────────────────────────────────
PHASE 2 — INITIAL CODING
Goal: Generate initial semantic codes (clusters) from the corpus.
Actions:
1. Call run_bertopic_discovery({"columns": ["Abstract"]})
2. Call run_bertopic_discovery({"columns": ["Title"]})
3. Read outputs/summaries_abstract.json — list ALL cluster IDs and their top 2 sentences.
4. Analyse each cluster's top_sentences yourself.
5. Call label_topics_with_llm with your self-generated labels for the ABSTRACT run.
6. Call label_topics_with_llm with your self-generated labels for the TITLE run.
7. Build and present a REVIEW TABLE for the user (for abstract clusters):
Columns: [#, Topic Label, Top Evidence, Sentences, Papers, Approve, Rename To, Reasoning]
Fill Approve=True for confident clusters, Approve=False for weak/duplicate ones.
*** STOP GATE AFTER PHASE 2 ***
Say: "⏸️ STOP — Phase 2 complete. Review the table above.
Edit Approve/Rename To/Reasoning columns, then click Submit Review to proceed to Phase 3."
──────────────────────────────────────────────────────────────
PHASE 3 — SEARCHING FOR THEMES
Goal: Group related codes into broader themes.
Trigger: User submits the review table (message begins with [REVIEW_TABLE_SUBMITTED]).
Actions:
1. Parse the JSON review table from the user's message.
2. Call consolidate_into_themes with the parsed approvals for "abstract".
3. Call consolidate_into_themes with approvals for "title" (approve all by default).
4. Report the final theme list with counts.
*** STOP GATE AFTER PHASE 3 ***
Say: "⏸️ STOP — Phase 3 complete. [N] themes consolidated.
Review the theme list above. Say 'Proceed to Phase 4' when satisfied."
──────────────────────────────────────────────────────────────
PHASE 4 — REVIEWING THEMES
Goal: Theoretical saturation check.
Actions:
1. Analyse theme sizes and sentence counts.
2. Flag any theme with fewer than 3 sentences as POTENTIALLY WEAK.
3. Flag any two themes sharing >60% of their top keywords as POTENTIALLY OVERLAPPING.
4. Report saturation status: SATURATED or REQUIRES REVISION.
5. Recommend merges or splits if needed.
*** STOP GATE AFTER PHASE 4 ***
Say: "⏸️ STOP — Phase 4 complete. Saturation analysis done.
Say 'Proceed to Phase 5' to finalise theme names."
──────────────────────────────────────────────────────────────
PHASE 5 — DEFINING AND NAMING THEMES
Goal: Finalize descriptive theme names and definitions.
Actions:
1. For each theme, write a 1-sentence definition.
2. Present final theme names and definitions in a clean table.
3. Confirm with user.
(No STOP gate — flows directly into Phase 5.5)
──────────────────────────────────────────────────────────────
PHASE 5.5 — PAJAIS TAXONOMY MAPPING
Goal: Position themes within the IS research landscape.
Actions:
1. Call compare_with_taxonomy for the abstract run — self-supply your mappings.
2. Call compare_with_taxonomy for the title run — self-supply your mappings.
3. Present a table: Theme | PAJAIS Category | Status (MAPPED/NOVEL).
*** STOP GATE AFTER PHASE 5.5 ***
Say: "⏸️ STOP — Phase 5.5 complete. PAJAIS mapping done.
Say 'Generate Final Report' to proceed to Phase 6."
──────────────────────────────────────────────────────────────
PHASE 6 — WRITING UP (REPORT)
Goal: Generate the final deliverables.
Actions:
1. Call generate_comparison_csv({"tags": ["abstract", "title"]})
2. Write a ~500-word academic narrative (Section 7) covering:
- Research context
- Summary of each theme with evidence
- Comparison of abstract vs title themes
- PAJAIS taxonomy positioning
- Implications for IS research
3. Call export_narrative with your narrative text.
4. Tell the user: outputs are in the outputs/ folder, click Refresh Downloads.
════════════════════════════════════════════════════════════════
STRICT BEHAVIOURAL RULES
════════════════════════════════════════════════════════════════
• ONE PHASE PER MESSAGE. Never jump ahead.
• At each STOP gate, wait for explicit user confirmation before proceeding.
• Never skip a phase.
• Always self-supply data for label_topics_with_llm, compare_with_taxonomy,
and export_narrative — do not ask the user for these.
• When the user submits a review table ([REVIEW_TABLE_SUBMITTED]), parse it
and call consolidate_into_themes immediately.
• Be concise. Avoid repeating instructions.
• If a tool returns an error, report it clearly and ask the user how to proceed.
• Keep all intermediate files in the outputs/ directory.
════════════════════════════════════════════════════════════════
PHASE PROGRESS HTML FORMAT
════════════════════════════════════════════════════════════════
After completing each phase, include in your response:
[PHASE_PROGRESS: P1=done, P2=done, P3=pending, P4=pending, P5=pending, P5.5=pending, P6=pending]
(Replace 'done'/'pending' accurately for the current state.)
"""
# ─── Agent ────────────────────────────────────────────────────────────────────
tools_list = [
load_scopus_csv,
run_bertopic_discovery,
label_topics_with_llm,
consolidate_into_themes,
compare_with_taxonomy,
generate_comparison_csv,
export_narrative,
]
agent = create_react_agent(
model=llm,
tools=tools_list,
checkpointer=memory,
prompt=SYSTEM_PROMPT,
)
# ─── Helpers for app.py ───────────────────────────────────────────────────────
def _parse_phase_progress(text: str) -> str:
"""Extract PHASE_PROGRESS tag from agent response and render as HTML."""
match = re.search(r"\[PHASE_PROGRESS:(.*?)\]", text, re.DOTALL)
status_map = {
"done": ("✅", "#22c55e"),
"pending": ("⬜", "#94a3b8"),
"active": ("🔄", "#3b82f6"),
}
labels = ["P1", "P2", "P3", "P4", "P5", "P5.5", "P6"]
if not match:
return "<div style='padding:10px;background:#f0f4ff;border-radius:8px'>" \
"<b>Phase Progress:</b> " + \
" ".join(f"<span style='margin-left:8px'>⬜ {l}</span>" for l in labels) + \
"</div>"
progress_str = match.group(1)
state = {}
for part in progress_str.split(","):
part = part.strip()
kv = part.split("=")
if len(kv) == 2:
state[kv[0].strip()] = kv[1].strip()
def _badge(label):
s = state.get(label, "pending")
icon, color = status_map.get(s, ("⬜", "#94a3b8"))
return (f"<span style='margin-left:8px;color:{color};font-weight:600'>"
f"{icon} {label}</span>")
badges = "".join(map(_badge, labels))
clean = re.sub(r"\[PHASE_PROGRESS:.*?\]", "", text, flags=re.DOTALL).strip()
return (
"<div style='padding:10px;background:#f0f4ff;border-radius:8px;"
"font-family:sans-serif'>"
f"<b>Phase Progress:</b>{badges}</div>",
clean
)
def _build_review_table(agent_text: str) -> list:
"""
Parse a markdown table from the agent response into a list of dicts
for the Gradio Dataframe review table.
"""
lines = agent_text.splitlines()
# Find markdown table header line (starts with '|' and contains # and Topic)
header_idx = None
for i, ln in enumerate(lines):
if ln.strip().startswith("|") and ("#" in ln) and ("Topic" in ln or "Topic Label" in ln):
header_idx = i
break
if header_idx is None:
# Fallback: TSV / whitespace-delimited
lines = agent_text.strip().splitlines()
header_idx = None
for i, ln in enumerate(lines):
if ("#" in ln) and ("Topic" in ln or "Topic Label" in ln):
header_idx = i
break
if header_idx is None:
return []
header_cells = re.split(r"\t| {2,}", lines[header_idx].strip())
data_lines = lines[header_idx+1:]
else:
# header exists as markdown table; collect following '|' rows
header_cells = [c.strip() for c in lines[header_idx].strip().strip("|").split("|")]
data_lines = []
# skip possible separator row like |---|
j = header_idx + 1
if j < len(lines) and re.match(r"^\|[-\s:|]+\|$", lines[j].strip()):
j += 1
while j < len(lines) and lines[j].strip().startswith("|"):
data_lines.append(lines[j])
j += 1
# Map header indices
header_map = {}
for idx, h in enumerate(header_cells):
key = h.lower()
if "#" in key:
header_map["#"] = idx
elif "cluster" in key and "id" in key:
header_map["Cluster ID"] = idx
elif "topic" in key and "label" in key:
header_map["Topic Label"] = idx
elif "evidence" in key:
header_map["Top Evidence"] = idx
elif "sentence" in key:
header_map["Sentences"] = idx
elif "paper" in key:
header_map["Papers"] = idx
elif "approve" in key:
header_map["Approve"] = idx
elif "rename" in key:
header_map["Rename To"] = idx
elif "reason" in key:
header_map["Reasoning"] = idx
rows = []
for ln in data_lines:
cells = [c.strip() for c in ln.strip().strip("|").split("|")] if ln.strip().startswith("|") else re.split(r"\t| {2,}", ln.strip())
if len(cells) < 2:
continue
row = {"#": "", "Topic Label": "", "Top Evidence": "", "Sentences": "", "Papers": "", "Approve": False, "Rename To": "", "Reasoning": ""}
def safe_get(idx):
try:
return cells[idx]
except Exception:
return ""
if "#" in header_map:
row["#"] = safe_get(header_map["#"]) or safe_get(0)
if "Cluster ID" in header_map:
row["Cluster ID"] = safe_get(header_map["Cluster ID"]) or ""
if "Topic Label" in header_map:
row["Topic Label"] = safe_get(header_map["Topic Label"]) or safe_get(1)
if "Top Evidence" in header_map:
row["Top Evidence"] = safe_get(header_map["Top Evidence"]) or ""
if "Sentences" in header_map:
row["Sentences"] = safe_get(header_map["Sentences"]) or ""
if "Papers" in header_map:
row["Papers"] = safe_get(header_map["Papers"]) or ""
if "Approve" in header_map:
val = safe_get(header_map["Approve"]).lower()
row["Approve"] = val in ("true","yes","✅","1","y","approve")
if "Rename To" in header_map:
row["Rename To"] = safe_get(header_map["Rename To"]) or ""
if "Reasoning" in header_map:
row["Reasoning"] = safe_get(header_map["Reasoning"]) or ""
rows.append(row)
return rows
raw_rows = table_pattern.group(2).strip().splitlines()
rows = []
def _parse_row(line):
cells = list(map(str.strip, line.strip("|").split("|")))
if len(cells) >= 8:
return {
"#": cells[0],
"Topic Label": cells[1],
"Top Evidence": cells[2],
"Sentences": cells[3],
"Papers": cells[4],
"Approve": cells[5].lower() in ("true", "yes", "✅", "1"),
"Rename To": cells[6],
"Reasoning": cells[7],
}
return None
parsed = list(map(_parse_row, raw_rows))
cleaned = list(filter(lambda r: r is not None, parsed))
return cleaned
def get_agent_state(thread_id: str) -> dict:
"""Return the current memory state for a given thread."""
config = {"configurable": {"thread_id": thread_id}}
return memory.get(config) or {}
def run_agent(user_message: str, context: dict, chat_history: list):
"""
Invoke the agent with a user message and return:
(response_text, review_table_data, phase_bar_html)
Parameters
----------
user_message : str
The user's message or [REVIEW_TABLE_SUBMITTED] payload.
context : dict
Must include 'file_path' and 'thread_id'.
chat_history : list
List of (human, ai) tuples for context.
"""
file_path = context.get("file_path", "")
thread_id = context.get("thread_id", "thread-001")
# Quick shortcut: if user requests to start Phase 2, build a review table
# directly from outputs/summaries_abstract.json to avoid LLM calls.
if user_message.strip().lower().startswith("start phase 2"):
summaries_path = "outputs/summaries_abstract.json"
if not os.path.exists(summaries_path):
return (
"Summaries not found. Run BERTopic discovery first (Phase 2).",
[],
_parse_phase_progress("[PHASE_PROGRESS: P1=done, P2=pending, P3=pending, P4=pending, P5=pending, P5.5=pending, P6=pending]")
)
with open(summaries_path, encoding="utf-8") as f:
summaries = json.load(f)
# sort by size desc and take top 20
top = sorted(summaries, key=lambda s: s.get("size", 0), reverse=True)[:20]
# build markdown table
md_lines = [
"| # | Cluster ID | Topic Label | Top Evidence | Sentences | Papers | Approve | Rename To | Reasoning |",
"|---|------------|-------------|--------------|-----------|--------|---------|-----------|-----------|",
]
for i, s in enumerate(top, start=1):
top_ev = "; ".join(s.get("top_sentences", [])[:2])
row = f"| {i} | {s.get('cluster_id')} | {s.get('label','')} | {top_ev} | {s.get('size',0)} | {len(s.get('papers',[]))} | ✅ | | |"
md_lines.append(row)
md_table = "\n".join(md_lines)
phase_html = _parse_phase_progress("[PHASE_PROGRESS: P1=done, P2=done, P3=pending, P4=pending, P5=pending, P5.5=pending, P6=pending]")
# _parse_phase_progress can return (html, clean) tuple
if isinstance(phase_html, tuple):
phase_html = phase_html[0]
review_data = _build_review_table(md_table)
return md_table, review_data, phase_html
if not os.environ.get("MISTRAL_API_KEY"):
return (
"Mistral API key is missing. Set the `MISTRAL_API_KEY` environment variable, "
"restart the app, and then try again.",
[],
_parse_phase_progress(""),
)
# Prepend file path hint if present
full_message = (
f"[FILE_PATH: {file_path}]\n{user_message}"
if file_path
else user_message
)
config = {"configurable": {"thread_id": thread_id}}
try:
response = agent.invoke({"messages": [("human", full_message)]}, config=config)
ai_text = response["messages"][-1].content
except Exception as exc:
return (
f"Agent execution failed: {exc}",
[],
_parse_phase_progress(""),
)
# Parse phase progress bar
parsed = _parse_phase_progress(ai_text)
if isinstance(parsed, tuple):
phase_html, clean_text = parsed
else:
phase_html = parsed
clean_text = ai_text
# Parse review table if present
review_data = _build_review_table(clean_text)
# Fallback: if agent didn't emit a markdown review table but summaries exist,
# populate the review table from outputs/summaries_abstract.json so the UI
# shows a usable table for Phase 2 review.
if not review_data:
summaries_path = "outputs/summaries_abstract.json"
if os.path.exists(summaries_path):
try:
with open(summaries_path) as f:
summaries = json.load(f)
rows = []
for s in summaries:
rows.append({
"#": s.get("cluster_id", ""),
"Topic Label": s.get("label", ""),
"Top Evidence": ("; ").join(s.get("top_sentences", [])[:2]),
"Sentences": s.get("size", 0),
"Papers": len(s.get("papers", [])),
"Approve": False,
"Rename To": "",
"Reasoning": "",
})
review_data = rows
except Exception:
review_data = []
return clean_text, review_data, phase_html