import os, re, json, io, math, gc, uuid from datetime import datetime from typing import List, Dict, Any, Tuple, Optional import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from bs4 import BeautifulSoup import ftfy from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 import gradio as gr from tqdm import tqdm # sklearn (CPU-friendly) from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer as CharTfidf from sklearn.cluster import MiniBatchKMeans from sklearn.neighbors import NearestNeighbors from sklearn.decomposition import TruncatedSVD from sklearn.preprocessing import Normalizer from sklearn.preprocessing import normalize as sk_normalize from sklearn.metrics.pairwise import cosine_similarity # === NEW / UPDATED IMPORTS === from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer from scipy.sparse import csr_matrix try: import hdbscan # OPTIONAL (pip install hdbscan) HDBSCAN_OK = True except Exception: HDBSCAN_OK = False # Optional tiny/fast word vectors via Gensim (local .txt/.vec/.bin) try: from gensim.models import KeyedVectors # OPTIONAL GENSIM_OK = True except Exception: GENSIM_OK = False # Optional light anomaly detection try: from sklearn.ensemble import IsolationForest ISO_OK = True except Exception: ISO_OK = False from scipy.sparse import hstack # Optional fast ANN (CPU) try: import faiss # faiss-cpu on HF Space FAISS_OK = True except Exception: FAISS_OK = False # Optional tiny sentiment try: from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer VADER_OK = True except Exception: VADER_OK = False # ======== STAGE-1 TAXONOMY (Buckets) ======== TAXONOMY = { "Lobbyist": ["lobby","lobbyist","pac","influence"], "Campaign Finance": ["donation","contribution","fundraiser","pledge","campaign finance","pac"], "Procurement": ["contract","tender","rfp","rfq","bid","invoice","vendor","purchase order","po"], "HR/Admin": ["hiring","personnel","payroll","benefits","policy","vacation","pto"], "Constituent": ["constituent","concerned citizen","my issue","complaint","community"], "Scheduling": ["schedule","meeting","appointment","calendar","invite","availability","reschedule"], "Legal": ["legal","lawsuit","intake","attorney","counsel","privileged","court","subpoena","confidential"], "IT/Security": ["password","account security","two-factor","2fa","vpn","verification code","security alert","it support"], "Newsletters/Alerts": ["newsletter","daily briefing","news update","unsubscribe","press clip","digest"], "Other": [], } # header/domain cues (expand as you learn) LOBBY_DOMAINS = set() # e.g., {"acme-lobby.com"} LEGAL_DOMAINS = set() # e.g., {"biglaw.com","firmlaw.com"} def _contains_any(text: str, terms: list) -> bool: if not text or not terms: return False tl = text.lower() return any(t for t in terms if t and t.lower() in tl) def _bucket_header_bonus(row: pd.Series, bucket: str) -> float: fd = (row.get("from_domain") or "").lower() subj = (row.get("subject") or "") if bucket == "Newsletters/Alerts": return 5.0 if is_news_like(subj, row.get("body_text",""), fd) else 0.0 if bucket == "IT/Security": return 5.0 if is_notification_like(subj, row.get("body_text",""), row.get("from_email",""), fd) else 0.0 if bucket == "Constituent": return 3.0 if (fd in PERSONAL_DOMAINS) else 0.0 if bucket == "Lobbyist": return 5.0 if fd in LOBBY_DOMAINS else 0.0 if bucket == "Legal": return 5.0 if (("law" in fd) or (fd in LEGAL_DOMAINS) or ("privileged" in subj.lower())) else 0.0 if bucket == "Scheduling": body = (row.get("body_text") or "") return 3.0 if (ATTACH_NAME_RE.search(" ".join(row.get("attachments") or [])) or re.search(r"\binvitation\b|\binvite\b", subj, re.I) or re.search(r"\.ics\b", body, re.I)) else 0.0 return 0.0 MIN_ROUTE_SCORE = 1.5 TIE_MARGIN = 1.0 def route_email_row(row: pd.Series) -> str: text = f'{row.get("subject","")} {row.get("body_text","")}'.lower() scores: dict = {b: 0.0 for b in TAXONOMY.keys()} for b, terms in TAXONOMY.items(): if not terms: continue hits = sum(1 for t in terms if t and t.lower() in text) scores[b] += float(hits) if b in ("Lobbyist","Procurement") and any(p in text for p in SUSPECT_PHRASES): scores[b] += 1.0 for b in TAXONOMY.keys(): scores[b] += _bucket_header_bonus(row, b) best_bucket, best = max(scores.items(), key=lambda kv: kv[1]) second = sorted(scores.values(), reverse=True)[1] if len(scores) > 1 else 0.0 if best < MIN_ROUTE_SCORE or (best - second) < TIE_MARGIN: return "Other" return best_bucket # =================== Regex & Flags =================== TOKEN_PATTERN = r"(?u)\b\w[\w.@-]{1,}\b" URL_RE = re.compile(r"https?://\S+|www\.\S+", re.I) QUOTE_LINE_RE = re.compile(r"^>.*$", re.M) SIG_RE = re.compile(r"\n-- ?\n", re.M) SENT_FROM_RE = re.compile(r"\nSent from my .*$", re.M) HEBREW_SENT_FROM_RE = re.compile(r"\nנשלח מה.*$", re.M) FWD_BEGIN_RE = re.compile(r"^Begin forwarded message:", re.I | re.M) FWD_MSG_RE = re.compile(r"^[-\s]*Original Message[-\s]*$", re.I | re.M) ON_WROTE_RE = re.compile(r'^\s*On .* wrote:$', re.M) SKIP_LANGDETECT = True # ==== Expanded corruption lexicon ==== SUSPECT_PHRASES = [ "off the books","cover up","kickback","bribe","under the table", "no inspection","special fee","friendly payment","confidential deal", "nobody will find out","pay to play","cash only","shell company", "bid rigging","embezzle","slush fund","false invoice","ghost employee", "contract splitting","grease payment","unreported","unrecorded", "off the record","just between us","don’t quote me on this","dont quote me on this", "we never had this conversation","keep this between us","not ethical","illegal", "grey area","gray area","write off","failed investment","they owe it to me", "let’s take this offline","lets take this offline","send to my gmail","send to my yahoo", "don’t leave a trail","dont leave a trail","call my cell","text me","don’t text me","dont text me", "tell you on the phone","talk in person","come by my office","vpn", "tax haven","off-shore account","offshore account","backdate","pull earnings forward", "delete this email","no inspection","special fees","wire instructions", ] EVASIVE_ACRO_RE = re.compile(r'\b(?:TYOP|LDL|TOL|OTR|TXT|TYL)\b', re.I) MONEY_RE = re.compile(r'(\$|USD|EUR|ILS|NIS)\s?\d[\d,.\s]*', re.I) PHONE_RE = re.compile(r'(\+?\d{1,3}[-\s.]?)?(\(?\d{2,4}\)?[-\s.]?)?\d{3,4}[-\s.]?\d{4}') INVOICE_RE = re.compile(r'\b(invoice|inv\.\s?\d+|po\s?#?\d+|purchase order|wire)\b', re.I) COMPANY_RE = re.compile(r'\b(LLC|Ltd|Limited|Inc|GmbH|S\.A\.|S\.p\.A\.)\b') ATTACH_NAME_RE = re.compile(r'\b(agreement|contract|invoice|wire|payment|instructions|accounts?|offshore|tax|statement)\b', re.I) OFFCHANNEL_PATTERNS = [ r"\bwhatsapp\b", r"\bsignal\b", r"\btelegram\b", r"\bwechat\b", r"send to my (gmail|yahoo|protonmail)", r"(call|text) (me|my cell)", r"take this offline", r"don.?t (text|email) (me|this)", r"\bOTR\b", r"\bTOL\b", r"\bTYOP\b", r"\bLDL\b" ] OFFCHANNEL_RE = re.compile("|".join(OFFCHANNEL_PATTERNS), re.I) PERSONAL_DOMAINS = {"gmail.com","yahoo.com","outlook.com","hotmail.com","proton.me","protonmail.com","icloud.com","mail.ru","yandex.ru"} NEWS_DOMAINS = {"nytimes.com","ft.com","wsj.com","bloomberg.com","reuters.com","theguardian.com","economist.com"} def is_news_like(subject: str, body: str, from_domain: str) -> bool: s = (subject or "").lower() b = (body or "").lower() fd = (from_domain or "").lower() if "unsubscribe" in b or "manage preferences" in b: return True if any(k in s for k in ["daily briefing","morning update","newsletter","top stories"]): return True if any(d in fd for d in NEWS_DOMAINS): return True return False NOTIFY_PATTERNS = [ r"\bno[-\s]?reply\b", r"do not reply", r"security alert", r"new sign[-\s]?in", r"verification code", r"two[-\s]?factor", r"\b2fa\b", r"\botp\b", r"\bcode[:\s]", r"itunes connect", r"apple id", r"your google account", r"used (?:a )?new browser", r"unable to determine", r"reset your password", r"\balert\b", r"mailer[-\s]?daemon", r"\bpostmaster\b", r"delivery status notification", r"undeliverable", r"delivery failure", r"returned mail", r"mail delivery subsystem", r"proofpoint", r"mimecast", r"dmarc", r"\bspf\b", r"\bdkim\b", r"quarantine", r"spam digest", r"phishing", r"security gateway", r"mail[-\s]?secure|secure message" ] NOTIFY_RE = re.compile("|".join(NOTIFY_PATTERNS), re.I) def is_notification_like(subject: str, body: str, from_email: str, from_domain: str) -> bool: s = (subject or "").lower() b = (body or "").lower() fe = (from_email or "").lower() if NOTIFY_RE.search(s) or NOTIFY_RE.search(b): return True if re.search(r"noreply|no-reply|donotreply", fe): return True return False HEB_RE = re.compile(r'[\u0590-\u05FF]') AR_RE = re.compile(r'[\u0600-\u06FF]') CYR_RE = re.compile(r'[\u0400-\u04FF]') def fast_lang_heuristic(text: str) -> str: t = text or "" if HEB_RE.search(t): return "he" if AR_RE.search(t): return "ar" if CYR_RE.search(t): return "ru" letters = sum(ch.isalpha() for ch in t) ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in t) if letters and ascii_letters / max(1, letters) > 0.85: return "en" return "unknown" CORR_LEX = { "kickback" : ["kickback","bribe","under the table","gift","cash"], "invoice_fraud" : ["false invoice","ghost employee","contract splitting","slush fund","shell company","front company"], "procurement" : ["bid rigging","tender","vendor","sole source","rfp","rfq","purchase order","po"], "money_flow" : ["wire transfer","transfer","swift","iban","routing number","account number","cash"] } # =================== Label cleanup helpers =================== EN_STOP = { "the","of","and","to","in","is","for","on","at","with","from","by","or","as", "that","this","it","be","are","was","were","an","a","you","your","we","our","us", "re","fwd","fw","hi","hello","thanks","thank","regards","best","please","dear","mr","mrs", "message","original","forwarded","attached","attachment","confidential","notice","disclaimer", "herein","thereof","hereby","therein","regarding","subject","url","via","kind","regard","ny" } HE_STOP = {"של","על","זה","גם","אם","לא","את","אתה","אני","הוא","היא","הם","הן","כי","מה","שלום","תודה","בברכה","מצורף","הודעה","קדימה","היי"} MONTHS = { "jan","feb","mar","apr","may","jun","jul","aug","sep","sept","oct","nov","dec", "january","february","march","april","june","july","august","september", "october","november","december" } STOP_TERMS = { "div","span","nbsp","href","src","img","class","style","align","border","cid", "content","content-type","multipart","alternative","quoted","printable","utf", "windows-1255","iso-8859","us-ascii","html","plain","attachment","filename", "type","id","service","person","generated","fyi" } AUX_STOP = { "will","would","should","could","can","cant","cannot","did","do","does","done", "have","has","had","having","get","got","make","made","let","need","want", "not","dont","didnt","isnt","arent","wasnt","werent","im","youre","hes","shes", "weve","ive","theyre","its","ok","okay","pls","please","thx","thanks","regards","best", "hi","hello","dear","re","fw","fwd","via","kind" } CTA_STOP = { "click","here","unsubscribe","view","browser","mailto","reply","iphone","android", "press","link","below","above","update","newsletter","manage","preferences", "לחץ","כאן","נשלח","מה","מה-iphone","הטלפון" } TECH_META = { "quot","nbsp","cid","href","src","img","class","style","div","span","http","https", "content","content-type","multipart","alternative","quoted","printable","utf", "windows-1255","iso-8859","us-ascii","attachment","filename" } ZH_HEADER_STOP = {"发送时间","星期","星期一","星期二","星期三","星期四","星期五","星期六","星期日","转发","主题","收件人","发件人"} HE_EXTRA_STOP = {"עם","או"} STOP_TERMS |= AUX_STOP | CTA_STOP | TECH_META | ZH_HEADER_STOP | HE_EXTRA_STOP EMAIL_LIKE_RE = re.compile(r"@|^[\w\-]+\.(com|net|org|ru|us|il|ch|co|io|uk|de|fr|it)$", re.I) YEAR_RE = re.compile(r"^(19|20)\d{2}$") NUMERIC_RE = re.compile(r"^\d+([.,:/-]\d+)*$") ONE_CHAR_RE = re.compile(r"^.$") LONG_ALNUM_RE = re.compile(r"^[A-Za-z0-9_-]{24,}$") HEXISH_RE = re.compile(r"^(?:[A-Fa-f0-9]{8,})$") DIGIT_HEAVY_RE = re.compile(r"^(?:\D*\d){6,}\D*$") UNDERSCORE_HEAVY_RE = re.compile(r"^[A-Za-z0-9]*_[A-Za-z0-9_]*$") STOPWORD_FOR_VEC = sorted(EN_STOP | HE_STOP | STOP_TERMS) def _is_junk_term(t: str) -> bool: tl = (t or "").strip().lower() if not tl: return True if tl in STOP_TERMS or tl in EN_STOP or tl in HE_STOP or tl in MONTHS: return True if EMAIL_LIKE_RE.search(tl): return True if YEAR_RE.match(tl): return True if NUMERIC_RE.match(tl): return True if ONE_CHAR_RE.match(tl): return True if LONG_ALNUM_RE.match(t): return True if HEXISH_RE.match(t): return True if DIGIT_HEAVY_RE.match(t): return True if UNDERSCORE_HEAVY_RE.match(t): return True if len(t) > 40: return True return False def _sanitize_top_terms(names: np.ndarray, idxs: np.ndarray, mean_vec: np.ndarray, want:int) -> list: ordered = idxs[np.argsort(-mean_vec[idxs])] cleaned = [] for i in ordered: term = names[i] if _is_junk_term(term): continue cleaned.append(term) if len(cleaned) >= want: break if len(cleaned) < max(2, want//2): for i in ordered: term = names[i] if EMAIL_LIKE_RE.search(term) or YEAR_RE.match(term.lower()): continue if term not in cleaned: cleaned.append(term) if len(cleaned) >= want: break return cleaned # =================== HTML/Text & Header Parsing =================== def html_to_text(html: str) -> str: if not html: return "" soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style"]): tag.decompose() return soup.get_text(separator="\n") def strip_quotes_and_sigs(text: str) -> str: if not text: return "" text = QUOTE_LINE_RE.sub("", text) parts = SIG_RE.split(text) if parts: text = parts[0] text = SENT_FROM_RE.sub("", text) text = HEBREW_SENT_FROM_RE.sub("", text) cut = None for pat in (FWD_BEGIN_RE, FWD_MSG_RE, ON_WROTE_RE): m = pat.search(text) if m: idx = m.start() cut = idx if (cut is None or idx < cut) else cut if cut is not None: text = text[:cut] text = re.sub(r"\n\s*sent from my .*?$", "", text, flags=re.I|re.M) text = re.sub(r"\n\s*(נשלח מה-?iphone).*?$", "", text, flags=re.I|re.M) return text.strip() def parse_name_email(s: str) -> Tuple[str, str]: if not s: return "", "" m = re.match(r'(?:"?([^"]*)"?\s)?]+@[^<>]+)>?', s) if m: return (m.group(1) or "").strip(), (m.group(2) or "").strip() return "", s.strip() def parse_multi_emails(s: str) -> List[str]: if not s: return [] parts = re.split(r",\s*(?=[^,]*@)", s) emails = [] for p in parts: _, e = parse_name_email(p.strip()) if e: emails.append(e) return emails def parse_email_headers(text: str) -> Tuple[Dict[str, str], str]: headers: Dict[str, str] = {} lines = (text or "").splitlines() header_pat = re.compile(r'^(From|To|Cc|CC|Bcc|Date|Subject|Subject:|To:|Cc:|Bcc:|From:):') i = 0 saw_header = False while i < len(lines): line = lines[i].rstrip("\r") stripped = line.strip() if stripped == "": i += 1 break if header_pat.match(line): saw_header = True key, rest = line.split(":", 1) key = key.strip() value = rest.strip() if value == "": j = i + 1 cont = [] while j < len(lines): nxt = lines[j].rstrip("\r") nxts = nxt.strip() if nxts == "" or header_pat.match(nxt): break if key.lower() == "subject": if FWD_BEGIN_RE.match(nxts) or FWD_MSG_RE.match(nxts) or ON_WROTE_RE.match(nxts): break if len(cont) > 0: break cont.append(nxts) j += 1 value = " ".join(cont) i = j else: i += 1 headers[key] = value continue else: if saw_header: break else: break body_text = "\n".join(lines[i:]) if i < len(lines) else "" return headers, body_text # =================== Normalization & Utilities =================== def normalize_email_record(raw: Dict[str, Any], use_langdetect: bool) -> Dict[str, Any]: if str(raw.get("type", "")).lower() == "meta": return {} attach_names = [] atts = raw.get("attachments") or raw.get("Attachments") or raw.get("files") or [] if isinstance(atts, list): for a in atts: if isinstance(a, dict): name = a.get("filename") or a.get("name") or "" else: name = str(a) if name: attach_names.append(str(name)) body_text_raw = raw.get("body_text") or raw.get("text") or "" html_content = raw.get("body_html") or raw.get("html") or "" if html_content and not body_text_raw: body_text_raw = html_to_text(html_content) body_text_raw = ftfy.fix_text(body_text_raw or "") subject_text = "" from_name = from_email = from_domain = "" to_emails: List[str] = [] date_val = raw.get("date") or raw.get("Date") or "" if body_text_raw: headers, body_only = parse_email_headers(body_text_raw) subject_text = headers.get("Subject", "") or raw.get("subject") or raw.get("Subject") or "" sender = headers.get("From", "") or raw.get("from") or raw.get("From") or "" date_val = headers.get("Date", "") or date_val to_emails = parse_multi_emails(headers.get("To","") or (raw.get("to") or "")) + \ parse_multi_emails(headers.get("Cc","") or (raw.get("cc") or "")) body_clean = strip_quotes_and_sigs(ftfy.fix_text(body_only or "")) body_clean = URL_RE.sub(" URL ", body_clean) body_clean = re.sub(r"\s+", " ", body_clean).strip() body_text = body_clean from_name, from_email = parse_name_email(sender) from_domain = from_email.split("@")[-1].lower() if "@" in from_email else "" else: subject_text = ftfy.fix_text(raw.get("subject") or raw.get("Subject") or "").strip() body_text = ftfy.fix_text(raw.get("body_text") or raw.get("text") or "") body_text = URL_RE.sub(" URL ", body_text) body_text = strip_quotes_and_sigs(body_text) body_text = re.sub(r"\s+", " ", body_text).strip() sender = raw.get("from") or raw.get("From") or "" from_name, from_email = parse_name_email(sender) from_domain = from_email.split("@")[-1].lower() if "@" in from_email else "" to_emails = parse_multi_emails(raw.get("to") or "") + parse_multi_emails(raw.get("cc") or "") subject_norm = re.sub(r"\s+", " ", subject_text or "").strip() if use_langdetect: try: lang = detect((subject_norm + " " + body_text[:5000]).strip()) if (subject_norm or body_text) else "unknown" except Exception: lang = fast_lang_heuristic(subject_norm + " " + (body_text or "")) else: lang = fast_lang_heuristic(subject_norm + " " + (body_text or "")) iso_date = "" if isinstance(date_val, (int, float)): try: iso_date = pd.to_datetime(int(date_val), unit="s", utc=True).isoformat() except Exception: iso_date = "" elif isinstance(date_val, str) and date_val: iso_date = pd.to_datetime(date_val, utc=True, errors="coerce").isoformat() msg_id = raw.get("message_id") or raw.get("Message-ID") or "" if not msg_id: msg_id = f"gen-{uuid.uuid4().hex}" thread_key = subject_norm or (from_email + body_text[:120]) thread_id = str(pd.util.hash_pandas_object(pd.Series([thread_key], dtype="string")).astype("uint64").iloc[0]) text_hash = str(pd.util.hash_pandas_object(pd.Series([body_text], dtype="string")).astype("uint64").iloc[0]) if body_text else "" return { "message_id": str(msg_id), "thread_id": thread_id, "date": iso_date, "from_name": from_name, "from_email": from_email, "from_domain": from_domain, "to_emails": to_emails, "subject": subject_norm, "body_text": body_text, "lang": lang, "attachments": attach_names, "text_hash": text_hash, } def has_suspect_tag(text: str) -> List[str]: tags = [] if not text: return tags low = text.lower() for phrase in SUSPECT_PHRASES: if phrase in low: tags.append("🚩suspect") break if "invoice" in low or "payment" in low or "contract" in low: tags.append("finance") if "wire" in low or "transfer" in low or "cash" in low: if "finance" not in tags: tags.append("finance") if OFFCHANNEL_RE.search(low) or EVASIVE_ACRO_RE.search(low): tags.append("off-channel") return tags def compute_sentiment_column(df: pd.DataFrame) -> pd.DataFrame: if not VADER_OK: df["sentiment_score"] = np.nan df["sentiment"] = "(unknown)" return df analyzer = SentimentIntensityAnalyzer() scores = df["body_text"].fillna("").map(lambda t: analyzer.polarity_scores(t)["compound"]) bins = [-1.01, -0.05, 0.05, 1.01] labels = ["negative", "neutral", "positive"] df["sentiment_score"] = scores df["sentiment"] = pd.cut(df["sentiment_score"], bins=bins, labels=labels, include_lowest=True) return df def _compile_highlight_terms(row: pd.Series, extra_terms: List[str]) -> List[str]: terms = [] txt = (row.get("subject","") + " " + row.get("body_text","")).lower() for p in SUSPECT_PHRASES: if p in txt: terms.append(p) if MONEY_RE.search(txt): terms.append("$") if INVOICE_RE.search(txt): terms.append("invoice") if PHONE_RE.search(row.get("body_text","") or ""): terms.append("phone") for t in extra_terms or []: t=t.strip() if t and t.lower() in txt: terms.append(t) out, seen = [], set() for t in terms: if t.lower() not in seen: out.append(t); seen.add(t.lower()) return out[:24] def build_highlighted_html(row: pd.Series, query_terms: Optional[List[str]] = None, cluster_label: Optional[str] = None, do_highlight: bool = True, extra_terms: Optional[List[str]] = None) -> str: subject = (row.get("subject") or "").strip() body = (row.get("body_text") or "").strip() from_email = row.get("from_email") or "" date = row.get("date") or "" tags = row.get("tags") or [] flags = row.get("flags") or [] sentiment = row.get("sentiment") or "(unknown)" hl_terms = [] if do_highlight: hl_terms = (query_terms or []) + _compile_highlight_terms(row, extra_terms or []) seen=set(); uniq=[] for t in hl_terms: tl=t.lower() if tl and tl not in seen: uniq.append(t); seen.add(tl) hl_terms = uniq[:24] def hi(text: str) -> str: if not text or not do_highlight or not hl_terms: return text out = text for qt in hl_terms: if not qt: continue try: pat = re.compile(re.escape(qt), re.I) out = pat.sub(lambda m: f"{m.group(0)}", out) except Exception: pass return out subject_h = hi(subject) body_h = hi(body) rtl = bool(re.search(r"[\u0590-\u08FF]", body_h)) dir_attr = ' dir="rtl"' if rtl else "" body_html = body_h.replace("\n", "
") def pill(s, cls="tag"): return f'{s}' tag_html = "" pills = [] if isinstance(tags, list) and tags: pills += [pill(t, "tag") for t in tags] if isinstance(flags, list) and flags: pills += [pill(f, "tag") for f in flags] if pills: tag_html = " ".join(pills) cluster_html = f'{cluster_label or ""}' if cluster_label else "" html = ( f'
' f'
' f'
' f'
{subject_h or "(no subject)"}
' f'
From: {from_email} • Date: {date or "—"}
' f'
' f'
' f' {cluster_html}' f' sentiment: {sentiment}' f' {tag_html}' f'
' f'
' f'
' f' {body_html}' f'
' f'
' ) return html # ---------- Lightweight Embedding Utilities (Optional) ---------- def _load_embeddings(emb_path: str, binary: bool): if not GENSIM_OK or not emb_path or not os.path.exists(emb_path): return None, 0 try: if binary: kv = KeyedVectors.load_word2vec_format(emb_path, binary=True) else: kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=False) return kv, int(kv.vector_size) except Exception: try: kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=True) return kv, int(kv.vector_size) except Exception: return None, 0 def _avg_embed_for_text(text: str, kv, dim: int) -> np.ndarray: vec = np.zeros((dim,), dtype=np.float32) if not kv or not text: return vec toks = re.findall(TOKEN_PATTERN, text.lower()) cnt = 0 for t in toks: if t in kv: vec += kv[t] cnt += 1 if cnt > 0: vec /= float(cnt) n = np.linalg.norm(vec) if n > 0: vec /= n return vec def _build_doc_embeddings(texts: List[str], kv, dim: int) -> np.ndarray: if not kv or dim <= 0: return np.zeros((len(texts), 0), dtype=np.float32) out = np.zeros((len(texts), dim), dtype=np.float32) for i, t in enumerate(texts): out[i, :] = _avg_embed_for_text(t or "", kv, dim) return out # =================== Feature engineering (BM25 + char) =================== class BM25Transformer: def __init__(self, k1=1.2, b=0.75): self.k1 = k1 self.b = b self.idf_ = None self.avgdl_ = None def fit(self, X): N = X.shape[0] df = np.bincount(X.tocsc().indices, minlength=X.shape[1]).astype(np.float64) self.idf_ = np.log((N - df + 0.5) / (df + 0.5 + 1e-12)) dl = np.asarray(X.sum(axis=1)).ravel() self.avgdl_ = float(dl.mean() if dl.size else 1.0) return self def transform(self, X): X = X.tocsr(copy=True).astype(np.float32) dl = np.asarray(X.sum(axis=1)).ravel() k1, b, avgdl = self.k1, self.b, self.avgdl_ rows, cols = X.nonzero() data = X.data for i in range(len(data)): tf = data[i] d = rows[i] denom = tf + k1 * (1 - b + b * (dl[d] / (avgdl + 1e-12))) data[i] = (self.idf_[cols[i]] * (tf * (k1 + 1))) / (denom + 1e-12) return X def enrich_text(row: pd.Series) -> str: subj = row.get("subject","") or "" body = row.get("body_text","") or "" t = subj + "\n\n" + body tokens = [] if MONEY_RE.search(t): tokens.append("__HAS_MONEY__") if PHONE_RE.search(t): tokens.append("__HAS_PHONE__") if INVOICE_RE.search(t): tokens.append("__HAS_INVOICE__") if COMPANY_RE.search(t): tokens.append("__HAS_COMPANY__") if OFFCHANNEL_RE.search(t): tokens.append("__OFF_CHANNEL__") lang_tok = f'__LANG_{(row.get("lang") or "unk").lower()}__' tokens.append(lang_tok) return (t + " " + " ".join(tokens)).strip() # =================== Cluster labeling: PMI + class-TFIDF + SUBJECT BOOST (+ coverage ≥30% preference) =================== def cluster_labels_pmi_bigram( texts, labels, subjects=None, topn=6, subject_alpha=0.75, global_ubiq_cut=0.20, subject_min_cov=0.30 ): import math as _math from collections import Counter, defaultdict from sklearn.feature_extraction.text import TfidfVectorizer HEADER_STOP = {"subject","re","fw","fwd","to","cc","bcc","from","sent","forwarded", "回复","主题","收件人","发件人"} def is_junk_token(tok: str) -> bool: if _is_junk_term(tok): return True tl = tok.lower() if tl.startswith("__"): return True if "@" in tl: return True if tl.isascii() and len(tl) <= 2: return True if LONG_ALNUM_RE.match(tok) or HEXISH_RE.match(tok) or DIGIT_HEAVY_RE.match(tok): return True if len(tok) > 40: return True if re.search(r"[^\w\-’']", tl): return True return False def tokenize_clean(t): toks = re.findall(TOKEN_PATTERN, (t or "").lower()) return [w for w in toks if not is_junk_token(w)] def ngrams(toks, n): return [" ".join(p) for p in zip(*[toks[i:] for i in range(n)]) if all(not is_junk_token(x) for x in p)] glob_df_uni = Counter() glob_df_bg = Counter() glob_df_tri = Counter() per_c_bg = defaultdict(Counter) per_c_tri = defaultdict(Counter) per_c_texts = defaultdict(list) per_c_doc_count = defaultdict(int) per_c_subj_uni_docs = defaultdict(Counter) per_c_subj_bg_docs = defaultdict(Counter) per_c_subj_tri_docs = defaultdict(Counter) have_subjects = subjects is not None and len(subjects) == len(texts) for idx, (txt, c) in enumerate(zip(texts, labels)): c = int(c) toks = tokenize_clean(txt) uni_set = set(toks) bg_set = set(ngrams(toks, 2)) tri_set = set(ngrams(toks, 3)) glob_df_uni.update(uni_set) glob_df_bg.update(bg_set) glob_df_tri.update(tri_set) per_c_bg[c].update(bg_set) per_c_tri[c].update(tri_set) per_c_texts[c].append(" ".join(toks)) per_c_doc_count[c] += 1 if have_subjects: stoks = tokenize_clean(subjects[idx] or "") s_uni = set(stoks) s_bg = set(ngrams(stoks, 2)) s_tri = set(ngrams(stoks, 3)) per_c_subj_uni_docs[c].update(s_uni) per_c_subj_bg_docs[c].update(s_bg) per_c_subj_tri_docs[c].update(s_tri) N = max(1, len(texts)) def too_ubiquitous(df_count): return (df_count / float(N)) > float(global_ubiq_cut) labels_out = {} for c in sorted(set(int(x) for x in labels)): n_docs_c = max(1, per_c_doc_count[c]) phrases = [] for store, glob_df, subj_docs, n in ( (per_c_bg[c], glob_df_bg, per_c_subj_bg_docs[c], 2), (per_c_tri[c], glob_df_tri, per_c_subj_tri_docs[c], 3), ): total_c = sum(store.values()) + 1e-12 total_g = sum(glob_df.values()) + 1e-12 scored = [] for ng, cnt in store.most_common(3000): if too_ubiquitous(glob_df[ng]): continue p_ng_c = cnt / total_c p_ng_g = (glob_df[ng] / total_g) if p_ng_c > 0 and p_ng_g > 0: score = _math.log(p_ng_c) - _math.log(p_ng_g) cov = 0.0 if have_subjects: cov = subj_docs[ng] / n_docs_c if cov >= subject_min_cov: score += 0.6 score += subject_alpha * cov scored.append((score, cov, ng)) scored.sort(key=lambda x: (x[1] >= subject_min_cov, x[0]), reverse=True) take = max(1, topn // (3 if n == 3 else 2)) phrases.extend([p for _, _, p in scored[:take]]) docs_c = [" ".join(per_c_texts[c])] if per_c_texts[c] else [" "] docs_bg = [" ".join(sum((per_c_texts[k] for k in per_c_texts if k != c), [])) or " "] corpus = [docs_c[0], docs_bg[0]] vec = TfidfVectorizer( analyzer="word", ngram_range=(1,1), max_features=3000, token_pattern=TOKEN_PATTERN, lowercase=True ) X = vec.fit_transform(corpus) vocab = np.array(sorted(vec.vocabulary_, key=lambda k: vec.vocabulary_[k])) row = X[0].toarray().ravel() subj_cov = np.zeros_like(row) subj_cov_frac = np.zeros_like(row) vocab_index = {t:i for i,t in enumerate(vocab)} if have_subjects: for tok, cnt_docs in per_c_subj_uni_docs[c].items(): if tok in vocab_index and not _is_junk_term(tok): i = vocab_index[tok] frac = cnt_docs / n_docs_c subj_cov[i] = frac subj_cov_frac[i] = frac row_boosted = row + subject_alpha * subj_cov pref_bump = (subj_cov_frac >= subject_min_cov).astype(row_boosted.dtype) * 0.6 final = row_boosted + pref_bump order = final.argsort()[::-1] unis = [] for i in order: tok = vocab[i] if _is_junk_term(tok): continue if too_ubiquitous(glob_df_uni.get(tok, 0)): continue unis.append(tok) if len(unis) >= max(0, topn - len(phrases)): break parts = (phrases + unis)[:max(2, topn)] labels_out[c] = ", ".join(parts) if parts else f"cluster_{c}" return labels_out # =================== Auto-k & merge =================== def choose_k_by_kneedle(X, ks=(50,100,150,200,300,400,500)): n = X.shape[0] if n <= 1: return 1, {1: 0.0} if n < min(ks): k_small = max(2, min(10, n)) return int(k_small), {int(k_small): 0.0} if n > 40000: rs = np.random.RandomState(0) idx = rs.choice(n, size=40000, replace=False) Xs = X[idx] else: Xs = X inertias = [] for k in ks: k = int(k) if n < k: break km = MiniBatchKMeans(n_clusters=k, batch_size=4096, random_state=0, n_init="auto") km.fit(Xs) inertias.append(km.inertia_) if not inertias: k_small = max(2, min(10, n)) return int(k_small), {int(k_small): 0.0} x = np.array(list(ks)[:len(inertias)], dtype=float) y = np.array(inertias, dtype=float) y_norm = (y - y.min()) / (y.max() - y.min() + 1e-9) x_norm = (x - x.min()) / (x.max() - x.min() + 1e-9) chord = y_norm[0] + (y_norm[-1] - y_norm[0]) * (x_norm - x_norm[0])/(x_norm[-1]-x_norm[0]+1e-9) dist = chord - y_norm k_best = int(x[np.argmax(dist)]) return k_best, dict(zip(x.astype(int), inertias)) def auto_k_rule(n_docs: int) -> int: return int(max(120, min(600, math.sqrt(max(n_docs, 1) / 50.0) * 110))) def merge_close_clusters(labels, centers, thresh=0.92): centers = sk_normalize(centers) sim = cosine_similarity(centers, centers) k = centers.shape[0] parent = list(range(k)) def find(a): while parent[a]!=a: a=parent[a] return a for i in range(k): for j in range(i+1, k): if sim[i,j] >= thresh: pi, pj = find(i), find(j) if pi!=pj: parent[pj]=pi root = {i:find(i) for i in range(k)} idmap, new_id = {}, 0 for i in range(k): r = root[i] if r not in idmap: idmap[r] = new_id new_id += 1 labels2 = np.array([idmap[root[int(c)]] for c in labels], dtype=int) return labels2 def seeded_centroids_in_lsa(lexicons: Dict[str, List[str]], count_vec: CountVectorizer, lsa_components: np.ndarray, norm_obj: Normalizer, d_word: int, d_full: int, k: int) -> Optional[np.ndarray]: seeds_word = [] vocab = count_vec.vocabulary_ for _, words in lexicons.items(): idxs = [vocab.get(w.lower()) for w in words if vocab.get(w.lower()) is not None] if not idxs: continue v = np.zeros((d_word,), dtype=np.float32) v[idxs] = 1.0 n = np.linalg.norm(v) if n > 0: v /= n seeds_word.append(v) if not seeds_word: return None seeds_full = [] for v in seeds_word: vf = np.zeros((d_full,), dtype=np.float32) vf[:d_word] = v seeds_full.append(vf) seeds_full = np.stack(seeds_full, axis=0) seeds_red = seeds_full @ lsa_components.T seeds_red = norm_obj.transform(seeds_red.astype(np.float32)) if seeds_red.shape[0] >= 2 and seeds_red.shape[0] <= k: return seeds_red return None # =================== NEW: cluster stabilizer =================== def _centroids_from_labels(X, labels): labs = np.asarray(labels, dtype=int) uniq = np.unique(labs) cents = {} if isinstance(X, np.ndarray): for c in uniq: idx = (labs == c) if not np.any(idx): continue v = X[idx].mean(axis=0) n = np.linalg.norm(v) if n > 0: v = v / n cents[int(c)] = v.astype(np.float32) return cents X = X.tocsr() for c in uniq: rows = np.where(labs == c)[0] if rows.size == 0: continue sub = X[rows] v = np.asarray(sub.mean(axis=0)).ravel() n = np.linalg.norm(v) if n > 0: v = v / n cents[int(c)] = v.astype(np.float32) return cents def _cosine_sim_to_centroids(vecs, centroids): if not centroids: return None, None keys = list(centroids.keys()) C = np.stack([centroids[k] for k in keys], axis=0) if isinstance(vecs, np.ndarray): sims = vecs @ C.T else: sims = vecs.dot(C.T) if hasattr(sims, "toarray"): sims = sims.toarray() best_idx = np.argmax(sims, axis=1) best_lab = np.array([keys[i] for i in best_idx], dtype=int) best_sim = sims[np.arange(sims.shape[0]), best_idx] return best_lab, best_sim def stabilize_labels(X_space, labels, min_size=40, merge_thresh=0.96, reassign_thresh=0.35): labs = np.asarray(labels, dtype=int) cents = _centroids_from_labels(X_space, labs) keys = sorted([k for k in cents.keys() if k >= 0]) if len(keys) >= 2: C = np.stack([cents[k] for k in keys], axis=0) sims = C @ C.T parent = {k:k for k in keys} def find(a): while parent[a]!=a: a = parent[a] return a for i in range(len(keys)): for j in range(i+1, len(keys)): if sims[i,j] >= float(merge_thresh): ri, rj = find(keys[i]), find(keys[j]) if ri != rj: parent[rj] = ri root = {k: find(k) for k in keys} merge_map = {k: root[k] for k in keys} labs = np.array([merge_map.get(int(c), int(c)) for c in labs], dtype=int) cents = _centroids_from_labels(X_space, labs) vc = pd.Series(labs).value_counts() big_labs = set(vc[vc >= int(min_size)].index.tolist()) small_labs = set(vc[vc < int(min_size)].index.tolist()) big_cents = {c: cents[c] for c in big_labs if c in cents and c >= 0} NOISE_ID = -3 if small_labs and big_cents: idx_small = np.where(pd.Series(labs).isin(small_labs))[0] if idx_small.size > 0: sub = X_space[idx_small] if not isinstance(X_space, np.ndarray) else X_space[idx_small] best_lab, best_sim = _cosine_sim_to_centroids(sub, big_cents) reassigned = np.where(best_sim >= float(reassign_thresh), best_lab, NOISE_ID) labs[idx_small] = reassigned return labs # =================== Scoring & Flags =================== def _hour_of(dt_iso: str) -> Optional[int]: try: if not dt_iso: return None dt = pd.to_datetime(dt_iso, utc=True, errors="coerce") if pd.isna(dt): return None return int(dt.hour) except Exception: return None def _attachment_flags(names: List[str]) -> List[str]: flags=[] for n in names or []: if ATTACH_NAME_RE.search(n): flags.append("📎 " + n[:40]) return flags[:5] def corruption_score(row, trusted_domains: set): score = 0.0 txt = f'{row.get("subject","")} {row.get("body_text","")}'.lower() for ph in SUSPECT_PHRASES: if ph in txt: score += 2.0 break if EVASIVE_ACRO_RE.search(txt) or OFFCHANNEL_RE.search(txt): score += 1.0 if isinstance(row.get("tags"), list) and ("🚩suspect" in row["tags"] or "finance" in row["tags"]): score += 1.5 if MONEY_RE.search(txt): score += 0.7 if INVOICE_RE.search(txt): score += 0.7 if str(row.get("sentiment","")) == "negative": score += 0.3 body_len = len(row.get("body_text","")) if body_len < 160 and PHONE_RE.search(row.get("body_text","") or ""): score += 0.5 fd = (row.get("from_domain") or "").lower() if fd in PERSONAL_DOMAINS and fd not in trusted_domains: score += 0.5 h = _hour_of(row.get("date") or "") if h is not None and (h < 6 or h > 22): score += 0.3 return score def compute_context_anomaly(df_in: pd.DataFrame) -> pd.DataFrame: if df_in.empty: df_in["context_anomaly_score"] = 0.0 return df_in df = df_in.copy() if "anomaly_score" in df.columns: df["_if_pct"] = 0.0 for bkt, grp in df.groupby("bucket", dropna=False): vals = grp["anomaly_score"].astype(float) if vals.notna().sum() >= 5: ranks = vals.rank(pct=True, ascending=False) df.loc[grp.index, "_if_pct"] = ranks.clip(0, 1) df["_if_pts"] = (df["_if_pct"] * 6.0).clip(0, 6) else: df["_if_pts"] = 0.0 df["_rule_pts"] = 0.0 low = (df["subject"].fillna("") + " " + df["body_text"].fillna("")).str.lower() for bkt, terms in TAXONOMY.items(): mask = (df["bucket"] == bkt) if not mask.any(): continue if terms: has_term = low.str.contains("|".join([re.escape(t.lower()) for t in terms]), regex=True) df.loc[mask & (~has_term), "_rule_pts"] += 1.0 if bkt == "Constituent": df.loc[mask & (~df["from_domain"].str.lower().isin(PERSONAL_DOMAINS)), "_rule_pts"] += 1.0 if bkt == "Scheduling": subj = df.loc[mask, "subject"].fillna("").str.lower() df.loc[mask & (~subj.str.contains(r"\bmeeting|invite|schedule|calendar\b", regex=True)), "_rule_pts"] += 1.0 df["_rule_pts"] = df["_rule_pts"].clip(0, 2) df["_corr_pts"] = df["corruption_score"].fillna(0).clip(0, 3) df["context_anomaly_score"] = (df["_if_pts"] + df["_rule_pts"] + df["_corr_pts"]).clip(0, 10) return df.drop(columns=["_if_pct","_if_pts","_rule_pts","_corr_pts"], errors="ignore") # =================== 🔧 NEW: Per-bucket k & stabilizer params =================== def _bucket_k_multiplier(bucket_name: str) -> float: b = (bucket_name or "").lower() if b in ("constituent",): return 1.25 if b in ("procurement", "campaign finance", "receipts/billing", "lobbyist"): return 1.15 if b in ("scheduling", "other"): return 1.00 if b in ("legal",): return 0.80 return 1.00 def _bucket_stabilizer_params(bucket_name: str) -> Tuple[int, float, float]: b = (bucket_name or "").lower() if b == "legal": return (30, 0.97, 0.38) if b == "procurement": return (35, 0.96, 0.36) if b == "campaign finance": return (35, 0.96, 0.36) if b == "constituent": return (40, 0.95, 0.33) if b == "receipts/billing": return (40, 0.95, 0.35) if b == "scheduling": return (35, 0.95, 0.35) return (40, 0.96, 0.35) # =================== 🔧 NEW: Label de-dup helpers =================== def _normalize_label_tokens(label: str) -> set: if not label: return set() txt = str(label).lower() toks = re.findall(r"[a-z\u0590-\u05FF][a-z\u0590-\u05FF\-']{1,}", txt) toks2 = [t[:-1] if len(t) > 3 and t.endswith("s") else t for t in toks] return {t for t in toks2 if t not in STOP_TERMS and t not in EN_STOP and t not in HE_STOP and len(t) >= 2} def _jaccard(a: set, b: set) -> float: if not a or not b: return 0.0 inter = len(a & b) if inter == 0: return 0.0 return inter / float(len(a | b)) def dedupe_cluster_labels_in_bucket(df: pd.DataFrame, bucket: str, sim_thresh: float = 0.72) -> pd.DataFrame: sel = df[df["bucket"] == bucket].copy() if sel.empty or "cluster_name" not in sel.columns: return df names = sel[["cluster_id", "cluster_name"]].drop_duplicates() tokens = {int(cid): _normalize_label_tokens(str(name)) for cid, name in names.values} ids = list(tokens.keys()) parent = {i: i for i in ids} def find(i): while parent[i] != i: i = parent[i] return i def union(a, b): ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra for i in range(len(ids)): for j in range(i+1, len(ids)): if _jaccard(tokens[ids[i]], tokens[ids[j]]) >= sim_thresh: union(ids[i], ids[j]) names_map = dict(names.values) comp_to_canon = {} for cid in ids: root = find(cid) comp_to_canon.setdefault(root, []) comp_to_canon[root].append((cid, names_map.get(cid, ""))) canon_for_cluster = {} for root, items in comp_to_canon.items(): best = max(items, key=lambda kv: (len(kv[1] or ""), kv[1])) for cid, _ in items: canon_for_cluster[cid] = best[1] df.loc[sel.index, "cluster_name"] = sel["cluster_id"].map(lambda c: canon_for_cluster.get(int(c), names_map.get(int(c), ""))) return df def dedupe_all_labels(df: pd.DataFrame) -> pd.DataFrame: out = df for bkt in sorted(df["bucket"].dropna().unique()): out = dedupe_cluster_labels_in_bucket(out, bkt, sim_thresh=0.72) return out # =================== 🔎 NEW: Surveillance-campaign detection =================== SURV_KEYWORDS = [ "daily report","daily brief","briefing","sitreps","sitrep","situation report","summary", "dossier","monitoring","tracking","watchlist","watch list","profile","surveillance", "intel","intelligence","osint","open source intel","clippings","press clips","digest", "alert","alerting","dispatch","bulletin","roundup","update" ] SURV_RE = re.compile("|".join([re.escape(k) for k in SURV_KEYWORDS]), re.I) SUBJ_NUM_RE = re.compile(r"\b\d{1,4}([,./-]\d{1,4})*\b") SUBJ_DATE_RE = re.compile(r"\b(?:\d{1,2}[-/]\d{1,2}(?:[-/]\d{2,4})?|\d{4}-\d{2}-\d{2}|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\b", re.I) SUBJ_FW_RE = re.compile(r"^\s*(re:|fw:|fwd:)\s*", re.I) EMAIL_RE = re.compile(r"\b[\w.\-+%]+@[\w.-]+\.[A-Za-z]{2,}\b") def _candidate_entities_from_subjects(df: pd.DataFrame, extra_watchlist: List[str]) -> List[str]: cand = set([w.strip() for w in (extra_watchlist or []) if w.strip()]) subs = df["subject"].dropna().astype(str).tolist() pat = re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})\b") for s in subs: for m in pat.finditer(s): name = m.group(1).strip() if name.lower() in EN_STOP or len(name) < 5: continue cand.add(name) out = sorted(cand) return out[:3000] def _normalize_subject_template(subj: str, entity: str) -> str: if not subj: return "" s = SUBJ_FW_RE.sub("", subj) try: s = re.sub(re.escape(entity), "«ENTITY»", s, flags=re.I) except Exception: pass s = SUBJ_DATE_RE.sub("«DATE»", s) s = SUBJ_NUM_RE.sub("«NUM»", s) s = EMAIL_RE.sub("«EMAIL»", s) s = re.sub(r"\s+", " ", s).strip().lower() return s def _entity_mask_present(row: pd.Series, entity: str) -> bool: t = (row.get("subject","") + " " + row.get("body_text","")).lower() e = (entity or "").lower() return (e in t) if e else False def detect_surveillance_campaigns( df: pd.DataFrame, watchlist: Optional[List[str]] = None, min_mentions: int = 15, ) -> Tuple[pd.DataFrame, pd.DataFrame]: if df.empty: return pd.DataFrame(), pd.DataFrame() watch = [w.strip() for w in (watchlist or []) if w.strip()] cands = _candidate_entities_from_subjects(df, watch) dfd = df.copy() dfd["_dt"] = pd.to_datetime(dfd["date"], utc=True, errors="coerce") dfd["_day"] = dfd["_dt"].dt.date dfd["_week"] = dfd["_dt"].dt.to_period("W").astype(str) dfd["_is_news"] = (dfd["bucket"] == "Newsletters/Alerts") dfd["_is_it"] = (dfd["bucket"] == "IT/Security") dfd["_is_internal"] = ~(dfd["_is_news"] | dfd["_is_it"]) dfd["_recips"] = dfd["to_emails"].apply(lambda xs: len(xs) if isinstance(xs, list) else 0) rows = [] samples = [] for entity in cands: mask = dfd.apply(lambda r: _entity_mask_present(r, entity), axis=1) grp = dfd[mask] n = len(grp) if n < int(min_mentions): continue n_senders = grp["from_email"].nunique() n_domains = grp["from_domain"].nunique() pct_news = float((grp["_is_news"].mean() if n else 0.0)) pct_int = float((grp["_is_internal"].mean() if n else 0.0)) avg_recips = float((grp["_recips"].mean() if n else 0.0)) wk = grp.groupby("_week").size().astype(float) if len(wk) >= 4: baseline = wk.iloc[:-1] mu = float(baseline.mean()) if len(baseline) else 0.0 sd = float(baseline.std(ddof=1)) if len(baseline) > 1 else 0.0 last = float(wk.iloc[-1]) weekly_peak_z = 0.0 if sd == 0.0 else (last - mu) / sd else: weekly_peak_z = 0.0 norm_subj = grp["subject"].fillna("").astype(str).map(lambda s: _normalize_subject_template(s, entity)) if len(norm_subj): top_template_share = norm_subj.value_counts(normalize=True).iloc[0] else: top_template_share = 0.0 kw_share = float(((grp["subject"].fillna("") + " " + grp["body_text"].fillna("")).str.contains(SURV_RE).mean()) if n else 0.0) score = 0.0 score += 2.5 * min(1.0, top_template_share) score += 2.0 * min(1.0, kw_share) score += 1.5 * min(1.0, weekly_peak_z / 3.0) score += 0.8 * min(1.0, n_senders / 10.0) score += 0.5 * min(1.0, n_domains / 10.0) score += 1.0 * pct_int score += 0.3 * min(1.0, avg_recips / 10.0) level = "info" if score >= 6.5: level = "likely" elif score >= 4.5: level = "possible" first_d = grp["_dt"].min() last_d = grp["_dt"].max() rows.append({ "entity": entity, "surveillance_score": round(float(score), 3), "level": level, "n_emails": int(n), "n_senders": int(n_senders), "n_domains": int(n_domains), "pct_newsletters": round(pct_news, 3), "pct_internal": round(pct_int, 3), "avg_recipients": round(avg_recips, 2), "weekly_peak_z": round(float(weekly_peak_z), 3), "template_max_share": round(float(top_template_share), 3), "keyword_share": round(float(kw_share), 3), "first_date": str(first_d) if pd.notna(first_d) else "", "last_date": str(last_d) if pd.notna(last_d) else "", "notes": "template/keywords/cadence/senders/domains mix" }) ex = grp[["date","from_email","from_domain","subject","bucket"]].copy().head(8) ex.insert(0, "entity", entity) samples.append(ex) ent_df = pd.DataFrame(rows).sort_values(["surveillance_score","n_emails"], ascending=[False, False]).head(200) samp_df = pd.concat(samples, ignore_index=True) if samples else pd.DataFrame() return ent_df, samp_df def tag_surveillance_emails(df: pd.DataFrame, ent_df: pd.DataFrame, threshold: float = 4.5) -> pd.DataFrame: if df.empty or ent_df.empty: return df hot = ent_df[ent_df["surveillance_score"] >= float(threshold)]["entity"].tolist() if not hot: return df def _tag(row): txt = (row.get("subject","") + " " + row.get("body_text","")).lower() tags = set(row.get("tags") or []) for e in hot: if e.lower() in txt: tags.add("surveillance") break return sorted(tags) out = df.copy() out["tags"] = out.apply(_tag, axis=1) return out # =================== UI / PIPELINE CONTINUATION =================== # ---------- Styles ---------- CSS = """ :root { --pill:#eef2ff; --pill-text:#1f2937; --tag:#e5e7eb; --tag-text:#111827; } .email-card { background:#ffffff; color:#111827; border-radius:12px; padding:16px; box-shadow:0 1px 3px rgba(0,0,0,0.08); } .email-header { display:flex; align-items:flex-start; justify-content:space-between; gap:12px; } .subject { color:#0f172a; font-size:18px; font-weight:700; margin-bottom:6px; } .meta { color:#334155; font-size:12px; } .badges { display:flex; gap:8px; align-items:center; flex-wrap:wrap; } .cluster-pill { background:var(--pill); color:var(--pill-text); padding:2px 8px; border-radius:999px; font-size:12px; } .sentiment { font-size:12px; color:#334155; } .tag { background:var(--tag); color:var(--tag-text); padding:2px 6px; border-radius:6px; font-size:12px; } .email-body { margin-top:12px; max-height:520px; overflow:auto; line-height:1.6; white-space:normal; color:#111827; } .email-body a { color:#1d4ed8; text-decoration:underline; } mark { background:#fff59d; color:#111827; padding:0 2px; border-radius:2px; } hr.sep { border:none; border-top:1px solid #e5e7eb; margin:10px 0; } .small { color:#475569; font-size:12px; } .cursor { cursor:pointer; } """ # ---------- App ---------- with gr.Blocks(title="Email Investigator — Per-bucket-k + Label Dedup + Surveillance Radar", css=CSS, theme="soft") as demo: gr.Markdown("# Email Investigator — BM25 + Char-grams + (optional) LSA → MiniBatchKMeans") gr.Markdown( "This build includes per-bucket **k** heuristics, label **de-dup**, and a **surveillance-campaign detector** " "(template cadence + keywords + multi-sender/domain signals)." ) with gr.Row(): inbox_file = gr.File(label="Upload emails (.jsonl or .json)", file_types=[".jsonl", ".json"]) with gr.Accordion("Vectorization & Clustering", open=True): with gr.Row(): max_features = gr.Number(label="Word max_features (BM25)", value=120_000, precision=0) min_df = gr.Number(label="min_df (doc freq ≥)", value=3, precision=0) max_df = gr.Slider(label="max_df (fraction ≤)", minimum=0.1, maximum=0.95, value=0.7, step=0.05) use_bigrams = gr.Checkbox(label="Use bigrams (1–2)", value=True) skip_lang = gr.Checkbox(label="Skip language detection (faster)", value=True) with gr.Row(): use_hashing = gr.Checkbox(label="Use HashingVectorizer (memory-light, fast)", value=True) hash_bits = gr.Slider(label="Hashing bits (2^n features)", minimum=16, maximum=20, step=1, value=18) with gr.Row(): use_lsa = gr.Checkbox(label="Use LSA (TruncatedSVD) before KMeans", value=True) lsa_dim = gr.Number(label="LSA components", value=256, precision=0) auto_k = gr.Checkbox(label="Auto choose k (kneedle)", value=True) k_clusters = gr.Number(label="Base k (before per-bucket multiplier)", value=350, precision=0) mb_batch = gr.Number(label="KMeans batch_size", value=4096, precision=0) with gr.Row(): use_hdbscan = gr.Checkbox(label="Use HDBSCAN (auto-k, noise) on reduced vectors", value=False) hdb_min_cluster = gr.Number(label="HDBSCAN min_cluster_size", value=60, precision=0) hdb_min_samples = gr.Number(label="HDBSCAN min_samples (0=auto)", value=0, precision=0) with gr.Row(): per_language = gr.Checkbox(label="Cluster per language (reduces cross-language mixing)", value=True) with gr.Row(): use_faiss = gr.Checkbox(label="Use Faiss ANN for search (if available & LSA on)", value=True) use_iso = gr.Checkbox(label="Compute anomaly score (IsolationForest on LSA)", value=False) with gr.Accordion("Investigation Controls", open=True): with gr.Row(): trusted_domains_in = gr.Textbox(label="Trusted org domains (comma-separated)", value="example.gov, example.org") extra_keywords_in = gr.Textbox(label="Extra suspicious phrases (comma-separated)", value="") highlight_toggle = gr.Checkbox(label="Highlight suspect patterns in reader", value=True) with gr.Row(): use_embeddings = gr.Checkbox(label="Add lightweight word embeddings (avg word2vec/GloVe) if available", value=False) embed_weight = gr.Slider(label="Embedding weight in feature space", minimum=0.0, maximum=1.0, step=0.05, value=0.35) with gr.Row(): embeddings_path = gr.Textbox(label="Path to local embeddings (.txt/.vec/.bin) (optional)", value="") embeddings_binary = gr.Checkbox(label="File is binary word2vec format", value=False) with gr.Row(): bucket_drop = gr.Dropdown(label="Bucket", choices=["(any)"] + list(TAXONOMY.keys()), value="(any)", allow_custom_value=False) cluster_drop = gr.Dropdown(label="Cluster", choices=[], value=None, allow_custom_value=False) domain_drop = gr.Dropdown(label="Sender domain", choices=[], value=None, allow_custom_value=False) sender_drop = gr.Dropdown(label="Sender email", choices=[], value=None, allow_custom_value=False) lang_drop = gr.Dropdown(label="Language", choices=["(any)"], value="(any)", allow_custom_value=False) sentiment_drop = gr.Dropdown(label="Sentiment", choices=["(any)", "positive", "neutral", "negative"], value="(any)") tag_drop = gr.Dropdown(label="Tag", choices=["(any)", "🚩suspect", "finance", "off-channel", "surveillance", "odd-hours", "personal-mail"], value="(any)") with gr.Row(): date_start = gr.Textbox(label="Date from (YYYY-MM-DD, optional)", value="") date_end = gr.Textbox(label="Date to (YYYY-MM-DD, optional)", value="") sort_by = gr.Dropdown(label="Sort by", choices=["context_anomaly_score","corruption_score","date","anomaly_score","search_score"], value="context_anomaly_score") sort_dir = gr.Dropdown(label="Order", choices=["desc","asc"], value="desc") with gr.Row(): hide_noise = gr.Checkbox(label="Hide noise/unassigned (cluster -3)", value=True) with gr.Accordion("Surveillance Radar", open=True): with gr.Row(): watchlist_in = gr.Textbox(label="Watchlist (names or entities, comma-separated)", value="Hillary Clinton, Joe Biden, Donald Trump") min_mentions = gr.Number(label="Min mentions per entity", value=15, precision=0) with gr.Row(): run_btn = gr.Button("Process", variant="primary") # NEW: Update button lets you re-run with same uploaded file & current settings update_btn = gr.Button("Update", variant="secondary") # NEW: Update reset_btn = gr.Button("Reset filters") status = gr.Markdown("") with gr.Row(): cluster_counts_df = gr.Dataframe(label="Cluster summary (top 500) — click a row to filter", interactive=False, wrap=True) domain_counts_df = gr.Dataframe(label="Top sender domains", interactive=False, wrap=True) with gr.Row(): sender_counts_df = gr.Dataframe(label="Top senders", interactive=False, wrap=True) with gr.Row(): actors_df = gr.Dataframe(label="Top actors (by degree / unique counterparts)", interactive=False, wrap=True) offhours_df = gr.Dataframe(label="Off-hours & personal-mail hits", interactive=False, wrap=True) gr.Markdown("### Surveillance Campaigns (detected entities)") with gr.Row(): surv_entities_df = gr.Dataframe(label="Entities ranked by surveillance score", interactive=False, wrap=True) surv_samples_df = gr.Dataframe(label="Sample emails for highlighted entities", interactive=False, wrap=True) gr.Markdown("### Search") with gr.Row(): search_query = gr.Textbox(label="Search (keywords, names, etc.)") search_btn = gr.Button("Search") results_df = gr.Dataframe(label="Results (top 500 or top 50 for search)", interactive=True, wrap=True) email_view = gr.HTML(label="Reader") # -------- State -------- state_df = gr.State() state_vec = gr.State() state_X_reduced = gr.State() state_index = gr.State() state_term_names = gr.State() state_query_terms = gr.State() state_use_lsa = gr.State() state_use_faiss = gr.State() state_svd = gr.State() state_norm = gr.State() state_dims = gr.State() state_extra_terms = gr.State() state_highlight = gr.State() state_inbox = gr.State() # NEW: keep last uploaded file for Update # -------- IO helpers -------- def _load_json_records(local_path: str) -> List[Dict[str, Any]]: recs: List[Dict[str, Any]] = [] if local_path.endswith(".jsonl"): with open(local_path, "r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if str(obj.get("type", "")).lower() == "meta": continue recs.append(obj) else: with open(local_path, "r", encoding="utf-8") as fh: obj = json.load(fh) if isinstance(obj, list): for r in obj: if str(r.get("type", "")).lower() == "meta": continue recs.append(r) elif isinstance(obj, dict): if str(obj.get("type", "")).lower() != "meta": recs = [obj] return recs def _apply_filters( df: pd.DataFrame, bucket: Optional[str], cluster: Optional[str], domain: Optional[str], sender: Optional[str], lang_value: str, sentiment: str, tag_value: str, start: str, end: str, hide_noise_flag: bool = False, ) -> pd.DataFrame: out = df if bucket and bucket != "(any)": out = out[out["bucket"] == bucket] if cluster and cluster != "(any)": m = re.match(r"^.*?(\-?\d+)\s+—", cluster) if m: cid = int(m.group(1)) out = out[out["cluster_id"] == cid] if domain and domain != "(any)": out = out[out["from_domain"] == domain] if sender and sender != "(any)": out = out[out["from_email"] == sender] if lang_value and lang_value != "(any)": out = out[out["lang"] == lang_value] if sentiment and sentiment != "(any)" and "sentiment" in out.columns: out = out[out["sentiment"].astype(str) == sentiment] if tag_value and tag_value != "(any)": out = out[out["tags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts)) | out["flags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts))] if start: try: dt = pd.to_datetime(start, utc=True, errors="coerce") out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") >= dt] except Exception: pass if end: try: dt = pd.to_datetime(end, utc=True, errors="coerce") out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") <= dt] except Exception: pass if hide_noise_flag: out = out[out["cluster_id"] != -3] return out # -------- Social graph summary -------- def social_stats(df: pd.DataFrame) -> pd.DataFrame: deg = {} def add_edge(a,b): if not a or not b or a==b: return deg.setdefault(a,set()).add(b) deg.setdefault(b,set()).add(a) for _, r in df.iterrows(): f = r.get("from_email") or "" tos = r.get("to_emails") or [] for t in tos: add_edge(f, t) rows=[] for addr, nbrs in deg.items(): rows.append({"address": addr, "degree": len(nbrs)}) out = pd.DataFrame(rows).sort_values("degree", ascending=False).head(50) return out # -------- Sorting helper -------- def _sort_results(df, by, direction): if df is None or len(df) == 0: return pd.DataFrame() tmp = df.copy() if "date" in tmp.columns: tmp["_dt"] = pd.to_datetime(tmp["date"], utc=True, errors="coerce") else: tmp["_dt"] = pd.NaT by = by if by in tmp.columns else "context_anomaly_score" asc = (direction == "asc") sort_cols = [by] if by == "date": sort_cols = ["_dt"] elif by in ["anomaly_score", "corruption_score", "context_anomaly_score"]: sort_cols.append("_dt") tmp = tmp.sort_values(sort_cols, ascending=[asc, False]) cols_out = [ "date","bucket","from_email","from_domain","subject","cluster_name","lang", "tags","flags","sentiment","context_anomaly_score","corruption_score","anomaly_score" ] if "search_score" in tmp.columns: cols_out.append("search_score") return tmp[[c for c in cols_out if c in tmp.columns]].head(500) # -------- Vectorization helpers (mirror training path for queries) -------- def _tokenize_query(q: str) -> List[str]: return [p.strip() for p in re.split(r"\s+", q or "") if p.strip()][:8] def _project_query_to_lsa(q_vec, svd, norm) -> Optional[np.ndarray]: try: return norm.transform(svd.transform(q_vec)).astype(np.float32) except Exception: return None def _vectorize_query(q, vec_state, corpus_texts): # Build the same features for the query that we used for docs char_min_df = 1 if len(corpus_texts) <= 1 else 2 if vec_state.get("use_hashing"): hv = HashingVectorizer( analyzer="word", ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1), n_features=2 ** vec_state.get("hash_bits", 18), token_pattern=TOKEN_PATTERN, lowercase=True, norm=None, alternate_sign=False, ) counts = hv.transform(corpus_texts) tfidf_tr = TfidfTransformer().fit(counts) q_word = tfidf_tr.transform(hv.transform([q])) else: cv = CountVectorizer( analyzer="word", ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1), max_features=vec_state.get("max_features"), min_df=vec_state.get("min_df"), max_df=vec_state.get("max_df"), token_pattern=TOKEN_PATTERN, lowercase=True, stop_words=STOPWORD_FOR_VEC, dtype=np.float32, ) tf = cv.fit_transform(corpus_texts) bm25 = BM25Transformer().fit(tf) q_word = bm25.transform(cv.transform([q])) char_vec = CharTfidf( analyzer="char", ngram_range=(3, 5), min_df=char_min_df, max_features=100_000, lowercase=True, dtype=np.float32 ).fit(corpus_texts) q_char = char_vec.transform([q]) return hstack([q_word, q_char * 0.20], format="csr") # -------- Main pipeline -------- def process_file(inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang, use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, trusted_domains_in, extra_keywords_in, highlight_toggle, use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, watchlist_in, min_mentions): if inbox_file is None: return ( "**Please upload a file.**", None, None, None, None, None, None, None, # surveillance outputs None, # results_df None, None, None, # states df/vec/X None, None, # index & term names None, None, # flags gr.update(), gr.update(), gr.update(), gr.update(), # dropdowns None, None, None, # svd/norm/dims None, None, # extra terms / highlight gr.update() # bucket list ) # === Inner helpers for this function === def _make_texts(df_in: pd.DataFrame) -> Tuple[List[str], List[str]]: texts = list(df_in.apply(enrich_text, axis=1)) subjects_only = list(df_in["subject"].fillna("")) return texts, subjects_only def _vectorize_block( texts: List[str], use_bigrams: bool, max_features: int, min_df: int, max_df: float, use_hashing: bool, hash_bits: int ): """ Return (X_full_csr, count_vec, char_vec, bm25, d_word, d_char, d_full) Uses Count+BM25 (+ char-tfidf) or Hashing+TfidfTransformer (+ char-tfidf). """ n_docs = len(texts) ngram_range = (1, 2) if use_bigrams else (1, 1) char_min_df = 1 if n_docs <= 1 else 2 if use_hashing: hv = HashingVectorizer( analyzer="word", ngram_range=ngram_range, n_features=2 ** int(hash_bits), alternate_sign=False, token_pattern=TOKEN_PATTERN, lowercase=True, norm=None ) word_counts = hv.transform(texts) tfidf_tr = TfidfTransformer() X_word = tfidf_tr.fit_transform(word_counts).astype(np.float32) char_vec = CharTfidf( analyzer="char", ngram_range=(3, 5), min_df=char_min_df, max_features=100_000, lowercase=True, dtype=np.float32 ) X_char = char_vec.fit_transform(texts) X_full = hstack([X_word, X_char * 0.20], format="csr") d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1] count_vec = None; bm25 = None return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full count_vec = CountVectorizer( analyzer="word", ngram_range=ngram_range, max_features=int(max_features) if max_features else None, min_df=int(min_df) if min_df else 2, max_df=float(max_df) if max_df else 0.7, token_pattern=TOKEN_PATTERN, lowercase=True, dtype=np.float32, stop_words=STOPWORD_FOR_VEC ) TF = count_vec.fit_transform(texts) bm25 = BM25Transformer(k1=1.2, b=0.75).fit(TF) X_word = bm25.transform(TF) char_vec = CharTfidf( analyzer="char", ngram_range=(3, 5), min_df=char_min_df, max_features=100_000, lowercase=True, dtype=np.float32 ) X_char = char_vec.fit_transform(texts) X_full = hstack([X_word, X_char * 0.20], format="csr") d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1] return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full def _reduce_space(X_full, use_lsa, lsa_dim): svd_obj = None norm_obj = None X_reduced = None if not use_lsa: return X_reduced, svd_obj, norm_obj n_docs = X_full.shape[0] n_feats = X_full.shape[1] max_components = max(1, min(n_docs, n_feats) - 1) n_comp = int(min(int(lsa_dim or 256), max_components)) if n_comp < 2: return None, None, None svd_obj = TruncatedSVD(n_components=n_comp, random_state=0) Xtmp = svd_obj.fit_transform(X_full) # dense norm_obj = Normalizer(copy=False) X_reduced = norm_obj.fit_transform(Xtmp).astype(np.float32) del Xtmp; gc.collect() return X_reduced, svd_obj, norm_obj def _attach_embeddings(texts, X_reduced_or_full, use_lsa, kv, emb_dim, weight): if kv is None or emb_dim <= 0 or weight <= 0.0: return X_reduced_or_full, emb_dim doc_embs = _build_doc_embeddings(texts, kv, emb_dim).astype(np.float32) if weight != 1.0: doc_embs *= float(weight) if isinstance(X_reduced_or_full, np.ndarray): return np.hstack([X_reduced_or_full, doc_embs]).astype(np.float32), emb_dim else: X_emb = csr_matrix(doc_embs) return hstack([X_reduced_or_full, X_emb], format="csr"), emb_dim def _cluster_space( X_space, bucket_name: str, df_part: pd.DataFrame, use_lsa: bool, use_hdbscan: bool, hdb_min_cluster: int, hdb_min_samples: int, auto_k: bool, k_clusters: int, mb_batch: int, count_vec, svd_obj, norm_obj, d_word, d_char ): n = X_space.shape[0] # Per-bucket stabilizer params min_size, merge_th, reassign_th = _bucket_stabilizer_params(bucket_name) if n <= 1: labels = np.zeros((n,), dtype=int) if n == 1 else np.array([], dtype=int) centers = None chosen_k = int(n) if n > 0 else 0 return stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th), centers, chosen_k if n < 10: k_small = min(max(2, n // 2), n) kmeans = MiniBatchKMeans( n_clusters=int(k_small), batch_size=int(mb_batch or 4096), random_state=0, n_init="auto" ) labels = kmeans.fit_predict(X_space) centers = getattr(kmeans, "cluster_centers_", None) labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) return labels, centers, int(len(set(labels))) if use_hdbscan and HDBSCAN_OK and isinstance(X_space, np.ndarray) and X_space.shape[0] >= max(50, hdb_min_cluster): min_samples = None if int(hdb_min_samples or 0) <= 0 else int(hdb_min_samples) clusterer = hdbscan.HDBSCAN( min_cluster_size=int(hdb_min_cluster or 60), min_samples=min_samples, metric='euclidean', cluster_selection_epsilon=0.0, core_dist_n_jobs=1 ) labels = clusterer.fit_predict(X_space) centers = None labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) chosen_k = int(len(set([l for l in labels if l >= 0]))) return labels, centers, chosen_k # Choose k (global rule or kneedle), then per-bucket multiplier if bool(auto_k): if use_lsa and isinstance(X_space, np.ndarray): k, _ = choose_k_by_kneedle(X_space, ks=(50, 100, 150, 200, 300, 400, 500)) else: k = auto_k_rule(X_space.shape[0]) else: k = max(10, int(k_clusters or 350)) k = int(max(2, round(k * _bucket_k_multiplier(bucket_name)))) k = min(k, n) init = None if use_lsa and isinstance(X_space, np.ndarray) and count_vec is not None: seeds = seeded_centroids_in_lsa( CORR_LEX, count_vec, svd_obj.components_, norm_obj, d_word=d_word, d_full=(d_word + d_char), k=k ) if seeds is not None and seeds.shape[0] == k: init = seeds kmeans = MiniBatchKMeans( n_clusters=k, batch_size=int(mb_batch or 4096), random_state=0, n_init="auto" if init is None else 1, init="k-means++" if init is None else init ) labels = kmeans.fit_predict(X_space) centers = kmeans.cluster_centers_ if hasattr(kmeans, "cluster_centers_") else None if use_lsa and centers is not None: labels = merge_close_clusters(labels, centers, thresh=0.95) labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) chosen_k = int(len(set(labels))) return labels, centers, chosen_k # ---- Begin processing ---- trusted = set([d.strip().lower() for d in (trusted_domains_in or "").split(",") if d.strip()]) extra_terms = [t.strip() for t in (extra_keywords_in or "").split(",") if t.strip()] extra_terms_lower = [t.lower() for t in extra_terms] # Handle Gradio file object try: infile_path = inbox_file.name except Exception: infile_path = str(inbox_file) if inbox_file else "" recs = _load_json_records(infile_path) if not recs: return ("**No valid records found.**", None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None) normd = [] for r in tqdm(recs, desc="Normalize", leave=False): out = normalize_email_record(r, use_langdetect=(not bool(skip_lang))) if out and out.get("body_text") is not None: normd.append(out) df = pd.DataFrame(normd) if df.empty: return ("**No usable email records.**", None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None) df = df.drop_duplicates(subset=["message_id", "subject", "text_hash"]).reset_index(drop=True) df["tags"] = df["body_text"].fillna("").map(has_suspect_tag) df = compute_sentiment_column(df) # Stage-1 routing (bucketing) df["bucket"] = df.apply(route_email_row, axis=1) df["is_news"] = df.apply(lambda r: is_news_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_domain", "")), axis=1) df["is_notify"] = df.apply(lambda r: is_notification_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_email", ""), r.get("from_domain", "")), axis=1) df.loc[df["is_news"] == True, "bucket"] = "Newsletters/Alerts" df.loc[df["is_notify"] == True, "bucket"] = "IT/Security" # Flags flags = [] for _, row in df.iterrows(): f = [] h = _hour_of(row.get("date") or "") if h is not None and (h < 6 or h > 22): f.append("odd-hours") fd = (row.get("from_domain") or "").lower() if (fd in PERSONAL_DOMAINS) and (fd not in trusted): f.append("personal-mail") f += _attachment_flags(row.get("attachments") or []) flags.append(f) df["flags"] = flags # Split out stable buckets df_main = df[~df["bucket"].isin(["Newsletters/Alerts", "IT/Security"])].reset_index(drop=True) df_news = df[df["bucket"] == "Newsletters/Alerts"].reset_index(drop=True) df_alerts = df[df["bucket"] == "IT/Security"].reset_index(drop=True) # Optional embeddings kv = None emb_dim = 0 if bool(use_embeddings): kv, emb_dim = _load_embeddings(embeddings_path or "", bool(embeddings_binary)) # Build partitions: per language within bucket if requested parts = [] if bool(per_language): for bkt, g_bucket in df_main.groupby("bucket", dropna=False): for lang_code, grp in g_bucket.groupby("lang", dropna=False): parts.append(((bkt, lang_code), grp.copy())) else: for bkt, grp in df_main.groupby("bucket", dropna=False): parts.append(((bkt, "all"), grp.copy())) labels_list, cluster_name_list, anomaly_list = [], [], [] bucket_indexers = [] X_reduced_holder = None term_names_global = {} single_partition = (len(parts) == 1) d_word_agg, d_char_agg = 0, 0 svd_obj_local, norm_obj_local = None, None for (bucket_name, _lang), df_part in parts: if df_part.empty: continue texts, subjects_only = _make_texts(df_part) X_full, count_vec, char_vec, _, d_word, d_char, _ = _vectorize_block( texts=texts, use_bigrams=bool(use_bigrams), max_features=int(max_features or 120000), min_df=int(min_df or 3), max_df=float(max_df or 0.7), use_hashing=bool(use_hashing), hash_bits=int(hash_bits or 18), ) d_word_agg += d_word d_char_agg += d_char X_reduced, svd_obj_local, norm_obj_local = _reduce_space(X_full, bool(use_lsa), int(lsa_dim or 256)) X_space = (X_reduced if X_reduced is not None else X_full) if kv: X_space, _ = _attach_embeddings( texts, X_space, bool(use_lsa) and X_reduced is not None, kv, emb_dim, float(embed_weight) ) anomaly_scores = np.full((len(df_part),), np.nan, dtype=np.float32) if X_reduced is not None and bool(use_iso) and ISO_OK and X_reduced.shape[0] >= 50: try: iso = IsolationForest(n_estimators=100, contamination="auto", random_state=0).fit(X_reduced) anomaly_scores = (-iso.score_samples(X_reduced)).astype(np.float32) except Exception: pass labels, _, _ = _cluster_space( X_space=X_space, bucket_name=bucket_name, df_part=df_part, use_lsa=bool(use_lsa) and X_reduced is not None, use_hdbscan=bool(use_hdbscan), hdb_min_cluster=int(hdb_min_cluster or 60), hdb_min_samples=int(hdb_min_samples or 0), auto_k=bool(auto_k), k_clusters=int(k_clusters or 350), mb_batch=int(mb_batch or 4096), count_vec=count_vec, svd_obj=svd_obj_local, norm_obj=norm_obj_local, d_word=d_word, d_char=d_char, ) term_names = cluster_labels_pmi_bigram( texts=texts, labels=labels, subjects=subjects_only, topn=6, subject_alpha=0.75, global_ubiq_cut=0.20, subject_min_cov=0.30 ) bucket_indexers.append(df_part.index) labels_list.append(pd.Series(labels, index=df_part.index)) cluster_name_list.append(pd.Series([term_names.get(int(c), "noise" if int(c) < 0 else f"cluster_{int(c)}") for c in labels], index=df_part.index)) anomaly_list.append(pd.Series(anomaly_scores, index=df_part.index)) term_names_global.update({int(k): v for k, v in term_names.items()}) if single_partition and X_reduced is not None: X_reduced_holder = X_reduced if labels_list: df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_id"] = pd.concat(labels_list).sort_index() df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_name"] = pd.concat(cluster_name_list).sort_index() df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "anomaly_score"] = pd.concat(anomaly_list).sort_index() else: df_main["cluster_id"] = -10 df_main["cluster_name"] = "unclustered" df_main["anomaly_score"] = np.nan # Assign fixed ids for news/alerts buckets if len(df_news): df_news.loc[:, "cluster_id"] = -1 df_news.loc[:, "cluster_name"] = "newsletter/news" df_news.loc[:, "anomaly_score"] = np.nan if len(df_alerts): df_alerts.loc[:, "cluster_id"] = -2 df_alerts.loc[:, "cluster_name"] = "system/alerts" df_alerts.loc[:, "anomaly_score"] = np.nan # Merge back df = pd.concat([df_main, df_news, df_alerts], ignore_index=True) # Label de-dup pass per-bucket df = dedupe_all_labels(df) # Scores df["corruption_score"] = df.apply(lambda r: corruption_score(r, trusted_domains=trusted), axis=1) df = compute_context_anomaly(df) # Surveillance campaigns wl = [w.strip() for w in (watchlist_in or "").split(",") if w.strip()] ent_df, samp_df = detect_surveillance_campaigns(df, watchlist=wl, min_mentions=int(min_mentions or 15)) df = tag_surveillance_emails(df, ent_df, threshold=4.5) # Build indexes/search index_obj = None use_faiss_flag = bool(use_faiss) and FAISS_OK and bool(use_lsa) and (X_reduced_holder is not None) and single_partition if use_faiss_flag: d = X_reduced_holder.shape[1] index_obj = faiss.IndexFlatIP(d) index_obj.add(X_reduced_holder) else: try: if bool(use_lsa) and X_reduced_holder is not None and single_partition: nn = NearestNeighbors(metric="cosine", algorithm="brute").fit(X_reduced_holder) index_obj = nn else: index_obj = NearestNeighbors(metric="cosine", algorithm="brute").fit(np.zeros((1, 4), dtype=np.float32)) except Exception: pass # Summaries cluster_counts = ( df.groupby(["bucket", "cluster_id", "cluster_name"]) .size() .reset_index(name="count") .sort_values("count", ascending=False) .head(500) ) cluster_counts["label"] = cluster_counts.apply( lambda r: f'{r["bucket"]} — {int(r["cluster_id"])} — {r["cluster_name"]} ({int(r["count"])})', axis=1 ) cluster_choices = ["(any)"] + cluster_counts["label"].tolist() bucket_choices = ["(any)"] + sorted(df["bucket"].dropna().unique().tolist()) domain_counts = ( df.groupby("from_domain").size().reset_index(name="count").sort_values("count", ascending=False).head(100) ) domain_choices = ["(any)"] + domain_counts["from_domain"].tolist() sender_counts = ( df.groupby("from_email").size().reset_index(name="count").sort_values("count", ascending=False).head(200) ) sender_choices = ["(any)"] + sender_counts["from_email"].tolist() langs = sorted([l for l in df["lang"].dropna().unique() if l and l != "unknown"]) lang_choices = ["(any)"] + langs actors = social_stats(df) offp = df[df["flags"].apply(lambda xs: "odd-hours" in xs or "personal-mail" in xs)] offhours_table = ( offp[["date", "from_email", "subject", "flags", "corruption_score"]] .sort_values("corruption_score", ascending=False) .head(200) ) out_table = _sort_results(df, "context_anomaly_score", "desc") vec_state = { "use_hashing": bool(use_hashing), "hash_bits": int(hash_bits), "max_features": int(max_features), "min_df": int(min_df), "max_df": float(max_df), "use_bigrams": bool(use_bigrams), } status_md = f"**Processed {len(df):,} emails** | clusters ~ {len(cluster_counts):,} (showing top 500)" svd_obj_out = svd_obj_local if single_partition else None norm_obj_out = norm_obj_local if single_partition else None return ( status_md, # status cluster_counts, domain_counts, sender_counts, # summaries actors, offhours_table, # extra summaries ent_df, samp_df, # surveillance tables out_table, # results table df, vec_state, X_reduced_holder, # states index_obj, term_names_global, # index + labels bool(use_lsa), use_faiss_flag, # flags gr.update(choices=cluster_choices, value="(any)"), gr.update(choices=domain_choices, value="(any)"), gr.update(choices=sender_choices, value="(any)"), gr.update(choices=lang_choices, value="(any)"), svd_obj_out, norm_obj_out, (d_word_agg, d_char_agg), extra_terms_lower, bool(highlight_toggle), gr.update(choices=bucket_choices, value="(any)") ) # Bind Process button (run_btn.click)( process_file, inputs=[ inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang, use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, trusted_domains_in, extra_keywords_in, highlight_toggle, use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, watchlist_in, min_mentions ], outputs=[ status, cluster_counts_df, domain_counts_df, sender_counts_df, actors_df, offhours_df, surv_entities_df, surv_samples_df, results_df, state_df, state_vec, state_X_reduced, state_index, state_term_names, state_use_lsa, state_use_faiss, cluster_drop, domain_drop, sender_drop, lang_drop, state_svd, state_norm, state_dims, state_extra_terms, state_highlight, bucket_drop, ], ).then( # remember the uploaded file for future "Update" runs lambda f: f, inputs=[inbox_file], outputs=[state_inbox] ) # Keep state_inbox in sync whenever a new file is uploaded inbox_file.change(lambda f: f, inputs=[inbox_file], outputs=[state_inbox]) # NEW: Bind Update button — re-run with the last uploaded file + current settings update_btn.click( process_file, inputs=[ state_inbox, max_features, min_df, max_df, use_bigrams, skip_lang, use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, trusted_domains_in, extra_keywords_in, highlight_toggle, use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, watchlist_in, min_mentions ], outputs=[ status, cluster_counts_df, domain_counts_df, sender_counts_df, actors_df, offhours_df, surv_entities_df, surv_samples_df, results_df, state_df, state_vec, state_X_reduced, state_index, state_term_names, state_use_lsa, state_use_faiss, cluster_drop, domain_drop, sender_drop, lang_drop, state_svd, state_norm, state_dims, state_extra_terms, state_highlight, bucket_drop, ], ) # -------- Filtering & Search -------- def refresh_results(df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, sort_by, sort_dir, hide_noise_flag): if df is None or len(df) == 0: return pd.DataFrame() filt = _apply_filters( df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, hide_noise_flag=bool(hide_noise_flag) ) return _sort_results(filt, sort_by, sort_dir) # Re-run when any filter control changes for ctrl in [bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise]: ctrl.change( refresh_results, inputs=[ state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise ], outputs=[results_df], ) # Reset filters reset_btn.click( lambda: ["(any)"] * 7 + [""] * 2 + ["context_anomaly_score", "desc"] + [True], [], [bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise], ).then( refresh_results, inputs=[ state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise, ], outputs=[results_df], ) # --- Search --- def search_fn(q, df, vec, X_red, index, use_lsa, use_faiss, svd, norm, sort, sdir): if not q or df is None or vec is None or index is None: return pd.DataFrame(), [] # Search ignores newsletters/alerts/noise by default mask = ~df["cluster_id"].isin([-1, -2, -3]) df_main = df[mask].reset_index(drop=True) if df_main.empty: return pd.DataFrame(), [] q_terms = _tokenize_query(q) q_vec = _vectorize_query(q, vec, list(df_main.apply(enrich_text, axis=1))) q_emb = _project_query_to_lsa(q_vec, svd, norm) if use_lsa and svd is not None and norm is not None else q_vec if q_emb is None: return pd.DataFrame(), q_terms n_req = min(50, len(df_main)) if n_req <= 0: return pd.DataFrame(), q_terms if isinstance(index, NearestNeighbors): if hasattr(index, "n_samples_fit_") and index.n_samples_fit_ <= 1: return pd.DataFrame(), q_terms dists, inds = index.kneighbors(q_emb, n_neighbors=n_req) sims = 1.0 - dists[0] results = df_main.iloc[inds[0]].copy() results["search_score"] = sims elif use_faiss and FAISS_OK and hasattr(index, "search"): D, I = index.search(q_emb.astype(np.float32), k=n_req) results = df_main.iloc[I[0]].copy() results["search_score"] = D[0] else: return pd.DataFrame(), q_terms return _sort_results(results, sort, sdir), q_terms search_btn.click( search_fn, inputs=[ search_query, state_df, state_vec, state_X_reduced, state_index, state_use_lsa, state_use_faiss, state_svd, state_norm, sort_by, sort_dir, ], outputs=[results_df, state_query_terms], ) # --- Reader selection (highlighting) --- def on_row_select(evt: gr.SelectData, table, df, term_names, q_terms, extra_terms, do_highlight): if evt.index is None or table is None or len(table) == 0 or df is None or len(df) == 0: return "" row_idx = evt.index[0] sel = table.iloc[row_idx] # Try to match the original row cand = df[ (df["subject"] == sel.get("subject")) & (df["from_email"] == sel.get("from_email")) & (df["date"] == sel.get("date")) ] if cand.empty: cand = df[df["subject"] == sel.get("subject")] if cand.empty: return "Could not find original record." row = cand.iloc[0] cid = int(row.get("cluster_id", -99)) clabel = term_names.get(cid, row.get("cluster_name")) if term_names else row.get("cluster_name") return build_highlighted_html( row, query_terms=q_terms, cluster_label=f'{row.get("bucket","Other")} / {clabel}', do_highlight=do_highlight, extra_terms=extra_terms, ) results_df.select( on_row_select, inputs=[results_df, state_df, state_term_names, state_query_terms, state_extra_terms, state_highlight], outputs=[email_view], ) # --- Click-to-filter helpers --- def on_click_filter(evt: gr.SelectData, df_sum: pd.DataFrame, col_name: str, out_comp: gr.Dropdown): if evt.index is None or df_sum is None or df_sum.empty: return gr.update() val = df_sum.iloc[evt.index[0]][col_name] return gr.update(value=val) def on_cluster_summary_select(evt: gr.SelectData, df_sum: pd.DataFrame): if evt.index is None or df_sum is None or df_sum.empty: return gr.update(), gr.update() r = df_sum.iloc[evt.index[0]] return gr.update(value=r["bucket"]), gr.update(value=r["label"]) cluster_counts_df.select( on_cluster_summary_select, [cluster_counts_df], [bucket_drop, cluster_drop] ).then( refresh_results, inputs=[ state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise ], outputs=[results_df], ) domain_counts_df.select( lambda evt, df: on_click_filter(evt, df, "from_domain", domain_drop), [domain_counts_df], [domain_drop] ).then( refresh_results, inputs=[ state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise ], outputs=[results_df], ) sender_counts_df.select( lambda evt, df: on_click_filter(evt, df, "from_email", sender_drop), [sender_counts_df], [sender_drop] ).then( refresh_results, inputs=[ state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise ], outputs=[results_df], ) if __name__ == "__main__": # Disable SSR to avoid handler arity warnings under server-side rendering demo.launch(ssr_mode=False)