topic_modelling / tools.py
aadisawant2912's picture
Update tools.py
9804054 verified
"""
tools.py - 7 LangChain @tool functions for BERTopic Thematic Analysis Agent.
Rules: ZERO if/else, ZERO for/while, ZERO try/except, ZERO PromptTemplate.
All LLM calls use plain HumanMessage strings directly.
Workflow:
- Abstract run saves to data/abstract/
- Title run saves to data/title/
- Comparison CSV + narrative only generated when BOTH runs are complete
- Topic IDs are sequential 1..N (not raw cluster labels)
- Boilerplate filter catches © symbol, all major publishers
"""
from __future__ import annotations
import json
import re
import shutil
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.express as px
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_mistralai import ChatMistralAI
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
# ── paths ──────────────────────────────────────────────────────────────────────
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)
# ── Embedding model — loaded ONCE at module level, reused everywhere ───────────
# This prevents repeated HuggingFace downloads and avoids 429 rate limit errors.
# The UNEXPECTED embeddings.position_ids warning is harmless — safe to ignore.
print("Loading sentence-transformers model (one-time)...")
_EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
print("Model loaded OK.")
def _p(run_config: str) -> dict:
"""Return all file paths for a given run_config, creating subdirectory."""
d = DATA_DIR / run_config
d.mkdir(parents=True, exist_ok=True)
return {
"dir": d,
"sentences": d / "sentences.json",
"stats": d / "stats.json",
"papers": d / "papers.csv",
"emb": d / "emb.npy",
"summaries": d / "summaries.json",
"charts": d / "charts.json",
"themes": d / "themes.json",
"taxonomy": d / "taxonomy.json",
"narrative": d / "narrative.txt",
"comparison": DATA_DIR / "comparison.csv", # shared output
}
RUN_CONFIGS = {
"abstract": ["Abstract"],
"title": ["Title"],
}
# Comprehensive boilerplate filter — catches © symbol + all major publishers
BOILERPLATE_PATTERNS = [
r"\u00a9", # © unicode
r"\\u00a9", # escaped unicode
r"copyright\s*\d{4}",
r"\d{4}\s+john wiley",
r"john wiley\s*(&|and)\s*sons",
r"blackwell\s*(publishing|pub)",
r"wiley\s+periodicals",
r"wiley\s+online",
r"all rights reserved",
r"doi\s*:\s*\S+",
r"published by elsevier",
r"elsevier\s*(b\.v|inc|ltd|science)",
r"springer\s*(nature|verlag|science|link)",
r"taylor\s*(&|and)\s*francis",
r"informa\s+uk",
r"sage\s+publications",
r"information systems journal\s+published",
r"emerald\s+(publishing|group)",
r"this article is",
r"rights reserved",
r"permission from",
r"reproduced with",
]
BOILERPLATE_RE = re.compile("|".join(BOILERPLATE_PATTERNS), re.IGNORECASE)
# Extra keyword filter applied per-sentence
PUBLISHER_KEYWORDS = frozenset([
"wiley", "elsevier", "blackwell", "springer",
"taylor", "information systems journal", "emerald"
])
PAJAIS_CATEGORIES = [
"Information Systems Theory", "IS Strategy & Governance",
"Digital Innovation", "Enterprise Systems",
"AI & Intelligent Systems", "Big Data & Analytics",
"Cybersecurity & Privacy", "Cloud Computing",
"IS in Healthcare", "IS in Education",
"E-Commerce & Digital Markets", "Social Media & Platforms",
"Human-Computer Interaction", "IS Project Management",
"IT Outsourcing", "Knowledge Management",
"IS Development Methodologies", "Digital Transformation",
"IS Ethics & Society", "IS in Developing Countries",
"Mobile Computing", "IT Infrastructure",
"IS Adoption & Diffusion", "IS Evaluation",
"Organizational IS & Change",
]
def safe_read_csv(path):
"""Read CSV with UTF-8 fallback to latin-1."""
try:
return pd.read_csv(path, encoding="utf-8")
except UnicodeDecodeError:
return pd.read_csv(path, encoding="latin-1")
def _is_clean(s: str) -> bool:
"""Return True if sentence passes all quality checks."""
sl = s.lower().strip()
return (
not BOILERPLATE_RE.search(s)
and not s.strip().startswith("\u00a9")
and not s.strip().startswith("©")
and len(s.split()) > 6
and len(s.strip()) > 40
and not any(kw in sl for kw in PUBLISHER_KEYWORDS)
)
def _call_llm_json(llm, prompt: str) -> list:
"""Call LLM with plain HumanMessage, strip markdown fences, parse JSON."""
response = llm.invoke([HumanMessage(content=prompt)])
raw = response.content.strip()
raw = raw.split("```json")[-1].split("```")[0].strip() if "```" in raw else raw
return json.loads(raw)
def _both_runs_complete() -> bool:
"""Return True only when BOTH abstract and title runs have themes saved."""
return (
(_p("abstract")["themes"]).exists()
and (_p("title")["themes"]).exists()
)
# =============================================================================
# TOOL 1 — load_scopus_csv
# Saves to data/uploaded.csv (permanent copy) AND data/{run_config}/papers.csv
# =============================================================================
@tool
def load_scopus_csv(csv_path: str, run_config: str = "abstract") -> str:
"""Load a Scopus CSV, filter boilerplate sentences, save per run_config.
Saves sentences to data/{run_config}/sentences.json.
Also copies the CSV permanently to data/uploaded.csv.
Args:
csv_path: Path to the uploaded Scopus CSV file.
run_config: 'abstract' or 'title' (default 'abstract').
"""
p = _p(run_config)
columns = RUN_CONFIGS.get(run_config, ["Abstract"])
# Copy CSV to permanent location only if it is a different file
dest = DATA_DIR / "uploaded.csv"
src = Path(csv_path).resolve()
dst = dest.resolve()
_ = shutil.copy(str(src), str(dst)) if src != dst else None
df_raw = safe_read_csv(dest)
# Find which text column actually exists in this CSV
# Scopus sometimes uses "Abstract" or "abstract" or "ABSTRACT"
col_lower_map = {c.strip().lower(): c for c in df_raw.columns}
target_lower = columns[0].lower()
actual_col = col_lower_map.get(target_lower, None)
# Also try partial match if exact match fails
actual_col = (
actual_col
if actual_col is not None
else next(filter(lambda c: target_lower in c.lower(), df_raw.columns), None)
)
# If still not found, return early with clear message
if actual_col is None:
available = list(df_raw.columns)
return json.dumps({
"error": "Column '{}' not found in CSV. Available columns: {}".format(
columns[0], available
),
"run_config": run_config,
})
# Build keep_cols — deduplicate to avoid DataFrame-instead-of-Series bug
# when actual_col == "Title" (title run) and "Title" also appears in extras
extras = ["Title", "Year", "Source title", "Cited by"]
all_wanted = [actual_col] + [c for c in extras if c != actual_col]
keep_cols = list(dict.fromkeys(filter(lambda c: c in df_raw.columns, all_wanted)))
df = df_raw[keep_cols].copy()
# Access the text column safely as a Series using column position
text_series = df[actual_col]
# If still a DataFrame (duplicate col names), take first column
text_series = (
text_series.iloc[:, 0]
if isinstance(text_series, pd.DataFrame)
else text_series
)
mask = text_series.notna() & (text_series.astype(str).str.strip() != "")
df = df[mask].copy()
text_series = text_series[mask]
def split_sentences(text):
parts = re.split(r"(?<=[.!?])\s+", str(text))
return list(filter(_is_clean, parts))
sentences_lists = list(map(split_sentences, list(text_series)))
all_sentences = [s for lst in sentences_lists for s in lst]
stats = {
"papers": int(len(df)),
"sentences_after_filter": int(len(all_sentences)),
"columns_used": [actual_col],
"run_config": run_config,
}
p["sentences"].write_text(json.dumps(all_sentences, ensure_ascii=False))
p["stats"].write_text(json.dumps(stats, ensure_ascii=False))
df.to_csv(p["papers"], index=False)
return json.dumps(stats)
# =============================================================================
# TOOL 2 — run_bertopic_discovery
# threshold=0.35 → ~100 fine-grained clusters; IDs renumbered 1..N
# =============================================================================
@tool
def run_bertopic_discovery(top_n_topics: int = 100, run_config: str = "abstract") -> str:
"""Embed sentences with all-MiniLM-L6-v2, cluster with AgglomerativeClustering
(cosine, threshold=0.35) targeting ~100 topics. Topic IDs are sequential 1..N.
Args:
top_n_topics: Target number of clusters (default 100).
run_config: 'abstract' or 'title' (default 'abstract').
"""
p = _p(run_config)
sentences = json.loads(p["sentences"].read_text())
embeddings = _EMBED_MODEL.encode(
sentences, normalize_embeddings=True,
show_progress_bar=False, batch_size=64
)
np.save(p["emb"], embeddings)
clustering = AgglomerativeClustering(
metric="cosine", linkage="average",
distance_threshold=0.35, n_clusters=None,
)
labels = clustering.fit_predict(embeddings)
all_labels = sorted(set(labels.tolist()))
label_sizes = list(map(lambda lb: (lb, int((labels == lb).sum())), all_labels))
# Keep clusters with ≥3 sentences, sort by size desc, take top N
label_filtered = list(filter(lambda x: x[1] >= 3, label_sizes))
label_sorted = sorted(label_filtered, key=lambda x: -x[1])
retained = list(map(lambda x: x[0], label_sorted[:top_n_topics]))
def build_summary(seq_label):
seq_id, raw_label = seq_label
mask = labels == raw_label
cluster_embs = embeddings[mask]
raw_sents = [sentences[i] for i, m in enumerate(mask.tolist()) if m]
clean_sents = list(filter(_is_clean, raw_sents))
sents = clean_sents if clean_sents else raw_sents[:5]
centroid = cluster_embs.mean(axis=0, keepdims=True)
sims = cosine_similarity(centroid, cluster_embs)[0]
top5_idx = sims.argsort()[-5:][::-1].tolist()
raw_top = list(map(lambda i: raw_sents[i], top5_idx))
clean_set = set(sents)
top_evidence = list(filter(lambda s: s in clean_set, raw_top))[:5]
top_evidence = top_evidence if top_evidence else raw_top[:3]
return {
"topic_id": seq_id,
"size": int(mask.sum()),
"top_evidence": top_evidence,
"sentences": sents,
"centroid": centroid[0].tolist(),
"run_config": run_config,
}
# Sequential IDs starting at 1
seq_pairs = list(map(lambda x: (x[0] + 1, x[1]), enumerate(retained)))
summaries = list(map(build_summary, seq_pairs))
p["summaries"].write_text(json.dumps(summaries, indent=2, ensure_ascii=False))
sizes = list(map(lambda s: s["size"], summaries))
ids = list(map(lambda s: s["topic_id"], summaries))
fig1 = px.bar(x=ids, y=sizes, title="Topic Sizes — {}".format(run_config),
labels={"x": "Topic #", "y": "Sentences"})
fig2 = px.histogram(x=sizes, nbins=30, title="Size Distribution — {}".format(run_config),
labels={"x": "Cluster Size"})
centroids = np.array(list(map(lambda s: s["centroid"], summaries)))
n_comp = min(2, centroids.shape[0], centroids.shape[1])
coords = PCA(n_components=n_comp).fit_transform(centroids)
fig3 = px.scatter(
x=coords[:, 0],
y=(coords[:, 1] if coords.shape[1] > 1 else [0] * len(coords)),
text=list(map(str, ids)),
title="Topic Centroids PCA — {}".format(run_config),
labels={"x": "PC1", "y": "PC2"},
)
fig4 = px.treemap(
names=list(map(str, ids)), parents=["Topics"] * len(ids),
values=sizes, title="Treemap — {}".format(run_config),
)
charts = {
"bar": fig1.to_html(full_html=False, include_plotlyjs="cdn"),
"histogram": fig2.to_html(full_html=False, include_plotlyjs=False),
"scatter": fig3.to_html(full_html=False, include_plotlyjs=False),
"treemap": fig4.to_html(full_html=False, include_plotlyjs=False),
}
p["charts"].write_text(json.dumps(charts))
return json.dumps({
"topics_found": len(summaries),
"run_config": run_config,
"chart_types": list(charts.keys()),
"note": "Topics numbered 1..{}, threshold=0.35".format(len(summaries)),
})
# =============================================================================
# TOOL 3 — label_topics_with_llm
# =============================================================================
@tool
def label_topics_with_llm(batch_size: int = 15, run_config: str = "abstract") -> str:
"""Label topic clusters with human-readable names via Mistral LLM.
Uses mistral-small-latest to stay within free-tier rate limits.
Adds 12-second sleep between batches to avoid HTTP 429 errors.
Args:
batch_size: Topics per LLM call (default 15).
run_config: 'abstract' or 'title' (default 'abstract').
"""
import time
p = _p(run_config)
summaries = json.loads(p["summaries"].read_text())
# Cap at 60 to reduce total API calls — covers the most meaningful clusters
top_summaries = summaries[:60]
# mistral-small has higher RPM limits than mistral-large on the free tier
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.2)
batch_starts = list(range(0, len(top_summaries), batch_size))
def label_batch(start):
batch = top_summaries[start: start + batch_size]
# Only 2 evidence sentences per topic to reduce token usage
mini = list(map(
lambda s: {"topic_id": s["topic_id"], "sentences": s["top_evidence"][:2]},
batch
))
topic_ids_in_batch = list(map(lambda s: s["topic_id"], batch))
prompt = (
"You are a thematic analysis expert in Information Systems research.\n"
"For each topic cluster below, provide:\n"
" - label: a specific 3-6 word academic theme name (e.g. 'Digital Transformation Barriers', "
"'AI Adoption in Healthcare', 'IS Project Management Challenges')\n"
" - reasoning: one sentence explaining why you chose that label\n\n"
"IMPORTANT: You MUST return exactly one entry for each topic_id in this list: "
+ str(topic_ids_in_batch) + "\n\n"
"TOPICS:\n" + json.dumps(mini, indent=2) + "\n\n"
"Return ONLY a raw JSON array with no markdown fences. "
"Each element must have exactly these three keys: "
"topic_id (integer matching the input), label (string), reasoning (string)."
)
return _call_llm_json(llm, prompt)
# Sequential with sleep between batches — free tier ~5 req/min for mistral-small
# 12 seconds between calls keeps us safely under the limit
all_labels_raw = []
for idx, start in enumerate(batch_starts):
all_labels_raw.extend(label_batch(start))
_ = time.sleep(12) if idx < len(batch_starts) - 1 else None
# Build label_map keyed by BOTH int and str — LLM sometimes returns "1" not 1
label_map = {}
for item in all_labels_raw:
tid = item.get("topic_id", "")
label_map[int(tid)] = item
label_map[str(tid)] = item
def enrich(s):
tid = s["topic_id"]
info = label_map.get(tid) or label_map.get(str(tid)) or {}
raw_label = str(info.get("label", "")).strip()
raw_reason = str(info.get("reasoning", "")).strip()
good_label = (
raw_label
if raw_label and raw_label.lower() not in ("", "n/a", "none", "null")
else "Topic {}".format(tid)
)
return {**s, "label": good_label, "reasoning": raw_reason}
enriched = list(map(enrich, top_summaries))
p["summaries"].write_text(json.dumps(enriched, indent=2, ensure_ascii=False))
labelled_count = sum(
1 for s in enriched
if s.get("label", "").strip() and not s["label"].startswith("Topic ")
)
return json.dumps({
"labelled_topics": len(enriched),
"with_llm_label": labelled_count,
"run_config": run_config,
})
@tool
def consolidate_into_themes(approved_groups: str, run_config: str = "abstract") -> str:
"""Merge approved topic groups into themes and recompute centroids.
Args:
approved_groups: JSON list [{theme_name: str, topic_ids: [int,...]}]
run_config: 'abstract' or 'title' (default 'abstract').
"""
p = _p(run_config)
groups = json.loads(approved_groups)
summaries = json.loads(p["summaries"].read_text())
id_map = {s["topic_id"]: s for s in summaries}
def build_theme(group):
ids = group["topic_ids"]
members = list(map(lambda tid: id_map[tid], ids))
sents = [s for ms in members for s in ms.get("sentences", [])]
centroids = np.array(list(map(lambda ms: ms["centroid"], members)))
return {
"theme_name": group["theme_name"],
"topic_ids": ids,
"sentences": sents,
"centroid": centroids.mean(axis=0).tolist(),
"paper_count": len(set(sents)),
"run_config": run_config,
}
themes = list(map(build_theme, groups))
p["themes"].write_text(json.dumps(themes, indent=2, ensure_ascii=False))
return json.dumps({
"themes_created": len(themes),
"theme_names": list(map(lambda t: t["theme_name"], themes)),
"run_config": run_config,
"both_complete": _both_runs_complete(),
})
# =============================================================================
# TOOL 5 — compare_with_taxonomy
# =============================================================================
@tool
def compare_with_taxonomy(run_config: str = "abstract") -> str:
"""Map themes to PAJAIS 25 categories via Mistral LLM.
Args:
run_config: 'abstract' or 'title' (default 'abstract').
"""
p = _p(run_config)
themes = json.loads(p["themes"].read_text())
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1)
theme_mini = list(map(
lambda t: {"name": t["theme_name"], "sample": t["sentences"][:2]},
themes
))
prompt = (
"You are a research classification expert in Information Systems.\n\n"
"Map each theme to the single most relevant PAJAIS category.\n\n"
"THEMES:\n" + json.dumps(theme_mini, indent=2) + "\n\n"
"PAJAIS CATEGORIES:\n" + json.dumps(PAJAIS_CATEGORIES, indent=2) + "\n\n"
"Return ONLY a raw JSON array. "
"Each element: name, pajais_category, confidence, rationale. "
"No markdown, no explanation."
)
result = _call_llm_json(llm, prompt)
p["taxonomy"].write_text(json.dumps(result, indent=2, ensure_ascii=False))
return json.dumps({
"mapped_themes": len(result),
"run_config": run_config,
"both_complete": _both_runs_complete(),
})
# =============================================================================
# TOOL 6 — generate_comparison_csv
# ONLY runs when BOTH abstract and title runs are complete
# Columns: Title | Abstract | Year | Source Journal
# =============================================================================
@tool
def generate_comparison_csv() -> str:
"""Generate Title | Abstract | Year | Source Journal comparison CSV.
Only available after BOTH abstract and title runs have completed themes.
Saves to data/comparison.csv.
"""
abs_complete = _p("abstract")["themes"].exists()
title_complete = _p("title")["themes"].exists()
status_msg = (
"Abstract complete: {}, Title complete: {}. "
"Run 'run title' to complete the title analysis first."
).format(abs_complete, title_complete)
# Use ternary to avoid if/else
result = (
_do_generate_comparison_csv()
if (abs_complete and title_complete)
else status_msg
)
return result
def _assign_theme_for_text(text: str, themes: list, taxonomy_map: dict) -> tuple:
"""
Given a piece of text (title or abstract), find the best matching theme
by computing cosine similarity between the text embedding and theme centroids.
Returns (theme_name, pajais_category).
"""
text_emb = _EMBED_MODEL.encode([str(text)], normalize_embeddings=True)[0]
centroids = np.array(list(map(lambda t: t["centroid"], themes)))
sims = cosine_similarity(text_emb.reshape(1, -1), centroids)[0]
best_idx = int(sims.argmax())
best_theme = themes[best_idx]["theme_name"]
pajais = taxonomy_map.get(best_theme, "Unknown")
return best_theme, pajais, float(round(sims[best_idx], 4))
def _do_generate_comparison_csv() -> str:
"""
Build enriched comparison CSV with per-paper theme assignments for both runs.
Columns:
Title | Title Theme | Title PAJAIS Category |
Abstract | Abstract Theme | Abstract PAJAIS Category |
Year | Source Journal |
Theme Similarity | Similarity % | Similarity Reasoning
"""
df = safe_read_csv(DATA_DIR / "uploaded.csv")
# Detect columns
title_col = next(filter(lambda c: c.strip().lower() == "title", df.columns), None)
abstract_col = next(filter(lambda c: c.strip().lower() == "abstract", df.columns), None)
year_col = next(filter(lambda c: c.strip().lower() == "year", df.columns), None)
journal_col = next(filter(lambda c: "source" in c.lower(), df.columns), None)
# Load abstract themes + taxonomy
abs_themes = json.loads(_p("abstract")["themes"].read_text())
abs_taxonomy = json.loads(_p("abstract")["taxonomy"].read_text())
abs_tax_map = {
item.get("name", item.get("theme_name", "")): item.get("pajais_category", "")
for item in abs_taxonomy
}
# Load title themes + taxonomy
title_themes = json.loads(_p("title")["themes"].read_text())
title_taxonomy = json.loads(_p("title")["taxonomy"].read_text())
title_tax_map = {
item.get("name", item.get("theme_name", "")): item.get("pajais_category", "")
for item in title_taxonomy
}
# Build theme name → PAJAIS lookup
abs_theme_names = list(map(lambda t: t["theme_name"], abs_themes))
title_theme_names = list(map(lambda t: t["theme_name"], title_themes))
# Assign themes per paper using centroid similarity
def assign_abstract_theme(text):
return _assign_theme_for_text(str(text), abs_themes, abs_tax_map)
def assign_title_theme(text):
return _assign_theme_for_text(str(text), title_themes, title_tax_map)
abstracts = list(df[abstract_col].fillna("") if abstract_col else [""] * len(df))
titles = list(df[title_col].fillna("") if title_col else [""] * len(df))
abs_assignments = list(map(assign_abstract_theme, abstracts))
title_assignments = list(map(assign_title_theme, titles))
# Use LLM to compute similarity reasoning between matched theme pairs
import time
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1)
# Get unique theme pairs — call LLM once per pair, not once per paper
unique_pairs = list(set(
(a[0], t[0]) for a, t in zip(abs_assignments, title_assignments)
))
def get_similarity_reasoning(pair):
abs_theme, title_theme = pair
abs_pajais = abs_tax_map.get(abs_theme, "Unknown")
title_pajais = title_tax_map.get(title_theme, "Unknown")
prompt = (
"Compare these two research themes and assess their similarity:\n"
"Abstract Theme: {} (PAJAIS: {})\n"
"Title Theme: {} (PAJAIS: {})\n\n"
"Return ONLY a raw JSON object with three keys:\n"
" similarity_label: one of High/Medium/Low\n"
" similarity_pct: integer 0-100\n"
" reasoning: one sentence explaining the similarity or difference\n"
"No markdown, no explanation, just the JSON object."
).format(abs_theme, abs_pajais, title_theme, title_pajais)
result = _call_llm_json(llm, prompt)
return pair, result
# Sequential with sleep to respect rate limits
pair_results_raw = []
for idx, pair in enumerate(unique_pairs):
pair_results_raw.append(get_similarity_reasoning(pair))
_ = time.sleep(8) if idx < len(unique_pairs) - 1 else None
pair_map = {pair: result for pair, result in pair_results_raw}
# Build output rows
def build_row(idx):
a_theme, a_pajais, a_sim = abs_assignments[idx]
t_theme, t_pajais, t_sim = title_assignments[idx]
sim_info = pair_map.get((a_theme, t_theme), {})
return {
"Title": titles[idx],
"Title Theme": t_theme,
"Title PAJAIS Category": t_pajais,
"Abstract": abstracts[idx],
"Abstract Theme": a_theme,
"Abstract PAJAIS Category": a_pajais,
"Year": str(df[year_col].iloc[idx]) if year_col else "",
"Source Journal": str(df[journal_col].iloc[idx]) if journal_col else "",
"Theme Similarity": sim_info.get("similarity_label", ""),
"Similarity %": str(sim_info.get("similarity_pct", "")),
"Similarity Reasoning": sim_info.get("reasoning", ""),
}
rows = list(map(build_row, list(range(len(df)))))
out_df = pd.DataFrame(rows)
dest = DATA_DIR / "comparison.csv"
out_df.to_csv(dest, index=False, encoding="utf-8-sig")
return json.dumps({
"rows": len(out_df),
"columns": list(out_df.columns),
"path": str(dest),
"abstract_themes": abs_theme_names,
"title_themes": title_theme_names,
"note": "Enriched comparison CSV with per-paper theme + PAJAIS + similarity",
})
# =============================================================================
# TOOL 7 — export_narrative
# ONLY runs when BOTH abstract and title runs are complete
# =============================================================================
@tool
def export_narrative() -> str:
"""Write a 500-word Section 7 narrative using themes from BOTH runs.
Only available after BOTH abstract and title runs have completed taxonomy mapping.
Saves to data/narrative.txt.
"""
abs_tax = _p("abstract")["taxonomy"]
title_tax = _p("title")["taxonomy"]
both_done = abs_tax.exists() and title_tax.exists()
result = (
_do_export_narrative()
if both_done
else (
"Narrative cannot be generated yet. "
"Abstract taxonomy complete: {}. Title taxonomy complete: {}. "
"Complete both runs through Phase 5.5 first.".format(
abs_tax.exists(), title_tax.exists()
)
)
)
return result
def _do_export_narrative() -> str:
"""Internal: generate narrative when both runs are done."""
abs_themes = json.loads(_p("abstract")["themes"].read_text())
title_themes = json.loads(_p("title")["themes"].read_text())
abs_taxonomy = json.loads(_p("abstract")["taxonomy"].read_text())
title_taxonomy = json.loads(_p("title")["taxonomy"].read_text())
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.4)
abs_summary = list(map(lambda t: {"name": t["theme_name"],
"sentences": len(t["sentences"])}, abs_themes))
title_summary = list(map(lambda t: {"name": t["theme_name"],
"sentences": len(t["sentences"])}, title_themes))
prompt = (
"You are an academic writing expert in Information Systems.\n\n"
"Write Section 7 (Discussion and Thematic Synthesis) of a systematic "
"literature review paper. Approximately 500 words, formal academic prose.\n"
"Cover:\n"
"(a) Overview of themes from abstract analysis\n"
"(b) Overview of themes from title analysis\n"
"(c) Comparison: what themes appear in both vs only one\n"
"(d) PAJAIS taxonomy mapping and implications\n"
"(e) Implications for IS research and practice\n"
"(f) Limitations\n\n"
"ABSTRACT THEMES:\n" + json.dumps(abs_summary, indent=2) + "\n\n"
"TITLE THEMES:\n" + json.dumps(title_summary, indent=2) + "\n\n"
"ABSTRACT PAJAIS MAPPING:\n" + json.dumps(abs_taxonomy, indent=2) + "\n\n"
"TITLE PAJAIS MAPPING:\n" + json.dumps(title_taxonomy, indent=2) + "\n\n"
"Write in continuous academic paragraphs. No bullet points or headers."
)
response = llm.invoke([HumanMessage(content=prompt)])
narrative_text = response.content
dest = DATA_DIR / "narrative.txt"
dest.write_text(narrative_text, encoding="utf-8")
return json.dumps({
"word_count": len(narrative_text.split()),
"path": str(dest),
"note": "Narrative combines both abstract and title run themes",
})