comparIA's picture
Scale to 25K conversations with local embeddings + improved visualization
55b1789 verified
"""
Topic map visualization using UMAP projection of stored embeddings,
KMeans clustering, TF-IDF/Chi2 term extraction, and LLM-generated labels.
Cache format v2: pre-serialized Plotly figure, stratified sample, truncated
512D vectors for search. Zero computation at app startup.
"""
import json
import math
import pickle
import lancedb
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import umap
from openai import OpenAI
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from config import (
LANCEDB_DIR,
OPENROUTER_API_KEY,
SEARCH_VECTOR_DIMS,
TABLE_NAME,
TOPIC_LABEL_MODEL,
TOPIC_MAP_CACHE,
TOPIC_MAP_MAX_DISPLAY,
TOPIC_NUM_CLUSTERS,
)
from search import _embed_query
# Cached data from build_topic_map() for search highlighting
_topic_map_data = None
def _cluster_embeddings(vectors: np.ndarray, n_clusters: int) -> np.ndarray:
"""KMeans clustering on full-dimensional embeddings."""
# Keep clusters readable: 10-20 range regardless of dataset size
n_clusters = min(n_clusters, len(vectors) // 3)
n_clusters = max(n_clusters, 2)
kmeans = KMeans(n_clusters=n_clusters, n_init="auto", random_state=42)
return kmeans.fit_predict(vectors)
def _extract_cluster_terms(
texts: list[str], labels: np.ndarray, top_n: int = 10
) -> dict[int, list[str]]:
"""Extract the most distinctive terms per cluster using TF-IDF + Chi2-like scoring."""
tfidf = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 2),
min_df=1,
max_df=0.95,
)
tfidf_matrix = tfidf.fit_transform(texts)
feature_names = tfidf.get_feature_names_out()
cluster_terms = {}
unique_labels = sorted(set(labels))
for cluster_id in unique_labels:
mask = labels == cluster_id
cluster_mean = tfidf_matrix[mask].mean(axis=0).A1
rest_mean = tfidf_matrix[~mask].mean(axis=0).A1 if (~mask).any() else np.zeros_like(cluster_mean)
specificity = cluster_mean - rest_mean
top_indices = specificity.argsort()[-top_n:][::-1]
cluster_terms[cluster_id] = [feature_names[i] for i in top_indices if specificity[i] > 0]
return cluster_terms
def _generate_labels(
cluster_terms: dict[int, list[str]],
cluster_summaries: dict[int, list[str]],
) -> dict[int, str]:
"""Use an LLM to generate clean topic labels from keywords and sample summaries."""
if not OPENROUTER_API_KEY:
return {k: " & ".join(v[:3]) for k, v in cluster_terms.items()}
client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=OPENROUTER_API_KEY)
labels = {}
for cluster_id, terms in cluster_terms.items():
if not terms:
labels[cluster_id] = f"Topic {cluster_id}"
continue
terms_str = ", ".join(terms[:10])
summaries = cluster_summaries.get(cluster_id, [])
samples_str = "\n".join(f"- {s}" for s in summaries[:5])
try:
resp = client.chat.completions.create(
model=TOPIC_LABEL_MODEL,
messages=[{
"role": "user",
"content": (
f"Keywords: {terms_str}\n\n"
f"Sample conversation summaries from this group:\n{samples_str}\n\n"
"Generate a SHORT, GENERAL topic label (2-4 words) that describes the broad theme of this group. "
"Be abstract and general — do NOT reference specific places, names, or niche details. "
"Reply with ONLY the label, nothing else."
),
}],
max_tokens=30,
temperature=0,
)
label = (resp.choices[0].message.content or "").strip().strip('"').strip("'")
if not label:
label = " & ".join(terms[:3])
labels[cluster_id] = label
print(f" Cluster {cluster_id}: [{terms_str[:60]}...] -> {label}")
except Exception as e:
print(f" Cluster {cluster_id}: LLM failed ({e}), using keywords")
labels[cluster_id] = " & ".join(terms[:3])
return labels
def _repel_labels(
centroids: list[tuple[float, float]],
texts: list[str],
x_range: float,
y_range: float,
iterations: int = 80,
) -> list[tuple[float, float]]:
"""Push overlapping labels apart while keeping them near their centroids."""
n = len(centroids)
if n == 0:
return []
origins = np.array(centroids, dtype=float)
pos = origins.copy()
char_w = x_range / 80
label_h = y_range / 30
widths = np.array([len(t) * char_w for t in texts])
heights = np.full(n, label_h)
max_drift = min(x_range, y_range) * 0.3
for _ in range(iterations):
moved = False
for i in range(n):
for j in range(i + 1, n):
dx = pos[j, 0] - pos[i, 0]
dy = pos[j, 1] - pos[i, 1]
min_dx = (widths[i] + widths[j]) / 2
min_dy = (heights[i] + heights[j]) / 2
overlap_x = min_dx - abs(dx)
overlap_y = min_dy - abs(dy)
if overlap_x > 0 and overlap_y > 0:
moved = True
if overlap_x < overlap_y:
shift = overlap_x / 2 + char_w * 0.2
sign = 1 if dx >= 0 else -1
pos[i, 0] -= shift * sign
pos[j, 0] += shift * sign
else:
shift = overlap_y / 2 + label_h * 0.2
sign = 1 if dy >= 0 else -1
pos[i, 1] -= shift * sign
pos[j, 1] += shift * sign
drift = pos - origins
pos -= drift * 0.3
drift = pos - origins
dist = np.sqrt(drift[:, 0] ** 2 + drift[:, 1] ** 2)
too_far = dist > max_drift
if too_far.any():
scale = max_drift / dist[too_far]
pos[too_far] = origins[too_far] + drift[too_far] * scale[:, None]
if not moved:
break
return pos.tolist()
def _build_figure(
plot_df: pd.DataFrame,
scores: np.ndarray = None,
selected_idx: int = None,
) -> go.Figure:
"""Build Plotly figure from plot data, optionally with per-point relevance scores."""
fig = go.Figure()
unique_topics = sorted(plot_df["topic"].unique())
for topic in unique_topics:
mask = plot_df["topic"] == topic
subset = plot_df[mask]
if scores is not None:
topic_scores = scores[mask.values]
opacities = (0.06 + 0.94 * (topic_scores ** 2.5)).tolist()
sizes = (6 + 10 * topic_scores).tolist()
else:
sizes = 10
opacities = 0.8
fig.add_trace(go.Scattergl(
x=subset["x"],
y=subset["y"],
mode="markers",
name=topic,
text=subset["hover"],
customdata=subset["idx"].tolist() if "idx" in subset.columns else None,
hovertemplate="%{text}<extra></extra>",
marker=dict(
size=sizes,
opacity=opacities,
line=dict(width=0.5, color="white"),
),
))
if selected_idx is not None and "idx" in plot_df.columns:
sel = plot_df[plot_df["idx"] == selected_idx]
if len(sel) == 1:
fig.add_trace(go.Scattergl(
x=sel["x"],
y=sel["y"],
mode="markers",
name="Selected",
text=sel["hover"],
hovertemplate="%{text}<extra></extra>",
showlegend=False,
marker=dict(
size=20,
color="rgba(255,255,255,0)",
line=dict(width=3, color="black"),
symbol="circle",
),
))
# Label placement: push labels outward from global center, then repel overlaps
global_cx = plot_df["x"].mean()
global_cy = plot_df["y"].mean()
x_range = plot_df["x"].max() - plot_df["x"].min()
y_range = plot_df["y"].max() - plot_df["y"].min()
centroids = []
label_anchors = []
labels = []
for topic in unique_topics:
mask = plot_df["topic"] == topic
subset = plot_df[mask]
if len(subset) < 2:
continue
xs = subset["x"].values
ys = subset["y"].values
# Cluster centroid (mean position)
cx, cy = xs.mean(), ys.mean()
centroids.append((cx, cy))
# Push label outward from global center
dx = cx - global_cx
dy = cy - global_cy
dist = max(math.sqrt(dx**2 + dy**2), 1e-6)
offset = min(x_range, y_range) * 0.18
lx = cx + (dx / dist) * offset
ly = cy + (dy / dist) * offset
label_anchors.append((lx, ly))
labels.append(topic)
label_positions = _repel_labels(
label_anchors, labels,
x_range, y_range,
iterations=120,
)
for (cx, cy), (lx, ly), label in zip(centroids, label_positions, labels):
fig.add_annotation(
x=cx, y=cy,
ax=lx, ay=ly,
text=f"<b>{label}</b>",
showarrow=True,
arrowhead=0,
arrowwidth=1.2,
arrowcolor="rgba(80,80,80,0.4)",
axref="x", ayref="y",
font=dict(size=11, color="#1e1e24"),
bgcolor="rgba(255,255,255,0.85)",
bordercolor="rgba(0,0,0,0.1)",
borderwidth=0.5,
borderpad=4,
)
fig.update_layout(
title="Topic Map — Conversations clustered by semantic similarity",
xaxis=dict(visible=False),
yaxis=dict(visible=False),
height=700,
legend=dict(
title="Topic",
orientation="v",
yanchor="top",
y=1,
xanchor="left",
x=1.02,
),
margin=dict(l=20, r=20, t=50, b=20),
plot_bgcolor="white",
)
return fig
def _select_display_sample(
labels: np.ndarray, n_total: int, max_display: int
) -> np.ndarray:
"""Stratified random sample proportional to cluster size.
Returns indices into the original array.
"""
if n_total <= max_display:
return np.arange(n_total)
rng = np.random.RandomState(42)
unique_labels = np.unique(labels)
n_clusters = len(unique_labels)
min_per_cluster = max(5, max_display // (n_clusters * 3))
selected = []
for cluster_id in unique_labels:
cluster_indices = np.where(labels == cluster_id)[0]
# Proportional allocation
proportion = len(cluster_indices) / n_total
n_sample = max(min_per_cluster, int(proportion * max_display))
n_sample = min(n_sample, len(cluster_indices))
chosen = rng.choice(cluster_indices, size=n_sample, replace=False)
selected.append(chosen)
selected = np.concatenate(selected)
# If we overshot, trim back to max_display
if len(selected) > max_display:
selected = rng.choice(selected, size=max_display, replace=False)
return np.sort(selected)
def _compute_topic_map(
texts: list[str],
summaries: list[str],
vectors: np.ndarray,
metadata: list[dict],
n_total: int,
max_display: int,
) -> dict:
"""Run the full topic map pipeline: clustering, LLM labels, sampling, UMAP.
Called during ingest (not at app startup). Returns cache dict.
"""
# 1. Cluster on full-dimensional embeddings (all rows)
print(" Clustering embeddings...")
labels = _cluster_embeddings(vectors, TOPIC_NUM_CLUSTERS)
# 2. Extract distinctive terms per cluster
print(" Extracting cluster terms...")
cluster_terms = _extract_cluster_terms(texts, labels)
# Collect sample summaries per cluster for LLM context
cluster_summaries = {}
for i, label in enumerate(labels):
cluster_summaries.setdefault(int(label), [])
s = (summaries[i] or "").strip()
if s and len(cluster_summaries[int(label)]) < 5:
cluster_summaries[int(label)].append(s[:150])
# 3. Generate clean labels via LLM
print(" Generating topic labels via LLM...")
topic_labels = _generate_labels(cluster_terms, cluster_summaries)
# 4. Select stratified display sample
print(f" Selecting display sample (max {max_display} from {n_total})...")
sample_indices = _select_display_sample(labels, n_total, max_display)
n_display = len(sample_indices)
print(f" Display sample: {n_display} points")
sample_labels = labels[sample_indices]
sample_vectors = vectors[sample_indices]
# 5. Truncate sample vectors to SEARCH_VECTOR_DIMS (Matryoshka)
truncated_dim = min(SEARCH_VECTOR_DIMS, sample_vectors.shape[1])
sample_vectors_trunc = sample_vectors[:, :truncated_dim].copy()
# 6. UMAP on sample (truncated dims)
n_sample = len(sample_indices)
print(f" Computing UMAP projection (stage 1: {truncated_dim}D -> 4D) on {n_sample} points...")
n_neighbors = min(100, n_sample - 1)
stage1 = umap.UMAP(
n_components=4,
n_neighbors=n_neighbors,
min_dist=0.0,
metric="cosine",
random_state=42,
)
intermediate = stage1.fit_transform(sample_vectors_trunc)
print(" Computing UMAP projection (stage 2: 4D -> 2D)...")
stage2 = umap.UMAP(
n_components=2,
n_neighbors=min(30, n_sample - 1),
min_dist=0.5,
spread=2.0,
metric="euclidean",
random_state=42,
)
coords = stage2.fit_transform(intermediate)
# 7. L2-normalize truncated vectors for search
norms = np.linalg.norm(sample_vectors_trunc, axis=1, keepdims=True)
norms[norms == 0] = 1
sample_vectors_norm = sample_vectors_trunc / norms
# 8. Collect lightweight metadata for sample
sample_metadata = [metadata[i] for i in sample_indices]
# 9. Build hover texts and plot_df for figure
hover_texts = []
for meta in sample_metadata:
summary = (meta.get("short_summary") or meta.get("opening_msg") or "")[:150]
full = meta.get("short_summary") or meta.get("opening_msg") or ""
if len(full) > 150:
summary += "..."
models = f"{meta.get('model_a_name', '')} vs {meta.get('model_b_name', '')}"
hover_texts.append(f"<b>{summary}</b><br>{models}")
point_topics = [topic_labels[int(l)] for l in sample_labels]
plot_df = pd.DataFrame({
"x": coords[:, 0],
"y": coords[:, 1],
"topic": point_topics,
"hover": hover_texts,
"idx": range(n_display),
})
# 10. Build and serialize Plotly figure
print(" Building Plotly figure...")
fig = _build_figure(plot_df)
figure_json = fig.to_json()
# 11. Collect search texts for sample
sample_search_texts = [texts[i] for i in sample_indices]
return {
"n_rows_total": n_total,
"n_display": n_display,
"topic_labels": topic_labels,
"sample_indices": sample_indices.tolist(),
"sample_cluster_labels": sample_labels.tolist(),
"sample_coords": coords.tolist(),
"sample_vectors_norm": sample_vectors_norm,
"sample_metadata": sample_metadata,
"sample_search_texts": sample_search_texts,
"figure_json": figure_json,
# Keep for validation
"n_rows": n_total,
}
def _save_cache(cache_data: dict):
"""Save topic map cache to disk."""
with open(TOPIC_MAP_CACHE, "wb") as f:
pickle.dump(cache_data, f)
print(f" Topic map cache saved to {TOPIC_MAP_CACHE}")
def _load_cache() -> dict | None:
"""Load cache if it exists."""
try:
with open(TOPIC_MAP_CACHE, "rb") as f:
cache = pickle.load(f)
print(f" Loaded topic map from cache ({cache.get('n_rows_total', cache.get('n_rows', '?'))} total rows, "
f"{cache.get('n_display', '?')} displayed)")
return cache
except FileNotFoundError:
return None
def build_topic_map() -> go.Figure:
"""Load pre-computed topic map from cache. Zero computation at app startup.
Raises RuntimeError if cache is missing (run `python ingest.py --topic-cache`).
"""
global _topic_map_data
cache = _load_cache()
if cache is None:
raise RuntimeError(
"Topic map cache not found. Run `python ingest.py --topic-cache` first."
)
# Check if this is v2 cache (has figure_json) or v1 (legacy)
if "figure_json" in cache:
return _load_v2_cache(cache)
else:
return _load_v1_cache(cache)
def _load_v2_cache(cache: dict) -> go.Figure:
"""Load v2 cache format with pre-serialized figure."""
global _topic_map_data
n_total = cache["n_rows_total"]
n_display = cache["n_display"]
# Deserialize pre-built figure
fig = go.Figure(json.loads(cache["figure_json"]))
# Rebuild plot_df from cache for search highlighting
coords = np.array(cache["sample_coords"])
sample_labels = cache["sample_cluster_labels"]
topic_labels = cache["topic_labels"]
hover_texts = []
for meta in cache["sample_metadata"]:
summary = (meta.get("short_summary") or meta.get("opening_msg") or "")[:150]
full = meta.get("short_summary") or meta.get("opening_msg") or ""
if len(full) > 150:
summary += "..."
models = f"{meta.get('model_a_name', '')} vs {meta.get('model_b_name', '')}"
hover_texts.append(f"<b>{summary}</b><br>{models}")
point_topics = [topic_labels[int(l)] for l in sample_labels]
plot_df = pd.DataFrame({
"x": coords[:, 0],
"y": coords[:, 1],
"topic": point_topics,
"hover": hover_texts,
"idx": range(n_display),
})
_topic_map_data = {
"plot_df": plot_df,
"search_texts": cache["sample_search_texts"],
"vectors_norm": cache["sample_vectors_norm"],
"sample_metadata": cache["sample_metadata"],
"n_total": n_total,
"n_display": n_display,
}
print(f" Topic map ready: {n_display} points displayed (from {n_total} total)")
return fig
def _load_v1_cache(cache: dict) -> go.Figure:
"""Load legacy v1 cache format (full data, no pre-serialized figure)."""
global _topic_map_data
db = lancedb.connect(LANCEDB_DIR)
table = db.open_table(TABLE_NAME)
df = table.to_pandas()
vectors = np.stack(df["vector"].values)
labels = np.array(cache["cluster_labels"])
topic_labels = cache["topic_labels"]
coords = np.array(cache["coords"])
n_total = len(df)
hover_texts = []
for _, row in df.iterrows():
summary = (row["short_summary"] or row["opening_msg"] or "")[:150]
if len(row["short_summary"] or row["opening_msg"] or "") > 150:
summary += "..."
models = f"{row['model_a_name']} vs {row['model_b_name']}"
hover_texts.append(f"<b>{summary}</b><br>{models}")
point_topics = [topic_labels[l] for l in labels]
plot_df = pd.DataFrame({
"x": coords[:, 0],
"y": coords[:, 1],
"topic": point_topics,
"hover": hover_texts,
"idx": range(n_total),
})
# Truncate and normalize vectors for search
truncated_dim = min(SEARCH_VECTOR_DIMS, vectors.shape[1])
vectors_trunc = vectors[:, :truncated_dim]
norms = np.linalg.norm(vectors_trunc, axis=1, keepdims=True)
norms[norms == 0] = 1
vectors_norm = vectors_trunc / norms
# Build lightweight metadata from df
meta_cols = ["id", "short_summary", "opening_msg", "model_a_name", "model_b_name",
"mode", "conv_turns", "primary_language", "keywords_str"]
sample_metadata = df[meta_cols].to_dict("records")
_topic_map_data = {
"plot_df": plot_df,
"search_texts": df["search_text"].tolist(),
"vectors_norm": vectors_norm,
"sample_metadata": sample_metadata,
"n_total": n_total,
"n_display": n_total,
}
return _build_figure(plot_df)
def compute_and_cache_topic_map():
"""Compute the full topic map pipeline and save to cache.
Called from ingest.py --topic-cache. Loads data from LanceDB, runs clustering,
UMAP, and builds the pre-serialized Plotly figure.
"""
db = lancedb.connect(LANCEDB_DIR)
table = db.open_table(TABLE_NAME)
# Load only needed columns to save memory
print(" Loading data from LanceDB...")
df = table.to_pandas()
n_total = len(df)
print(f" Loaded {n_total} rows")
vectors = np.stack(df["vector"].values)
texts = df["search_text"].tolist()
summaries = df["short_summary"].tolist()
# Build lightweight metadata (no conversation JSON)
meta_cols = ["id", "short_summary", "opening_msg", "model_a_name", "model_b_name",
"mode", "conv_turns", "primary_language", "keywords_str"]
metadata = df[meta_cols].to_dict("records")
cache = _compute_topic_map(
texts=texts,
summaries=summaries,
vectors=vectors,
metadata=metadata,
n_total=n_total,
max_display=TOPIC_MAP_MAX_DISPLAY,
)
_save_cache(cache)
return cache
def search_topic_map(query: str) -> tuple[go.Figure, str]:
"""Search with combined semantic (vector) + keyword matching.
Returns (figure, status_text).
"""
if _topic_map_data is None:
return go.Figure(), ""
plot_df = _topic_map_data["plot_df"]
n_total = _topic_map_data["n_total"]
n_display = _topic_map_data["n_display"]
if not query or not query.strip():
_topic_map_data["last_scores"] = None
return _build_figure(plot_df), ""
search_texts = _topic_map_data["search_texts"]
vectors_norm = _topic_map_data["vectors_norm"]
query_lower = query.lower().strip()
# 1. Keyword matches (substring)
keyword_matches = np.array([query_lower in t.lower() for t in search_texts])
n_keyword = int(keyword_matches.sum())
# 2. Semantic matches (cosine similarity on truncated vectors)
try:
query_vec = np.array(_embed_query(query))
# Truncate query vector to match stored dimensions
query_vec = query_vec[:vectors_norm.shape[1]]
query_norm = np.linalg.norm(query_vec)
if query_norm > 0:
query_vec = query_vec / query_norm
similarities = vectors_norm @ query_vec
baseline = np.percentile(similarities, 80)
scores = np.clip(similarities - baseline, 0, None)
score_max = scores.max()
if score_max > 0:
scores = scores / score_max
else:
scores = np.zeros_like(similarities)
n_semantic = int((scores > 0.05).sum()) - n_keyword
n_semantic = max(n_semantic, 0)
except Exception as e:
print(f" Semantic search failed ({e}), using keyword only")
scores = np.zeros(len(search_texts))
n_semantic = 0
# 3. Boost keyword matches
scores[keyword_matches] = np.maximum(scores[keyword_matches], 0.95)
# Build status text
parts = []
if n_keyword:
parts.append(f"{n_keyword} keyword")
if n_semantic:
parts.append(f"~{n_semantic} semantic")
match_text = f"**{' + '.join(parts)} match(es)**" if parts else "Showing relevance gradient"
status = f"{match_text} for \"{query}\" (showing {n_display:,} of {n_total:,} conversations)"
_topic_map_data["last_scores"] = scores
return _build_figure(plot_df, scores=scores), status
def get_highlighted_figure(selected_idx: int) -> go.Figure:
"""Return the current topic map figure with a highlight on the selected point."""
if _topic_map_data is None:
return go.Figure()
plot_df = _topic_map_data["plot_df"]
scores = _topic_map_data.get("last_scores")
return _build_figure(plot_df, scores=scores, selected_idx=selected_idx)
def get_topic_map_record(idx: int) -> dict | None:
"""Get a record by display index. Uses cached metadata, fetches full record from LanceDB on demand."""
if _topic_map_data is None:
return None
sample_metadata = _topic_map_data["sample_metadata"]
if not (0 <= idx < len(sample_metadata)):
return None
meta = sample_metadata[idx]
record_id = meta.get("id")
if not record_id:
return meta
# Fetch full record from LanceDB (includes conversation JSON)
try:
db = lancedb.connect(LANCEDB_DIR)
table = db.open_table(TABLE_NAME)
result = table.search().where(f"id = '{record_id}'", prefilter=True).limit(1).to_pandas()
if len(result) > 0:
if "vector" in result.columns:
result = result.drop(columns=["vector"])
record = result.to_dict("records")[0]
record["_row_idx"] = idx
return record
except Exception as e:
print(f" LanceDB fetch failed for id={record_id}: {e}")
# Fallback to cached metadata
meta["_row_idx"] = idx
return meta
def get_topic_map_stats() -> tuple[int, int]:
"""Return (n_display, n_total) for UI text."""
if _topic_map_data is None:
return 0, 0
return _topic_map_data["n_display"], _topic_map_data["n_total"]