| | import os, re, json, io, math, gc, uuid |
| | from datetime import datetime |
| | from typing import List, Dict, Any, Tuple, Optional |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | import pyarrow as pa |
| | import pyarrow.parquet as pq |
| |
|
| | from bs4 import BeautifulSoup |
| | import ftfy |
| | from langdetect import detect, DetectorFactory |
| | DetectorFactory.seed = 0 |
| |
|
| | import gradio as gr |
| | from tqdm import tqdm |
| |
|
| | |
| | from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer as CharTfidf |
| | from sklearn.cluster import MiniBatchKMeans |
| | from sklearn.neighbors import NearestNeighbors |
| | from sklearn.decomposition import TruncatedSVD |
| | from sklearn.preprocessing import Normalizer |
| | from sklearn.preprocessing import normalize as sk_normalize |
| | from sklearn.metrics.pairwise import cosine_similarity |
| |
|
| | |
| | from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer |
| | from scipy.sparse import csr_matrix |
| | try: |
| | import hdbscan |
| | HDBSCAN_OK = True |
| | except Exception: |
| | HDBSCAN_OK = False |
| |
|
| | |
| | try: |
| | from gensim.models import KeyedVectors |
| | GENSIM_OK = True |
| | except Exception: |
| | GENSIM_OK = False |
| |
|
| | |
| | try: |
| | from sklearn.ensemble import IsolationForest |
| | ISO_OK = True |
| | except Exception: |
| | ISO_OK = False |
| |
|
| | from scipy.sparse import hstack |
| |
|
| | |
| | try: |
| | import faiss |
| | FAISS_OK = True |
| | except Exception: |
| | FAISS_OK = False |
| |
|
| | |
| | try: |
| | from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer |
| | VADER_OK = True |
| | except Exception: |
| | VADER_OK = False |
| |
|
| | |
| | TAXONOMY = { |
| | "Lobbyist": ["lobby","lobbyist","pac","influence"], |
| | "Campaign Finance": ["donation","contribution","fundraiser","pledge","campaign finance","pac"], |
| | "Procurement": ["contract","tender","rfp","rfq","bid","invoice","vendor","purchase order","po"], |
| | "HR/Admin": ["hiring","personnel","payroll","benefits","policy","vacation","pto"], |
| | "Constituent": ["constituent","concerned citizen","my issue","complaint","community"], |
| | "Scheduling": ["schedule","meeting","appointment","calendar","invite","availability","reschedule"], |
| | "Legal": ["legal","lawsuit","intake","attorney","counsel","privileged","court","subpoena","confidential"], |
| | "IT/Security": ["password","account security","two-factor","2fa","vpn","verification code","security alert","it support"], |
| | "Newsletters/Alerts": ["newsletter","daily briefing","news update","unsubscribe","press clip","digest"], |
| | "Other": [], |
| | } |
| | |
| | LOBBY_DOMAINS = set() |
| | LEGAL_DOMAINS = set() |
| |
|
| | def _contains_any(text: str, terms: list) -> bool: |
| | if not text or not terms: return False |
| | tl = text.lower() |
| | return any(t for t in terms if t and t.lower() in tl) |
| |
|
| | def _bucket_header_bonus(row: pd.Series, bucket: str) -> float: |
| | fd = (row.get("from_domain") or "").lower() |
| | subj = (row.get("subject") or "") |
| | if bucket == "Newsletters/Alerts": |
| | return 5.0 if is_news_like(subj, row.get("body_text",""), fd) else 0.0 |
| | if bucket == "IT/Security": |
| | return 5.0 if is_notification_like(subj, row.get("body_text",""), row.get("from_email",""), fd) else 0.0 |
| | if bucket == "Constituent": |
| | return 3.0 if (fd in PERSONAL_DOMAINS) else 0.0 |
| | if bucket == "Lobbyist": |
| | return 5.0 if fd in LOBBY_DOMAINS else 0.0 |
| | if bucket == "Legal": |
| | return 5.0 if (("law" in fd) or (fd in LEGAL_DOMAINS) or ("privileged" in subj.lower())) else 0.0 |
| | if bucket == "Scheduling": |
| | body = (row.get("body_text") or "") |
| | return 3.0 if (ATTACH_NAME_RE.search(" ".join(row.get("attachments") or [])) or re.search(r"\binvitation\b|\binvite\b", subj, re.I) or re.search(r"\.ics\b", body, re.I)) else 0.0 |
| | return 0.0 |
| |
|
| | MIN_ROUTE_SCORE = 1.5 |
| | TIE_MARGIN = 1.0 |
| |
|
| | def route_email_row(row: pd.Series) -> str: |
| | text = f'{row.get("subject","")} {row.get("body_text","")}'.lower() |
| | scores: dict = {b: 0.0 for b in TAXONOMY.keys()} |
| | for b, terms in TAXONOMY.items(): |
| | if not terms: |
| | continue |
| | hits = sum(1 for t in terms if t and t.lower() in text) |
| | scores[b] += float(hits) |
| | if b in ("Lobbyist","Procurement") and any(p in text for p in SUSPECT_PHRASES): |
| | scores[b] += 1.0 |
| | for b in TAXONOMY.keys(): |
| | scores[b] += _bucket_header_bonus(row, b) |
| | best_bucket, best = max(scores.items(), key=lambda kv: kv[1]) |
| | second = sorted(scores.values(), reverse=True)[1] if len(scores) > 1 else 0.0 |
| | if best < MIN_ROUTE_SCORE or (best - second) < TIE_MARGIN: |
| | return "Other" |
| | return best_bucket |
| |
|
| | |
| | TOKEN_PATTERN = r"(?u)\b\w[\w.@-]{1,}\b" |
| | URL_RE = re.compile(r"https?://\S+|www\.\S+", re.I) |
| | QUOTE_LINE_RE = re.compile(r"^>.*$", re.M) |
| | SIG_RE = re.compile(r"\n-- ?\n", re.M) |
| | SENT_FROM_RE = re.compile(r"\nSent from my .*$", re.M) |
| | HEBREW_SENT_FROM_RE = re.compile(r"\nנשלח מה.*$", re.M) |
| | FWD_BEGIN_RE = re.compile(r"^Begin forwarded message:", re.I | re.M) |
| | FWD_MSG_RE = re.compile(r"^[-\s]*Original Message[-\s]*$", re.I | re.M) |
| | ON_WROTE_RE = re.compile(r'^\s*On .* wrote:$', re.M) |
| |
|
| | SKIP_LANGDETECT = True |
| |
|
| | |
| | SUSPECT_PHRASES = [ |
| | "off the books","cover up","kickback","bribe","under the table", |
| | "no inspection","special fee","friendly payment","confidential deal", |
| | "nobody will find out","pay to play","cash only","shell company", |
| | "bid rigging","embezzle","slush fund","false invoice","ghost employee", |
| | "contract splitting","grease payment","unreported","unrecorded", |
| | "off the record","just between us","don’t quote me on this","dont quote me on this", |
| | "we never had this conversation","keep this between us","not ethical","illegal", |
| | "grey area","gray area","write off","failed investment","they owe it to me", |
| | "let’s take this offline","lets take this offline","send to my gmail","send to my yahoo", |
| | "don’t leave a trail","dont leave a trail","call my cell","text me","don’t text me","dont text me", |
| | "tell you on the phone","talk in person","come by my office","vpn", |
| | "tax haven","off-shore account","offshore account","backdate","pull earnings forward", |
| | "delete this email","no inspection","special fees","wire instructions", |
| | ] |
| | EVASIVE_ACRO_RE = re.compile(r'\b(?:TYOP|LDL|TOL|OTR|TXT|TYL)\b', re.I) |
| |
|
| | MONEY_RE = re.compile(r'(\$|USD|EUR|ILS|NIS)\s?\d[\d,.\s]*', re.I) |
| | PHONE_RE = re.compile(r'(\+?\d{1,3}[-\s.]?)?(\(?\d{2,4}\)?[-\s.]?)?\d{3,4}[-\s.]?\d{4}') |
| | INVOICE_RE = re.compile(r'\b(invoice|inv\.\s?\d+|po\s?#?\d+|purchase order|wire)\b', re.I) |
| | COMPANY_RE = re.compile(r'\b(LLC|Ltd|Limited|Inc|GmbH|S\.A\.|S\.p\.A\.)\b') |
| | ATTACH_NAME_RE = re.compile(r'\b(agreement|contract|invoice|wire|payment|instructions|accounts?|offshore|tax|statement)\b', re.I) |
| |
|
| | OFFCHANNEL_PATTERNS = [ |
| | r"\bwhatsapp\b", r"\bsignal\b", r"\btelegram\b", r"\bwechat\b", |
| | r"send to my (gmail|yahoo|protonmail)", r"(call|text) (me|my cell)", |
| | r"take this offline", r"don.?t (text|email) (me|this)", |
| | r"\bOTR\b", r"\bTOL\b", r"\bTYOP\b", r"\bLDL\b" |
| | ] |
| | OFFCHANNEL_RE = re.compile("|".join(OFFCHANNEL_PATTERNS), re.I) |
| |
|
| | PERSONAL_DOMAINS = {"gmail.com","yahoo.com","outlook.com","hotmail.com","proton.me","protonmail.com","icloud.com","mail.ru","yandex.ru"} |
| |
|
| | NEWS_DOMAINS = {"nytimes.com","ft.com","wsj.com","bloomberg.com","reuters.com","theguardian.com","economist.com"} |
| | def is_news_like(subject: str, body: str, from_domain: str) -> bool: |
| | s = (subject or "").lower() |
| | b = (body or "").lower() |
| | fd = (from_domain or "").lower() |
| | if "unsubscribe" in b or "manage preferences" in b: return True |
| | if any(k in s for k in ["daily briefing","morning update","newsletter","top stories"]): return True |
| | if any(d in fd for d in NEWS_DOMAINS): return True |
| | return False |
| |
|
| | NOTIFY_PATTERNS = [ |
| | r"\bno[-\s]?reply\b", r"do not reply", r"security alert", r"new sign[-\s]?in", |
| | r"verification code", r"two[-\s]?factor", r"\b2fa\b", r"\botp\b", r"\bcode[:\s]", |
| | r"itunes connect", r"apple id", r"your google account", r"used (?:a )?new browser", |
| | r"unable to determine", r"reset your password", r"\balert\b", |
| | r"mailer[-\s]?daemon", r"\bpostmaster\b", r"delivery status notification", |
| | r"undeliverable", r"delivery failure", r"returned mail", r"mail delivery subsystem", |
| | r"proofpoint", r"mimecast", r"dmarc", r"\bspf\b", r"\bdkim\b", r"quarantine", |
| | r"spam digest", r"phishing", r"security gateway", r"mail[-\s]?secure|secure message" |
| | ] |
| | NOTIFY_RE = re.compile("|".join(NOTIFY_PATTERNS), re.I) |
| | def is_notification_like(subject: str, body: str, from_email: str, from_domain: str) -> bool: |
| | s = (subject or "").lower() |
| | b = (body or "").lower() |
| | fe = (from_email or "").lower() |
| | if NOTIFY_RE.search(s) or NOTIFY_RE.search(b): |
| | return True |
| | if re.search(r"noreply|no-reply|donotreply", fe): |
| | return True |
| | return False |
| |
|
| | HEB_RE = re.compile(r'[\u0590-\u05FF]') |
| | AR_RE = re.compile(r'[\u0600-\u06FF]') |
| | CYR_RE = re.compile(r'[\u0400-\u04FF]') |
| | def fast_lang_heuristic(text: str) -> str: |
| | t = text or "" |
| | if HEB_RE.search(t): return "he" |
| | if AR_RE.search(t): return "ar" |
| | if CYR_RE.search(t): return "ru" |
| | letters = sum(ch.isalpha() for ch in t) |
| | ascii_letters = sum(ch.isascii() and ch.isalpha() for ch in t) |
| | if letters and ascii_letters / max(1, letters) > 0.85: |
| | return "en" |
| | return "unknown" |
| |
|
| | CORR_LEX = { |
| | "kickback" : ["kickback","bribe","under the table","gift","cash"], |
| | "invoice_fraud" : ["false invoice","ghost employee","contract splitting","slush fund","shell company","front company"], |
| | "procurement" : ["bid rigging","tender","vendor","sole source","rfp","rfq","purchase order","po"], |
| | "money_flow" : ["wire transfer","transfer","swift","iban","routing number","account number","cash"] |
| | } |
| |
|
| | |
| | EN_STOP = { |
| | "the","of","and","to","in","is","for","on","at","with","from","by","or","as", |
| | "that","this","it","be","are","was","were","an","a","you","your","we","our","us", |
| | "re","fwd","fw","hi","hello","thanks","thank","regards","best","please","dear","mr","mrs", |
| | "message","original","forwarded","attached","attachment","confidential","notice","disclaimer", |
| | "herein","thereof","hereby","therein","regarding","subject","url","via","kind","regard","ny" |
| | } |
| | HE_STOP = {"של","על","זה","גם","אם","לא","את","אתה","אני","הוא","היא","הם","הן","כי","מה","שלום","תודה","בברכה","מצורף","הודעה","קדימה","היי"} |
| | MONTHS = { |
| | "jan","feb","mar","apr","may","jun","jul","aug","sep","sept","oct","nov","dec", |
| | "january","february","march","april","june","july","august","september", |
| | "october","november","december" |
| | } |
| | STOP_TERMS = { |
| | "div","span","nbsp","href","src","img","class","style","align","border","cid", |
| | "content","content-type","multipart","alternative","quoted","printable","utf", |
| | "windows-1255","iso-8859","us-ascii","html","plain","attachment","filename", |
| | "type","id","service","person","generated","fyi" |
| | } |
| | AUX_STOP = { |
| | "will","would","should","could","can","cant","cannot","did","do","does","done", |
| | "have","has","had","having","get","got","make","made","let","need","want", |
| | "not","dont","didnt","isnt","arent","wasnt","werent","im","youre","hes","shes", |
| | "weve","ive","theyre","its","ok","okay","pls","please","thx","thanks","regards","best", |
| | "hi","hello","dear","re","fw","fwd","via","kind" |
| | } |
| | CTA_STOP = { |
| | "click","here","unsubscribe","view","browser","mailto","reply","iphone","android", |
| | "press","link","below","above","update","newsletter","manage","preferences", |
| | "לחץ","כאן","נשלח","מה","מה-iphone","הטלפון" |
| | } |
| | TECH_META = { |
| | "quot","nbsp","cid","href","src","img","class","style","div","span","http","https", |
| | "content","content-type","multipart","alternative","quoted","printable","utf", |
| | "windows-1255","iso-8859","us-ascii","attachment","filename" |
| | } |
| | ZH_HEADER_STOP = {"发送时间","星期","星期一","星期二","星期三","星期四","星期五","星期六","星期日","转发","主题","收件人","发件人"} |
| | HE_EXTRA_STOP = {"עם","או"} |
| |
|
| | STOP_TERMS |= AUX_STOP | CTA_STOP | TECH_META | ZH_HEADER_STOP | HE_EXTRA_STOP |
| | EMAIL_LIKE_RE = re.compile(r"@|^[\w\-]+\.(com|net|org|ru|us|il|ch|co|io|uk|de|fr|it)$", re.I) |
| | YEAR_RE = re.compile(r"^(19|20)\d{2}$") |
| | NUMERIC_RE = re.compile(r"^\d+([.,:/-]\d+)*$") |
| | ONE_CHAR_RE = re.compile(r"^.$") |
| |
|
| | LONG_ALNUM_RE = re.compile(r"^[A-Za-z0-9_-]{24,}$") |
| | HEXISH_RE = re.compile(r"^(?:[A-Fa-f0-9]{8,})$") |
| | DIGIT_HEAVY_RE = re.compile(r"^(?:\D*\d){6,}\D*$") |
| | UNDERSCORE_HEAVY_RE = re.compile(r"^[A-Za-z0-9]*_[A-Za-z0-9_]*$") |
| |
|
| | STOPWORD_FOR_VEC = sorted(EN_STOP | HE_STOP | STOP_TERMS) |
| |
|
| | def _is_junk_term(t: str) -> bool: |
| | tl = (t or "").strip().lower() |
| | if not tl: return True |
| | if tl in STOP_TERMS or tl in EN_STOP or tl in HE_STOP or tl in MONTHS: return True |
| | if EMAIL_LIKE_RE.search(tl): return True |
| | if YEAR_RE.match(tl): return True |
| | if NUMERIC_RE.match(tl): return True |
| | if ONE_CHAR_RE.match(tl): return True |
| | if LONG_ALNUM_RE.match(t): return True |
| | if HEXISH_RE.match(t): return True |
| | if DIGIT_HEAVY_RE.match(t): return True |
| | if UNDERSCORE_HEAVY_RE.match(t): return True |
| | if len(t) > 40: return True |
| | return False |
| |
|
| | def _sanitize_top_terms(names: np.ndarray, idxs: np.ndarray, mean_vec: np.ndarray, want:int) -> list: |
| | ordered = idxs[np.argsort(-mean_vec[idxs])] |
| | cleaned = [] |
| | for i in ordered: |
| | term = names[i] |
| | if _is_junk_term(term): |
| | continue |
| | cleaned.append(term) |
| | if len(cleaned) >= want: |
| | break |
| | if len(cleaned) < max(2, want//2): |
| | for i in ordered: |
| | term = names[i] |
| | if EMAIL_LIKE_RE.search(term) or YEAR_RE.match(term.lower()): |
| | continue |
| | if term not in cleaned: |
| | cleaned.append(term) |
| | if len(cleaned) >= want: |
| | break |
| | return cleaned |
| |
|
| | |
| | def html_to_text(html: str) -> str: |
| | if not html: |
| | return "" |
| | soup = BeautifulSoup(html, "html.parser") |
| | for tag in soup(["script", "style"]): |
| | tag.decompose() |
| | return soup.get_text(separator="\n") |
| |
|
| | def strip_quotes_and_sigs(text: str) -> str: |
| | if not text: |
| | return "" |
| | text = QUOTE_LINE_RE.sub("", text) |
| | parts = SIG_RE.split(text) |
| | if parts: |
| | text = parts[0] |
| | text = SENT_FROM_RE.sub("", text) |
| | text = HEBREW_SENT_FROM_RE.sub("", text) |
| | cut = None |
| | for pat in (FWD_BEGIN_RE, FWD_MSG_RE, ON_WROTE_RE): |
| | m = pat.search(text) |
| | if m: |
| | idx = m.start() |
| | cut = idx if (cut is None or idx < cut) else cut |
| | if cut is not None: |
| | text = text[:cut] |
| | text = re.sub(r"\n\s*sent from my .*?$", "", text, flags=re.I|re.M) |
| | text = re.sub(r"\n\s*(נשלח מה-?iphone).*?$", "", text, flags=re.I|re.M) |
| | return text.strip() |
| |
|
| | def parse_name_email(s: str) -> Tuple[str, str]: |
| | if not s: |
| | return "", "" |
| | m = re.match(r'(?:"?([^"]*)"?\s)?<?([^<>]+@[^<>]+)>?', s) |
| | if m: |
| | return (m.group(1) or "").strip(), (m.group(2) or "").strip() |
| | return "", s.strip() |
| |
|
| | def parse_multi_emails(s: str) -> List[str]: |
| | if not s: return [] |
| | parts = re.split(r",\s*(?=[^,]*@)", s) |
| | emails = [] |
| | for p in parts: |
| | _, e = parse_name_email(p.strip()) |
| | if e: emails.append(e) |
| | return emails |
| |
|
| | def parse_email_headers(text: str) -> Tuple[Dict[str, str], str]: |
| | headers: Dict[str, str] = {} |
| | lines = (text or "").splitlines() |
| | header_pat = re.compile(r'^(From|To|Cc|CC|Bcc|Date|Subject|Subject:|To:|Cc:|Bcc:|From:):') |
| | i = 0 |
| | saw_header = False |
| | while i < len(lines): |
| | line = lines[i].rstrip("\r") |
| | stripped = line.strip() |
| | if stripped == "": |
| | i += 1 |
| | break |
| | if header_pat.match(line): |
| | saw_header = True |
| | key, rest = line.split(":", 1) |
| | key = key.strip() |
| | value = rest.strip() |
| | if value == "": |
| | j = i + 1 |
| | cont = [] |
| | while j < len(lines): |
| | nxt = lines[j].rstrip("\r") |
| | nxts = nxt.strip() |
| | if nxts == "" or header_pat.match(nxt): |
| | break |
| | if key.lower() == "subject": |
| | if FWD_BEGIN_RE.match(nxts) or FWD_MSG_RE.match(nxts) or ON_WROTE_RE.match(nxts): |
| | break |
| | if len(cont) > 0: |
| | break |
| | cont.append(nxts) |
| | j += 1 |
| | value = " ".join(cont) |
| | i = j |
| | else: |
| | i += 1 |
| | headers[key] = value |
| | continue |
| | else: |
| | if saw_header: |
| | break |
| | else: |
| | break |
| | body_text = "\n".join(lines[i:]) if i < len(lines) else "" |
| | return headers, body_text |
| |
|
| | |
| | def normalize_email_record(raw: Dict[str, Any], use_langdetect: bool) -> Dict[str, Any]: |
| | if str(raw.get("type", "")).lower() == "meta": |
| | return {} |
| |
|
| | attach_names = [] |
| | atts = raw.get("attachments") or raw.get("Attachments") or raw.get("files") or [] |
| | if isinstance(atts, list): |
| | for a in atts: |
| | if isinstance(a, dict): |
| | name = a.get("filename") or a.get("name") or "" |
| | else: |
| | name = str(a) |
| | if name: |
| | attach_names.append(str(name)) |
| |
|
| | body_text_raw = raw.get("body_text") or raw.get("text") or "" |
| | html_content = raw.get("body_html") or raw.get("html") or "" |
| | if html_content and not body_text_raw: |
| | body_text_raw = html_to_text(html_content) |
| | body_text_raw = ftfy.fix_text(body_text_raw or "") |
| |
|
| | subject_text = "" |
| | from_name = from_email = from_domain = "" |
| | to_emails: List[str] = [] |
| | date_val = raw.get("date") or raw.get("Date") or "" |
| |
|
| | if body_text_raw: |
| | headers, body_only = parse_email_headers(body_text_raw) |
| | subject_text = headers.get("Subject", "") or raw.get("subject") or raw.get("Subject") or "" |
| | sender = headers.get("From", "") or raw.get("from") or raw.get("From") or "" |
| | date_val = headers.get("Date", "") or date_val |
| | to_emails = parse_multi_emails(headers.get("To","") or (raw.get("to") or "")) + \ |
| | parse_multi_emails(headers.get("Cc","") or (raw.get("cc") or "")) |
| |
|
| | body_clean = strip_quotes_and_sigs(ftfy.fix_text(body_only or "")) |
| | body_clean = URL_RE.sub(" URL ", body_clean) |
| | body_clean = re.sub(r"\s+", " ", body_clean).strip() |
| | body_text = body_clean |
| |
|
| | from_name, from_email = parse_name_email(sender) |
| | from_domain = from_email.split("@")[-1].lower() if "@" in from_email else "" |
| | else: |
| | subject_text = ftfy.fix_text(raw.get("subject") or raw.get("Subject") or "").strip() |
| | body_text = ftfy.fix_text(raw.get("body_text") or raw.get("text") or "") |
| | body_text = URL_RE.sub(" URL ", body_text) |
| | body_text = strip_quotes_and_sigs(body_text) |
| | body_text = re.sub(r"\s+", " ", body_text).strip() |
| | sender = raw.get("from") or raw.get("From") or "" |
| | from_name, from_email = parse_name_email(sender) |
| | from_domain = from_email.split("@")[-1].lower() if "@" in from_email else "" |
| | to_emails = parse_multi_emails(raw.get("to") or "") + parse_multi_emails(raw.get("cc") or "") |
| |
|
| | subject_norm = re.sub(r"\s+", " ", subject_text or "").strip() |
| |
|
| | if use_langdetect: |
| | try: |
| | lang = detect((subject_norm + " " + body_text[:5000]).strip()) if (subject_norm or body_text) else "unknown" |
| | except Exception: |
| | lang = fast_lang_heuristic(subject_norm + " " + (body_text or "")) |
| | else: |
| | lang = fast_lang_heuristic(subject_norm + " " + (body_text or "")) |
| |
|
| | iso_date = "" |
| | if isinstance(date_val, (int, float)): |
| | try: |
| | iso_date = pd.to_datetime(int(date_val), unit="s", utc=True).isoformat() |
| | except Exception: |
| | iso_date = "" |
| | elif isinstance(date_val, str) and date_val: |
| | iso_date = pd.to_datetime(date_val, utc=True, errors="coerce").isoformat() |
| |
|
| | msg_id = raw.get("message_id") or raw.get("Message-ID") or "" |
| | if not msg_id: |
| | msg_id = f"gen-{uuid.uuid4().hex}" |
| |
|
| | thread_key = subject_norm or (from_email + body_text[:120]) |
| | thread_id = str(pd.util.hash_pandas_object(pd.Series([thread_key], dtype="string")).astype("uint64").iloc[0]) |
| | text_hash = str(pd.util.hash_pandas_object(pd.Series([body_text], dtype="string")).astype("uint64").iloc[0]) if body_text else "" |
| |
|
| | return { |
| | "message_id": str(msg_id), |
| | "thread_id": thread_id, |
| | "date": iso_date, |
| | "from_name": from_name, |
| | "from_email": from_email, |
| | "from_domain": from_domain, |
| | "to_emails": to_emails, |
| | "subject": subject_norm, |
| | "body_text": body_text, |
| | "lang": lang, |
| | "attachments": attach_names, |
| | "text_hash": text_hash, |
| | } |
| |
|
| | def has_suspect_tag(text: str) -> List[str]: |
| | tags = [] |
| | if not text: |
| | return tags |
| | low = text.lower() |
| | for phrase in SUSPECT_PHRASES: |
| | if phrase in low: |
| | tags.append("🚩suspect") |
| | break |
| | if "invoice" in low or "payment" in low or "contract" in low: |
| | tags.append("finance") |
| | if "wire" in low or "transfer" in low or "cash" in low: |
| | if "finance" not in tags: |
| | tags.append("finance") |
| | if OFFCHANNEL_RE.search(low) or EVASIVE_ACRO_RE.search(low): |
| | tags.append("off-channel") |
| | return tags |
| |
|
| | def compute_sentiment_column(df: pd.DataFrame) -> pd.DataFrame: |
| | if not VADER_OK: |
| | df["sentiment_score"] = np.nan |
| | df["sentiment"] = "(unknown)" |
| | return df |
| | analyzer = SentimentIntensityAnalyzer() |
| | scores = df["body_text"].fillna("").map(lambda t: analyzer.polarity_scores(t)["compound"]) |
| | bins = [-1.01, -0.05, 0.05, 1.01] |
| | labels = ["negative", "neutral", "positive"] |
| | df["sentiment_score"] = scores |
| | df["sentiment"] = pd.cut(df["sentiment_score"], bins=bins, labels=labels, include_lowest=True) |
| | return df |
| |
|
| | def _compile_highlight_terms(row: pd.Series, extra_terms: List[str]) -> List[str]: |
| | terms = [] |
| | txt = (row.get("subject","") + " " + row.get("body_text","")).lower() |
| | for p in SUSPECT_PHRASES: |
| | if p in txt: |
| | terms.append(p) |
| | if MONEY_RE.search(txt): terms.append("$") |
| | if INVOICE_RE.search(txt): terms.append("invoice") |
| | if PHONE_RE.search(row.get("body_text","") or ""): terms.append("phone") |
| | for t in extra_terms or []: |
| | t=t.strip() |
| | if t and t.lower() in txt: |
| | terms.append(t) |
| | out, seen = [], set() |
| | for t in terms: |
| | if t.lower() not in seen: |
| | out.append(t); seen.add(t.lower()) |
| | return out[:24] |
| |
|
| | def build_highlighted_html(row: pd.Series, query_terms: Optional[List[str]] = None, |
| | cluster_label: Optional[str] = None, |
| | do_highlight: bool = True, |
| | extra_terms: Optional[List[str]] = None) -> str: |
| | subject = (row.get("subject") or "").strip() |
| | body = (row.get("body_text") or "").strip() |
| | from_email = row.get("from_email") or "" |
| | date = row.get("date") or "" |
| | tags = row.get("tags") or [] |
| | flags = row.get("flags") or [] |
| | sentiment = row.get("sentiment") or "(unknown)" |
| |
|
| | hl_terms = [] |
| | if do_highlight: |
| | hl_terms = (query_terms or []) + _compile_highlight_terms(row, extra_terms or []) |
| | seen=set(); uniq=[] |
| | for t in hl_terms: |
| | tl=t.lower() |
| | if tl and tl not in seen: |
| | uniq.append(t); seen.add(tl) |
| | hl_terms = uniq[:24] |
| |
|
| | def hi(text: str) -> str: |
| | if not text or not do_highlight or not hl_terms: |
| | return text |
| | out = text |
| | for qt in hl_terms: |
| | if not qt: |
| | continue |
| | try: |
| | pat = re.compile(re.escape(qt), re.I) |
| | out = pat.sub(lambda m: f"<mark>{m.group(0)}</mark>", out) |
| | except Exception: |
| | pass |
| | return out |
| |
|
| | subject_h = hi(subject) |
| | body_h = hi(body) |
| |
|
| | rtl = bool(re.search(r"[\u0590-\u08FF]", body_h)) |
| | dir_attr = ' dir="rtl"' if rtl else "" |
| | body_html = body_h.replace("\n", "<br/>") |
| |
|
| | def pill(s, cls="tag"): |
| | return f'<span class="{cls}">{s}</span>' |
| |
|
| | tag_html = "" |
| | pills = [] |
| | if isinstance(tags, list) and tags: |
| | pills += [pill(t, "tag") for t in tags] |
| | if isinstance(flags, list) and flags: |
| | pills += [pill(f, "tag") for f in flags] |
| | if pills: |
| | tag_html = " ".join(pills) |
| |
|
| | cluster_html = f'<span class="cluster-pill">{cluster_label or ""}</span>' if cluster_label else "" |
| |
|
| | html = ( |
| | f'<div class="email-card">' |
| | f' <div class="email-header">' |
| | f' <div>' |
| | f' <div class="subject">{subject_h or "(no subject)"}</div>' |
| | f' <div class="meta">From: <b>{from_email}</b> • Date: {date or "—"}</div>' |
| | f' </div>' |
| | f' <div class="badges">' |
| | f' {cluster_html}' |
| | f' <span class="sentiment">sentiment: <b>{sentiment}</b></span>' |
| | f' {tag_html}' |
| | f' </div>' |
| | f' </div>' |
| | f' <div class="email-body"{dir_attr}>' |
| | f' {body_html}' |
| | f' </div>' |
| | f'</div>' |
| | ) |
| | return html |
| |
|
| | |
| | def _load_embeddings(emb_path: str, binary: bool): |
| | if not GENSIM_OK or not emb_path or not os.path.exists(emb_path): |
| | return None, 0 |
| | try: |
| | if binary: |
| | kv = KeyedVectors.load_word2vec_format(emb_path, binary=True) |
| | else: |
| | kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=False) |
| | return kv, int(kv.vector_size) |
| | except Exception: |
| | try: |
| | kv = KeyedVectors.load_word2vec_format(emb_path, binary=False, no_header=True) |
| | return kv, int(kv.vector_size) |
| | except Exception: |
| | return None, 0 |
| |
|
| | def _avg_embed_for_text(text: str, kv, dim: int) -> np.ndarray: |
| | vec = np.zeros((dim,), dtype=np.float32) |
| | if not kv or not text: |
| | return vec |
| | toks = re.findall(TOKEN_PATTERN, text.lower()) |
| | cnt = 0 |
| | for t in toks: |
| | if t in kv: |
| | vec += kv[t] |
| | cnt += 1 |
| | if cnt > 0: |
| | vec /= float(cnt) |
| | n = np.linalg.norm(vec) |
| | if n > 0: |
| | vec /= n |
| | return vec |
| |
|
| | def _build_doc_embeddings(texts: List[str], kv, dim: int) -> np.ndarray: |
| | if not kv or dim <= 0: |
| | return np.zeros((len(texts), 0), dtype=np.float32) |
| | out = np.zeros((len(texts), dim), dtype=np.float32) |
| | for i, t in enumerate(texts): |
| | out[i, :] = _avg_embed_for_text(t or "", kv, dim) |
| | return out |
| |
|
| | |
| | class BM25Transformer: |
| | def __init__(self, k1=1.2, b=0.75): |
| | self.k1 = k1 |
| | self.b = b |
| | self.idf_ = None |
| | self.avgdl_ = None |
| |
|
| | def fit(self, X): |
| | N = X.shape[0] |
| | df = np.bincount(X.tocsc().indices, minlength=X.shape[1]).astype(np.float64) |
| | self.idf_ = np.log((N - df + 0.5) / (df + 0.5 + 1e-12)) |
| | dl = np.asarray(X.sum(axis=1)).ravel() |
| | self.avgdl_ = float(dl.mean() if dl.size else 1.0) |
| | return self |
| |
|
| | def transform(self, X): |
| | X = X.tocsr(copy=True).astype(np.float32) |
| | dl = np.asarray(X.sum(axis=1)).ravel() |
| | k1, b, avgdl = self.k1, self.b, self.avgdl_ |
| | rows, cols = X.nonzero() |
| | data = X.data |
| | for i in range(len(data)): |
| | tf = data[i] |
| | d = rows[i] |
| | denom = tf + k1 * (1 - b + b * (dl[d] / (avgdl + 1e-12))) |
| | data[i] = (self.idf_[cols[i]] * (tf * (k1 + 1))) / (denom + 1e-12) |
| | return X |
| |
|
| | def enrich_text(row: pd.Series) -> str: |
| | subj = row.get("subject","") or "" |
| | body = row.get("body_text","") or "" |
| | t = subj + "\n\n" + body |
| | tokens = [] |
| | if MONEY_RE.search(t): tokens.append("__HAS_MONEY__") |
| | if PHONE_RE.search(t): tokens.append("__HAS_PHONE__") |
| | if INVOICE_RE.search(t): tokens.append("__HAS_INVOICE__") |
| | if COMPANY_RE.search(t): tokens.append("__HAS_COMPANY__") |
| | if OFFCHANNEL_RE.search(t): tokens.append("__OFF_CHANNEL__") |
| | lang_tok = f'__LANG_{(row.get("lang") or "unk").lower()}__' |
| | tokens.append(lang_tok) |
| | return (t + " " + " ".join(tokens)).strip() |
| |
|
| | |
| | def cluster_labels_pmi_bigram( |
| | texts, |
| | labels, |
| | subjects=None, |
| | topn=6, |
| | subject_alpha=0.75, |
| | global_ubiq_cut=0.20, |
| | subject_min_cov=0.30 |
| | ): |
| | import math as _math |
| | from collections import Counter, defaultdict |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| |
|
| | HEADER_STOP = {"subject","re","fw","fwd","to","cc","bcc","from","sent","forwarded", |
| | "回复","主题","收件人","发件人"} |
| |
|
| | def is_junk_token(tok: str) -> bool: |
| | if _is_junk_term(tok): return True |
| | tl = tok.lower() |
| | if tl.startswith("__"): return True |
| | if "@" in tl: return True |
| | if tl.isascii() and len(tl) <= 2: return True |
| | if LONG_ALNUM_RE.match(tok) or HEXISH_RE.match(tok) or DIGIT_HEAVY_RE.match(tok): return True |
| | if len(tok) > 40: return True |
| | if re.search(r"[^\w\-’']", tl): return True |
| | return False |
| |
|
| | def tokenize_clean(t): |
| | toks = re.findall(TOKEN_PATTERN, (t or "").lower()) |
| | return [w for w in toks if not is_junk_token(w)] |
| |
|
| | def ngrams(toks, n): |
| | return [" ".join(p) for p in zip(*[toks[i:] for i in range(n)]) if all(not is_junk_token(x) for x in p)] |
| |
|
| | glob_df_uni = Counter() |
| | glob_df_bg = Counter() |
| | glob_df_tri = Counter() |
| | per_c_bg = defaultdict(Counter) |
| | per_c_tri = defaultdict(Counter) |
| | per_c_texts = defaultdict(list) |
| | per_c_doc_count = defaultdict(int) |
| | per_c_subj_uni_docs = defaultdict(Counter) |
| | per_c_subj_bg_docs = defaultdict(Counter) |
| | per_c_subj_tri_docs = defaultdict(Counter) |
| |
|
| | have_subjects = subjects is not None and len(subjects) == len(texts) |
| |
|
| | for idx, (txt, c) in enumerate(zip(texts, labels)): |
| | c = int(c) |
| | toks = tokenize_clean(txt) |
| | uni_set = set(toks) |
| | bg_set = set(ngrams(toks, 2)) |
| | tri_set = set(ngrams(toks, 3)) |
| | glob_df_uni.update(uni_set) |
| | glob_df_bg.update(bg_set) |
| | glob_df_tri.update(tri_set) |
| | per_c_bg[c].update(bg_set) |
| | per_c_tri[c].update(tri_set) |
| | per_c_texts[c].append(" ".join(toks)) |
| | per_c_doc_count[c] += 1 |
| | if have_subjects: |
| | stoks = tokenize_clean(subjects[idx] or "") |
| | s_uni = set(stoks) |
| | s_bg = set(ngrams(stoks, 2)) |
| | s_tri = set(ngrams(stoks, 3)) |
| | per_c_subj_uni_docs[c].update(s_uni) |
| | per_c_subj_bg_docs[c].update(s_bg) |
| | per_c_subj_tri_docs[c].update(s_tri) |
| |
|
| | N = max(1, len(texts)) |
| |
|
| | def too_ubiquitous(df_count): |
| | return (df_count / float(N)) > float(global_ubiq_cut) |
| |
|
| | labels_out = {} |
| | for c in sorted(set(int(x) for x in labels)): |
| | n_docs_c = max(1, per_c_doc_count[c]) |
| | phrases = [] |
| | for store, glob_df, subj_docs, n in ( |
| | (per_c_bg[c], glob_df_bg, per_c_subj_bg_docs[c], 2), |
| | (per_c_tri[c], glob_df_tri, per_c_subj_tri_docs[c], 3), |
| | ): |
| | total_c = sum(store.values()) + 1e-12 |
| | total_g = sum(glob_df.values()) + 1e-12 |
| | scored = [] |
| | for ng, cnt in store.most_common(3000): |
| | if too_ubiquitous(glob_df[ng]): |
| | continue |
| | p_ng_c = cnt / total_c |
| | p_ng_g = (glob_df[ng] / total_g) |
| | if p_ng_c > 0 and p_ng_g > 0: |
| | score = _math.log(p_ng_c) - _math.log(p_ng_g) |
| | cov = 0.0 |
| | if have_subjects: |
| | cov = subj_docs[ng] / n_docs_c |
| | if cov >= subject_min_cov: |
| | score += 0.6 |
| | score += subject_alpha * cov |
| | scored.append((score, cov, ng)) |
| | scored.sort(key=lambda x: (x[1] >= subject_min_cov, x[0]), reverse=True) |
| | take = max(1, topn // (3 if n == 3 else 2)) |
| | phrases.extend([p for _, _, p in scored[:take]]) |
| |
|
| | docs_c = [" ".join(per_c_texts[c])] if per_c_texts[c] else [" "] |
| | docs_bg = [" ".join(sum((per_c_texts[k] for k in per_c_texts if k != c), [])) or " "] |
| | corpus = [docs_c[0], docs_bg[0]] |
| | vec = TfidfVectorizer( |
| | analyzer="word", ngram_range=(1,1), |
| | max_features=3000, token_pattern=TOKEN_PATTERN, lowercase=True |
| | ) |
| | X = vec.fit_transform(corpus) |
| | vocab = np.array(sorted(vec.vocabulary_, key=lambda k: vec.vocabulary_[k])) |
| | row = X[0].toarray().ravel() |
| |
|
| | subj_cov = np.zeros_like(row) |
| | subj_cov_frac = np.zeros_like(row) |
| | vocab_index = {t:i for i,t in enumerate(vocab)} |
| | if have_subjects: |
| | for tok, cnt_docs in per_c_subj_uni_docs[c].items(): |
| | if tok in vocab_index and not _is_junk_term(tok): |
| | i = vocab_index[tok] |
| | frac = cnt_docs / n_docs_c |
| | subj_cov[i] = frac |
| | subj_cov_frac[i] = frac |
| |
|
| | row_boosted = row + subject_alpha * subj_cov |
| | pref_bump = (subj_cov_frac >= subject_min_cov).astype(row_boosted.dtype) * 0.6 |
| | final = row_boosted + pref_bump |
| |
|
| | order = final.argsort()[::-1] |
| | unis = [] |
| | for i in order: |
| | tok = vocab[i] |
| | if _is_junk_term(tok): continue |
| | if too_ubiquitous(glob_df_uni.get(tok, 0)): |
| | continue |
| | unis.append(tok) |
| | if len(unis) >= max(0, topn - len(phrases)): |
| | break |
| |
|
| | parts = (phrases + unis)[:max(2, topn)] |
| | labels_out[c] = ", ".join(parts) if parts else f"cluster_{c}" |
| |
|
| | return labels_out |
| |
|
| | |
| | def choose_k_by_kneedle(X, ks=(50,100,150,200,300,400,500)): |
| | n = X.shape[0] |
| | if n <= 1: |
| | return 1, {1: 0.0} |
| | if n < min(ks): |
| | k_small = max(2, min(10, n)) |
| | return int(k_small), {int(k_small): 0.0} |
| |
|
| | if n > 40000: |
| | rs = np.random.RandomState(0) |
| | idx = rs.choice(n, size=40000, replace=False) |
| | Xs = X[idx] |
| | else: |
| | Xs = X |
| | inertias = [] |
| | for k in ks: |
| | k = int(k) |
| | if n < k: |
| | break |
| | km = MiniBatchKMeans(n_clusters=k, batch_size=4096, random_state=0, n_init="auto") |
| | km.fit(Xs) |
| | inertias.append(km.inertia_) |
| | if not inertias: |
| | k_small = max(2, min(10, n)) |
| | return int(k_small), {int(k_small): 0.0} |
| | x = np.array(list(ks)[:len(inertias)], dtype=float) |
| | y = np.array(inertias, dtype=float) |
| | y_norm = (y - y.min()) / (y.max() - y.min() + 1e-9) |
| | x_norm = (x - x.min()) / (x.max() - x.min() + 1e-9) |
| | chord = y_norm[0] + (y_norm[-1] - y_norm[0]) * (x_norm - x_norm[0])/(x_norm[-1]-x_norm[0]+1e-9) |
| | dist = chord - y_norm |
| | k_best = int(x[np.argmax(dist)]) |
| | return k_best, dict(zip(x.astype(int), inertias)) |
| |
|
| | def auto_k_rule(n_docs: int) -> int: |
| | return int(max(120, min(600, math.sqrt(max(n_docs, 1) / 50.0) * 110))) |
| |
|
| | def merge_close_clusters(labels, centers, thresh=0.92): |
| | centers = sk_normalize(centers) |
| | sim = cosine_similarity(centers, centers) |
| | k = centers.shape[0] |
| | parent = list(range(k)) |
| | def find(a): |
| | while parent[a]!=a: a=parent[a] |
| | return a |
| | for i in range(k): |
| | for j in range(i+1, k): |
| | if sim[i,j] >= thresh: |
| | pi, pj = find(i), find(j) |
| | if pi!=pj: parent[pj]=pi |
| | root = {i:find(i) for i in range(k)} |
| | idmap, new_id = {}, 0 |
| | for i in range(k): |
| | r = root[i] |
| | if r not in idmap: |
| | idmap[r] = new_id |
| | new_id += 1 |
| | labels2 = np.array([idmap[root[int(c)]] for c in labels], dtype=int) |
| | return labels2 |
| |
|
| | def seeded_centroids_in_lsa(lexicons: Dict[str, List[str]], count_vec: CountVectorizer, |
| | lsa_components: np.ndarray, norm_obj: Normalizer, |
| | d_word: int, d_full: int, k: int) -> Optional[np.ndarray]: |
| | seeds_word = [] |
| | vocab = count_vec.vocabulary_ |
| | for _, words in lexicons.items(): |
| | idxs = [vocab.get(w.lower()) for w in words if vocab.get(w.lower()) is not None] |
| | if not idxs: |
| | continue |
| | v = np.zeros((d_word,), dtype=np.float32) |
| | v[idxs] = 1.0 |
| | n = np.linalg.norm(v) |
| | if n > 0: |
| | v /= n |
| | seeds_word.append(v) |
| | if not seeds_word: |
| | return None |
| | seeds_full = [] |
| | for v in seeds_word: |
| | vf = np.zeros((d_full,), dtype=np.float32) |
| | vf[:d_word] = v |
| | seeds_full.append(vf) |
| | seeds_full = np.stack(seeds_full, axis=0) |
| | seeds_red = seeds_full @ lsa_components.T |
| | seeds_red = norm_obj.transform(seeds_red.astype(np.float32)) |
| | if seeds_red.shape[0] >= 2 and seeds_red.shape[0] <= k: |
| | return seeds_red |
| | return None |
| |
|
| | |
| | def _centroids_from_labels(X, labels): |
| | labs = np.asarray(labels, dtype=int) |
| | uniq = np.unique(labs) |
| | cents = {} |
| | if isinstance(X, np.ndarray): |
| | for c in uniq: |
| | idx = (labs == c) |
| | if not np.any(idx): continue |
| | v = X[idx].mean(axis=0) |
| | n = np.linalg.norm(v) |
| | if n > 0: v = v / n |
| | cents[int(c)] = v.astype(np.float32) |
| | return cents |
| | X = X.tocsr() |
| | for c in uniq: |
| | rows = np.where(labs == c)[0] |
| | if rows.size == 0: continue |
| | sub = X[rows] |
| | v = np.asarray(sub.mean(axis=0)).ravel() |
| | n = np.linalg.norm(v) |
| | if n > 0: v = v / n |
| | cents[int(c)] = v.astype(np.float32) |
| | return cents |
| |
|
| | def _cosine_sim_to_centroids(vecs, centroids): |
| | if not centroids: |
| | return None, None |
| | keys = list(centroids.keys()) |
| | C = np.stack([centroids[k] for k in keys], axis=0) |
| | if isinstance(vecs, np.ndarray): |
| | sims = vecs @ C.T |
| | else: |
| | sims = vecs.dot(C.T) |
| | if hasattr(sims, "toarray"): sims = sims.toarray() |
| | best_idx = np.argmax(sims, axis=1) |
| | best_lab = np.array([keys[i] for i in best_idx], dtype=int) |
| | best_sim = sims[np.arange(sims.shape[0]), best_idx] |
| | return best_lab, best_sim |
| |
|
| | def stabilize_labels(X_space, labels, min_size=40, merge_thresh=0.96, reassign_thresh=0.35): |
| | labs = np.asarray(labels, dtype=int) |
| | cents = _centroids_from_labels(X_space, labs) |
| | keys = sorted([k for k in cents.keys() if k >= 0]) |
| | if len(keys) >= 2: |
| | C = np.stack([cents[k] for k in keys], axis=0) |
| | sims = C @ C.T |
| | parent = {k:k for k in keys} |
| | def find(a): |
| | while parent[a]!=a: |
| | a = parent[a] |
| | return a |
| | for i in range(len(keys)): |
| | for j in range(i+1, len(keys)): |
| | if sims[i,j] >= float(merge_thresh): |
| | ri, rj = find(keys[i]), find(keys[j]) |
| | if ri != rj: |
| | parent[rj] = ri |
| | root = {k: find(k) for k in keys} |
| | merge_map = {k: root[k] for k in keys} |
| | labs = np.array([merge_map.get(int(c), int(c)) for c in labs], dtype=int) |
| | cents = _centroids_from_labels(X_space, labs) |
| |
|
| | vc = pd.Series(labs).value_counts() |
| | big_labs = set(vc[vc >= int(min_size)].index.tolist()) |
| | small_labs = set(vc[vc < int(min_size)].index.tolist()) |
| | big_cents = {c: cents[c] for c in big_labs if c in cents and c >= 0} |
| |
|
| | NOISE_ID = -3 |
| | if small_labs and big_cents: |
| | idx_small = np.where(pd.Series(labs).isin(small_labs))[0] |
| | if idx_small.size > 0: |
| | sub = X_space[idx_small] if not isinstance(X_space, np.ndarray) else X_space[idx_small] |
| | best_lab, best_sim = _cosine_sim_to_centroids(sub, big_cents) |
| | reassigned = np.where(best_sim >= float(reassign_thresh), best_lab, NOISE_ID) |
| | labs[idx_small] = reassigned |
| |
|
| | return labs |
| |
|
| | |
| | def _hour_of(dt_iso: str) -> Optional[int]: |
| | try: |
| | if not dt_iso: return None |
| | dt = pd.to_datetime(dt_iso, utc=True, errors="coerce") |
| | if pd.isna(dt): return None |
| | return int(dt.hour) |
| | except Exception: |
| | return None |
| |
|
| | def _attachment_flags(names: List[str]) -> List[str]: |
| | flags=[] |
| | for n in names or []: |
| | if ATTACH_NAME_RE.search(n): |
| | flags.append("📎 " + n[:40]) |
| | return flags[:5] |
| |
|
| | def corruption_score(row, trusted_domains: set): |
| | score = 0.0 |
| | txt = f'{row.get("subject","")} {row.get("body_text","")}'.lower() |
| | for ph in SUSPECT_PHRASES: |
| | if ph in txt: |
| | score += 2.0 |
| | break |
| | if EVASIVE_ACRO_RE.search(txt) or OFFCHANNEL_RE.search(txt): |
| | score += 1.0 |
| | if isinstance(row.get("tags"), list) and ("🚩suspect" in row["tags"] or "finance" in row["tags"]): |
| | score += 1.5 |
| | if MONEY_RE.search(txt): score += 0.7 |
| | if INVOICE_RE.search(txt): score += 0.7 |
| | if str(row.get("sentiment","")) == "negative": |
| | score += 0.3 |
| | body_len = len(row.get("body_text","")) |
| | if body_len < 160 and PHONE_RE.search(row.get("body_text","") or ""): |
| | score += 0.5 |
| | fd = (row.get("from_domain") or "").lower() |
| | if fd in PERSONAL_DOMAINS and fd not in trusted_domains: |
| | score += 0.5 |
| | h = _hour_of(row.get("date") or "") |
| | if h is not None and (h < 6 or h > 22): |
| | score += 0.3 |
| | return score |
| |
|
| | def compute_context_anomaly(df_in: pd.DataFrame) -> pd.DataFrame: |
| | if df_in.empty: |
| | df_in["context_anomaly_score"] = 0.0 |
| | return df_in |
| |
|
| | df = df_in.copy() |
| | if "anomaly_score" in df.columns: |
| | df["_if_pct"] = 0.0 |
| | for bkt, grp in df.groupby("bucket", dropna=False): |
| | vals = grp["anomaly_score"].astype(float) |
| | if vals.notna().sum() >= 5: |
| | ranks = vals.rank(pct=True, ascending=False) |
| | df.loc[grp.index, "_if_pct"] = ranks.clip(0, 1) |
| | df["_if_pts"] = (df["_if_pct"] * 6.0).clip(0, 6) |
| | else: |
| | df["_if_pts"] = 0.0 |
| |
|
| | df["_rule_pts"] = 0.0 |
| | low = (df["subject"].fillna("") + " " + df["body_text"].fillna("")).str.lower() |
| | for bkt, terms in TAXONOMY.items(): |
| | mask = (df["bucket"] == bkt) |
| | if not mask.any(): |
| | continue |
| | if terms: |
| | has_term = low.str.contains("|".join([re.escape(t.lower()) for t in terms]), regex=True) |
| | df.loc[mask & (~has_term), "_rule_pts"] += 1.0 |
| | if bkt == "Constituent": |
| | df.loc[mask & (~df["from_domain"].str.lower().isin(PERSONAL_DOMAINS)), "_rule_pts"] += 1.0 |
| | if bkt == "Scheduling": |
| | subj = df.loc[mask, "subject"].fillna("").str.lower() |
| | df.loc[mask & (~subj.str.contains(r"\bmeeting|invite|schedule|calendar\b", regex=True)), "_rule_pts"] += 1.0 |
| |
|
| | df["_rule_pts"] = df["_rule_pts"].clip(0, 2) |
| | df["_corr_pts"] = df["corruption_score"].fillna(0).clip(0, 3) |
| |
|
| | df["context_anomaly_score"] = (df["_if_pts"] + df["_rule_pts"] + df["_corr_pts"]).clip(0, 10) |
| | return df.drop(columns=["_if_pct","_if_pts","_rule_pts","_corr_pts"], errors="ignore") |
| |
|
| | |
| | def _bucket_k_multiplier(bucket_name: str) -> float: |
| | b = (bucket_name or "").lower() |
| | if b in ("constituent",): return 1.25 |
| | if b in ("procurement", "campaign finance", "receipts/billing", "lobbyist"): return 1.15 |
| | if b in ("scheduling", "other"): return 1.00 |
| | if b in ("legal",): return 0.80 |
| | return 1.00 |
| |
|
| | def _bucket_stabilizer_params(bucket_name: str) -> Tuple[int, float, float]: |
| | b = (bucket_name or "").lower() |
| | if b == "legal": return (30, 0.97, 0.38) |
| | if b == "procurement": return (35, 0.96, 0.36) |
| | if b == "campaign finance": return (35, 0.96, 0.36) |
| | if b == "constituent": return (40, 0.95, 0.33) |
| | if b == "receipts/billing": return (40, 0.95, 0.35) |
| | if b == "scheduling": return (35, 0.95, 0.35) |
| | return (40, 0.96, 0.35) |
| |
|
| | |
| | def _normalize_label_tokens(label: str) -> set: |
| | if not label: return set() |
| | txt = str(label).lower() |
| | toks = re.findall(r"[a-z\u0590-\u05FF][a-z\u0590-\u05FF\-']{1,}", txt) |
| | toks2 = [t[:-1] if len(t) > 3 and t.endswith("s") else t for t in toks] |
| | return {t for t in toks2 if t not in STOP_TERMS and t not in EN_STOP and t not in HE_STOP and len(t) >= 2} |
| |
|
| | def _jaccard(a: set, b: set) -> float: |
| | if not a or not b: return 0.0 |
| | inter = len(a & b) |
| | if inter == 0: return 0.0 |
| | return inter / float(len(a | b)) |
| |
|
| | def dedupe_cluster_labels_in_bucket(df: pd.DataFrame, bucket: str, sim_thresh: float = 0.72) -> pd.DataFrame: |
| | sel = df[df["bucket"] == bucket].copy() |
| | if sel.empty or "cluster_name" not in sel.columns: |
| | return df |
| |
|
| | names = sel[["cluster_id", "cluster_name"]].drop_duplicates() |
| | tokens = {int(cid): _normalize_label_tokens(str(name)) for cid, name in names.values} |
| | ids = list(tokens.keys()) |
| |
|
| | parent = {i: i for i in ids} |
| | def find(i): |
| | while parent[i] != i: i = parent[i] |
| | return i |
| | def union(a, b): |
| | ra, rb = find(a), find(b) |
| | if ra != rb: parent[rb] = ra |
| |
|
| | for i in range(len(ids)): |
| | for j in range(i+1, len(ids)): |
| | if _jaccard(tokens[ids[i]], tokens[ids[j]]) >= sim_thresh: |
| | union(ids[i], ids[j]) |
| |
|
| | names_map = dict(names.values) |
| | comp_to_canon = {} |
| | for cid in ids: |
| | root = find(cid) |
| | comp_to_canon.setdefault(root, []) |
| | comp_to_canon[root].append((cid, names_map.get(cid, ""))) |
| |
|
| | canon_for_cluster = {} |
| | for root, items in comp_to_canon.items(): |
| | best = max(items, key=lambda kv: (len(kv[1] or ""), kv[1])) |
| | for cid, _ in items: |
| | canon_for_cluster[cid] = best[1] |
| |
|
| | df.loc[sel.index, "cluster_name"] = sel["cluster_id"].map(lambda c: canon_for_cluster.get(int(c), names_map.get(int(c), ""))) |
| | return df |
| |
|
| | def dedupe_all_labels(df: pd.DataFrame) -> pd.DataFrame: |
| | out = df |
| | for bkt in sorted(df["bucket"].dropna().unique()): |
| | out = dedupe_cluster_labels_in_bucket(out, bkt, sim_thresh=0.72) |
| | return out |
| |
|
| | |
| | SURV_KEYWORDS = [ |
| | "daily report","daily brief","briefing","sitreps","sitrep","situation report","summary", |
| | "dossier","monitoring","tracking","watchlist","watch list","profile","surveillance", |
| | "intel","intelligence","osint","open source intel","clippings","press clips","digest", |
| | "alert","alerting","dispatch","bulletin","roundup","update" |
| | ] |
| | SURV_RE = re.compile("|".join([re.escape(k) for k in SURV_KEYWORDS]), re.I) |
| |
|
| | SUBJ_NUM_RE = re.compile(r"\b\d{1,4}([,./-]\d{1,4})*\b") |
| | SUBJ_DATE_RE = re.compile(r"\b(?:\d{1,2}[-/]\d{1,2}(?:[-/]\d{2,4})?|\d{4}-\d{2}-\d{2}|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\b", re.I) |
| | SUBJ_FW_RE = re.compile(r"^\s*(re:|fw:|fwd:)\s*", re.I) |
| | EMAIL_RE = re.compile(r"\b[\w.\-+%]+@[\w.-]+\.[A-Za-z]{2,}\b") |
| |
|
| | def _candidate_entities_from_subjects(df: pd.DataFrame, extra_watchlist: List[str]) -> List[str]: |
| | cand = set([w.strip() for w in (extra_watchlist or []) if w.strip()]) |
| | subs = df["subject"].dropna().astype(str).tolist() |
| | pat = re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})\b") |
| | for s in subs: |
| | for m in pat.finditer(s): |
| | name = m.group(1).strip() |
| | if name.lower() in EN_STOP or len(name) < 5: |
| | continue |
| | cand.add(name) |
| | out = sorted(cand) |
| | return out[:3000] |
| |
|
| | def _normalize_subject_template(subj: str, entity: str) -> str: |
| | if not subj: return "" |
| | s = SUBJ_FW_RE.sub("", subj) |
| | try: |
| | s = re.sub(re.escape(entity), "«ENTITY»", s, flags=re.I) |
| | except Exception: |
| | pass |
| | s = SUBJ_DATE_RE.sub("«DATE»", s) |
| | s = SUBJ_NUM_RE.sub("«NUM»", s) |
| | s = EMAIL_RE.sub("«EMAIL»", s) |
| | s = re.sub(r"\s+", " ", s).strip().lower() |
| | return s |
| |
|
| | def _entity_mask_present(row: pd.Series, entity: str) -> bool: |
| | t = (row.get("subject","") + " " + row.get("body_text","")).lower() |
| | e = (entity or "").lower() |
| | return (e in t) if e else False |
| |
|
| | def detect_surveillance_campaigns( |
| | df: pd.DataFrame, |
| | watchlist: Optional[List[str]] = None, |
| | min_mentions: int = 15, |
| | ) -> Tuple[pd.DataFrame, pd.DataFrame]: |
| | if df.empty: |
| | return pd.DataFrame(), pd.DataFrame() |
| |
|
| | watch = [w.strip() for w in (watchlist or []) if w.strip()] |
| | cands = _candidate_entities_from_subjects(df, watch) |
| |
|
| | dfd = df.copy() |
| | dfd["_dt"] = pd.to_datetime(dfd["date"], utc=True, errors="coerce") |
| | dfd["_day"] = dfd["_dt"].dt.date |
| | dfd["_week"] = dfd["_dt"].dt.to_period("W").astype(str) |
| | dfd["_is_news"] = (dfd["bucket"] == "Newsletters/Alerts") |
| | dfd["_is_it"] = (dfd["bucket"] == "IT/Security") |
| | dfd["_is_internal"] = ~(dfd["_is_news"] | dfd["_is_it"]) |
| | dfd["_recips"] = dfd["to_emails"].apply(lambda xs: len(xs) if isinstance(xs, list) else 0) |
| |
|
| | rows = [] |
| | samples = [] |
| |
|
| | for entity in cands: |
| | mask = dfd.apply(lambda r: _entity_mask_present(r, entity), axis=1) |
| | grp = dfd[mask] |
| | n = len(grp) |
| | if n < int(min_mentions): |
| | continue |
| |
|
| | n_senders = grp["from_email"].nunique() |
| | n_domains = grp["from_domain"].nunique() |
| | pct_news = float((grp["_is_news"].mean() if n else 0.0)) |
| | pct_int = float((grp["_is_internal"].mean() if n else 0.0)) |
| | avg_recips = float((grp["_recips"].mean() if n else 0.0)) |
| |
|
| | wk = grp.groupby("_week").size().astype(float) |
| | if len(wk) >= 4: |
| | baseline = wk.iloc[:-1] |
| | mu = float(baseline.mean()) if len(baseline) else 0.0 |
| | sd = float(baseline.std(ddof=1)) if len(baseline) > 1 else 0.0 |
| | last = float(wk.iloc[-1]) |
| | weekly_peak_z = 0.0 if sd == 0.0 else (last - mu) / sd |
| | else: |
| | weekly_peak_z = 0.0 |
| |
|
| | norm_subj = grp["subject"].fillna("").astype(str).map(lambda s: _normalize_subject_template(s, entity)) |
| | if len(norm_subj): |
| | top_template_share = norm_subj.value_counts(normalize=True).iloc[0] |
| | else: |
| | top_template_share = 0.0 |
| |
|
| | kw_share = float(((grp["subject"].fillna("") + " " + grp["body_text"].fillna("")).str.contains(SURV_RE).mean()) if n else 0.0) |
| |
|
| | score = 0.0 |
| | score += 2.5 * min(1.0, top_template_share) |
| | score += 2.0 * min(1.0, kw_share) |
| | score += 1.5 * min(1.0, weekly_peak_z / 3.0) |
| | score += 0.8 * min(1.0, n_senders / 10.0) |
| | score += 0.5 * min(1.0, n_domains / 10.0) |
| | score += 1.0 * pct_int |
| | score += 0.3 * min(1.0, avg_recips / 10.0) |
| |
|
| | level = "info" |
| | if score >= 6.5: level = "likely" |
| | elif score >= 4.5: level = "possible" |
| |
|
| | first_d = grp["_dt"].min() |
| | last_d = grp["_dt"].max() |
| |
|
| | rows.append({ |
| | "entity": entity, |
| | "surveillance_score": round(float(score), 3), |
| | "level": level, |
| | "n_emails": int(n), |
| | "n_senders": int(n_senders), |
| | "n_domains": int(n_domains), |
| | "pct_newsletters": round(pct_news, 3), |
| | "pct_internal": round(pct_int, 3), |
| | "avg_recipients": round(avg_recips, 2), |
| | "weekly_peak_z": round(float(weekly_peak_z), 3), |
| | "template_max_share": round(float(top_template_share), 3), |
| | "keyword_share": round(float(kw_share), 3), |
| | "first_date": str(first_d) if pd.notna(first_d) else "", |
| | "last_date": str(last_d) if pd.notna(last_d) else "", |
| | "notes": "template/keywords/cadence/senders/domains mix" |
| | }) |
| |
|
| | ex = grp[["date","from_email","from_domain","subject","bucket"]].copy().head(8) |
| | ex.insert(0, "entity", entity) |
| | samples.append(ex) |
| |
|
| | ent_df = pd.DataFrame(rows).sort_values(["surveillance_score","n_emails"], ascending=[False, False]).head(200) |
| | samp_df = pd.concat(samples, ignore_index=True) if samples else pd.DataFrame() |
| |
|
| | return ent_df, samp_df |
| |
|
| | def tag_surveillance_emails(df: pd.DataFrame, ent_df: pd.DataFrame, threshold: float = 4.5) -> pd.DataFrame: |
| | if df.empty or ent_df.empty: |
| | return df |
| | hot = ent_df[ent_df["surveillance_score"] >= float(threshold)]["entity"].tolist() |
| | if not hot: return df |
| | def _tag(row): |
| | txt = (row.get("subject","") + " " + row.get("body_text","")).lower() |
| | tags = set(row.get("tags") or []) |
| | for e in hot: |
| | if e.lower() in txt: |
| | tags.add("surveillance") |
| | break |
| | return sorted(tags) |
| | out = df.copy() |
| | out["tags"] = out.apply(_tag, axis=1) |
| | return out |
| |
|
| | |
| |
|
| | |
| | CSS = """ |
| | :root { --pill:#eef2ff; --pill-text:#1f2937; --tag:#e5e7eb; --tag-text:#111827; } |
| | .email-card { background:#ffffff; color:#111827; border-radius:12px; padding:16px; box-shadow:0 1px 3px rgba(0,0,0,0.08); } |
| | .email-header { display:flex; align-items:flex-start; justify-content:space-between; gap:12px; } |
| | .subject { color:#0f172a; font-size:18px; font-weight:700; margin-bottom:6px; } |
| | .meta { color:#334155; font-size:12px; } |
| | .badges { display:flex; gap:8px; align-items:center; flex-wrap:wrap; } |
| | .cluster-pill { background:var(--pill); color:var(--pill-text); padding:2px 8px; border-radius:999px; font-size:12px; } |
| | .sentiment { font-size:12px; color:#334155; } |
| | .tag { background:var(--tag); color:var(--tag-text); padding:2px 6px; border-radius:6px; font-size:12px; } |
| | .email-body { margin-top:12px; max-height:520px; overflow:auto; line-height:1.6; white-space:normal; color:#111827; } |
| | .email-body a { color:#1d4ed8; text-decoration:underline; } |
| | mark { background:#fff59d; color:#111827; padding:0 2px; border-radius:2px; } |
| | hr.sep { border:none; border-top:1px solid #e5e7eb; margin:10px 0; } |
| | .small { color:#475569; font-size:12px; } |
| | .cursor { cursor:pointer; } |
| | """ |
| |
|
| | |
| | with gr.Blocks(title="Email Investigator — Per-bucket-k + Label Dedup + Surveillance Radar", css=CSS, theme="soft") as demo: |
| | gr.Markdown("# Email Investigator — BM25 + Char-grams + (optional) LSA → MiniBatchKMeans") |
| | gr.Markdown( |
| | "This build includes per-bucket **k** heuristics, label **de-dup**, and a **surveillance-campaign detector** " |
| | "(template cadence + keywords + multi-sender/domain signals)." |
| | ) |
| |
|
| | with gr.Row(): |
| | inbox_file = gr.File(label="Upload emails (.jsonl or .json)", file_types=[".jsonl", ".json"]) |
| |
|
| | with gr.Accordion("Vectorization & Clustering", open=True): |
| | with gr.Row(): |
| | max_features = gr.Number(label="Word max_features (BM25)", value=120_000, precision=0) |
| | min_df = gr.Number(label="min_df (doc freq ≥)", value=3, precision=0) |
| | max_df = gr.Slider(label="max_df (fraction ≤)", minimum=0.1, maximum=0.95, value=0.7, step=0.05) |
| | use_bigrams = gr.Checkbox(label="Use bigrams (1–2)", value=True) |
| | skip_lang = gr.Checkbox(label="Skip language detection (faster)", value=True) |
| | with gr.Row(): |
| | use_hashing = gr.Checkbox(label="Use HashingVectorizer (memory-light, fast)", value=True) |
| | hash_bits = gr.Slider(label="Hashing bits (2^n features)", minimum=16, maximum=20, step=1, value=18) |
| | with gr.Row(): |
| | use_lsa = gr.Checkbox(label="Use LSA (TruncatedSVD) before KMeans", value=True) |
| | lsa_dim = gr.Number(label="LSA components", value=256, precision=0) |
| | auto_k = gr.Checkbox(label="Auto choose k (kneedle)", value=True) |
| | k_clusters = gr.Number(label="Base k (before per-bucket multiplier)", value=350, precision=0) |
| | mb_batch = gr.Number(label="KMeans batch_size", value=4096, precision=0) |
| | with gr.Row(): |
| | use_hdbscan = gr.Checkbox(label="Use HDBSCAN (auto-k, noise) on reduced vectors", value=False) |
| | hdb_min_cluster = gr.Number(label="HDBSCAN min_cluster_size", value=60, precision=0) |
| | hdb_min_samples = gr.Number(label="HDBSCAN min_samples (0=auto)", value=0, precision=0) |
| | with gr.Row(): |
| | per_language = gr.Checkbox(label="Cluster per language (reduces cross-language mixing)", value=True) |
| | with gr.Row(): |
| | use_faiss = gr.Checkbox(label="Use Faiss ANN for search (if available & LSA on)", value=True) |
| | use_iso = gr.Checkbox(label="Compute anomaly score (IsolationForest on LSA)", value=False) |
| |
|
| | with gr.Accordion("Investigation Controls", open=True): |
| | with gr.Row(): |
| | trusted_domains_in = gr.Textbox(label="Trusted org domains (comma-separated)", value="example.gov, example.org") |
| | extra_keywords_in = gr.Textbox(label="Extra suspicious phrases (comma-separated)", value="") |
| | highlight_toggle = gr.Checkbox(label="Highlight suspect patterns in reader", value=True) |
| | with gr.Row(): |
| | use_embeddings = gr.Checkbox(label="Add lightweight word embeddings (avg word2vec/GloVe) if available", value=False) |
| | embed_weight = gr.Slider(label="Embedding weight in feature space", minimum=0.0, maximum=1.0, step=0.05, value=0.35) |
| | with gr.Row(): |
| | embeddings_path = gr.Textbox(label="Path to local embeddings (.txt/.vec/.bin) (optional)", value="") |
| | embeddings_binary = gr.Checkbox(label="File is binary word2vec format", value=False) |
| | with gr.Row(): |
| | bucket_drop = gr.Dropdown(label="Bucket", choices=["(any)"] + list(TAXONOMY.keys()), value="(any)", allow_custom_value=False) |
| | cluster_drop = gr.Dropdown(label="Cluster", choices=[], value=None, allow_custom_value=False) |
| | domain_drop = gr.Dropdown(label="Sender domain", choices=[], value=None, allow_custom_value=False) |
| | sender_drop = gr.Dropdown(label="Sender email", choices=[], value=None, allow_custom_value=False) |
| | lang_drop = gr.Dropdown(label="Language", choices=["(any)"], value="(any)", allow_custom_value=False) |
| | sentiment_drop = gr.Dropdown(label="Sentiment", choices=["(any)", "positive", "neutral", "negative"], value="(any)") |
| | tag_drop = gr.Dropdown(label="Tag", choices=["(any)", "🚩suspect", "finance", "off-channel", "surveillance", "odd-hours", "personal-mail"], value="(any)") |
| | with gr.Row(): |
| | date_start = gr.Textbox(label="Date from (YYYY-MM-DD, optional)", value="") |
| | date_end = gr.Textbox(label="Date to (YYYY-MM-DD, optional)", value="") |
| | sort_by = gr.Dropdown(label="Sort by", choices=["context_anomaly_score","corruption_score","date","anomaly_score","search_score"], value="context_anomaly_score") |
| | sort_dir = gr.Dropdown(label="Order", choices=["desc","asc"], value="desc") |
| | with gr.Row(): |
| | hide_noise = gr.Checkbox(label="Hide noise/unassigned (cluster -3)", value=True) |
| |
|
| | with gr.Accordion("Surveillance Radar", open=True): |
| | with gr.Row(): |
| | watchlist_in = gr.Textbox(label="Watchlist (names or entities, comma-separated)", value="Hillary Clinton, Joe Biden, Donald Trump") |
| | min_mentions = gr.Number(label="Min mentions per entity", value=15, precision=0) |
| |
|
| | with gr.Row(): |
| | run_btn = gr.Button("Process", variant="primary") |
| | |
| | update_btn = gr.Button("Update", variant="secondary") |
| | reset_btn = gr.Button("Reset filters") |
| | status = gr.Markdown("") |
| |
|
| | with gr.Row(): |
| | cluster_counts_df = gr.Dataframe(label="Cluster summary (top 500) — click a row to filter", interactive=False, wrap=True) |
| | domain_counts_df = gr.Dataframe(label="Top sender domains", interactive=False, wrap=True) |
| | with gr.Row(): |
| | sender_counts_df = gr.Dataframe(label="Top senders", interactive=False, wrap=True) |
| |
|
| | with gr.Row(): |
| | actors_df = gr.Dataframe(label="Top actors (by degree / unique counterparts)", interactive=False, wrap=True) |
| | offhours_df = gr.Dataframe(label="Off-hours & personal-mail hits", interactive=False, wrap=True) |
| |
|
| | gr.Markdown("### Surveillance Campaigns (detected entities)") |
| | with gr.Row(): |
| | surv_entities_df = gr.Dataframe(label="Entities ranked by surveillance score", interactive=False, wrap=True) |
| | surv_samples_df = gr.Dataframe(label="Sample emails for highlighted entities", interactive=False, wrap=True) |
| |
|
| | gr.Markdown("### Search") |
| | with gr.Row(): |
| | search_query = gr.Textbox(label="Search (keywords, names, etc.)") |
| | search_btn = gr.Button("Search") |
| | results_df = gr.Dataframe(label="Results (top 500 or top 50 for search)", interactive=True, wrap=True) |
| | email_view = gr.HTML(label="Reader") |
| |
|
| | |
| | state_df = gr.State() |
| | state_vec = gr.State() |
| | state_X_reduced = gr.State() |
| | state_index = gr.State() |
| | state_term_names = gr.State() |
| | state_query_terms = gr.State() |
| | state_use_lsa = gr.State() |
| | state_use_faiss = gr.State() |
| | state_svd = gr.State() |
| | state_norm = gr.State() |
| | state_dims = gr.State() |
| | state_extra_terms = gr.State() |
| | state_highlight = gr.State() |
| | state_inbox = gr.State() |
| |
|
| | |
| | def _load_json_records(local_path: str) -> List[Dict[str, Any]]: |
| | recs: List[Dict[str, Any]] = [] |
| | if local_path.endswith(".jsonl"): |
| | with open(local_path, "r", encoding="utf-8") as fh: |
| | for line in fh: |
| | line = line.strip() |
| | if not line: |
| | continue |
| | try: |
| | obj = json.loads(line) |
| | except Exception: |
| | continue |
| | if str(obj.get("type", "")).lower() == "meta": |
| | continue |
| | recs.append(obj) |
| | else: |
| | with open(local_path, "r", encoding="utf-8") as fh: |
| | obj = json.load(fh) |
| | if isinstance(obj, list): |
| | for r in obj: |
| | if str(r.get("type", "")).lower() == "meta": |
| | continue |
| | recs.append(r) |
| | elif isinstance(obj, dict): |
| | if str(obj.get("type", "")).lower() != "meta": |
| | recs = [obj] |
| | return recs |
| |
|
| | def _apply_filters( |
| | df: pd.DataFrame, |
| | bucket: Optional[str], |
| | cluster: Optional[str], |
| | domain: Optional[str], |
| | sender: Optional[str], |
| | lang_value: str, |
| | sentiment: str, |
| | tag_value: str, |
| | start: str, |
| | end: str, |
| | hide_noise_flag: bool = False, |
| | ) -> pd.DataFrame: |
| | out = df |
| | if bucket and bucket != "(any)": |
| | out = out[out["bucket"] == bucket] |
| | if cluster and cluster != "(any)": |
| | m = re.match(r"^.*?(\-?\d+)\s+—", cluster) |
| | if m: |
| | cid = int(m.group(1)) |
| | out = out[out["cluster_id"] == cid] |
| | if domain and domain != "(any)": |
| | out = out[out["from_domain"] == domain] |
| | if sender and sender != "(any)": |
| | out = out[out["from_email"] == sender] |
| | if lang_value and lang_value != "(any)": |
| | out = out[out["lang"] == lang_value] |
| | if sentiment and sentiment != "(any)" and "sentiment" in out.columns: |
| | out = out[out["sentiment"].astype(str) == sentiment] |
| | if tag_value and tag_value != "(any)": |
| | out = out[out["tags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts)) |
| | | out["flags"].apply(lambda ts: isinstance(ts, list) and (tag_value in ts))] |
| | if start: |
| | try: |
| | dt = pd.to_datetime(start, utc=True, errors="coerce") |
| | out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") >= dt] |
| | except Exception: |
| | pass |
| | if end: |
| | try: |
| | dt = pd.to_datetime(end, utc=True, errors="coerce") |
| | out = out[pd.to_datetime(out["date"], utc=True, errors="coerce") <= dt] |
| | except Exception: |
| | pass |
| | if hide_noise_flag: |
| | out = out[out["cluster_id"] != -3] |
| | return out |
| |
|
| | |
| | def social_stats(df: pd.DataFrame) -> pd.DataFrame: |
| | deg = {} |
| | def add_edge(a,b): |
| | if not a or not b or a==b: return |
| | deg.setdefault(a,set()).add(b) |
| | deg.setdefault(b,set()).add(a) |
| | for _, r in df.iterrows(): |
| | f = r.get("from_email") or "" |
| | tos = r.get("to_emails") or [] |
| | for t in tos: |
| | add_edge(f, t) |
| | rows=[] |
| | for addr, nbrs in deg.items(): |
| | rows.append({"address": addr, "degree": len(nbrs)}) |
| | out = pd.DataFrame(rows).sort_values("degree", ascending=False).head(50) |
| | return out |
| |
|
| | |
| | def _sort_results(df, by, direction): |
| | if df is None or len(df) == 0: |
| | return pd.DataFrame() |
| | tmp = df.copy() |
| | if "date" in tmp.columns: |
| | tmp["_dt"] = pd.to_datetime(tmp["date"], utc=True, errors="coerce") |
| | else: |
| | tmp["_dt"] = pd.NaT |
| | by = by if by in tmp.columns else "context_anomaly_score" |
| | asc = (direction == "asc") |
| | sort_cols = [by] |
| | if by == "date": |
| | sort_cols = ["_dt"] |
| | elif by in ["anomaly_score", "corruption_score", "context_anomaly_score"]: |
| | sort_cols.append("_dt") |
| | tmp = tmp.sort_values(sort_cols, ascending=[asc, False]) |
| | cols_out = [ |
| | "date","bucket","from_email","from_domain","subject","cluster_name","lang", |
| | "tags","flags","sentiment","context_anomaly_score","corruption_score","anomaly_score" |
| | ] |
| | if "search_score" in tmp.columns: |
| | cols_out.append("search_score") |
| | return tmp[[c for c in cols_out if c in tmp.columns]].head(500) |
| |
|
| | |
| | def _tokenize_query(q: str) -> List[str]: |
| | return [p.strip() for p in re.split(r"\s+", q or "") if p.strip()][:8] |
| |
|
| | def _project_query_to_lsa(q_vec, svd, norm) -> Optional[np.ndarray]: |
| | try: |
| | return norm.transform(svd.transform(q_vec)).astype(np.float32) |
| | except Exception: |
| | return None |
| |
|
| | def _vectorize_query(q, vec_state, corpus_texts): |
| | |
| | char_min_df = 1 if len(corpus_texts) <= 1 else 2 |
| |
|
| | if vec_state.get("use_hashing"): |
| | hv = HashingVectorizer( |
| | analyzer="word", |
| | ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1), |
| | n_features=2 ** vec_state.get("hash_bits", 18), |
| | token_pattern=TOKEN_PATTERN, |
| | lowercase=True, |
| | norm=None, |
| | alternate_sign=False, |
| | ) |
| | counts = hv.transform(corpus_texts) |
| | tfidf_tr = TfidfTransformer().fit(counts) |
| | q_word = tfidf_tr.transform(hv.transform([q])) |
| | else: |
| | cv = CountVectorizer( |
| | analyzer="word", |
| | ngram_range=(1, 2) if vec_state.get("use_bigrams") else (1, 1), |
| | max_features=vec_state.get("max_features"), |
| | min_df=vec_state.get("min_df"), |
| | max_df=vec_state.get("max_df"), |
| | token_pattern=TOKEN_PATTERN, |
| | lowercase=True, |
| | stop_words=STOPWORD_FOR_VEC, |
| | dtype=np.float32, |
| | ) |
| | tf = cv.fit_transform(corpus_texts) |
| | bm25 = BM25Transformer().fit(tf) |
| | q_word = bm25.transform(cv.transform([q])) |
| |
|
| | char_vec = CharTfidf( |
| | analyzer="char", ngram_range=(3, 5), min_df=char_min_df, max_features=100_000, lowercase=True, dtype=np.float32 |
| | ).fit(corpus_texts) |
| | q_char = char_vec.transform([q]) |
| |
|
| | return hstack([q_word, q_char * 0.20], format="csr") |
| |
|
| | |
| | def process_file(inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang, |
| | use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, |
| | trusted_domains_in, extra_keywords_in, highlight_toggle, |
| | use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, |
| | per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, |
| | watchlist_in, min_mentions): |
| | if inbox_file is None: |
| | return ( |
| | "**Please upload a file.**", |
| | None, None, None, None, None, |
| | None, None, |
| | None, |
| | None, None, None, |
| | None, None, |
| | None, None, |
| | gr.update(), gr.update(), gr.update(), gr.update(), |
| | None, None, None, |
| | None, None, |
| | gr.update() |
| | ) |
| |
|
| | |
| | def _make_texts(df_in: pd.DataFrame) -> Tuple[List[str], List[str]]: |
| | texts = list(df_in.apply(enrich_text, axis=1)) |
| | subjects_only = list(df_in["subject"].fillna("")) |
| | return texts, subjects_only |
| |
|
| | def _vectorize_block( |
| | texts: List[str], |
| | use_bigrams: bool, |
| | max_features: int, |
| | min_df: int, |
| | max_df: float, |
| | use_hashing: bool, |
| | hash_bits: int |
| | ): |
| | """ |
| | Return (X_full_csr, count_vec, char_vec, bm25, d_word, d_char, d_full) |
| | Uses Count+BM25 (+ char-tfidf) or Hashing+TfidfTransformer (+ char-tfidf). |
| | """ |
| | n_docs = len(texts) |
| | ngram_range = (1, 2) if use_bigrams else (1, 1) |
| | char_min_df = 1 if n_docs <= 1 else 2 |
| |
|
| | if use_hashing: |
| | hv = HashingVectorizer( |
| | analyzer="word", ngram_range=ngram_range, |
| | n_features=2 ** int(hash_bits), alternate_sign=False, |
| | token_pattern=TOKEN_PATTERN, lowercase=True, norm=None |
| | ) |
| | word_counts = hv.transform(texts) |
| | tfidf_tr = TfidfTransformer() |
| | X_word = tfidf_tr.fit_transform(word_counts).astype(np.float32) |
| |
|
| | char_vec = CharTfidf( |
| | analyzer="char", ngram_range=(3, 5), min_df=char_min_df, |
| | max_features=100_000, lowercase=True, dtype=np.float32 |
| | ) |
| | X_char = char_vec.fit_transform(texts) |
| | X_full = hstack([X_word, X_char * 0.20], format="csr") |
| | d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1] |
| | count_vec = None; bm25 = None |
| | return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full |
| |
|
| | count_vec = CountVectorizer( |
| | analyzer="word", ngram_range=ngram_range, |
| | max_features=int(max_features) if max_features else None, |
| | min_df=int(min_df) if min_df else 2, max_df=float(max_df) if max_df else 0.7, |
| | token_pattern=TOKEN_PATTERN, lowercase=True, |
| | dtype=np.float32, stop_words=STOPWORD_FOR_VEC |
| | ) |
| | TF = count_vec.fit_transform(texts) |
| | bm25 = BM25Transformer(k1=1.2, b=0.75).fit(TF) |
| | X_word = bm25.transform(TF) |
| |
|
| | char_vec = CharTfidf( |
| | analyzer="char", ngram_range=(3, 5), min_df=char_min_df, |
| | max_features=100_000, lowercase=True, dtype=np.float32 |
| | ) |
| | X_char = char_vec.fit_transform(texts) |
| | X_full = hstack([X_word, X_char * 0.20], format="csr") |
| | d_word, d_char, d_full = X_word.shape[1], X_char.shape[1], X_word.shape[1] + X_char.shape[1] |
| | return X_full, count_vec, char_vec, bm25, d_word, d_char, d_full |
| |
|
| | def _reduce_space(X_full, use_lsa, lsa_dim): |
| | svd_obj = None |
| | norm_obj = None |
| | X_reduced = None |
| | if not use_lsa: |
| | return X_reduced, svd_obj, norm_obj |
| |
|
| | n_docs = X_full.shape[0] |
| | n_feats = X_full.shape[1] |
| | max_components = max(1, min(n_docs, n_feats) - 1) |
| | n_comp = int(min(int(lsa_dim or 256), max_components)) |
| |
|
| | if n_comp < 2: |
| | return None, None, None |
| |
|
| | svd_obj = TruncatedSVD(n_components=n_comp, random_state=0) |
| | Xtmp = svd_obj.fit_transform(X_full) |
| | norm_obj = Normalizer(copy=False) |
| | X_reduced = norm_obj.fit_transform(Xtmp).astype(np.float32) |
| | del Xtmp; gc.collect() |
| | return X_reduced, svd_obj, norm_obj |
| |
|
| | def _attach_embeddings(texts, X_reduced_or_full, use_lsa, kv, emb_dim, weight): |
| | if kv is None or emb_dim <= 0 or weight <= 0.0: |
| | return X_reduced_or_full, emb_dim |
| | doc_embs = _build_doc_embeddings(texts, kv, emb_dim).astype(np.float32) |
| | if weight != 1.0: |
| | doc_embs *= float(weight) |
| | if isinstance(X_reduced_or_full, np.ndarray): |
| | return np.hstack([X_reduced_or_full, doc_embs]).astype(np.float32), emb_dim |
| | else: |
| | X_emb = csr_matrix(doc_embs) |
| | return hstack([X_reduced_or_full, X_emb], format="csr"), emb_dim |
| |
|
| | def _cluster_space( |
| | X_space, |
| | bucket_name: str, |
| | df_part: pd.DataFrame, |
| | use_lsa: bool, |
| | use_hdbscan: bool, |
| | hdb_min_cluster: int, |
| | hdb_min_samples: int, |
| | auto_k: bool, |
| | k_clusters: int, |
| | mb_batch: int, |
| | count_vec, |
| | svd_obj, |
| | norm_obj, |
| | d_word, d_char |
| | ): |
| | n = X_space.shape[0] |
| |
|
| | |
| | min_size, merge_th, reassign_th = _bucket_stabilizer_params(bucket_name) |
| |
|
| | if n <= 1: |
| | labels = np.zeros((n,), dtype=int) if n == 1 else np.array([], dtype=int) |
| | centers = None |
| | chosen_k = int(n) if n > 0 else 0 |
| | return stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th), centers, chosen_k |
| |
|
| | if n < 10: |
| | k_small = min(max(2, n // 2), n) |
| | kmeans = MiniBatchKMeans( |
| | n_clusters=int(k_small), |
| | batch_size=int(mb_batch or 4096), |
| | random_state=0, |
| | n_init="auto" |
| | ) |
| | labels = kmeans.fit_predict(X_space) |
| | centers = getattr(kmeans, "cluster_centers_", None) |
| | labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) |
| | return labels, centers, int(len(set(labels))) |
| |
|
| | if use_hdbscan and HDBSCAN_OK and isinstance(X_space, np.ndarray) and X_space.shape[0] >= max(50, hdb_min_cluster): |
| | min_samples = None if int(hdb_min_samples or 0) <= 0 else int(hdb_min_samples) |
| | clusterer = hdbscan.HDBSCAN( |
| | min_cluster_size=int(hdb_min_cluster or 60), |
| | min_samples=min_samples, |
| | metric='euclidean', |
| | cluster_selection_epsilon=0.0, |
| | core_dist_n_jobs=1 |
| | ) |
| | labels = clusterer.fit_predict(X_space) |
| | centers = None |
| | labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) |
| | chosen_k = int(len(set([l for l in labels if l >= 0]))) |
| | return labels, centers, chosen_k |
| |
|
| | |
| | if bool(auto_k): |
| | if use_lsa and isinstance(X_space, np.ndarray): |
| | k, _ = choose_k_by_kneedle(X_space, ks=(50, 100, 150, 200, 300, 400, 500)) |
| | else: |
| | k = auto_k_rule(X_space.shape[0]) |
| | else: |
| | k = max(10, int(k_clusters or 350)) |
| | k = int(max(2, round(k * _bucket_k_multiplier(bucket_name)))) |
| |
|
| | k = min(k, n) |
| |
|
| | init = None |
| | if use_lsa and isinstance(X_space, np.ndarray) and count_vec is not None: |
| | seeds = seeded_centroids_in_lsa( |
| | CORR_LEX, count_vec, svd_obj.components_, norm_obj, |
| | d_word=d_word, d_full=(d_word + d_char), k=k |
| | ) |
| | if seeds is not None and seeds.shape[0] == k: |
| | init = seeds |
| |
|
| | kmeans = MiniBatchKMeans( |
| | n_clusters=k, |
| | batch_size=int(mb_batch or 4096), |
| | random_state=0, |
| | n_init="auto" if init is None else 1, |
| | init="k-means++" if init is None else init |
| | ) |
| | labels = kmeans.fit_predict(X_space) |
| | centers = kmeans.cluster_centers_ if hasattr(kmeans, "cluster_centers_") else None |
| | if use_lsa and centers is not None: |
| | labels = merge_close_clusters(labels, centers, thresh=0.95) |
| | labels = stabilize_labels(X_space, labels, min_size=min_size, merge_thresh=merge_th, reassign_thresh=reassign_th) |
| | chosen_k = int(len(set(labels))) |
| | return labels, centers, chosen_k |
| |
|
| | |
| | trusted = set([d.strip().lower() for d in (trusted_domains_in or "").split(",") if d.strip()]) |
| | extra_terms = [t.strip() for t in (extra_keywords_in or "").split(",") if t.strip()] |
| | extra_terms_lower = [t.lower() for t in extra_terms] |
| |
|
| | |
| | try: |
| | infile_path = inbox_file.name |
| | except Exception: |
| | infile_path = str(inbox_file) if inbox_file else "" |
| |
|
| | recs = _load_json_records(infile_path) |
| | if not recs: |
| | return ("**No valid records found.**", |
| | None, None, None, None, None, |
| | None, None, |
| | None, |
| | None, None, None, |
| | None, None, |
| | None, None, None, None, |
| | None, None, None, |
| | None, None, |
| | None) |
| |
|
| | normd = [] |
| | for r in tqdm(recs, desc="Normalize", leave=False): |
| | out = normalize_email_record(r, use_langdetect=(not bool(skip_lang))) |
| | if out and out.get("body_text") is not None: |
| | normd.append(out) |
| | df = pd.DataFrame(normd) |
| | if df.empty: |
| | return ("**No usable email records.**", |
| | None, None, None, None, None, |
| | None, None, |
| | None, |
| | None, None, None, |
| | None, None, |
| | None, None, None, None, |
| | None, None, None, |
| | None, None, |
| | None) |
| |
|
| | df = df.drop_duplicates(subset=["message_id", "subject", "text_hash"]).reset_index(drop=True) |
| | df["tags"] = df["body_text"].fillna("").map(has_suspect_tag) |
| | df = compute_sentiment_column(df) |
| |
|
| | |
| | df["bucket"] = df.apply(route_email_row, axis=1) |
| | df["is_news"] = df.apply(lambda r: is_news_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_domain", "")), axis=1) |
| | df["is_notify"] = df.apply(lambda r: is_notification_like(r.get("subject", ""), r.get("body_text", ""), r.get("from_email", ""), r.get("from_domain", "")), axis=1) |
| | df.loc[df["is_news"] == True, "bucket"] = "Newsletters/Alerts" |
| | df.loc[df["is_notify"] == True, "bucket"] = "IT/Security" |
| |
|
| | |
| | flags = [] |
| | for _, row in df.iterrows(): |
| | f = [] |
| | h = _hour_of(row.get("date") or "") |
| | if h is not None and (h < 6 or h > 22): |
| | f.append("odd-hours") |
| | fd = (row.get("from_domain") or "").lower() |
| | if (fd in PERSONAL_DOMAINS) and (fd not in trusted): |
| | f.append("personal-mail") |
| | f += _attachment_flags(row.get("attachments") or []) |
| | flags.append(f) |
| | df["flags"] = flags |
| |
|
| | |
| | df_main = df[~df["bucket"].isin(["Newsletters/Alerts", "IT/Security"])].reset_index(drop=True) |
| | df_news = df[df["bucket"] == "Newsletters/Alerts"].reset_index(drop=True) |
| | df_alerts = df[df["bucket"] == "IT/Security"].reset_index(drop=True) |
| |
|
| | |
| | kv = None |
| | emb_dim = 0 |
| | if bool(use_embeddings): |
| | kv, emb_dim = _load_embeddings(embeddings_path or "", bool(embeddings_binary)) |
| |
|
| | |
| | parts = [] |
| | if bool(per_language): |
| | for bkt, g_bucket in df_main.groupby("bucket", dropna=False): |
| | for lang_code, grp in g_bucket.groupby("lang", dropna=False): |
| | parts.append(((bkt, lang_code), grp.copy())) |
| | else: |
| | for bkt, grp in df_main.groupby("bucket", dropna=False): |
| | parts.append(((bkt, "all"), grp.copy())) |
| |
|
| | labels_list, cluster_name_list, anomaly_list = [], [], [] |
| | bucket_indexers = [] |
| | X_reduced_holder = None |
| | term_names_global = {} |
| | single_partition = (len(parts) == 1) |
| | d_word_agg, d_char_agg = 0, 0 |
| | svd_obj_local, norm_obj_local = None, None |
| |
|
| | for (bucket_name, _lang), df_part in parts: |
| | if df_part.empty: |
| | continue |
| | texts, subjects_only = _make_texts(df_part) |
| |
|
| | X_full, count_vec, char_vec, _, d_word, d_char, _ = _vectorize_block( |
| | texts=texts, |
| | use_bigrams=bool(use_bigrams), |
| | max_features=int(max_features or 120000), |
| | min_df=int(min_df or 3), |
| | max_df=float(max_df or 0.7), |
| | use_hashing=bool(use_hashing), |
| | hash_bits=int(hash_bits or 18), |
| | ) |
| | d_word_agg += d_word |
| | d_char_agg += d_char |
| |
|
| | X_reduced, svd_obj_local, norm_obj_local = _reduce_space(X_full, bool(use_lsa), int(lsa_dim or 256)) |
| | X_space = (X_reduced if X_reduced is not None else X_full) |
| |
|
| | if kv: |
| | X_space, _ = _attach_embeddings( |
| | texts, X_space, bool(use_lsa) and X_reduced is not None, kv, emb_dim, float(embed_weight) |
| | ) |
| |
|
| | anomaly_scores = np.full((len(df_part),), np.nan, dtype=np.float32) |
| | if X_reduced is not None and bool(use_iso) and ISO_OK and X_reduced.shape[0] >= 50: |
| | try: |
| | iso = IsolationForest(n_estimators=100, contamination="auto", random_state=0).fit(X_reduced) |
| | anomaly_scores = (-iso.score_samples(X_reduced)).astype(np.float32) |
| | except Exception: |
| | pass |
| |
|
| | labels, _, _ = _cluster_space( |
| | X_space=X_space, |
| | bucket_name=bucket_name, |
| | df_part=df_part, |
| | use_lsa=bool(use_lsa) and X_reduced is not None, |
| | use_hdbscan=bool(use_hdbscan), |
| | hdb_min_cluster=int(hdb_min_cluster or 60), |
| | hdb_min_samples=int(hdb_min_samples or 0), |
| | auto_k=bool(auto_k), |
| | k_clusters=int(k_clusters or 350), |
| | mb_batch=int(mb_batch or 4096), |
| | count_vec=count_vec, |
| | svd_obj=svd_obj_local, |
| | norm_obj=norm_obj_local, |
| | d_word=d_word, |
| | d_char=d_char, |
| | ) |
| |
|
| | term_names = cluster_labels_pmi_bigram( |
| | texts=texts, labels=labels, subjects=subjects_only, |
| | topn=6, subject_alpha=0.75, global_ubiq_cut=0.20, subject_min_cov=0.30 |
| | ) |
| |
|
| | bucket_indexers.append(df_part.index) |
| | labels_list.append(pd.Series(labels, index=df_part.index)) |
| | cluster_name_list.append(pd.Series([term_names.get(int(c), "noise" if int(c) < 0 else f"cluster_{int(c)}") for c in labels], index=df_part.index)) |
| | anomaly_list.append(pd.Series(anomaly_scores, index=df_part.index)) |
| | term_names_global.update({int(k): v for k, v in term_names.items()}) |
| |
|
| | if single_partition and X_reduced is not None: |
| | X_reduced_holder = X_reduced |
| |
|
| | if labels_list: |
| | df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_id"] = pd.concat(labels_list).sort_index() |
| | df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "cluster_name"] = pd.concat(cluster_name_list).sort_index() |
| | df_main.loc[pd.Index(np.concatenate(bucket_indexers)), "anomaly_score"] = pd.concat(anomaly_list).sort_index() |
| | else: |
| | df_main["cluster_id"] = -10 |
| | df_main["cluster_name"] = "unclustered" |
| | df_main["anomaly_score"] = np.nan |
| |
|
| | |
| | if len(df_news): |
| | df_news.loc[:, "cluster_id"] = -1 |
| | df_news.loc[:, "cluster_name"] = "newsletter/news" |
| | df_news.loc[:, "anomaly_score"] = np.nan |
| | if len(df_alerts): |
| | df_alerts.loc[:, "cluster_id"] = -2 |
| | df_alerts.loc[:, "cluster_name"] = "system/alerts" |
| | df_alerts.loc[:, "anomaly_score"] = np.nan |
| |
|
| | |
| | df = pd.concat([df_main, df_news, df_alerts], ignore_index=True) |
| |
|
| | |
| | df = dedupe_all_labels(df) |
| |
|
| | |
| | df["corruption_score"] = df.apply(lambda r: corruption_score(r, trusted_domains=trusted), axis=1) |
| | df = compute_context_anomaly(df) |
| |
|
| | |
| | wl = [w.strip() for w in (watchlist_in or "").split(",") if w.strip()] |
| | ent_df, samp_df = detect_surveillance_campaigns(df, watchlist=wl, min_mentions=int(min_mentions or 15)) |
| | df = tag_surveillance_emails(df, ent_df, threshold=4.5) |
| |
|
| | |
| | index_obj = None |
| | use_faiss_flag = bool(use_faiss) and FAISS_OK and bool(use_lsa) and (X_reduced_holder is not None) and single_partition |
| | if use_faiss_flag: |
| | d = X_reduced_holder.shape[1] |
| | index_obj = faiss.IndexFlatIP(d) |
| | index_obj.add(X_reduced_holder) |
| | else: |
| | try: |
| | if bool(use_lsa) and X_reduced_holder is not None and single_partition: |
| | nn = NearestNeighbors(metric="cosine", algorithm="brute").fit(X_reduced_holder) |
| | index_obj = nn |
| | else: |
| | index_obj = NearestNeighbors(metric="cosine", algorithm="brute").fit(np.zeros((1, 4), dtype=np.float32)) |
| | except Exception: |
| | pass |
| |
|
| | |
| | cluster_counts = ( |
| | df.groupby(["bucket", "cluster_id", "cluster_name"]) |
| | .size() |
| | .reset_index(name="count") |
| | .sort_values("count", ascending=False) |
| | .head(500) |
| | ) |
| | cluster_counts["label"] = cluster_counts.apply( |
| | lambda r: f'{r["bucket"]} — {int(r["cluster_id"])} — {r["cluster_name"]} ({int(r["count"])})', axis=1 |
| | ) |
| | cluster_choices = ["(any)"] + cluster_counts["label"].tolist() |
| | bucket_choices = ["(any)"] + sorted(df["bucket"].dropna().unique().tolist()) |
| |
|
| | domain_counts = ( |
| | df.groupby("from_domain").size().reset_index(name="count").sort_values("count", ascending=False).head(100) |
| | ) |
| | domain_choices = ["(any)"] + domain_counts["from_domain"].tolist() |
| | sender_counts = ( |
| | df.groupby("from_email").size().reset_index(name="count").sort_values("count", ascending=False).head(200) |
| | ) |
| | sender_choices = ["(any)"] + sender_counts["from_email"].tolist() |
| | langs = sorted([l for l in df["lang"].dropna().unique() if l and l != "unknown"]) |
| | lang_choices = ["(any)"] + langs |
| | actors = social_stats(df) |
| | offp = df[df["flags"].apply(lambda xs: "odd-hours" in xs or "personal-mail" in xs)] |
| | offhours_table = ( |
| | offp[["date", "from_email", "subject", "flags", "corruption_score"]] |
| | .sort_values("corruption_score", ascending=False) |
| | .head(200) |
| | ) |
| | out_table = _sort_results(df, "context_anomaly_score", "desc") |
| |
|
| | vec_state = { |
| | "use_hashing": bool(use_hashing), |
| | "hash_bits": int(hash_bits), |
| | "max_features": int(max_features), |
| | "min_df": int(min_df), |
| | "max_df": float(max_df), |
| | "use_bigrams": bool(use_bigrams), |
| | } |
| | status_md = f"**Processed {len(df):,} emails** | clusters ~ {len(cluster_counts):,} (showing top 500)" |
| |
|
| | svd_obj_out = svd_obj_local if single_partition else None |
| | norm_obj_out = norm_obj_local if single_partition else None |
| |
|
| | return ( |
| | status_md, |
| | cluster_counts, domain_counts, sender_counts, |
| | actors, offhours_table, |
| | ent_df, samp_df, |
| | out_table, |
| | df, vec_state, X_reduced_holder, |
| | index_obj, term_names_global, |
| | bool(use_lsa), use_faiss_flag, |
| | gr.update(choices=cluster_choices, value="(any)"), |
| | gr.update(choices=domain_choices, value="(any)"), |
| | gr.update(choices=sender_choices, value="(any)"), |
| | gr.update(choices=lang_choices, value="(any)"), |
| | svd_obj_out, norm_obj_out, (d_word_agg, d_char_agg), |
| | extra_terms_lower, bool(highlight_toggle), |
| | gr.update(choices=bucket_choices, value="(any)") |
| | ) |
| |
|
| | |
| | (run_btn.click)( |
| | process_file, |
| | inputs=[ |
| | inbox_file, max_features, min_df, max_df, use_bigrams, skip_lang, |
| | use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, |
| | trusted_domains_in, extra_keywords_in, highlight_toggle, |
| | use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, |
| | per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, |
| | watchlist_in, min_mentions |
| | ], |
| | outputs=[ |
| | status, cluster_counts_df, domain_counts_df, sender_counts_df, |
| | actors_df, offhours_df, |
| | surv_entities_df, surv_samples_df, |
| | results_df, |
| | state_df, state_vec, state_X_reduced, |
| | state_index, state_term_names, |
| | state_use_lsa, state_use_faiss, |
| | cluster_drop, domain_drop, sender_drop, lang_drop, |
| | state_svd, state_norm, state_dims, |
| | state_extra_terms, state_highlight, |
| | bucket_drop, |
| | ], |
| | ).then( |
| | |
| | lambda f: f, inputs=[inbox_file], outputs=[state_inbox] |
| | ) |
| |
|
| | |
| | inbox_file.change(lambda f: f, inputs=[inbox_file], outputs=[state_inbox]) |
| |
|
| | |
| | update_btn.click( |
| | process_file, |
| | inputs=[ |
| | state_inbox, max_features, min_df, max_df, use_bigrams, skip_lang, |
| | use_lsa, lsa_dim, auto_k, k_clusters, mb_batch, use_faiss, use_iso, |
| | trusted_domains_in, extra_keywords_in, highlight_toggle, |
| | use_hashing, hash_bits, use_hdbscan, hdb_min_cluster, hdb_min_samples, |
| | per_language, use_embeddings, embed_weight, embeddings_path, embeddings_binary, |
| | watchlist_in, min_mentions |
| | ], |
| | outputs=[ |
| | status, cluster_counts_df, domain_counts_df, sender_counts_df, |
| | actors_df, offhours_df, |
| | surv_entities_df, surv_samples_df, |
| | results_df, |
| | state_df, state_vec, state_X_reduced, |
| | state_index, state_term_names, |
| | state_use_lsa, state_use_faiss, |
| | cluster_drop, domain_drop, sender_drop, lang_drop, |
| | state_svd, state_norm, state_dims, |
| | state_extra_terms, state_highlight, |
| | bucket_drop, |
| | ], |
| | ) |
| |
|
| | |
| | def refresh_results(df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, sort_by, sort_dir, hide_noise_flag): |
| | if df is None or len(df) == 0: |
| | return pd.DataFrame() |
| | filt = _apply_filters( |
| | df, bucket, cluster, domain, sender, lang, sentiment, tag, start, end, hide_noise_flag=bool(hide_noise_flag) |
| | ) |
| | return _sort_results(filt, sort_by, sort_dir) |
| |
|
| | |
| | for ctrl in [bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, |
| | date_start, date_end, sort_by, sort_dir, hide_noise]: |
| | ctrl.change( |
| | refresh_results, |
| | inputs=[ |
| | state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, |
| | sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise |
| | ], |
| | outputs=[results_df], |
| | ) |
| |
|
| | |
| | reset_btn.click( |
| | lambda: ["(any)"] * 7 + [""] * 2 + ["context_anomaly_score", "desc"] + [True], |
| | [], |
| | [bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, sentiment_drop, tag_drop, |
| | date_start, date_end, sort_by, sort_dir, hide_noise], |
| | ).then( |
| | refresh_results, |
| | inputs=[ |
| | state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, |
| | sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise, |
| | ], |
| | outputs=[results_df], |
| | ) |
| |
|
| | |
| | def search_fn(q, df, vec, X_red, index, use_lsa, use_faiss, svd, norm, sort, sdir): |
| | if not q or df is None or vec is None or index is None: |
| | return pd.DataFrame(), [] |
| |
|
| | |
| | mask = ~df["cluster_id"].isin([-1, -2, -3]) |
| | df_main = df[mask].reset_index(drop=True) |
| | if df_main.empty: |
| | return pd.DataFrame(), [] |
| |
|
| | q_terms = _tokenize_query(q) |
| | q_vec = _vectorize_query(q, vec, list(df_main.apply(enrich_text, axis=1))) |
| |
|
| | q_emb = _project_query_to_lsa(q_vec, svd, norm) if use_lsa and svd is not None and norm is not None else q_vec |
| | if q_emb is None: |
| | return pd.DataFrame(), q_terms |
| |
|
| | n_req = min(50, len(df_main)) |
| | if n_req <= 0: |
| | return pd.DataFrame(), q_terms |
| |
|
| | if isinstance(index, NearestNeighbors): |
| | if hasattr(index, "n_samples_fit_") and index.n_samples_fit_ <= 1: |
| | return pd.DataFrame(), q_terms |
| | dists, inds = index.kneighbors(q_emb, n_neighbors=n_req) |
| | sims = 1.0 - dists[0] |
| | results = df_main.iloc[inds[0]].copy() |
| | results["search_score"] = sims |
| | elif use_faiss and FAISS_OK and hasattr(index, "search"): |
| | D, I = index.search(q_emb.astype(np.float32), k=n_req) |
| | results = df_main.iloc[I[0]].copy() |
| | results["search_score"] = D[0] |
| | else: |
| | return pd.DataFrame(), q_terms |
| |
|
| | return _sort_results(results, sort, sdir), q_terms |
| |
|
| | search_btn.click( |
| | search_fn, |
| | inputs=[ |
| | search_query, state_df, state_vec, state_X_reduced, state_index, |
| | state_use_lsa, state_use_faiss, state_svd, state_norm, sort_by, sort_dir, |
| | ], |
| | outputs=[results_df, state_query_terms], |
| | ) |
| |
|
| | |
| | def on_row_select(evt: gr.SelectData, table, df, term_names, q_terms, extra_terms, do_highlight): |
| | if evt.index is None or table is None or len(table) == 0 or df is None or len(df) == 0: |
| | return "" |
| | row_idx = evt.index[0] |
| | sel = table.iloc[row_idx] |
| |
|
| | |
| | cand = df[ |
| | (df["subject"] == sel.get("subject")) |
| | & (df["from_email"] == sel.get("from_email")) |
| | & (df["date"] == sel.get("date")) |
| | ] |
| | if cand.empty: |
| | cand = df[df["subject"] == sel.get("subject")] |
| | if cand.empty: |
| | return "Could not find original record." |
| |
|
| | row = cand.iloc[0] |
| | cid = int(row.get("cluster_id", -99)) |
| | clabel = term_names.get(cid, row.get("cluster_name")) if term_names else row.get("cluster_name") |
| | return build_highlighted_html( |
| | row, |
| | query_terms=q_terms, |
| | cluster_label=f'{row.get("bucket","Other")} / {clabel}', |
| | do_highlight=do_highlight, |
| | extra_terms=extra_terms, |
| | ) |
| |
|
| | results_df.select( |
| | on_row_select, |
| | inputs=[results_df, state_df, state_term_names, state_query_terms, state_extra_terms, state_highlight], |
| | outputs=[email_view], |
| | ) |
| |
|
| | |
| | def on_click_filter(evt: gr.SelectData, df_sum: pd.DataFrame, col_name: str, out_comp: gr.Dropdown): |
| | if evt.index is None or df_sum is None or df_sum.empty: |
| | return gr.update() |
| | val = df_sum.iloc[evt.index[0]][col_name] |
| | return gr.update(value=val) |
| | |
| | def on_cluster_summary_select(evt: gr.SelectData, df_sum: pd.DataFrame): |
| | if evt.index is None or df_sum is None or df_sum.empty: |
| | return gr.update(), gr.update() |
| | r = df_sum.iloc[evt.index[0]] |
| | return gr.update(value=r["bucket"]), gr.update(value=r["label"]) |
| |
|
| | cluster_counts_df.select( |
| | on_cluster_summary_select, [cluster_counts_df], [bucket_drop, cluster_drop] |
| | ).then( |
| | refresh_results, |
| | inputs=[ |
| | state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, |
| | sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise |
| | ], |
| | outputs=[results_df], |
| | ) |
| |
|
| | domain_counts_df.select( |
| | lambda evt, df: on_click_filter(evt, df, "from_domain", domain_drop), [domain_counts_df], [domain_drop] |
| | ).then( |
| | refresh_results, |
| | inputs=[ |
| | state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, |
| | sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise |
| | ], |
| | outputs=[results_df], |
| | ) |
| |
|
| | sender_counts_df.select( |
| | lambda evt, df: on_click_filter(evt, df, "from_email", sender_drop), [sender_counts_df], [sender_drop] |
| | ).then( |
| | refresh_results, |
| | inputs=[ |
| | state_df, bucket_drop, cluster_drop, domain_drop, sender_drop, lang_drop, |
| | sentiment_drop, tag_drop, date_start, date_end, sort_by, sort_dir, hide_noise |
| | ], |
| | outputs=[results_df], |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch(ssr_mode=False) |