import os, re, json, io, math, gc, uuid
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from bs4 import BeautifulSoup
import ftfy
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0
import gradio as gr
from tqdm import tqdm
# sklearn (CPU-friendly)
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer as CharTfidf
from sklearn.cluster import MiniBatchKMeans
from sklearn.neighbors import NearestNeighbors
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import Normalizer
from sklearn.preprocessing import normalize as sk_normalize
from sklearn.metrics.pairwise import cosine_similarity
# === NEW / UPDATED IMPORTS ===
from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer
from scipy.sparse import csr_matrix
try:
import hdbscan # OPTIONAL (pip install hdbscan)
HDBSCAN_OK = True
except Exception:
HDBSCAN_OK = False
# Optional tiny/fast word vectors via Gensim (local .txt/.vec/.bin)
try:
from gensim.models import KeyedVectors # OPTIONAL
GENSIM_OK = True
except Exception:
GENSIM_OK = False
# Optional light anomaly detection
try:
from sklearn.ensemble import IsolationForest
ISO_OK = True
except Exception:
ISO_OK = False
from scipy.sparse import hstack
# Optional fast ANN (CPU)
try:
import faiss # faiss-cpu on HF Space
FAISS_OK = True
except Exception:
FAISS_OK = False
# Optional tiny sentiment
try:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
VADER_OK = True
except Exception:
VADER_OK = False
# ======== STAGE-1 TAXONOMY (Buckets) ========
TAXONOMY = {
"Lobbyist": ["lobby","lobbyist","pac","influence"],
"Campaign Finance": ["donation","contribution","fundraiser","pledge","campaign finance","pac"],
"Procurement": ["contract","tender","rfp","rfq","bid","invoice","vendor","purchase order","po"],
"HR/Admin": ["hiring","personnel","payroll","benefits","policy","vacation","pto"],
"Constituent": ["constituent","concerned citizen","my issue","complaint","community"],
"Scheduling": ["schedule","meeting","appointment","calendar","invite","availability","reschedule"],
"Legal": ["legal","lawsuit","intake","attorney","counsel","privileged","court","subpoena","confidential"],
"IT/Security": ["password","account security","two-factor","2fa","vpn","verification code","security alert","it support"],
"Newsletters/Alerts": ["newsletter","daily briefing","news update","unsubscribe","press clip","digest"],
"Other": [],
}
# header/domain cues (expand as you learn)
LOBBY_DOMAINS = set() # e.g., {"acme-lobby.com"}
LEGAL_DOMAINS = set() # e.g., {"biglaw.com","firmlaw.com"}
def _contains_any(text: str, terms: list) -> bool:
if not text or not terms: return False
tl = text.lower()
return any(t for t in terms if t and t.lower() in tl)
def _bucket_header_bonus(row: pd.Series, bucket: str) -> float:
fd = (row.get("from_domain") or "").lower()
subj = (row.get("subject") or "")
if bucket == "Newsletters/Alerts":
return 5.0 if is_news_like(subj, row.get("body_text",""), fd) else 0.0
if bucket == "IT/Security":
return 5.0 if is_notification_like(subj, row.get("body_text",""), row.get("from_email",""), fd) else 0.0
if bucket == "Constituent":
return 3.0 if (fd in PERSONAL_DOMAINS) else 0.0
if bucket == "Lobbyist":
return 5.0 if fd in LOBBY_DOMAINS else 0.0
if bucket == "Legal":
return 5.0 if (("law" in fd) or (fd in LEGAL_DOMAINS) or ("privileged" in subj.lower())) else 0.0
if bucket == "Scheduling":
body = (row.get("body_text") or "")
return 3.0 if (ATTACH_NAME_RE.search(" ".join(row.get("attachments") or [])) or re.search(r"\binvitation\b|\binvite\b", subj, re.I) or re.search(r"\.ics\b", body, re.I)) else 0.0
return 0.0
MIN_ROUTE_SCORE = 1.5
TIE_MARGIN = 1.0
def route_email_row(row: pd.Series) -> str:
text = f'{row.get("subject","")} {row.get("body_text","")}'.lower()
scores: dict = {b: 0.0 for b in TAXONOMY.keys()}
for b, terms in TAXONOMY.items():
if not terms:
continue
hits = sum(1 for t in terms if t and t.lower() in text)
scores[b] += float(hits)
if b in ("Lobbyist","Procurement") and any(p in text for p in SUSPECT_PHRASES):
scores[b] += 1.0
for b in TAXONOMY.keys():
scores[b] += _bucket_header_bonus(row, b)
best_bucket, best = max(scores.items(), key=lambda kv: kv[1])
second = sorted(scores.values(), reverse=True)[1] if len(scores) > 1 else 0.0
if best < MIN_ROUTE_SCORE or (best - second) < TIE_MARGIN:
return "Other"
return best_bucket
# =================== Regex & Flags ===================
TOKEN_PATTERN = r"(?u)\b\w[\w.@-]{1,}\b"
URL_RE = re.compile(r"https?://\S+|www\.\S+", re.I)
QUOTE_LINE_RE = re.compile(r"^>.*$", re.M)
SIG_RE = re.compile(r"\n-- ?\n", re.M)
SENT_FROM_RE = re.compile(r"\nSent from my .*$", re.M)
HEBREW_SENT_FROM_RE = re.compile(r"\nנשלח מה.*$", re.M)
FWD_BEGIN_RE = re.compile(r"^Begin forwarded message:", re.I | re.M)
FWD_MSG_RE = re.compile(r"^[-\s]*Original Message[-\s]*$", re.I | re.M)
ON_WROTE_RE = re.compile(r'^\s*On .* wrote:$', re.M)
SKIP_LANGDETECT = True
# ==== Expanded corruption lexicon ====
SUSPECT_PHRASES = [
"off the books","cover up","kickback","bribe","under the table",
"no inspection","special fee","friendly payment","confidential deal",
"nobody will find out","pay to play","cash only","shell company",
"bid rigging","embezzle","slush fund","false invoice","ghost employee",
"contract splitting","grease payment","unreported","unrecorded",
"off the record","just between us","don’t quote me on this","dont quote me on this",
"we never had this conversation","keep this between us","not ethical","illegal",
"grey area","gray area","write off","failed investment","they owe it to me",
"let’s take this offline","lets take this offline","send to my gmail","send to my yahoo",
"don’t leave a trail","dont leave a trail","call my cell","text me","don’t text me","dont text me",
"tell you on the phone","talk in person","come by my office","vpn",
"tax haven","off-shore account","offshore account","backdate","pull earnings forward",
"delete this email","no inspection","special fees","wire instructions",
]
EVASIVE_ACRO_RE = re.compile(r'\b(?:TYOP|LDL|TOL|OTR|TXT|TYL)\b', re.I)
MONEY_RE = re.compile(r'(\$|USD|EUR|ILS|NIS)\s?\d[\d,.\s]*', re.I)
PHONE_RE = re.compile(r'(\+?\d{1,3}[-\s.]?)?(\(?\d{2,4}\)?[-\s.]?)?\d{3,4}[-\s.]?\d{4}')
INVOICE_RE = re.compile(r'\b(invoice|inv\.\s?\d+|po\s?#?\d+|purchase order|wire)\b', re.I)
COMPANY_RE = re.compile(r'\b(LLC|Ltd|Limited|Inc|GmbH|S\.A\.|S\.p\.A\.)\b')
ATTACH_NAME_RE = re.compile(r'\b(agreement|contract|invoice|wire|payment|instructions|accounts?|offshore|tax|statement)\b', re.I)
OFFCHANNEL_PATTERNS = [
r"\bwhatsapp\b", r"\bsignal\b", r"\btelegram\b", r"\bwechat\b",
r"send to my (gmail|yahoo|protonmail)", r"(call|text) (me|my cell)",
r"take this offline", r"don.?t (text|email) (me|this)",
r"\bOTR\b", r"\bTOL\b", r"\bTYOP\b", r"\bLDL\b"
]
OFFCHANNEL_RE = re.compile("|".join(OFFCHANNEL_PATTERNS), re.I)
PERSONAL_DOMAINS = {"gmail.com","yahoo.com","outlook.com","hotmail.com","proton.me","protonmail.com","icloud.com","mail.ru","yandex.ru"}
NEWS_DOMAINS = {"nytimes.com","ft.com","wsj.com","bloomberg.com","reuters.com","theguardian.com","economist.com"}
def is_news_like(subject: str, body: str, from_domain: str) -> bool:
s = (subject or "").lower()
b = (body or "").lower()
fd = (from_domain or "").lower()
if "unsubscribe" in b or "manage preferences" in b: return True
if any(k in s for k in ["daily briefing","morning update","newsletter","top stories"]): return True
if any(d in fd for d in NEWS_DOMAINS): return True
return False
NOTIFY_PATTERNS = [
r"\bno[-\s]?reply\b", r"do not reply", r"security alert", r"new sign[-\s]?in",
r"verification code", r"two[-\s]?factor", r"\b2fa\b", r"\botp\b", r"\bcode[:\s]",
r"itunes connect", r"apple id", r"your google account", r"used (?:a )?new browser",
r"unable to determine", r"reset your password", r"\balert\b",
r"mailer[-\s]?daemon", r"\bpostmaster\b", r"delivery status notification",
r"undeliverable", r"delivery failure", r"returned mail", r"mail delivery subsystem",
r"proofpoint", r"mimecast", r"dmarc", r"\bspf\b", r"\bdkim\b", r"quarantine",
r"spam digest", r"phishing", r"security gateway", r"mail[-\s]?secure|secure message"
]
NOTIFY_RE = re.compile("|".join(NOTIFY_PATTERNS), re.I)
def is_notification_like(subject: str, body: str, from_email: str, from_domain: str) -> bool:
s = (subject or "").lower()
b = (body or "").lower()
fe = (from_email or "").lower()
if NOTIFY_RE.search(s) or NOTIFY_RE.search(b):
return True
if re.search(r"noreply|no-reply|donotreply", fe):
return True
return False
HEB_RE = re.compile(r'[\u0590-\u05FF]')
AR_RE = re.compile(r'[\u0600-\u06FF]')
CYR_RE = re.compile(r'[\u0400-\u04FF]')
def fast_lang_heuristic(text: str) -> str:
t = text or ""
if HEB_RE.search(t): return "he"
if AR_RE.search(t): return "ar"
if CYR_RE.search(t): return "ru"
letters = sum(ch.isalpha() for ch in t)
ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in t)
if letters and ascii_letters / max(1, letters) > 0.85:
return "en"
return "unknown"
CORR_LEX = {
"kickback" : ["kickback","bribe","under the table","gift","cash"],
"invoice_fraud" : ["false invoice","ghost employee","contract splitting","slush fund","shell company","front company"],
"procurement" : ["bid rigging","tender","vendor","sole source","rfp","rfq","purchase order","po"],
"money_flow" : ["wire transfer","transfer","swift","iban","routing number","account number","cash"]
}
# =================== Label cleanup helpers ===================
EN_STOP = {
"the","of","and","to","in","is","for","on","at","with","from","by","or","as",
"that","this","it","be","are","was","were","an","a","you","your","we","our","us",
"re","fwd","fw","hi","hello","thanks","thank","regards","best","please","dear","mr","mrs",
"message","original","forwarded","attached","attachment","confidential","notice","disclaimer",
"herein","thereof","hereby","therein","regarding","subject","url","via","kind","regard","ny"
}
HE_STOP = {"של","על","זה","גם","אם","לא","את","אתה","אני","הוא","היא","הם","הן","כי","מה","שלום","תודה","בברכה","מצורף","הודעה","קדימה","היי"}
MONTHS = {
"jan","feb","mar","apr","may","jun","jul","aug","sep","sept","oct","nov","dec",
"january","february","march","april","june","july","august","september",
"october","november","december"
}
STOP_TERMS = {
"div","span","nbsp","href","src","img","class","style","align","border","cid",
"content","content-type","multipart","alternative","quoted","printable","utf",
"windows-1255","iso-8859","us-ascii","html","plain","attachment","filename",
"type","id","service","person","generated","fyi"
}
AUX_STOP = {
"will","would","should","could","can","cant","cannot","did","do","does","done",
"have","has","had","having","get","got","make","made","let","need","want",
"not","dont","didnt","isnt","arent","wasnt","werent","im","youre","hes","shes",
"weve","ive","theyre","its","ok","okay","pls","please","thx","thanks","regards","best",
"hi","hello","dear","re","fw","fwd","via","kind"
}
CTA_STOP = {
"click","here","unsubscribe","view","browser","mailto","reply","iphone","android",
"press","link","below","above","update","newsletter","manage","preferences",
"לחץ","כאן","נשלח","מה","מה-iphone","הטלפון"
}
TECH_META = {
"quot","nbsp","cid","href","src","img","class","style","div","span","http","https",
"content","content-type","multipart","alternative","quoted","printable","utf",
"windows-1255","iso-8859","us-ascii","attachment","filename"
}
ZH_HEADER_STOP = {"发送时间","星期","星期一","星期二","星期三","星期四","星期五","星期六","星期日","转发","主题","收件人","发件人"}
HE_EXTRA_STOP = {"עם","או"}
STOP_TERMS |= AUX_STOP | CTA_STOP | TECH_META | ZH_HEADER_STOP | HE_EXTRA_STOP
EMAIL_LIKE_RE = re.compile(r"@|^[\w\-]+\.(com|net|org|ru|us|il|ch|co|io|uk|de|fr|it)$", re.I)
YEAR_RE = re.compile(r"^(19|20)\d{2}$")
NUMERIC_RE = re.compile(r"^\d+([.,:/-]\d+)*$")
ONE_CHAR_RE = re.compile(r"^.$")
LONG_ALNUM_RE = re.compile(r"^[A-Za-z0-9_-]{24,}$")
HEXISH_RE = re.compile(r"^(?:[A-Fa-f0-9]{8,})$")
DIGIT_HEAVY_RE = re.compile(r"^(?:\D*\d){6,}\D*$")
UNDERSCORE_HEAVY_RE = re.compile(r"^[A-Za-z0-9]*_[A-Za-z0-9_]*$")
STOPWORD_FOR_VEC = sorted(EN_STOP | HE_STOP | STOP_TERMS)
def _is_junk_term(t: str) -> bool:
tl = (t or "").strip().lower()
if not tl: return True
if tl in STOP_TERMS or tl in EN_STOP or tl in HE_STOP or tl in MONTHS: return True
if EMAIL_LIKE_RE.search(tl): return True
if YEAR_RE.match(tl): return True
if NUMERIC_RE.match(tl): return True
if ONE_CHAR_RE.match(tl): return True
if LONG_ALNUM_RE.match(t): return True
if HEXISH_RE.match(t): return True
if DIGIT_HEAVY_RE.match(t): return True
if UNDERSCORE_HEAVY_RE.match(t): return True
if len(t) > 40: return True
return False
def _sanitize_top_terms(names: np.ndarray, idxs: np.ndarray, mean_vec: np.ndarray, want:int) -> list:
ordered = idxs[np.argsort(-mean_vec[idxs])]
cleaned = []
for i in ordered:
term = names[i]
if _is_junk_term(term):
continue
cleaned.append(term)
if len(cleaned) >= want:
break
if len(cleaned) < max(2, want//2):
for i in ordered:
term = names[i]
if EMAIL_LIKE_RE.search(term) or YEAR_RE.match(term.lower()):
continue
if term not in cleaned:
cleaned.append(term)
if len(cleaned) >= want:
break
return cleaned
# =================== HTML/Text & Header Parsing ===================
def html_to_text(html: str) -> str:
if not html:
return ""
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose()
return soup.get_text(separator="\n")
def strip_quotes_and_sigs(text: str) -> str:
if not text:
return ""
text = QUOTE_LINE_RE.sub("", text)
parts = SIG_RE.split(text)
if parts:
text = parts[0]
text = SENT_FROM_RE.sub("", text)
text = HEBREW_SENT_FROM_RE.sub("", text)
cut = None
for pat in (FWD_BEGIN_RE, FWD_MSG_RE, ON_WROTE_RE):
m = pat.search(text)
if m:
idx = m.start()
cut = idx if (cut is None or idx < cut) else cut
if cut is not None:
text = text[:cut]
text = re.sub(r"\n\s*sent from my .*?$", "", text, flags=re.I|re.M)
text = re.sub(r"\n\s*(נשלח מה-?iphone).*?$", "", text, flags=re.I|re.M)
return text.strip()
def parse_name_email(s: str) -> Tuple[str, str]:
if not s:
return "", ""
m = re.match(r'(?:"?([^"]*)"?\s)?([^<>]+@[^<>]+)>?', s)
if m:
return (m.group(1) or "").strip(), (m.group(2) or "").strip()
return "", s.strip()
def parse_multi_emails(s: str) -> List[str]:
if not s: return []
parts = re.split(r",\s*(?=[^,]*@)", s)
emails = []
for p in parts:
_, e = parse_name_email(p.strip())
if e: emails.append(e)
return emails
def parse_email_headers(text: str) -> Tuple[Dict[str, str], str]:
headers: Dict[str, str] = {}
lines = (text or "").splitlines()
header_pat = re.compile(r'^(From|To|Cc|CC|Bcc|Date|Subject|Subject:|To:|Cc:|Bcc:|From:):')
i = 0
saw_header = False
while i < len(lines):
line = lines[i].rstrip("\r")
stripped = line.strip()
if stripped == "":
i += 1
break
if header_pat.match(line):
saw_header = True
key, rest = line.split(":", 1)
key = key.strip()
value = rest.strip()
if value == "":
j = i + 1
cont = []
while j < len(lines):
nxt = lines[j].rstrip("\r")
nxts = nxt.strip()
if nxts == "" or header_pat.match(nxt):
break
if key.lower() == "subject":
if FWD_BEGIN_RE.match(nxts) or FWD_MSG_RE.match(nxts) or ON_WROTE_RE.match(nxts):
break
if len(cont) > 0:
break
cont.append(nxts)
j += 1
value = " ".join(cont)
i = j
else:
i += 1
headers[key] = value
continue
else:
if saw_header:
break
else:
break
body_text = "\n".join(lines[i:]) if i < len(lines) else ""
return headers, body_text
# =================== Normalization & Utilities ===================
def normalize_email_record(raw: Dict[str, Any], use_langdetect: bool) -> Dict[str, Any]:
if str(raw.get("type", "")).lower() == "meta":
return {}
attach_names = []
atts = raw.get("attachments") or raw.get("Attachments") or raw.get("files") or []
if isinstance(atts, list):
for a in atts:
if isinstance(a, dict):
name = a.get("filename") or a.get("name") or ""
else:
name = str(a)
if name:
attach_names.append(str(name))
body_text_raw = raw.get("body_text") or raw.get("text") or ""
html_content = raw.get("body_html") or raw.get("html") or ""
if html_content and not body_text_raw:
body_text_raw = html_to_text(html_content)
body_text_raw = ftfy.fix_text(body_text_raw or "")
subject_text = ""
from_name = from_email = from_domain = ""
to_emails: List[str] = []
date_val = raw.get("date") or raw.get("Date") or ""
if body_text_raw:
headers, body_only = parse_email_headers(body_text_raw)
subject_text = headers.get("Subject", "") or raw.get("subject") or raw.get("Subject") or ""
sender = headers.get("From", "") or raw.get("from") or raw.get("From") or ""
date_val = headers.get("Date", "") or date_val
to_emails = parse_multi_emails(headers.get("To","") or (raw.get("to") or "")) + \
parse_multi_emails(headers.get("Cc","") or (raw.get("cc") or ""))
body_clean = strip_quotes_and_sigs(ftfy.fix_text(body_only or ""))
body_clean = URL_RE.sub(" URL ", body_clean)
body_clean = re.sub(r"\s+", " ", body_clean).strip()
body_text = body_clean
from_name, from_email = parse_name_email(sender)
from_domain = from_email.split("@")[-1].lower() if "@" in from_email else ""
else:
subject_text = ftfy.fix_text(raw.get("subject") or raw.get("Subject") or "").strip()
body_text = ftfy.fix_text(raw.get("body_text") or raw.get("text") or "")
body_text = URL_RE.sub(" URL ", body_text)
body_text = strip_quotes_and_sigs(body_text)
body_text = re.sub(r"\s+", " ", body_text).strip()
sender = raw.get("from") or raw.get("From") or ""
from_name, from_email = parse_name_email(sender)
from_domain = from_email.split("@")[-1].lower() if "@" in from_email else ""
to_emails = parse_multi_emails(raw.get("to") or "") + parse_multi_emails(raw.get("cc") or "")
subject_norm = re.sub(r"\s+", " ", subject_text or "").strip()
if use_langdetect:
try:
lang = detect((subject_norm + " " + body_text[:5000]).strip()) if (subject_norm or body_text) else "unknown"
except Exception:
lang = fast_lang_heuristic(subject_norm + " " + (body_text or ""))
else:
lang = fast_lang_heuristic(subject_norm + " " + (body_text or ""))
iso_date = ""
if isinstance(date_val, (int, float)):
try:
iso_date = pd.to_datetime(int(date_val), unit="s", utc=True).isoformat()
except Exception:
iso_date = ""
elif isinstance(date_val, str) and date_val:
iso_date = pd.to_datetime(date_val, utc=True, errors="coerce").isoformat()
msg_id = raw.get("message_id") or raw.get("Message-ID") or ""
if not msg_id:
msg_id = f"gen-{uuid.uuid4().hex}"
thread_key = subject_norm or (from_email + body_text[:120])
thread_id = str(pd.util.hash_pandas_object(pd.Series([thread_key], dtype="string")).astype("uint64").iloc[0])
text_hash = str(pd.util.hash_pandas_object(pd.Series([body_text], dtype="string")).astype("uint64").iloc[0]) if body_text else ""
return {
"message_id": str(msg_id),
"thread_id": thread_id,
"date": iso_date,
"from_name": from_name,
"from_email": from_email,
"from_domain": from_domain,
"to_emails": to_emails,
"subject": subject_norm,
"body_text": body_text,
"lang": lang,
"attachments": attach_names,
"text_hash": text_hash,
}
def has_suspect_tag(text: str) -> List[str]:
tags = []
if not text:
return tags
low = text.lower()
for phrase in SUSPECT_PHRASES:
if phrase in low:
tags.append("🚩suspect")
break
if "invoice" in low or "payment" in low or "contract" in low:
tags.append("finance")
if "wire" in low or "transfer" in low or "cash" in low:
if "finance" not in tags:
tags.append("finance")
if OFFCHANNEL_RE.search(low) or EVASIVE_ACRO_RE.search(low):
tags.append("off-channel")
return tags
def compute_sentiment_column(df: pd.DataFrame) -> pd.DataFrame:
if not VADER_OK:
df["sentiment_score"] = np.nan
df["sentiment"] = "(unknown)"
return df
analyzer = SentimentIntensityAnalyzer()
scores = df["body_text"].fillna("").map(lambda t: analyzer.polarity_scores(t)["compound"])
bins = [-1.01, -0.05, 0.05, 1.01]
labels = ["negative", "neutral", "positive"]
df["sentiment_score"] = scores
df["sentiment"] = pd.cut(df["sentiment_score"], bins=bins, labels=labels, include_lowest=True)
return df
def _compile_highlight_terms(row: pd.Series, extra_terms: List[str]) -> List[str]:
terms = []
txt = (row.get("subject","") + " " + row.get("body_text","")).lower()
for p in SUSPECT_PHRASES:
if p in txt:
terms.append(p)
if MONEY_RE.search(txt): terms.append("$")
if INVOICE_RE.search(txt): terms.append("invoice")
if PHONE_RE.search(row.get("body_text","") or ""): terms.append("phone")
for t in extra_terms or []:
t=t.strip()
if t and t.lower() in txt:
terms.append(t)
out, seen = [], set()
for t in terms:
if t.lower() not in seen:
out.append(t); seen.add(t.lower())
return out[:24]
def build_highlighted_html(row: pd.Series, query_terms: Optional[List[str]] = None,
cluster_label: Optional[str] = None,
do_highlight: bool = True,
extra_terms: Optional[List[str]] = None) -> str:
subject = (row.get("subject") or "").strip()
body = (row.get("body_text") or "").strip()
from_email = row.get("from_email") or ""
date = row.get("date") or ""
tags = row.get("tags") or []
flags = row.get("flags") or []
sentiment = row.get("sentiment") or "(unknown)"
hl_terms = []
if do_highlight:
hl_terms = (query_terms or []) + _compile_highlight_terms(row, extra_terms or [])
seen=set(); uniq=[]
for t in hl_terms:
tl=t.lower()
if tl and tl not in seen:
uniq.append(t); seen.add(tl)
hl_terms = uniq[:24]
def hi(text: str) -> str:
if not text or not do_highlight or not hl_terms:
return text
out = text
for qt in hl_terms:
if not qt:
continue
try:
pat = re.compile(re.escape(qt), re.I)
out = pat.sub(lambda m: f"{m.group(0)}", out)
except Exception:
pass
return out
subject_h = hi(subject)
body_h = hi(body)
rtl = bool(re.search(r"[\u0590-\u08FF]", body_h))
dir_attr = ' dir="rtl"' if rtl else ""
body_html = body_h.replace("\n", "
")
def pill(s, cls="tag"):
return f'{s}'
tag_html = ""
pills = []
if isinstance(tags, list) and tags:
pills += [pill(t, "tag") for t in tags]
if isinstance(flags, list) and flags:
pills += [pill(f, "tag") for f in flags]
if pills:
tag_html = " ".join(pills)
cluster_html = f'{cluster_label or ""}' if cluster_label else ""
html = (
f'
'
f' '
f'
'
f' {body_html}'
f'
'
f'
'
)
return html
# ---------- Lightweight Embedding Utilities (Optional) ----------
def _load_embeddings(emb_path: str, binary: bool):
if not GENSIM_OK or not emb_path or not os.path.exists(emb_path):
return None, 0
try:
if binary:
kv = KeyedVectors.load_word2vec_format(emb_path, binary=True)
else:
kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=False)
return kv, int(kv.vector_size)
except Exception:
try:
kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=True)
return kv, int(kv.vector_size)
except Exception:
return None, 0
def _avg_embed_for_text(text: str, kv, dim: int) -> np.ndarray:
vec = np.zeros((dim,), dtype=np.float32)
if not kv or not text:
return vec
toks = re.findall(TOKEN_PATTERN, text.lower())
cnt = 0
for t in toks:
if t in kv:
vec += kv[t]
cnt += 1
if cnt > 0:
vec /= float(cnt)
n = np.linalg.norm(vec)
if n > 0:
vec /= n
return vec
def _build_doc_embeddings(texts: List[str], kv, dim: int) -> np.ndarray:
if not kv or dim <= 0:
return np.zeros((len(texts), 0), dtype=np.float32)
out = np.zeros((len(texts), dim), dtype=np.float32)
for i, t in enumerate(texts):
out[i, :] = _avg_embed_for_text(t or "", kv, dim)
return out
# =================== Feature engineering (BM25 + char) ===================
class BM25Transformer:
def __init__(self, k1=1.2, b=0.75):
self.k1 = k1
self.b = b
self.idf_ = None
self.avgdl_ = None
def fit(self, X):
N = X.shape[0]
df = np.bincount(X.tocsc().indices, minlength=X.shape[1]).astype(np.float64)
self.idf_ = np.log((N - df + 0.5) / (df + 0.5 + 1e-12))
dl = np.asarray(X.sum(axis=1)).ravel()
self.avgdl_ = float(dl.mean() if dl.size else 1.0)
return self
def transform(self, X):
X = X.tocsr(copy=True).astype(np.float32)
dl = np.asarray(X.sum(axis=1)).ravel()
k1, b, avgdl = self.k1, self.b, self.avgdl_
rows, cols = X.nonzero()
data = X.data
for i in range(len(data)):
tf = data[i]
d = rows[i]
denom = tf + k1 * (1 - b + b * (dl[d] / (avgdl + 1e-12)))
data[i] = (self.idf_[cols[i]] * (tf * (k1 + 1))) / (denom + 1e-12)
return X
def enrich_text(row: pd.Series) -> str:
subj = row.get("subject","") or ""
body = row.get("body_text","") or ""
t = subj + "\n\n" + body
tokens = []
if MONEY_RE.search(t): tokens.append("__HAS_MONEY__")
if PHONE_RE.search(t): tokens.append("__HAS_PHONE__")
if INVOICE_RE.search(t): tokens.append("__HAS_INVOICE__")
if COMPANY_RE.search(t): tokens.append("__HAS_COMPANY__")
if OFFCHANNEL_RE.search(t): tokens.append("__OFF_CHANNEL__")
lang_tok = f'__LANG_{(row.get("lang") or "unk").lower()}__'
tokens.append(lang_tok)
return (t + " " + " ".join(tokens)).strip()
# =================== Cluster labeling: PMI + class-TFIDF + SUBJECT BOOST (+ coverage ≥30% preference) ===================
def cluster_labels_pmi_bigram(
texts,
labels,
subjects=None,
topn=6,
subject_alpha=0.75,
global_ubiq_cut=0.20,
subject_min_cov=0.30
):
import math as _math
from collections import Counter, defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
HEADER_STOP = {"subject","re","fw","fwd","to","cc","bcc","from","sent","forwarded",
"回复","主题","收件人","发件人"}
def is_junk_token(tok: str) -> bool:
if _is_junk_term(tok): return True
tl = tok.lower()
if tl.startswith("__"): return True
if "@" in tl: return True
if tl.isascii() and len(tl) <= 2: return True
if LONG_ALNUM_RE.match(tok) or HEXISH_RE.match(tok) or DIGIT_HEAVY_RE.match(tok): return True
if len(tok) > 40: return True
if re.search(r"[^\w\-’']", tl): return True
return False
def tokenize_clean(t):
toks = re.findall(TOKEN_PATTERN, (t or "").lower())
return [w for w in toks if not is_junk_token(w)]
def ngrams(toks, n):
return [" ".join(p) for p in zip(*[toks[i:] for i in range(n)]) if all(not is_junk_token(x) for x in p)]
glob_df_uni = Counter()
glob_df_bg = Counter()
glob_df_tri = Counter()
per_c_bg = defaultdict(Counter)
per_c_tri = defaultdict(Counter)
per_c_texts = defaultdict(list)
per_c_doc_count = defaultdict(int)
per_c_subj_uni_docs = defaultdict(Counter)
per_c_subj_bg_docs = defaultdict(Counter)
per_c_subj_tri_docs = defaultdict(Counter)
have_subjects = subjects is not None and len(subjects) == len(texts)
for idx, (txt, c) in enumerate(zip(texts, labels)):
c = int(c)
toks = tokenize_clean(txt)
uni_set = set(toks)
bg_set = set(ngrams(toks, 2))
tri_set = set(ngrams(toks, 3))
glob_df_uni.update(uni_set)
glob_df_bg.update(bg_set)
glob_df_tri.update(tri_set)
per_c_bg[c].update(bg_set)
per_c_tri[c].update(tri_set)
per_c_texts[c].append(" ".join(toks))
per_c_doc_count[c] += 1
if have_subjects:
stoks = tokenize_clean(subjects[idx] or "")
s_uni = set(stoks)
s_bg = set(ngrams(stoks, 2))
s_tri = set(ngrams(stoks, 3))
per_c_subj_uni_docs[c].update(s_uni)
per_c_subj_bg_docs[c].update(s_bg)
per_c_subj_tri_docs[c].update(s_tri)
N = max(1, len(texts))
def too_ubiquitous(df_count):
return (df_count / float(N)) > float(global_ubiq_cut)
labels_out = {}
for c in sorted(set(int(x) for x in labels)):
n_docs_c = max(1, per_c_doc_count[c])
phrases = []
for store, glob_df, subj_docs, n in (
(per_c_bg[c], glob_df_bg, per_c_subj_bg_docs[c], 2),
(per_c_tri[c], glob_df_tri, per_c_subj_tri_docs[c], 3),
):
total_c = sum(store.values()) + 1e-12
total_g = sum(glob_df.values()) + 1e-12
scored = []
for ng, cnt in store.most_common(3000):
if too_ubiquitous(glob_df[ng]):
continue
p_ng_c = cnt / total_c
p_ng_g = (glob_df[ng] / total_g)
if p_ng_c > 0 and p_ng_g > 0:
score = _math.log(p_ng_c) - _math.log(p_ng_g)
cov = 0.0
if have_subjects:
cov = subj_docs[ng] / n_docs_c
if cov >= subject_min_cov:
score += 0.6
score += subject_alpha * cov
scored.append((score, cov, ng))
scored.sort(key=lambda x: (x[1] >= subject_min_cov, x[0]), reverse=True)
take = max(1, topn // (3 if n == 3 else 2))
phrases.extend([p for _, _, p in scored[:take]])
docs_c = [" ".join(per_c_texts[c])] if per_c_texts[c] else [" "]
docs_bg = [" ".join(sum((per_c_texts[k] for k in per_c_texts if k != c), [])) or " "]
corpus = [docs_c[0], docs_bg[0]]
vec = TfidfVectorizer(
analyzer="word", ngram_range=(1,1),
max_features=3000, token_pattern=TOKEN_PATTERN, lowercase=True
)
X = vec.fit_transform(corpus)
vocab = np.array(sorted(vec.vocabulary_, key=lambda k: vec.vocabulary_[k]))
row = X[0].toarray().ravel()
subj_cov = np.zeros_like(row)
subj_cov_frac = np.zeros_like(row)
vocab_index = {t:i for i,t in enumerate(vocab)}
if have_subjects:
for tok, cnt_docs in per_c_subj_uni_docs[c].items():
if tok in vocab_index and not _is_junk_term(tok):
i = vocab_index[tok]
frac = cnt_docs / n_docs_c
subj_cov[i] = frac
subj_cov_frac[i] = frac
row_boosted = row + subject_alpha * subj_cov
pref_bump = (subj_cov_frac >= subject_min_cov).astype(row_boosted.dtype) * 0.6
final = row_boosted + pref_bump
order = final.argsort()[::-1]
unis = []
for i in order:
tok = vocab[i]
if _is_junk_term(tok): continue
if too_ubiquitous(glob_df_uni.get(tok, 0)):
continue
unis.append(tok)
if len(unis) >= max(0, topn - len(phrases)):
break
parts = (phrases + unis)[:max(2, topn)]
labels_out[c] = ", ".join(parts) if parts else f"cluster_{c}"
return labels_out
# =================== Auto-k & merge ===================
def choose_k_by_kneedle(X, ks=(50,100,150,200,300,400,500)):
n = X.shape[0]
if n <= 1:
return 1, {1: 0.0}
if n < min(ks):
k_small = max(2, min(10, n))
return int(k_small), {int(k_small): 0.0}
if n > 40000:
rs = np.random.RandomState(0)
idx = rs.choice(n, size=40000, replace=False)
Xs = X[idx]
else:
Xs = X
inertias = []
for k in ks:
k = int(k)
if n < k:
break
km = MiniBatchKMeans(n_clusters=k, batch_size=4096, random_state=0, n_init="auto")
km.fit(Xs)
inertias.append(km.inertia_)
if not inertias:
k_small = max(2, min(10, n))
return int(k_small), {int(k_small): 0.0}
x = np.array(list(ks)[:len(inertias)], dtype=float)
y = np.array(inertias, dtype=float)
y_norm = (y - y.min()) / (y.max() - y.min() + 1e-9)
x_norm = (x - x.min()) / (x.max() - x.min() + 1e-9)
chord = y_norm[0] + (y_norm[-1] - y_norm[0]) * (x_norm - x_norm[0])/(x_norm[-1]-x_norm[0]+1e-9)
dist = chord - y_norm
k_best = int(x[np.argmax(dist)])
return k_best, dict(zip(x.astype(int), inertias))
def auto_k_rule(n_docs: int) -> int:
return int(max(120, min(600, math.sqrt(max(n_docs, 1) / 50.0) * 110)))
def merge_close_clusters(labels, centers, thresh=0.92):
centers = sk_normalize(centers)
sim = cosine_similarity(centers, centers)
k = centers.shape[0]
parent = list(range(k))
def find(a):
while parent[a]!=a: a=parent[a]
return a
for i in range(k):
for j in range(i+1, k):
if sim[i,j] >= thresh:
pi, pj = find(i), find(j)
if pi!=pj: parent[pj]=pi
root = {i:find(i) for i in range(k)}
idmap, new_id = {}, 0
for i in range(k):
r = root[i]
if r not in idmap:
idmap[r] = new_id
new_id += 1
labels2 = np.array([idmap[root[int(c)]] for c in labels], dtype=int)
return labels2
def seeded_centroids_in_lsa(lexicons: Dict[str, List[str]], count_vec: CountVectorizer,
lsa_components: np.ndarray, norm_obj: Normalizer,
d_word: int, d_full: int, k: int) -> Optional[np.ndarray]:
seeds_word = []
vocab = count_vec.vocabulary_
for _, words in lexicons.items():
idxs = [vocab.get(w.lower()) for w in words if vocab.get(w.lower()) is not None]
if not idxs:
continue
v = np.zeros((d_word,), dtype=np.float32)
v[idxs] = 1.0
n = np.linalg.norm(v)
if n > 0:
v /= n
seeds_word.append(v)
if not seeds_word:
return None
seeds_full = []
for v in seeds_word:
vf = np.zeros((d_full,), dtype=np.float32)
vf[:d_word] = v
seeds_full.append(vf)
seeds_full = np.stack(seeds_full, axis=0)
seeds_red = seeds_full @ lsa_components.T
seeds_red = norm_obj.transform(seeds_red.astype(np.float32))
if seeds_red.shape[0] >= 2 and seeds_red.shape[0] <= k:
return seeds_red
return None
# =================== NEW: cluster stabilizer ===================
def _centroids_from_labels(X, labels):
labs = np.asarray(labels, dtype=int)
uniq = np.unique(labs)
cents = {}
if isinstance(X, np.ndarray):
for c in uniq:
idx = (labs == c)
if not np.any(idx): continue
v = X[idx].mean(axis=0)
n = np.linalg.norm(v)
if n > 0: v = v / n
cents[int(c)] = v.astype(np.float32)
return cents
X = X.tocsr()
for c in uniq:
rows = np.where(labs == c)[0]
if rows.size == 0: continue
sub = X[rows]
v = np.asarray(sub.mean(axis=0)).ravel()
n = np.linalg.norm(v)
if n > 0: v = v / n
cents[int(c)] = v.astype(np.float32)
return cents
def _cosine_sim_to_centroids(vecs, centroids):
if not centroids:
return None, None
keys = list(centroids.keys())
C = np.stack([centroids[k] for k in keys], axis=0)
if isinstance(vecs, np.ndarray):
sims = vecs @ C.T
else:
sims = vecs.dot(C.T)
if hasattr(sims, "toarray"): sims = sims.toarray()
best_idx = np.argmax(sims, axis=1)
best_lab = np.array([keys[i] for i in best_idx], dtype=int)
best_sim = sims[np.arange(sims.shape[0]), best_idx]
return best_lab, best_sim
def stabilize_labels(X_space, labels, min_size=40, merge_thresh=0.96, reassign_thresh=0.35):
labs = np.asarray(labels, dtype=int)
cents = _centroids_from_labels(X_space, labs)
keys = sorted([k for k in cents.keys() if k >= 0])
if len(keys) >= 2:
C = np.stack([cents[k] for k in keys], axis=0)
sims = C @ C.T
parent = {k:k for k in keys}
def find(a):
while parent[a]!=a:
a = parent[a]
return a
for i in range(len(keys)):
for j in range(i+1, len(keys)):
if sims[i,j] >= float(merge_thresh):
ri, rj = find(keys[i]), find(keys[j])
if ri != rj:
parent[rj] = ri
root = {k: find(k) for k in keys}
merge_map = {k: root[k] for k in keys}
labs = np.array([merge_map.get(int(c), int(c)) for c in labs], dtype=int)
cents = _centroids_from_labels(X_space, labs)
vc = pd.Series(labs).value_counts()
big_labs = set(vc[vc >= int(min_size)].index.tolist())
small_labs = set(vc[vc < int(min_size)].index.tolist())
big_cents = {c: cents[c] for c in big_labs if c in cents and c >= 0}
NOISE_ID = -3
if small_labs and big_cents:
idx_small = np.where(pd.Series(labs).isin(small_labs))[0]
if idx_small.size > 0:
sub = X_space[idx_small] if not isinstance(X_space, np.ndarray) else X_space[idx_small]
best_lab, best_sim = _cosine_sim_to_centroids(sub, big_cents)
reassigned = np.where(best_sim >= float(reassign_thresh), best_lab, NOISE_ID)
labs[idx_small] = reassigned
return labs
# =================== Scoring & Flags ===================
def _hour_of(dt_iso: str) -> Optional[int]:
try:
if not dt_iso: return None
dt = pd.to_datetime(dt_iso, utc=True, errors="coerce")
if pd.isna(dt): return None
return int(dt.hour)
except Exception:
return None
def _attachment_flags(names: List[str]) -> List[str]:
flags=[]
for n in names or []:
if ATTACH_NAME_RE.search(n):
flags.append("📎 " + n[:40])
return flags[:5]
def corruption_score(row, trusted_domains: set):
score = 0.0
txt = f'{row.get("subject","")} {row.get("body_text","")}'.lower()
for ph in SUSPECT_PHRASES:
if ph in txt:
score += 2.0
break
if EVASIVE_ACRO_RE.search(txt) or OFFCHANNEL_RE.search(txt):
score += 1.0
if isinstance(row.get("tags"), list) and ("🚩suspect" in row["tags"] or "finance" in row["tags"]):
score += 1.5
if MONEY_RE.search(txt): score += 0.7
if INVOICE_RE.search(txt): score += 0.7
if str(row.get("sentiment","")) == "negative":
score += 0.3
body_len = len(row.get("body_text",""))
if body_len < 160 and PHONE_RE.search(row.get("body_text","") or ""):
score += 0.5
fd = (row.get("from_domain") or "").lower()
if fd in PERSONAL_DOMAINS and fd not in trusted_domains:
score += 0.5
h = _hour_of(row.get("date") or "")
if h is not None and (h < 6 or h > 22):
score += 0.3
return score
def compute_context_anomaly(df_in: pd.DataFrame) -> pd.DataFrame:
if df_in.empty:
df_in["context_anomaly_score"] = 0.0
return df_in
df = df_in.copy()
if "anomaly_score" in df.columns:
df["_if_pct"] = 0.0
for bkt, grp in df.groupby("bucket", dropna=False):
vals = grp["anomaly_score"].astype(float)
if vals.notna().sum() >= 5:
ranks = vals.rank(pct=True, ascending=False)
df.loc[grp.index, "_if_pct"] = ranks.clip(0, 1)
df["_if_pts"] = (df["_if_pct"] * 6.0).clip(0, 6)
else:
df["_if_pts"] = 0.0
df["_rule_pts"] = 0.0
low = (df["subject"].fillna("") + " " + df["body_text"].fillna("")).str.lower()
for bkt, terms in TAXONOMY.items():
mask = (df["bucket"] == bkt)
if not mask.any():
continue
if terms:
has_term = low.str.contains("|".join([re.escape(t.lower()) for t in terms]), regex=True)
df.loc[mask & (~has_term), "_rule_pts"] += 1.0
if bkt == "Constituent":
df.loc[mask & (~df["from_domain"].str.lower().isin(PERSONAL_DOMAINS)), "_rule_pts"] += 1.0
if bkt == "Scheduling":
subj = df.loc[mask, "subject"].fillna("").str.lower()
df.loc[mask & (~subj.str.contains(r"\bmeeting|invite|schedule|calendar\b", regex=True)), "_rule_pts"] += 1.0
df["_rule_pts"] = df["_rule_pts"].clip(0, 2)
df["_corr_pts"] = df["corruption_score"].fillna(0).clip(0, 3)
df["context_anomaly_score"] = (df["_if_pts"] + df["_rule_pts"] + df["_corr_pts"]).clip(0, 10)
return df.drop(columns=["_if_pct","_if_pts","_rule_pts","_corr_pts"], errors="ignore")
# =================== 🔧 NEW: Per-bucket k & stabilizer params ===================
def _bucket_k_multiplier(bucket_name: str) -> float:
b = (bucket_name or "").lower()
if b in ("constituent",): return 1.25
if b in ("procurement", "campaign finance", "receipts/billing", "lobbyist"): return 1.15
if b in ("scheduling", "other"): return 1.00
if b in ("legal",): return 0.80
return 1.00
def _bucket_stabilizer_params(bucket_name: str) -> Tuple[int, float, float]:
b = (bucket_name or "").lower()
if b == "legal": return (30, 0.97, 0.38)
if b == "procurement": return (35, 0.96, 0.36)
if b == "campaign finance": return (35, 0.96, 0.36)
if b == "constituent": return (40, 0.95, 0.33)
if b == "receipts/billing": return (40, 0.95, 0.35)
if b == "scheduling": return (35, 0.95, 0.35)
return (40, 0.96, 0.35)
# =================== 🔧 NEW: Label de-dup helpers ===================
def _normalize_label_tokens(label: str) -> set:
if not label: return set()
txt = str(label).lower()
toks = re.findall(r"[a-z\u0590-\u05FF][a-z\u0590-\u05FF\-']{1,}", txt)
toks2 = [t[:-1] if len(t) > 3 and t.endswith("s") else t for t in toks]
return {t for t in toks2 if t not in STOP_TERMS and t not in EN_STOP and t not in HE_STOP and len(t) >= 2}
def _jaccard(a: set, b: set) -> float:
if not a or not b: return 0.0
inter = len(a & b)
if inter == 0: return 0.0
return inter / float(len(a | b))
def dedupe_cluster_labels_in_bucket(df: pd.DataFrame, bucket: str, sim_thresh: float = 0.72) -> pd.DataFrame:
sel = df[df["bucket"] == bucket].copy()
if sel.empty or "cluster_name" not in sel.columns:
return df
names = sel[["cluster_id", "cluster_name"]].drop_duplicates()
tokens = {int(cid): _normalize_label_tokens(str(name)) for cid, name in names.values}
ids = list(tokens.keys())
parent = {i: i for i in ids}
def find(i):
while parent[i] != i: i = parent[i]
return i
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb: parent[rb] = ra
for i in range(len(ids)):
for j in range(i+1, len(ids)):
if _jaccard(tokens[ids[i]], tokens[ids[j]]) >= sim_thresh:
union(ids[i], ids[j])
names_map = dict(names.values)
comp_to_canon = {}
for cid in ids:
root = find(cid)
comp_to_canon.setdefault(root, [])
comp_to_canon[root].append((cid, names_map.get(cid, "")))
canon_for_cluster = {}
for root, items in comp_to_canon.items():
best = max(items, key=lambda kv: (len(kv[1] or ""), kv[1]))
for cid, _ in items:
canon_for_cluster[cid] = best[1]
df.loc[sel.index, "cluster_name"] = sel["cluster_id"].map(lambda c: canon_for_cluster.get(int(c), names_map.get(int(c), "")))
return df
def dedupe_all_labels(df: pd.DataFrame) -> pd.DataFrame:
out = df
for bkt in sorted(df["bucket"].dropna().unique()):
out = dedupe_cluster_labels_in_bucket(out, bkt, sim_thresh=0.72)
return out
# =================== 🔎 NEW: Surveillance-campaign detection ===================
SURV_KEYWORDS = [
"daily report","daily brief","briefing","sitreps","sitrep","situation report","summary",
"dossier","monitoring","tracking","watchlist","watch list","profile","surveillance",
"intel","intelligence","osint","open source intel","clippings","press clips","digest",
"alert","alerting","dispatch","bulletin","roundup","update"
]
SURV_RE = re.compile("|".join([re.escape(k) for k in SURV_KEYWORDS]), re.I)
SUBJ_NUM_RE = re.compile(r"\b\d{1,4}([,./-]\d{1,4})*\b")
SUBJ_DATE_RE = re.compile(r"\b(?:\d{1,2}[-/]\d{1,2}(?:[-/]\d{2,4})?|\d{4}-\d{2}-\d{2}|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\b", re.I)
SUBJ_FW_RE = re.compile(r"^\s*(re:|fw:|fwd:)\s*", re.I)
EMAIL_RE = re.compile(r"\b[\w.\-+%]+@[\w.-]+\.[A-Za-z]{2,}\b")
def _candidate_entities_from_subjects(df: pd.DataFrame, extra_watchlist: List[str]) -> List[str]:
cand = set([w.strip() for w in (extra_watchlist or []) if w.strip()])
subs = df["subject"].dropna().astype(str).tolist()
pat = re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})\b")
for s in subs:
for m in pat.finditer(s):
name = m.group(1).strip()
if name.lower() in EN_STOP or len(name) < 5:
continue
cand.add(name)
out = sorted(cand)
return out[:3000]
def _normalize_subject_template(subj: str, entity: str) -> str:
if not subj: return ""
s = SUBJ_FW_RE.sub("", subj)
try:
s = re.sub(re.escape(entity), "«ENTITY»", s, flags=re.I)
except Exception:
pass
s = SUBJ_DATE_RE.sub("«DATE»", s)
s = SUBJ_NUM_RE.sub("«NUM»", s)
s = EMAIL_RE.sub("«EMAIL»", s)
s = re.sub(r"\s+", " ", s).strip().lower()
return s
def _entity_mask_present(row: pd.Series, entity: str) -> bool:
t = (row.get("subject","") + " " + row.get("body_text","")).lower()
e = (entity or "").lower()
return (e in t) if e else False
def detect_surveillance_campaigns(
df: pd.DataFrame,
watchlist: Optional[List[str]] = None,
min_mentions: int = 15,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
if df.empty:
return pd.DataFrame(), pd.DataFrame()
watch = [w.strip() for w in (watchlist or []) if w.strip()]
cands = _candidate_entities_from_subjects(df, watch)
dfd = df.copy()
dfd["_dt"] = pd.to_datetime(dfd["date"], utc=True, errors="coerce")
dfd["_day"] = dfd["_dt"].dt.date
dfd["_week"] = dfd["_dt"].dt.to_period("W").astype(str)
dfd["_is_news"] = (dfd["bucket"] == "Newsletters/Alerts")
dfd["_is_it"] = (dfd["bucket"] == "IT/Security")
dfd["_is_internal"] = ~(dfd["_is_news"] | dfd["_is_it"])
dfd["_recips"] = dfd["to_emails"].apply(lambda xs: len(xs) if isinstance(xs, list) else 0)
rows = []
samples = []
for entity in cands:
mask = dfd.apply(lambda r: _entity_mask_present(r, entity), axis=1)
grp = dfd[mask]
n = len(grp)
if n < int(min_mentions):
continue
n_senders = grp["from_email"].nunique()
n_domains = grp["from_domain"].nunique()
pct_news = float((grp["_is_news"].mean() if n else 0.0))
pct_int = float((grp["_is_internal"].mean() if n else 0.0))
avg_recips = float((grp["_recips"].mean() if n else 0.0))
wk = grp.groupby("_week").size().astype(float)
if len(wk) >= 4:
baseline = wk.iloc[:-1]
mu = float(baseline.mean()) if len(baseline) else 0.0
sd = float(baseline.std(ddof=1)) if len(baseline) > 1 else 0.0
last = float(wk.iloc[-1])
weekly_peak_z = 0.0 if sd == 0.0 else (last - mu) / sd
else:
weekly_peak_z = 0.0
norm_subj = grp["subject"].fillna("").astype(str).map(lambda s: _normalize_subject_template(s, entity))
if len(norm_subj):
top_template_share = norm_subj.value_counts(normalize=True).iloc[0]
else:
top_template_share = 0.0
kw_share = float(((grp["subject"].fillna("") + " " + grp["body_text"].fillna("")).str.contains(SURV_RE).mean()) if n else 0.0)
score = 0.0
score += 2.5 * min(1.0, top_template_share)
score += 2.0 * min(1.0, kw_share)
score += 1.5 * min(1.0, weekly_peak_z / 3.0)
score += 0.8 * min(1.0, n_senders / 10.0)
score += 0.5 * min(1.0, n_domains / 10.0)
score += 1.0 * pct_int
score += 0.3 * min(1.0, avg_recips / 10.0)
level = "info"
if score >= 6.5: level = "likely"
elif score >= 4.5: level = "possible"
first_d = grp["_dt"].min()
last_d = grp["_dt"].max()
rows.append({
"entity": entity,
"surveillance_score": round(float(score), 3),
"level": level,
"n_emails": int(n),
"n_senders": int(n_senders),
"n_domains": int(n_domains),
"pct_newsletters": round(pct_news, 3),
"pct_internal": round(pct_int, 3),
"avg_recipients": round(avg_recips, 2),
"weekly_peak_z": round(float(weekly_peak_z), 3),
"template_max_share": round(float(top_template_share), 3),
"keyword_share": round(float(kw_share), 3),
"first_date": str(first_d) if pd.notna(first_d) else "",
"last_date": str(last_d) if pd.notna(last_d) else "",
"notes": "template/keywords/cadence/senders/domains mix"
})
ex = grp[["date","from_email","from_domain","subject","bucket"]].copy().head(8)
ex.insert(0, "entity", entity)
samples.append(ex)
ent_df = pd.DataFrame(rows).sort_values(["surveillance_score","n_emails"], ascending=[False, False]).head(200)
samp_df = pd.concat(samples, ignore_index=True) if samples else pd.DataFrame()
return ent_df, samp_df
def tag_surveillance_emails(df: pd.DataFrame, ent_df: pd.DataFrame, threshold: float = 4.5) -> pd.DataFrame:
if df.empty or ent_df.empty:
return df
hot = ent_df[ent_df["surveillance_score"] >= float(threshold)]["entity"].tolist()
if not hot: return df
def _tag(row):
txt = (row.get("subject","") + " " + row.get("body_text","")).lower()
tags = set(row.get("tags") or [])
for e in hot:
if e.lower() in txt:
tags.add("surveillance")
break
return sorted(tags)
out = df.copy()
out["tags"] = out.apply(_tag, axis=1)
return out
# =================== UI / PIPELINE CONTINUATION ===================
# ---------- Styles ----------
CSS = """
:root { --pill:#eef2ff; --pill-text:#1f2937; --tag:#e5e7eb; --tag-text:#111827; }
.email-card { background:#ffffff; color:#111827; border-radius:12px; padding:16px; box-shadow:0 1px 3px rgba(0,0,0,0.08); }
.email-header { display:flex; align-items:flex-start; justify-content:space-between; gap:12px; }
.subject { color:#0f172a; font-size:18px; font-weight:700; margin-bottom:6px; }
.meta { color:#334155; font-size:12px; }
.badges { display:flex; gap:8px; align-items:center; flex-wrap:wrap; }
.cluster-pill { background:var(--pill); color:var(--pill-text); padding:2px 8px; border-radius:999px; font-size:12px; }
.sentiment { font-size:12px; color:#334155; }
.tag { background:var(--tag); color:var(--tag-text); padding:2px 6px; border-radius:6px; font-size:12px; }
.email-body { margin-top:12px; max-height:520px; overflow:auto; line-height:1.6; white-space:normal; color:#111827; }
.email-body a { color:#1d4ed8; text-decoration:underline; }
mark { background:#fff59d; color:#111827; padding:0 2px; border-radius:2px; }
hr.sep { border:none; border-top:1px solid #e5e7eb; margin:10px 0; }
.small { color:#475569; font-size:12px; }
.cursor { cursor:pointer; }
"""
# ---------- App ----------
with gr.Blocks(title="Email Investigator — Per-bucket-k + Label Dedup + Surveillance Radar", css=CSS, theme="soft") as demo:
gr.Markdown("# Email Investigator — BM25 + Char-grams + (optional) LSA → MiniBatchKMeans")
gr.Markdown(
"This build includes per-bucket **k** heuristics, label **de-dup**, and a **surveillance-campaign detector** "
"(template cadence + keywords + multi-sender/domain signals)."
)
with gr.Row():
inbox_file = gr.File(label="Upload emails (.jsonl or .json)", file_types=[".jsonl", ".json"])
with gr.Accordion("Vectorization & Clustering", open=True):
with gr.Row():
max_features = gr.Number(label="Word max_features (BM25)", value=120_000, precision=0)
min_df = gr.Number(label="min_df (doc freq ≥)", value=3, precision=0)
max_df = gr.Slider(label="max_df (fraction ≤)", minimum=0.1, maximum=0.95, value=0.7, step=0.05)
use_bigrams = gr.Checkbox(label="Use bigrams (1–2)", value=True)
skip_lang = gr.Checkbox(label="Skip language detection (faster)", value=True)
with gr.Row():
use_hashing = gr.Checkbox(label="Use HashingVectorizer (memory-light, fast)", value=True)
hash_bits = gr.Slider(label="Hashing bits (2^n features)", minimum=16, maximum=20, step=1, value=18)
with gr.Row():
use_lsa = gr.Checkbox(label="Use LSA (TruncatedSVD) before KMeans", value=True)
lsa_dim = gr.Number(label="LSA components", value=256, precision=0)
auto_k = gr.Checkbox(label="Auto choose k (kneedle)", value=True)
k_clusters = gr.Number(label="Base k (before per-bucket multiplier)", value=350, precision=0)
mb_batch = gr.Number(label="KMeans batch_size", value=4096, precision=0)
with gr.Row():
use_hdbscan = gr.Checkbox(label="Use HDBSCAN (auto-k, noise) on reduced vectors", value=False)
hdb_min_cluster = gr.Number(label="HDBSCAN min_cluster_size", value=60, precision=0)
hdb_min_samples = gr.Number(label="HDBSCAN min_samples (0=auto)", value=0, precision=0)
with gr.Row():
per_language = gr.Checkbox(label="Cluster per language (reduces cross-language mixing)", value=True)
with gr.Row():
use_faiss = gr.Checkbox(label="Use Faiss ANN for search (if available & LSA on)", value=True)
use_iso = gr.Checkbox(label="Compute anomaly score (IsolationForest on LSA)", value=False)
with gr.Accordion("Investigation Controls", open=True):
with gr.Row():
trusted_domains_in = gr.Textbox(label="Trusted org domains (comma-separated)", value="example.gov, example.org")
extra_keywords_in = gr.Textbox(label="Extra suspicious phrases (comma-separated)", value="")
highlight_toggle = gr.Checkbox(label="Highlight suspect patterns in reader", value=True)
with gr.Row():
use_embeddings = gr.Checkbox(label="Add lightweight word embeddings (avg word2vec/GloVe) if available", value=False)
embed_weight = gr.Slider(label="Embedding weight in feature space", minimum=0.0, maximum=1.0, step=0.05, value=0.35)
with gr.Row():
embeddings_path = gr.Textbox(label="Path to local embeddings (.txt/.vec/.bin) (optional)", value="")
embeddings_binary = gr.Checkbox(label="File is binary word2vec format", value=False)
with gr.Row():
bucket_drop = gr.Dropdown(label="Bucket", choices=["(any)"] + list(TAXONOMY.keys()), value="(any)", allow_custom_value=False)
cluster_drop = gr.Dropdown(label="Cluster", choices=[], value=None, allow_custom_value=False)
domain_drop = gr.Dropdown(label="Sender domain", choices=[], value=None, allow_custom_value=False)
sender_drop = gr.Dropdown(label="Sender email", choices=[], value=None, allow_custom_value=False)
lang_drop = gr.Dropdown(label="Language", choices=["(any)"], value="(any)", allow_custom_value=False)
sentiment_drop = gr.Dropdown(label="Sentiment", choices=["(any)", "positive", "neutral", "negative"], value="(any)")
tag_drop = gr.Dropdown(label="Tag", choices=["(any)", "🚩suspect", "finance", "off-channel", "surveillance", "odd-hours", "personal-mail"], value="(any)")
with gr.Row():
date_start = gr.Textbox(label="Date from (YYYY-MM-DD, optional)", value="")
date_end = gr.Textbox(label="Date to (YYYY-MM-DD, optional)", value="")
sort_by = gr.Dropdown(label="Sort by", choices=["context_anomaly_score","corruption_score","date","anomaly_score","search_score"], value="context_anomaly_score")
sort_dir = gr.Dropdown(label="Order", choices=["desc","asc"], value="desc")
with gr.Row():
hide_noise = gr.Checkbox(label="Hide noise/unassigned (cluster -3)", value=True)
with gr.Accordion("Surveillance Radar", open=True):
with gr.Row():
watchlist_in = gr.Textbox(label="Watchlist (names or entities, comma-separated)", value="Hillary Clinton, Joe Biden, Donald Trump")
min_mentions = gr.Number(label="Min mentions per entity", value=15, precision=0)
with gr.Row():
run_btn = gr.Button("Process", variant="primary")
# NEW: Update button lets you re-run with same uploaded file & current settings
update_btn = gr.Button("Update", variant="secondary") # NEW: Update
reset_btn = gr.Button("Reset filters")
status = gr.Markdown("")
with gr.Row():
cluster_counts_df = gr.Dataframe(label="Cluster summary (top 500) — click a row to filter", interactive=False, wrap=True)
domain_counts_df = gr.Dataframe(label="Top sender domains", interactive=False, wrap=True)
with gr.Row():
sender_counts_df = gr.Dataframe(label="Top senders", interactive=False, wrap=True)
with gr.Row():
actors_df = gr.Dataframe(label="Top actors (by degree / unique counterparts)", interactive=False, wrap=True)
offhours_df = gr.Dataframe(label="Off-hours & personal-mail hits", interactive=False, wrap=True)
gr.Markdown("### Surveillance Campaigns (detected entities)")
with gr.Row():
surv_entities_df = gr.Dataframe(label="Entities ranked by surveillance score", interactive=False, wrap=True)
surv_samples_df = gr.Dataframe(label="Sample emails for highlighted entities", interactive=False, wrap=True)
gr.Markdown("### Search")
with gr.Row():
search_query = gr.Textbox(label="Search (keywords, names, etc.)")
search_btn = gr.Button("Search")
results_df = gr.Dataframe(label="Results (top 500 or top 50 for search)", interactive=True, wrap=True)
email_view = gr.HTML(label="Reader")
# -------- State --------
state_df = gr.State()
state_vec = gr.State()
state_X_reduced = gr.State()
state_index = gr.State()
state_term_names = gr.State()
state_query_terms = gr.State()
state_use_lsa = gr.State()
state_use_faiss = gr.State()
state_svd = gr.State()
state_norm = gr.State()
state_dims = gr.State()
state_extra_terms = gr.State()
state_highlight = gr.State()
state_inbox = gr.State() # NEW: keep last uploaded file for Update
# -------- IO helpers --------
def _load_json_records(local_path: str) -> List[Dict[str, Any]]:
recs: List[Dict[str, Any]] = []
if local_path.endswith(".jsonl"):
with open(local_path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if str(obj.get("type", "")).lower() == "meta":
continue
recs.append(obj)
else:
with open(local_path, "r", encoding="utf-8") as fh:
obj = json.load(fh)
if isinstance(obj, list):
for r in obj:
if str(r.get("type", "")).lower() == "meta":
continue
recs.append(r)
elif isinstance(obj, dict):
if str(obj.get("type", "")).lower() != "meta":
recs = [obj]
return recs
def _apply_filters(
df: pd.DataFrame,
bucket: Optional[str],
cluster: Optional[str],
domain: Optional[str],
sender: Optional[str],
lang_value: str,
sentiment: str,
tag_value: str,
start: str,
end: str,
hide_noise_flag: bool = False,
) -> pd.DataFrame:
out = df
if bucket and bucket != "(any)":
out = out[out["bucket"] == bucket]
if cluster and cluster != "(any)":
m = re.match(r"^.*?(\-?\d+)\s+—", cluster)
if m:
cid = int(m.group(1))
out = out[out["cluster_id"] == cid]
if domain and domain != "(any)":
out = out[out["from_domain"] == domain]
if sender and sender != "(any)":
out = out[out["from_email"] == sender]
if lang_value and lang_value != "(any)":
out = out[out["lang"] == lang_value]
if sentiment and sentiment != "(any)" and "sentiment" in out.columns:
out = out[out["sentiment"].astype(str) == sentiment]
if tag_value and tag_value != "(any)":
out = out[out["tags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts))
| out["flags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts))]
if start:
try:
dt = pd.to_datetime(start, utc=True, errors="coerce")
out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") >= dt]
except Exception:
pass
if end:
try:
dt = pd.to_datetime(end, utc=True, errors="coerce")
out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") <= dt]
except Exception:
pass
if hide_noise_flag:
out = out[out["cluster_id"] != -3]
return out
# -------- Social graph summary --------
def social_stats(df: pd.DataFrame) -> pd.DataFrame:
deg = {}
def add_edge(a,b):
if not a or not b or a==b: return
deg.setdefault(a,set()).add(b)
deg.setdefault(b,set()).add(a)
for _, r in df.iterrows():
f = r.get("from_email") or ""
tos = r.get("to_emails") or []
for t in tos:
add_edge(f, t)
rows=[]
for addr, nbrs in deg.items():
rows.append({"address": addr, "degree": len(nbrs)})
out = pd.DataFrame(rows).sort_values("degree", ascending=False).head(50)
return out
# -------- Sorting helper --------
def _sort_results(df, by, direction):
if df is None or len(df) == 0:
return pd.DataFrame()
tmp = df.copy()
if "date" in tmp.columns:
tmp["_dt"] = pd.to_datetime(tmp["date"], utc=True, errors="coerce")
else:
tmp["_dt"] = pd.NaT
by = by if by in tmp.columns else "context_anomaly_score"
asc = (direction == "asc")
sort_cols = [by]
if by == "date":
sort_cols = ["_dt"]
elif by in ["anomaly_score", "corruption_score", "context_anomaly_score"]:
sort_cols.append("_dt")
tmp = tmp.sort_values(sort_cols, ascending=[asc, False])
cols_out = [
"date","bucket","from_email","from_domain","subject","cluster_name","lang",
"tags","flags","sentiment","context_anomaly_score","corruption_score","anomaly_score"
]
if "search_score" in tmp.columns:
cols_out.append("search_score")
return tmp[[c for c in cols_out if c in tmp.columns]].head(500)
# -------- Vectorization helpers (mirror training path for queries) --------
def _tokenize_query(q: str) -> List[str]:
return [p.strip() for p in re.split(r"\s+", q or "") if p.strip()][:8]
def _project_query_to_lsa(q_vec, svd, norm) -> Optional[np.ndarray]:
try:
return norm.transform(svd.transform(q_vec)).astype(np.float32)
except Exception:
return None
def _vectorize_query(q, vec_state, corpus_texts):
# Build the same features for the query that we used for docs
char_min_df = 1 if len(corpus_texts) <= 1 else 2
if vec_state.get("use_hashing"):
hv = HashingVectorizer(
analyzer="word",
ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1),
n_features=2 ** vec_state.get("hash_bits", 18),
token_pattern=TOKEN_PATTERN,
lowercase=True,
norm=None,
alternate_sign=False,
)
counts = hv.transform(corpus_texts)
tfidf_tr = TfidfTransformer().fit(counts)
q_word = tfidf_tr.transform(hv.transform([q]))
else:
cv = CountVectorizer(
analyzer="word",
ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1),
max_features=vec_state.get("max_features"),
min_df=vec_state.get("min_df"),
max_df=vec_state.get("max_df"),
token_pattern=TOKEN_PATTERN,
lowercase=True,
stop_words=STOPWORD_FOR_VEC,
dtype=np.float32,
)
tf = cv.fit_transform(corpus_texts)
bm25 = BM25Transformer().fit(tf)
q_word = bm25.transform(cv.transform([q]))
char_vec = CharTfidf(
analyzer="char", ngram_range=(3, 5), min_df=char_min_df, max_features=100_000, lowercase=True, dtype=np.float32
).fit(corpus_texts)
q_char = char_vec.transform([q])
return hstack([q_word, q_char * 0.20], format="csr")
# -------- Main pipeline --------
def process_file(inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang,
use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso,
trusted_domains_in, extra_keywords_in, highlight_toggle,
use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples,
per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary,
watchlist_in, min_mentions):
if inbox_file is None:
return (
"**Please upload a file.**",
None, None, None, None, None,
None, None, # surveillance outputs
None, # results_df
None, None, None, # states df/vec/X
None, None, # index & term names
None, None, # flags
gr.update(), gr.update(), gr.update(), gr.update(), # dropdowns
None, None, None, # svd/norm/dims
None, None, # extra terms / highlight
gr.update() # bucket list
)
# === Inner helpers for this function ===
def _make_texts(df_in: pd.DataFrame) -> Tuple[List[str], List[str]]:
texts = list(df_in.apply(enrich_text, axis=1))
subjects_only = list(df_in["subject"].fillna(""))
return texts, subjects_only
def _vectorize_block(
texts: List[str],
use_bigrams: bool,
max_features: int,
min_df: int,
max_df: float,
use_hashing: bool,
hash_bits: int
):
"""
Return (X_full_csr, count_vec, char_vec, bm25, d_word, d_char, d_full)
Uses Count+BM25 (+ char-tfidf) or Hashing+TfidfTransformer (+ char-tfidf).
"""
n_docs = len(texts)
ngram_range = (1, 2) if use_bigrams else (1, 1)
char_min_df = 1 if n_docs <= 1 else 2
if use_hashing:
hv = HashingVectorizer(
analyzer="word", ngram_range=ngram_range,
n_features=2 ** int(hash_bits), alternate_sign=False,
token_pattern=TOKEN_PATTERN, lowercase=True, norm=None
)
word_counts = hv.transform(texts)
tfidf_tr = TfidfTransformer()
X_word = tfidf_tr.fit_transform(word_counts).astype(np.float32)
char_vec = CharTfidf(
analyzer="char", ngram_range=(3, 5), min_df=char_min_df,
max_features=100_000, lowercase=True, dtype=np.float32
)
X_char = char_vec.fit_transform(texts)
X_full = hstack([X_word, X_char * 0.20], format="csr")
d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1]
count_vec = None; bm25 = None
return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full
count_vec = CountVectorizer(
analyzer="word", ngram_range=ngram_range,
max_features=int(max_features) if max_features else None,
min_df=int(min_df) if min_df else 2, max_df=float(max_df) if max_df else 0.7,
token_pattern=TOKEN_PATTERN, lowercase=True,
dtype=np.float32, stop_words=STOPWORD_FOR_VEC
)
TF = count_vec.fit_transform(texts)
bm25 = BM25Transformer(k1=1.2, b=0.75).fit(TF)
X_word = bm25.transform(TF)
char_vec = CharTfidf(
analyzer="char", ngram_range=(3, 5), min_df=char_min_df,
max_features=100_000, lowercase=True, dtype=np.float32
)
X_char = char_vec.fit_transform(texts)
X_full = hstack([X_word, X_char * 0.20], format="csr")
d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1]
return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full
def _reduce_space(X_full, use_lsa, lsa_dim):
svd_obj = None
norm_obj = None
X_reduced = None
if not use_lsa:
return X_reduced, svd_obj, norm_obj
n_docs = X_full.shape[0]
n_feats = X_full.shape[1]
max_components = max(1, min(n_docs, n_feats) - 1)
n_comp = int(min(int(lsa_dim or 256), max_components))
if n_comp < 2:
return None, None, None
svd_obj = TruncatedSVD(n_components=n_comp, random_state=0)
Xtmp = svd_obj.fit_transform(X_full) # dense
norm_obj = Normalizer(copy=False)
X_reduced = norm_obj.fit_transform(Xtmp).astype(np.float32)
del Xtmp; gc.collect()
return X_reduced, svd_obj, norm_obj
def _attach_embeddings(texts, X_reduced_or_full, use_lsa, kv, emb_dim, weight):
if kv is None or emb_dim <= 0 or weight <= 0.0:
return X_reduced_or_full, emb_dim
doc_embs = _build_doc_embeddings(texts, kv, emb_dim).astype(np.float32)
if weight != 1.0:
doc_embs *= float(weight)
if isinstance(X_reduced_or_full, np.ndarray):
return np.hstack([X_reduced_or_full, doc_embs]).astype(np.float32), emb_dim
else:
X_emb = csr_matrix(doc_embs)
return hstack([X_reduced_or_full, X_emb], format="csr"), emb_dim
def _cluster_space(
X_space,
bucket_name: str,
df_part: pd.DataFrame,
use_lsa: bool,
use_hdbscan: bool,
hdb_min_cluster: int,
hdb_min_samples: int,
auto_k: bool,
k_clusters: int,
mb_batch: int,
count_vec,
svd_obj,
norm_obj,
d_word, d_char
):
n = X_space.shape[0]
# Per-bucket stabilizer params
min_size, merge_th, reassign_th = _bucket_stabilizer_params(bucket_name)
if n <= 1:
labels = np.zeros((n,), dtype=int) if n == 1 else np.array([], dtype=int)
centers = None
chosen_k = int(n) if n > 0 else 0
return stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th), centers, chosen_k
if n < 10:
k_small = min(max(2, n // 2), n)
kmeans = MiniBatchKMeans(
n_clusters=int(k_small),
batch_size=int(mb_batch or 4096),
random_state=0,
n_init="auto"
)
labels = kmeans.fit_predict(X_space)
centers = getattr(kmeans, "cluster_centers_", None)
labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th)
return labels, centers, int(len(set(labels)))
if use_hdbscan and HDBSCAN_OK and isinstance(X_space, np.ndarray) and X_space.shape[0] >= max(50, hdb_min_cluster):
min_samples = None if int(hdb_min_samples or 0) <= 0 else int(hdb_min_samples)
clusterer = hdbscan.HDBSCAN(
min_cluster_size=int(hdb_min_cluster or 60),
min_samples=min_samples,
metric='euclidean',
cluster_selection_epsilon=0.0,
core_dist_n_jobs=1
)
labels = clusterer.fit_predict(X_space)
centers = None
labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th)
chosen_k = int(len(set([l for l in labels if l >= 0])))
return labels, centers, chosen_k
# Choose k (global rule or kneedle), then per-bucket multiplier
if bool(auto_k):
if use_lsa and isinstance(X_space, np.ndarray):
k, _ = choose_k_by_kneedle(X_space, ks=(50, 100, 150, 200, 300, 400, 500))
else:
k = auto_k_rule(X_space.shape[0])
else:
k = max(10, int(k_clusters or 350))
k = int(max(2, round(k * _bucket_k_multiplier(bucket_name))))
k = min(k, n)
init = None
if use_lsa and isinstance(X_space, np.ndarray) and count_vec is not None:
seeds = seeded_centroids_in_lsa(
CORR_LEX, count_vec, svd_obj.components_, norm_obj,
d_word=d_word, d_full=(d_word + d_char), k=k
)
if seeds is not None and seeds.shape[0] == k:
init = seeds
kmeans = MiniBatchKMeans(
n_clusters=k,
batch_size=int(mb_batch or 4096),
random_state=0,
n_init="auto" if init is None else 1,
init="k-means++" if init is None else init
)
labels = kmeans.fit_predict(X_space)
centers = kmeans.cluster_centers_ if hasattr(kmeans, "cluster_centers_") else None
if use_lsa and centers is not None:
labels = merge_close_clusters(labels, centers, thresh=0.95)
labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th)
chosen_k = int(len(set(labels)))
return labels, centers, chosen_k
# ---- Begin processing ----
trusted = set([d.strip().lower() for d in (trusted_domains_in or "").split(",") if d.strip()])
extra_terms = [t.strip() for t in (extra_keywords_in or "").split(",") if t.strip()]
extra_terms_lower = [t.lower() for t in extra_terms]
# Handle Gradio file object
try:
infile_path = inbox_file.name
except Exception:
infile_path = str(inbox_file) if inbox_file else ""
recs = _load_json_records(infile_path)
if not recs:
return ("**No valid records found.**",
None, None, None, None, None,
None, None,
None,
None, None, None,
None, None,
None, None, None, None,
None, None, None,
None, None,
None)
normd = []
for r in tqdm(recs, desc="Normalize", leave=False):
out = normalize_email_record(r, use_langdetect=(not bool(skip_lang)))
if out and out.get("body_text") is not None:
normd.append(out)
df = pd.DataFrame(normd)
if df.empty:
return ("**No usable email records.**",
None, None, None, None, None,
None, None,
None,
None, None, None,
None, None,
None, None, None, None,
None, None, None,
None, None,
None)
df = df.drop_duplicates(subset=["message_id", "subject", "text_hash"]).reset_index(drop=True)
df["tags"] = df["body_text"].fillna("").map(has_suspect_tag)
df = compute_sentiment_column(df)
# Stage-1 routing (bucketing)
df["bucket"] = df.apply(route_email_row, axis=1)
df["is_news"] = df.apply(lambda r: is_news_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_domain", "")), axis=1)
df["is_notify"] = df.apply(lambda r: is_notification_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_email", ""), r.get("from_domain", "")), axis=1)
df.loc[df["is_news"] == True, "bucket"] = "Newsletters/Alerts"
df.loc[df["is_notify"] == True, "bucket"] = "IT/Security"
# Flags
flags = []
for _, row in df.iterrows():
f = []
h = _hour_of(row.get("date") or "")
if h is not None and (h < 6 or h > 22):
f.append("odd-hours")
fd = (row.get("from_domain") or "").lower()
if (fd in PERSONAL_DOMAINS) and (fd not in trusted):
f.append("personal-mail")
f += _attachment_flags(row.get("attachments") or [])
flags.append(f)
df["flags"] = flags
# Split out stable buckets
df_main = df[~df["bucket"].isin(["Newsletters/Alerts", "IT/Security"])].reset_index(drop=True)
df_news = df[df["bucket"] == "Newsletters/Alerts"].reset_index(drop=True)
df_alerts = df[df["bucket"] == "IT/Security"].reset_index(drop=True)
# Optional embeddings
kv = None
emb_dim = 0
if bool(use_embeddings):
kv, emb_dim = _load_embeddings(embeddings_path or "", bool(embeddings_binary))
# Build partitions: per language within bucket if requested
parts = []
if bool(per_language):
for bkt, g_bucket in df_main.groupby("bucket", dropna=False):
for lang_code, grp in g_bucket.groupby("lang", dropna=False):
parts.append(((bkt, lang_code), grp.copy()))
else:
for bkt, grp in df_main.groupby("bucket", dropna=False):
parts.append(((bkt, "all"), grp.copy()))
labels_list, cluster_name_list, anomaly_list = [], [], []
bucket_indexers = []
X_reduced_holder = None
term_names_global = {}
single_partition = (len(parts) == 1)
d_word_agg, d_char_agg = 0, 0
svd_obj_local, norm_obj_local = None, None
for (bucket_name, _lang), df_part in parts:
if df_part.empty:
continue
texts, subjects_only = _make_texts(df_part)
X_full, count_vec, char_vec, _, d_word, d_char, _ = _vectorize_block(
texts=texts,
use_bigrams=bool(use_bigrams),
max_features=int(max_features or 120000),
min_df=int(min_df or 3),
max_df=float(max_df or 0.7),
use_hashing=bool(use_hashing),
hash_bits=int(hash_bits or 18),
)
d_word_agg += d_word
d_char_agg += d_char
X_reduced, svd_obj_local, norm_obj_local = _reduce_space(X_full, bool(use_lsa), int(lsa_dim or 256))
X_space = (X_reduced if X_reduced is not None else X_full)
if kv:
X_space, _ = _attach_embeddings(
texts, X_space, bool(use_lsa) and X_reduced is not None, kv, emb_dim, float(embed_weight)
)
anomaly_scores = np.full((len(df_part),), np.nan, dtype=np.float32)
if X_reduced is not None and bool(use_iso) and ISO_OK and X_reduced.shape[0] >= 50:
try:
iso = IsolationForest(n_estimators=100, contamination="auto", random_state=0).fit(X_reduced)
anomaly_scores = (-iso.score_samples(X_reduced)).astype(np.float32)
except Exception:
pass
labels, _, _ = _cluster_space(
X_space=X_space,
bucket_name=bucket_name,
df_part=df_part,
use_lsa=bool(use_lsa) and X_reduced is not None,
use_hdbscan=bool(use_hdbscan),
hdb_min_cluster=int(hdb_min_cluster or 60),
hdb_min_samples=int(hdb_min_samples or 0),
auto_k=bool(auto_k),
k_clusters=int(k_clusters or 350),
mb_batch=int(mb_batch or 4096),
count_vec=count_vec,
svd_obj=svd_obj_local,
norm_obj=norm_obj_local,
d_word=d_word,
d_char=d_char,
)
term_names = cluster_labels_pmi_bigram(
texts=texts, labels=labels, subjects=subjects_only,
topn=6, subject_alpha=0.75, global_ubiq_cut=0.20, subject_min_cov=0.30
)
bucket_indexers.append(df_part.index)
labels_list.append(pd.Series(labels, index=df_part.index))
cluster_name_list.append(pd.Series([term_names.get(int(c), "noise" if int(c) < 0 else f"cluster_{int(c)}") for c in labels], index=df_part.index))
anomaly_list.append(pd.Series(anomaly_scores, index=df_part.index))
term_names_global.update({int(k): v for k, v in term_names.items()})
if single_partition and X_reduced is not None:
X_reduced_holder = X_reduced
if labels_list:
df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_id"] = pd.concat(labels_list).sort_index()
df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_name"] = pd.concat(cluster_name_list).sort_index()
df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "anomaly_score"] = pd.concat(anomaly_list).sort_index()
else:
df_main["cluster_id"] = -10
df_main["cluster_name"] = "unclustered"
df_main["anomaly_score"] = np.nan
# Assign fixed ids for news/alerts buckets
if len(df_news):
df_news.loc[:, "cluster_id"] = -1
df_news.loc[:, "cluster_name"] = "newsletter/news"
df_news.loc[:, "anomaly_score"] = np.nan
if len(df_alerts):
df_alerts.loc[:, "cluster_id"] = -2
df_alerts.loc[:, "cluster_name"] = "system/alerts"
df_alerts.loc[:, "anomaly_score"] = np.nan
# Merge back
df = pd.concat([df_main, df_news, df_alerts], ignore_index=True)
# Label de-dup pass per-bucket
df = dedupe_all_labels(df)
# Scores
df["corruption_score"] = df.apply(lambda r: corruption_score(r, trusted_domains=trusted), axis=1)
df = compute_context_anomaly(df)
# Surveillance campaigns
wl = [w.strip() for w in (watchlist_in or "").split(",") if w.strip()]
ent_df, samp_df = detect_surveillance_campaigns(df, watchlist=wl, min_mentions=int(min_mentions or 15))
df = tag_surveillance_emails(df, ent_df, threshold=4.5)
# Build indexes/search
index_obj = None
use_faiss_flag = bool(use_faiss) and FAISS_OK and bool(use_lsa) and (X_reduced_holder is not None) and single_partition
if use_faiss_flag:
d = X_reduced_holder.shape[1]
index_obj = faiss.IndexFlatIP(d)
index_obj.add(X_reduced_holder)
else:
try:
if bool(use_lsa) and X_reduced_holder is not None and single_partition:
nn = NearestNeighbors(metric="cosine", algorithm="brute").fit(X_reduced_holder)
index_obj = nn
else:
index_obj = NearestNeighbors(metric="cosine", algorithm="brute").fit(np.zeros((1, 4), dtype=np.float32))
except Exception:
pass
# Summaries
cluster_counts = (
df.groupby(["bucket", "cluster_id", "cluster_name"])
.size()
.reset_index(name="count")
.sort_values("count", ascending=False)
.head(500)
)
cluster_counts["label"] = cluster_counts.apply(
lambda r: f'{r["bucket"]} — {int(r["cluster_id"])} — {r["cluster_name"]} ({int(r["count"])})', axis=1
)
cluster_choices = ["(any)"] + cluster_counts["label"].tolist()
bucket_choices = ["(any)"] + sorted(df["bucket"].dropna().unique().tolist())
domain_counts = (
df.groupby("from_domain").size().reset_index(name="count").sort_values("count", ascending=False).head(100)
)
domain_choices = ["(any)"] + domain_counts["from_domain"].tolist()
sender_counts = (
df.groupby("from_email").size().reset_index(name="count").sort_values("count", ascending=False).head(200)
)
sender_choices = ["(any)"] + sender_counts["from_email"].tolist()
langs = sorted([l for l in df["lang"].dropna().unique() if l and l != "unknown"])
lang_choices = ["(any)"] + langs
actors = social_stats(df)
offp = df[df["flags"].apply(lambda xs: "odd-hours" in xs or "personal-mail" in xs)]
offhours_table = (
offp[["date", "from_email", "subject", "flags", "corruption_score"]]
.sort_values("corruption_score", ascending=False)
.head(200)
)
out_table = _sort_results(df, "context_anomaly_score", "desc")
vec_state = {
"use_hashing": bool(use_hashing),
"hash_bits": int(hash_bits),
"max_features": int(max_features),
"min_df": int(min_df),
"max_df": float(max_df),
"use_bigrams": bool(use_bigrams),
}
status_md = f"**Processed {len(df):,} emails** | clusters ~ {len(cluster_counts):,} (showing top 500)"
svd_obj_out = svd_obj_local if single_partition else None
norm_obj_out = norm_obj_local if single_partition else None
return (
status_md, # status
cluster_counts, domain_counts, sender_counts, # summaries
actors, offhours_table, # extra summaries
ent_df, samp_df, # surveillance tables
out_table, # results table
df, vec_state, X_reduced_holder, # states
index_obj, term_names_global, # index + labels
bool(use_lsa), use_faiss_flag, # flags
gr.update(choices=cluster_choices, value="(any)"),
gr.update(choices=domain_choices, value="(any)"),
gr.update(choices=sender_choices, value="(any)"),
gr.update(choices=lang_choices, value="(any)"),
svd_obj_out, norm_obj_out, (d_word_agg, d_char_agg),
extra_terms_lower, bool(highlight_toggle),
gr.update(choices=bucket_choices, value="(any)")
)
# Bind Process button
(run_btn.click)(
process_file,
inputs=[
inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang,
use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso,
trusted_domains_in, extra_keywords_in, highlight_toggle,
use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples,
per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary,
watchlist_in, min_mentions
],
outputs=[
status, cluster_counts_df, domain_counts_df, sender_counts_df,
actors_df, offhours_df,
surv_entities_df, surv_samples_df,
results_df,
state_df, state_vec, state_X_reduced,
state_index, state_term_names,
state_use_lsa, state_use_faiss,
cluster_drop, domain_drop, sender_drop, lang_drop,
state_svd, state_norm, state_dims,
state_extra_terms, state_highlight,
bucket_drop,
],
).then(
# remember the uploaded file for future "Update" runs
lambda f: f, inputs=[inbox_file], outputs=[state_inbox]
)
# Keep state_inbox in sync whenever a new file is uploaded
inbox_file.change(lambda f: f, inputs=[inbox_file], outputs=[state_inbox])
# NEW: Bind Update button — re-run with the last uploaded file + current settings
update_btn.click(
process_file,
inputs=[
state_inbox, max_features, min_df, max_df, use_bigrams, skip_lang,
use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso,
trusted_domains_in, extra_keywords_in, highlight_toggle,
use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples,
per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary,
watchlist_in, min_mentions
],
outputs=[
status, cluster_counts_df, domain_counts_df, sender_counts_df,
actors_df, offhours_df,
surv_entities_df, surv_samples_df,
results_df,
state_df, state_vec, state_X_reduced,
state_index, state_term_names,
state_use_lsa, state_use_faiss,
cluster_drop, domain_drop, sender_drop, lang_drop,
state_svd, state_norm, state_dims,
state_extra_terms, state_highlight,
bucket_drop,
],
)
# -------- Filtering & Search --------
def refresh_results(df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, sort_by, sort_dir, hide_noise_flag):
if df is None or len(df) == 0:
return pd.DataFrame()
filt = _apply_filters(
df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, hide_noise_flag=bool(hide_noise_flag)
)
return _sort_results(filt, sort_by, sort_dir)
# Re-run when any filter control changes
for ctrl in [bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop,
date_start, date_end, sort_by, sort_dir, hide_noise]:
ctrl.change(
refresh_results,
inputs=[
state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop,
sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise
],
outputs=[results_df],
)
# Reset filters
reset_btn.click(
lambda: ["(any)"] * 7 + [""] * 2 + ["context_anomaly_score", "desc"] + [True],
[],
[bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop,
date_start, date_end, sort_by, sort_dir, hide_noise],
).then(
refresh_results,
inputs=[
state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop,
sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise,
],
outputs=[results_df],
)
# --- Search ---
def search_fn(q, df, vec, X_red, index, use_lsa, use_faiss, svd, norm, sort, sdir):
if not q or df is None or vec is None or index is None:
return pd.DataFrame(), []
# Search ignores newsletters/alerts/noise by default
mask = ~df["cluster_id"].isin([-1, -2, -3])
df_main = df[mask].reset_index(drop=True)
if df_main.empty:
return pd.DataFrame(), []
q_terms = _tokenize_query(q)
q_vec = _vectorize_query(q, vec, list(df_main.apply(enrich_text, axis=1)))
q_emb = _project_query_to_lsa(q_vec, svd, norm) if use_lsa and svd is not None and norm is not None else q_vec
if q_emb is None:
return pd.DataFrame(), q_terms
n_req = min(50, len(df_main))
if n_req <= 0:
return pd.DataFrame(), q_terms
if isinstance(index, NearestNeighbors):
if hasattr(index, "n_samples_fit_") and index.n_samples_fit_ <= 1:
return pd.DataFrame(), q_terms
dists, inds = index.kneighbors(q_emb, n_neighbors=n_req)
sims = 1.0 - dists[0]
results = df_main.iloc[inds[0]].copy()
results["search_score"] = sims
elif use_faiss and FAISS_OK and hasattr(index, "search"):
D, I = index.search(q_emb.astype(np.float32), k=n_req)
results = df_main.iloc[I[0]].copy()
results["search_score"] = D[0]
else:
return pd.DataFrame(), q_terms
return _sort_results(results, sort, sdir), q_terms
search_btn.click(
search_fn,
inputs=[
search_query, state_df, state_vec, state_X_reduced, state_index,
state_use_lsa, state_use_faiss, state_svd, state_norm, sort_by, sort_dir,
],
outputs=[results_df, state_query_terms],
)
# --- Reader selection (highlighting) ---
def on_row_select(evt: gr.SelectData, table, df, term_names, q_terms, extra_terms, do_highlight):
if evt.index is None or table is None or len(table) == 0 or df is None or len(df) == 0:
return ""
row_idx = evt.index[0]
sel = table.iloc[row_idx]
# Try to match the original row
cand = df[
(df["subject"] == sel.get("subject"))
& (df["from_email"] == sel.get("from_email"))
& (df["date"] == sel.get("date"))
]
if cand.empty:
cand = df[df["subject"] == sel.get("subject")]
if cand.empty:
return "Could not find original record."
row = cand.iloc[0]
cid = int(row.get("cluster_id", -99))
clabel = term_names.get(cid, row.get("cluster_name")) if term_names else row.get("cluster_name")
return build_highlighted_html(
row,
query_terms=q_terms,
cluster_label=f'{row.get("bucket","Other")} / {clabel}',
do_highlight=do_highlight,
extra_terms=extra_terms,
)
results_df.select(
on_row_select,
inputs=[results_df, state_df, state_term_names, state_query_terms, state_extra_terms, state_highlight],
outputs=[email_view],
)
# --- Click-to-filter helpers ---
def on_click_filter(evt: gr.SelectData, df_sum: pd.DataFrame, col_name: str, out_comp: gr.Dropdown):
if evt.index is None or df_sum is None or df_sum.empty:
return gr.update()
val = df_sum.iloc[evt.index[0]][col_name]
return gr.update(value=val)
def on_cluster_summary_select(evt: gr.SelectData, df_sum: pd.DataFrame):
if evt.index is None or df_sum is None or df_sum.empty:
return gr.update(), gr.update()
r = df_sum.iloc[evt.index[0]]
return gr.update(value=r["bucket"]), gr.update(value=r["label"])
cluster_counts_df.select(
on_cluster_summary_select, [cluster_counts_df], [bucket_drop, cluster_drop]
).then(
refresh_results,
inputs=[
state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop,
sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise
],
outputs=[results_df],
)
domain_counts_df.select(
lambda evt, df: on_click_filter(evt, df, "from_domain", domain_drop), [domain_counts_df], [domain_drop]
).then(
refresh_results,
inputs=[
state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop,
sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise
],
outputs=[results_df],
)
sender_counts_df.select(
lambda evt, df: on_click_filter(evt, df, "from_email", sender_drop), [sender_counts_df], [sender_drop]
).then(
refresh_results,
inputs=[
state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop,
sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise
],
outputs=[results_df],
)
if __name__ == "__main__":
# Disable SSR to avoid handler arity warnings under server-side rendering
demo.launch(ssr_mode=False)