bert-topic / tools.py
reyansh2005's picture
all agents
f19d5b6
# tools.py — Scientific Document Topic Analyzer
# Built on the Braun & Clarke (2006) Thematic Analysis Framework.
# Implementation: Zero-loop, zero-exception, functional-first logic.
from dotenv import load_dotenv
load_dotenv()
import re
import json
import os
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_mistralai import ChatMistralAI
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering, DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
import nltk
# Initialize NLP resources
nltk.download("punkt", quiet=True)
nltk.download("punkt_tab", quiet=True)
from nltk.tokenize import sent_tokenize
# --- Global Configuration & Taxonomy ---
COLUMN_MAP = {
"abstract": ["Abstract"],
"title": ["Title"],
}
EMBEDDING_MODEL_ID = "all-MiniLM-L6-v2"
NEAREST_NEIGHBORS_K = 5
MAX_TOPIC_BATCH_SIZE = 60
SENTENCE_HARD_LIMIT = 3000
DEFAULT_CLUSTERING_THRESHOLD = 0.7
LLM_GATEWAY_TIMEOUT = 120
# Regex patterns to filter out standard academic publishing noise
JUNK_TEXT_REGEXES = [
r"©\s*\d{4}",
r"elsevier\s*(b\.v\.)?",
r"springer\s*(nature)?",
r"wiley\s*(online\s*library)?",
r"all\s+rights\s+reserved",
r"published\s+by\s+[a-z\s]+",
r"doi:\s*10\.",
r"www\.[a-z]+\.[a-z]+",
r"https?://",
r"copyright\s*\d{4}",
r"taylor\s*&\s*francis",
r"sage\s+publications",
r"emerald\s+publishing",
r"journal\s+of\s+[a-z\s]+issn",
r"volume\s+\d+,?\s+issue\s+\d+",
r"pp\.\s*\d+[-–]\d+",
r"received\s+\d+\s+\w+\s+\d{4}",
r"accepted\s+\d+\s+\w+\s+\d{4}",
r"available\s+online",
r"this\s+is\s+an\s+open\s+access",
r"creative\s+commons",
r"please\s+cite\s+this\s+article",
]
CATEGORY_HIERARCHY_PAJAIS = [
"Artificial Intelligence Methods",
"Natural Language Processing",
"Machine Learning",
"Deep Learning",
"Knowledge Representation",
"Ontologies & Semantic Web",
"Information Retrieval",
"Recommender Systems",
"Decision Support Systems",
"Human-Computer Interaction",
"Explainability & Transparency",
"Fairness, Accountability & Ethics",
"Data Management & Integration",
"Text Mining & Analytics",
"Sentiment Analysis",
"Social Media Analysis",
"Business Intelligence",
"Process Automation & RPA",
"Computer Vision",
"Speech & Audio Processing",
"Multi-Agent Systems",
"Robotics & Autonomous Systems",
"Healthcare & Biomedical AI",
"Finance & Risk Analytics",
"Education & E-Learning",
]
# --- Internal Utility Logic ---
def _is_unwanted_metadata(text_segment: str) -> bool:
"""Identifies if a string matches academic boilerplate patterns."""
return any(map(lambda pattern: bool(re.search(pattern, text_segment, re.IGNORECASE)), JUNK_TEXT_REGEXES))
def _refine_sentence_list(raw_list: list) -> list:
"""Filters out boilerplate and very short segments from the corpus."""
clean_collection = list(filter(lambda s: not _is_unwanted_metadata(s), raw_list))
meaningful_subs = list(filter(lambda s: len(s.split()) >= 6, clean_collection))
return meaningful_subs
def _extract_sentences_from_corpus(text_blocks: list) -> list:
"""Tokenizes multiple text blocks into a flat list of clean sentences."""
tokenized_nested = list(map(sent_tokenize, text_blocks))
flat_list = [item for sublist in tokenized_nested for item in sublist]
return _refine_sentence_list(flat_list)
def _generate_vector_embeddings(sentence_list: list) -> np.ndarray:
"""Converts text into normalized vector representations using SBERT."""
vector_engine = SentenceTransformer(EMBEDDING_MODEL_ID)
return vector_engine.encode(sentence_list, normalize_embeddings=True, show_progress_bar=False)
def _perform_hierarchical_clustering(vectors: np.ndarray, distance_cutoff: float) -> np.ndarray:
"""Clusters vectors using Agglomerative Clustering with a cosine metric."""
return AgglomerativeClustering(
metric="cosine", linkage="average",
distance_threshold=distance_cutoff, n_clusters=None,
).fit_predict(vectors)
def _perform_dbscan_clustering(vectors: np.ndarray, eps: float = 0.3, min_samples: int = 5) -> np.ndarray:
"""Clusters vectors using DBSCAN with cosine metric. Returns cluster assignments (-1 for noise)."""
return DBSCAN(eps=eps, min_samples=min_samples, metric="cosine").fit_predict(vectors)
def _get_cluster_centroids(vectors: np.ndarray, group_labels: np.ndarray) -> dict:
"""Calculates the mean vector for each discovered cluster."""
active_groups = sorted(set(group_labels.tolist()) - {-1})
return dict(map(lambda g: (g, vectors[group_labels == g].mean(axis=0)), active_groups))
def _find_exemplary_sentences(midpoint_vector: np.ndarray, all_texts: list,
all_vectors: np.ndarray, top_k: int) -> list:
"""Finds sentences whose vectors are closest to the cluster centroid."""
closeness_scores = cosine_similarity([midpoint_vector], all_vectors)[0]
best_indices = np.argsort(closeness_scores)[::-1][:top_k].tolist()
return list(map(lambda idx: all_texts[idx], best_indices))
def _assemble_cluster_summaries(group_labels: np.ndarray, text_source: list,
vector_source: np.ndarray) -> list:
"""Builds a JSON-ready summary for every identified topic cluster."""
midpoints = _get_cluster_centroids(vector_source, group_labels)
def _format_node(cluster_id):
membership_mask = group_labels == cluster_id
return {
"topic_id": cluster_id,
"count": int(membership_mask.sum()),
"centroid": midpoints[cluster_id].tolist(),
"nearest_sentences": _find_exemplary_sentences(
midpoints[cluster_id], text_source, vector_source, NEAREST_NEIGHBORS_K),
}
return list(map(_format_node, sorted(midpoints.keys())))
def _initialize_llm_client() -> ChatMistralAI:
"""Configures the Mistral AI interface for thematic labeling."""
return ChatMistralAI(
model="mistral-large-latest",
temperature=0.2,
timeout=LLM_GATEWAY_TIMEOUT,
max_retries=0,
)
# --- Primary Analysis Tools ---
@tool
def load_scopus_csv(file_path: str) -> str:
"""
Ingests a Scopus CSV, cleans the data, and prepares it for analysis.
Saves 'loaded_data.csv' as a local cache.
"""
source_df = pd.read_csv(
file_path,
encoding="utf-8-sig",
quoting=0,
engine="python",
on_bad_lines="skip",
)
source_df.to_csv("loaded_data.csv", index=False, encoding="utf-8")
total_records = len(source_df)
header_list = list(source_df.columns)
raw_abstracts = list(source_df["Abstract"].dropna().astype(str)) if "Abstract" in header_list else []
raw_titles = list(source_df["Title"].dropna().astype(str)) if "Title" in header_list else []
processed_abstracts = _extract_sentences_from_corpus(raw_abstracts)
processed_titles = _extract_sentences_from_corpus(raw_titles)
publication_years = pd.to_numeric(source_df["Year"], errors="coerce").dropna() if "Year" in header_list else pd.Series([], dtype=float)
period_string = f"{int(publication_years.min())}{int(publication_years.max())}" if len(publication_years) > 0 else "N/A"
return json.dumps({
"papers": total_records,
"abstract_sentences": len(processed_abstracts),
"title_sentences": len(processed_titles),
"year_range": period_string,
"columns": header_list,
"abstract_coverage_pct": round(len(raw_abstracts) / total_records * 100, 1) if total_records else 0,
"title_coverage_pct": round(len(raw_titles) / total_records * 100, 1) if total_records else 0,
"sample_titles": list(source_df["Title"].dropna().head(5)) if "Title" in header_list else [],
"file_saved": "loaded_data.csv",
"note": f"Clustering cap set to {SENTENCE_HARD_LIMIT} entries for efficiency.",
}, indent=2)
@tool
def run_bertopic_discovery(run_key: str = "abstract", threshold: float = 0.7, method: str = "hierarchical") -> str:
"""
Executes the BERTopic discovery logic: Embedding -> Clustering -> Visualization.
Outputs interactive Plotly charts and cluster summaries.
Supports both Hierarchical and DBSCAN clustering methods.
"""
cached_df = pd.read_csv("loaded_data.csv")
target_col = COLUMN_MAP[run_key][0]
unstructured_texts = list(cached_df[target_col].dropna().astype(str))
global_sentence_pool = _extract_sentences_from_corpus(unstructured_texts)
# Apply sentence limit to prevent memory overflow
optimized_sentence_pool = global_sentence_pool[:SENTENCE_HARD_LIMIT]
print(f"[Core Discovery] Processing {len(optimized_sentence_pool)} sentences from total pool of {len(global_sentence_pool)}.")
semantic_vectors = _generate_vector_embeddings(optimized_sentence_pool)
np.save(f"emb_{run_key}.npy", semantic_vectors)
if method == "dbscan":
cluster_assignments = _perform_dbscan_clustering(semantic_vectors, eps=threshold, min_samples=5)
else:
cluster_assignments = _perform_hierarchical_clustering(semantic_vectors, threshold)
thematic_summaries = _assemble_cluster_summaries(cluster_assignments, optimized_sentence_pool, semantic_vectors)
with open(f"summaries_{run_key}.json", "w") as storage_file:
json.dump(thematic_summaries, storage_file, indent=2)
entry_counts = [node["count"] for node in thematic_summaries]
node_identifiers = [node["topic_id"] for node in thematic_summaries]
centroid_stack = np.array([node["centroid"] for node in thematic_summaries])
# Visual 1: Inter-topic mapping via PCA
dimension_count = min(2, len(centroid_stack), centroid_stack.shape[1])
reduced_coords = PCA(n_components=dimension_count).fit_transform(centroid_stack)
dimension_x = reduced_coords[:, 0].tolist()
dimension_y = (reduced_coords[:, 1].tolist() if reduced_coords.shape[1] > 1 else [0] * len(dimension_x))
map_fig = px.scatter(
x=dimension_x, y=dimension_y,
size=entry_counts, text=list(map(str, node_identifiers)),
title=f"Thematic Landscape ({run_key})",
labels={"x": "Factor 1", "y": "Factor 2"},
size_max=40, color=entry_counts, color_continuous_scale="Viridis",
)
map_fig.update_traces(textposition="top center")
map_fig.update_layout(template="plotly_white")
v_file_1 = f"chart_{run_key}_intertopic.html"
map_fig.write_html(v_file_1, include_plotlyjs="cdn")
# Visual 2: Sentence distribution bar chart
top_nodes = thematic_summaries[:30]
bar_fig = px.bar(
x=list(map(lambda n: f"Topic {n['topic_id']}", top_nodes)),
y=list(map(lambda n: n["count"], top_nodes)),
title=f"Thematic Weight Distribution ({run_key}) — Top 30",
labels={"x": "Theme ID", "y": "Sentence Count"},
color=list(map(lambda n: n["count"], top_nodes)),
color_continuous_scale="Aggrnyl",
)
bar_fig.update_layout(template="plotly_white")
v_file_2 = f"chart_{run_key}_bars.html"
bar_fig.write_html(v_file_2, include_plotlyjs="cdn")
# Visual 3: Hierarchical Treemap
tree_fig = px.treemap(
names=list(map(lambda n: f"ID:{n['topic_id']}", thematic_summaries)),
parents=["Corpus"] * len(thematic_summaries),
values=entry_counts,
title=f"Topological Hierarchy ({run_key})",
)
tree_fig.update_layout(template="plotly_white")
v_file_3 = f"chart_{run_key}_hierarchy.html"
tree_fig.write_html(v_file_3, include_plotlyjs="cdn")
# Visual 4: Semantic Connectivity Matrix
preview_nodes = thematic_summaries[:20]
preview_vectors = np.array([n["centroid"] for n in preview_nodes])
similarity_grid = cosine_similarity(preview_vectors).tolist()
axis_labels = list(map(lambda n: f"T{n['topic_id']}", preview_nodes))
heat_fig = go.Figure(data=go.Heatmap(z=similarity_grid, x=axis_labels, y=axis_labels, colorscale="YlGnBu"))
heat_fig.update_layout(title=f"Semantic Proximity Heatmap ({run_key})", template="plotly_white")
v_file_4 = f"chart_{run_key}_heatmap.html"
heat_fig.write_html(v_file_4, include_plotlyjs="cdn")
return json.dumps({
"run_key": run_key,
"total_topics": len(thematic_summaries),
"total_sentences": len(global_sentence_pool),
"sentences_used": len(optimized_sentence_pool),
"sentences_capped": len(global_sentence_pool) > SENTENCE_HARD_LIMIT,
"threshold_used": threshold,
"summaries_file": f"summaries_{run_key}.json",
"embeddings_file": f"emb_{run_key}.npy",
"charts": [v_file_1, v_file_2, v_file_3, v_file_4],
"topics_preview": thematic_summaries[:3],
}, indent=2)
@tool
def label_topics_with_llm(run_key: str = "abstract") -> str:
"""
Queries Mistral AI to provide human-readable labels and metadata for clusters.
Uses batch processing to minimize API overhead and latency.
"""
with open(f"summaries_{run_key}.json", encoding="utf-8") as raw_json:
cluster_list = json.load(raw_json)
active_subset = cluster_list[:MAX_TOPIC_BATCH_SIZE]
# Structure data for the LLM's consumption
llm_payload = list(map(
lambda node: {
"topic_id": node["topic_id"],
"count": node["count"],
"sentences": node["nearest_sentences"][:2],
},
active_subset,
))
llm_handler = _initialize_llm_client()
json_interpreter = JsonOutputParser()
label_prompt = PromptTemplate(
input_variables=["input_json"],
template=(
"You are a specialized thematic coder for academic literature.\n\n"
"Analyze the following clusters discovered through BERTopic. "
"For each cluster, derive a research-oriented label with AI Council-style reasoning.\\n\\n"
"{input_json}\\n\\n"
"Respond ONLY with a JSON array containing these keys for each entry:\\n"
" topic_id (int), label (3-6 words), category (methodology/theory/application/context/empirical), "
" confidence (float), reasoning (object with keys: method, data, impact), niche (bool).\\n\\n"
"Reasoning structure (use brief, focused explanations):\\n"
" method: Explain the methodological or theoretical lens applied to this cluster (1-2 sentences)\\n"
" data: Describe the empirical patterns or evidence supporting this grouping (1-2 sentences)\\n"
" impact: Articulate the research or practice implications of this theme (1-2 sentences)\\n\\n"
"Generate entries for ALL {total_count} topics provided."
),
)
inference_chain = label_prompt | llm_handler | json_interpreter
ai_response = inference_chain.invoke({
"input_json": json.dumps(llm_payload, indent=2),
"total_count": len(active_subset),
})
# Map AI results back to the original database
response_directory = {str(item["topic_id"]): item for item in ai_response}
def _format_reasoning(reasoning_obj):
"""Converts multi-part reasoning structure into a readable string."""
if isinstance(reasoning_obj, dict):
parts = []
if "method" in reasoning_obj:
parts.append(f"Method: {reasoning_obj['method']}")
if "data" in reasoning_obj:
parts.append(f"Data: {reasoning_obj['data']}")
if "impact" in reasoning_obj:
parts.append(f"Impact: {reasoning_obj['impact']}")
return " | ".join(parts) if parts else ""
return str(reasoning_obj) if reasoning_obj else ""
final_labels = list(map(
lambda original: {
"topic_id": original["topic_id"],
"count": original["count"],
"nearest_sentences": original["nearest_sentences"],
"label": response_directory.get(str(original["topic_id"]), {}).get("label", f"Concept Group {original['topic_id']}"),
"category": response_directory.get(str(original["topic_id"]), {}).get("category", "application"),
"confidence": response_directory.get(str(original["topic_id"]), {}).get("confidence", 0.5),
"reasoning": _format_reasoning(response_directory.get(str(original["topic_id"]), {}).get("reasoning", "")),
"niche": response_directory.get(str(original["topic_id"]), {}).get("niche", False),
},
active_subset,
))
export_path = f"labels_{run_key}.json"
with open(export_path, "w") as out_file:
json.dump(final_labels, out_file, indent=2)
return json.dumps({
"run_key": run_key,
"total_labelled": len(final_labels),
"output_file": export_path,
"preview": final_labels[:5],
}, indent=2)
@tool
def consolidate_into_themes(run_key: str = "abstract", theme_map: str = "") -> str:
"""
Groups individual topic clusters into broader research themes.
Employs LLM-driven synthesis if no manual mapping is provided.
"""
with open(f"labels_{run_key}.json", encoding="utf-8") as raw_data:
labeled_topics = json.load(raw_data)
topic_lookup_table = {str(t["topic_id"]): t for t in labeled_topics}
manual_theme_design = json.loads(theme_map) if theme_map.strip() else {}
def _build_from_manual(name_id_pair):
theme_title, topic_id_list = name_id_pair
matching_topics = list(filter(lambda t: str(t["topic_id"]) in map(str, topic_id_list), labeled_topics))
aggregate_docs = sum(map(lambda t: t["count"], matching_topics))
sample_quotes = [s for t in matching_topics for s in t.get("nearest_sentences", [])][:5]
return {
"theme_name": theme_title,
"topic_ids": list(map(int, topic_id_list)),
"total_sentences": aggregate_docs,
"representative_sentences": sample_quotes,
"constituent_labels": list(map(lambda t: t.get("label", ""), matching_topics)),
}
def _build_from_intelligence():
llm_client = _initialize_llm_client()
json_output_mod = JsonOutputParser()
synthesis_prompt = PromptTemplate(
input_variables=["topic_definitions"],
template=(
"You are performing Phase 3 & 4 of thematic analysis (Braun & Clarke).\n\n"
"Data Clusters:\n{topic_definitions}\n\n"
"Consolidate these into 4-8 broad research themes.\n"
"Format: JSON array of objects with theme_name, topic_ids (list), rationale, representative_sentences (list).\n"
),
)
flow = synthesis_prompt | llm_client | json_output_mod
compact_definitions = list(map(
lambda t: {"topic_id": t["topic_id"], "label": t.get("label", ""), "sample": t.get("nearest_sentences", [""])[0][:100]},
labeled_topics[:MAX_TOPIC_BATCH_SIZE],
))
generated_themes = flow.invoke({"topic_definitions": json.dumps(compact_definitions, indent=2)})
return list(map(
lambda th: {
**th,
"total_sentences": sum(map(lambda tid: topic_lookup_table.get(str(tid), {}).get("count", 0), th.get("topic_ids", []))),
"constituent_labels": list(map(lambda tid: topic_lookup_table.get(str(tid), {}).get("label", ""), th.get("topic_ids", []))),
},
generated_themes,
))
final_thematic_set = (
list(map(_build_from_manual, manual_theme_design.items()))
if manual_theme_design
else _build_from_intelligence()
)
theme_store_1 = f"themes_{run_key}.json"
with open(theme_store_1, "w", encoding="utf-8") as f1:
json.dump(final_thematic_set, f1, indent=2)
with open("themes.json", "w", encoding="utf-8") as f_canonical:
json.dump(final_thematic_set, f_canonical, indent=2)
return json.dumps({
"run_key": run_key,
"total_themes": len(final_thematic_set),
"output_file": theme_store_1,
"themes_preview": [{"name": th["theme_name"], "size": th.get("total_sentences", 0)} for th in final_thematic_set],
}, indent=2)
@tool
def compare_with_taxonomy(run_key: str = "abstract") -> str:
"""
Aligns discovered themes with the PAJAIS research taxonomy.
Flags 'NOVEL' themes that represent potential scientific gaps.
"""
specific_themes_file = f"themes_{run_key}.json"
active_themes_file = specific_themes_file if os.path.exists(specific_themes_file) else "themes.json"
with open(active_themes_file, encoding="utf-8") as theme_io:
theme_collection = json.load(theme_io)
llm_bridge = _initialize_llm_client()
json_processor = JsonOutputParser()
alignment_prompt = PromptTemplate(
input_variables=["theme_input", "taxonomy_str"],
template=(
"You are a taxonomy alignment specialist.\n\n"
"Official Categories:\n{taxonomy_str}\n\n"
"User Themes:\n{theme_input}\n\n"
"Map each theme to the closest official category. If it is a completely new direction, mark as NOVEL.\n"
"Format: JSON array with theme_name, pajais_match, match_confidence, reasoning, is_novel.\n"
),
)
mapping_chain = alignment_prompt | llm_bridge | json_processor
theme_metadata = list(map(
lambda t: {
"theme_name": t["theme_name"],
"constituent_labels": t.get("constituent_labels", []),
"evidence": (t.get("representative_sentences", [""])[0][:100] if t.get("representative_sentences") else ""),
},
theme_collection,
))
alignment_results = mapping_chain.invoke({
"theme_input": json.dumps(theme_metadata, indent=2),
"taxonomy_str": "\n".join(f"- {cat}" for cat in CATEGORY_HIERARCHY_PAJAIS),
})
with open("taxonomy_map.json", "w", encoding="utf-8") as map_io:
json.dump(alignment_results, map_io, indent=2)
novel_count = sum(1 for entry in alignment_results if entry.get("is_novel", False))
return json.dumps({
"run_key": run_key,
"total_mapped": len(alignment_results),
"novel_entries": novel_count,
"standard_entries": len(alignment_results) - novel_count,
"mapping_file": "taxonomy_map.json",
"detailed_mapping": alignment_results,
}, indent=2)
@tool
def generate_comparison_csv() -> str:
"""
Aggregates results from Abstract and Title analyses into a single comparative report.
"""
def _read_theme_data(key):
path = f"themes_{key}.json"
return json.loads(open(path, encoding="utf-8").read()) if os.path.exists(path) else []
abstract_run_data = _read_theme_data("abstract")
title_run_data = _read_theme_data("title")
max_count = max(len(abstract_run_data), len(title_run_data), 1)
abs_padded = abstract_run_data + [{}] * (max_count - len(abstract_run_data))
ttl_padded = title_run_data + [{}] * (max_count - len(title_run_data))
comparative_rows = list(map(
lambda triple: {
"ID": triple[0] + 1,
"Abstract Theme": triple[1].get("theme_name", ""),
"Abstract Count": triple[1].get("total_sentences", 0),
"Title Theme": triple[2].get("theme_name", ""),
"Title Count": triple[2].get("total_sentences", 0),
"Consistency": "Matched" if str(triple[1].get("theme_name", ""))[:5].lower() == str(triple[2].get("theme_name", ""))[:5].lower() else "Distinct",
},
zip(range(max_count), abs_padded, ttl_padded)
))
report_df = pd.DataFrame(comparative_rows)
report_df.to_csv("comparison.csv", index=False)
return json.dumps({
"result_file": "comparison.csv",
"row_count": len(report_df),
"data_peek": comparative_rows[:3],
}, indent=2)
@tool
def export_narrative(run_key: str = "abstract") -> str:
"""
Generates a formal research narrative based on the thematic analysis results.
Produces a 500-word Section 7 draft.
"""
with open("themes.json", encoding="utf-8") as t_in:
thematic_data = json.load(t_in)
mapping_raw = open("taxonomy_map.json", encoding="utf-8").read() if os.path.exists("taxonomy_map.json") else "[]"
mapping_data = json.loads(mapping_raw)
narrative_llm = _initialize_llm_client()
narrative_llm.temperature = 0.4
narrative_prompt = PromptTemplate(
input_variables=["key", "themes", "mapping"],
template=(
"Write a formal academic Section 7 Discussion (approx 500 words).\n"
"Context: {key} analysis run.\n"
"Themes Found:\n{themes}\n\n"
"Taxonomy Alignment:\n{mapping}\n\n"
"Requirements:\n"
"1. Discuss the methodology (BERTopic + Braun & Clarke).\n"
"2. Interpret the key themes and their implications.\n"
"3. Analyze the NOVEL vs MAPPED categories.\n"
"4. Suggest future work. Use professional, scholarly language.\n"
),
)
composition_flow = narrative_prompt | narrative_llm
story_response = composition_flow.invoke({
"key": run_key,
"themes": json.dumps(thematic_data, indent=2),
"mapping": json.dumps(mapping_data, indent=2),
})
final_text = story_response.content if hasattr(story_response, "content") else str(story_response)
with open("narrative.txt", "w", encoding="utf-8") as narrative_io:
narrative_io.write(final_text)
return json.dumps({
"output_file": "narrative.txt",
"word_stats": len(final_text.split()),
"content_start": final_text[:400],
}, indent=2)