""" Anomalous-relation detection — finding "code word" candidates. A code word is a *common English word used uncommonly in this corpus*. We make that precise by contrasting two distributional word spaces: - the corpus Word2Vec (how words associate IN these documents), and - a pretrained general-English model (how they associate NORMALLY). The whole design avoids two traps: 1. Raw cosines from different embedding spaces are NOT comparable, so we never subtract a corpus cosine from a background cosine. We compare neighbour *sets* (scale-free) and, when we need per-neighbour scores, we standardise within each space (z-scores relative to the same anchor word) before combining — that subtraction *is* legitimate. 2. "Low similarity" is the default for almost all word pairs and is not a signal. The signal is *surprise*: strong here, weak normally. Three stages: A. sweep — rank which words behave most differently here vs. normally (neighbour-set divergence, z-scored across the vocabulary). B. relations — for one flagged word, the specific neighbours that are strong in-corpus but weak/absent in general English. C. incongruence — uses the transformer to find the specific occurrences (chunks/docs) where a keyword is used unlike its norm. Stages A/B need the corpus Word2Vec + background model. Stage C needs the transformer engine (contextual embeddings), which is the only one of the three models that can judge a single *occurrence* in context. """ import logging import re import threading from typing import Optional import numpy as np from contextual_similarity import ContextualSimilarityEngine from word2vec_baseline import Word2VecEngine from background_model import BackgroundModel logger = logging.getLogger(__name__) # Reuse the engine's stopword list so the three tools agree on what to ignore. _STOPWORDS = ContextualSimilarityEngine._STOPWORDS _ALPHA = re.compile(r"^[a-z]+$") # Capitalised spans (names/orgs) and dates, for the Stage-C investigation view. _ENTITY_RE = re.compile(r"\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,3}\b") _DATE_RE = re.compile( r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}" r"|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2}" r"|\d{4})\b" ) def _is_candidate_word(word: str) -> bool: """A gating word must be a plain lowercase English word, not a stopword/short token.""" return len(word) >= 3 and bool(_ALPHA.match(word)) and word not in _STOPWORDS def _normalize_rows(mat: np.ndarray) -> np.ndarray: norms = np.linalg.norm(mat, axis=1, keepdims=True) norms[norms == 0] = 1e-12 return (mat / norms).astype(np.float32) class SharedSpace: """ The shared vocabulary of words present in BOTH the corpus Word2Vec and the background model, with their normalised vectors in each space. Restricting to shared words keeps neighbour-set overlap fair (both spaces rank the *same* candidate set) and excludes names/jargon (absent from the background) — which are domain vocabulary, not code words. """ def __init__(self, words: list[str], corpus_vecs: np.ndarray, bg_vecs: np.ndarray, corpus_counts: np.ndarray): self.words = words self.index = {w: i for i, w in enumerate(words)} self.C = _normalize_rows(corpus_vecs) # (n, d_corpus) self.B = _normalize_rows(bg_vecs) # (n, d_bg) self.counts = corpus_counts # corpus frequency per word def __len__(self) -> int: return len(self.words) def build_shared_space( w2v: Word2VecEngine, background: BackgroundModel, min_count: int = 5, max_vocab: int = 3000, ) -> SharedSpace: """ Build the shared-vocabulary space. Args: min_count: ignore corpus words rarer than this (their vectors are noise). max_vocab: cap to the N most frequent shared words for tractability. """ wv = w2v.model.wv # Collect candidate words: frequent enough in the corpus, common in English. candidates: list[tuple[str, int]] = [] for word in wv.index_to_key: if not _is_candidate_word(word): continue count = wv.get_vecattr(word, "count") if count < min_count: continue if not background.has(word): continue candidates.append((word, count)) # Keep the most frequent ones (their corpus vectors are the most reliable). candidates.sort(key=lambda x: -x[1]) if len(candidates) > max_vocab: logger.info("Shared vocab capped: %d candidates -> top %d by corpus frequency", len(candidates), max_vocab) candidates = candidates[:max_vocab] words = [w for w, _ in candidates] counts = np.array([c for _, c in candidates], dtype=np.int64) corpus_vecs = np.array([wv[w] for w in words], dtype=np.float32) bg_vecs = np.array([background.kv[w] for w in words], dtype=np.float32) logger.info("Shared space built: %d words (corpus∩background, count>=%d)", len(words), min_count) return SharedSpace(words, corpus_vecs, bg_vecs, counts) # Module-level cache so the (expensive) shared space is reused across requests # until the underlying corpus Word2Vec changes. _cache_lock = threading.Lock() _cached: dict = {"key": None, "space": None} def get_shared_space(w2v: Word2VecEngine, background: BackgroundModel, min_count: int, max_vocab: int) -> SharedSpace: key = (id(w2v.model), len(w2v.model.wv), background.model_name, min_count, max_vocab) with _cache_lock: if _cached["key"] == key and _cached["space"] is not None: return _cached["space"] space = build_shared_space(w2v, background, min_count, max_vocab) _cached["key"] = key _cached["space"] = space return space def _topk_neighbours(sims: np.ndarray, self_idx: int, k: int) -> list[int]: """Indices of the top-k most similar rows, excluding the anchor itself.""" sims = sims.copy() sims[self_idx] = -np.inf if k >= len(sims): order = np.argsort(sims)[::-1] else: part = np.argpartition(sims, -k)[-k:] order = part[np.argsort(sims[part])[::-1]] return [int(i) for i in order] # ------------------------------------------------------------------ # # Stage A — corpus-wide sweep: which words behave most anomalously? # ------------------------------------------------------------------ # def sweep_anomalous_words( w2v: Word2VecEngine, background: BackgroundModel, min_count: int = 5, max_vocab: int = 3000, neighbours: int = 25, top_n: int = 30, preview: int = 6, ) -> dict: """ Rank words by how differently they associate here vs. in general English. For each shared word W we compute its top-`neighbours` in each space and score `shift = 1 - overlap@k` (Jaccard-style overlap of the two neighbour sets). Overlap is scale-free — no cross-space cosine arithmetic. We then z-score `shift` across the whole vocabulary so flagging adapts to the corpus instead of using a magic threshold. """ space = get_shared_space(w2v, background, min_count, max_vocab) n = len(space) if n < neighbours + 2: return {"ready": True, "vocab_size": n, "results": [], "note": "Shared vocabulary too small to compute neighbourhoods."} C, B = space.C, space.B k = min(neighbours, n - 1) shifts = np.zeros(n, dtype=np.float32) corpus_nbrs: list[list[int]] = [None] * n bg_nbrs: list[list[int]] = [None] * n for i in range(n): c_sims = C @ C[i] b_sims = B @ B[i] cn = _topk_neighbours(c_sims, i, k) bn = _topk_neighbours(b_sims, i, k) corpus_nbrs[i] = cn bg_nbrs[i] = bn overlap = len(set(cn) & set(bn)) / k shifts[i] = 1.0 - overlap mean, std = float(shifts.mean()), float(shifts.std()) std = std if std > 1e-9 else 1e-9 order = np.argsort(shifts)[::-1][:top_n] results = [] for i in order: i = int(i) bg_set = set(bg_nbrs[i]) # Corpus neighbours that are NOT normal neighbours = the surprising ties. surprising = [space.words[j] for j in corpus_nbrs[i] if j not in bg_set][:preview] normal = [space.words[j] for j in bg_nbrs[i]][:preview] results.append({ "word": space.words[i], "corpus_frequency": int(space.counts[i]), "shift": round(float(shifts[i]), 4), "z_score": round((float(shifts[i]) - mean) / std, 3), "surprising_neighbors": surprising, # strong here, absent normally "normal_neighbors": normal, # what's normal in English }) return { "ready": True, "vocab_size": n, "neighbours": k, "shift_mean": round(mean, 4), "shift_std": round(std, 4), "results": results, } # ------------------------------------------------------------------ # # Stage B — per-relation surprise for a single flagged word. # ------------------------------------------------------------------ # def relation_surprise( word: str, w2v: Word2VecEngine, background: BackgroundModel, min_count: int = 5, max_vocab: int = 3000, top_k: int = 15, ) -> dict: """ For a single word, the neighbours that are strong in-corpus but weak/absent in general English — the concrete "pizza -> fitness" rows. We standardise each space's similarities to the anchor word into z-scores (dimensionless, relative to the same anchor), then `surprise = corpus_z - background_z`. Subtracting two within-space z-scores IS valid, unlike subtracting raw cross-space cosines. """ word = word.lower().strip() space = get_shared_space(w2v, background, min_count, max_vocab) if word not in space.index: in_corpus = word in w2v.model.wv in_bg = background.has(word) if in_corpus and not in_bg: reason = ("not a common English word (absent from the background model), " "so it's treated as domain vocabulary — a name/jargon, not a code-word candidate") elif not in_corpus: reason = "not in the corpus vocabulary (or too rare)" else: reason = "below the minimum corpus frequency" return {"word": word, "ready": True, "found": False, "reason": reason, "relations": []} i = space.index[word] c_sims = space.C @ space.C[i] b_sims = space.B @ space.B[i] def zscore(sims: np.ndarray) -> np.ndarray: m, s = sims.mean(), sims.std() return (sims - m) / (s if s > 1e-9 else 1e-9) c_z, b_z = zscore(c_sims), zscore(b_sims) surprise = c_z - b_z surprise[i] = -np.inf # exclude self order = np.argsort(surprise)[::-1][:top_k] relations = [{ "neighbor": space.words[j], "corpus_sim": round(float(c_sims[j]), 4), # raw cosines: display only "background_sim": round(float(b_sims[j]), 4), "corpus_z": round(float(c_z[j]), 3), "background_z": round(float(b_z[j]), 3), "surprise": round(float(surprise[j]), 3), } for j in (int(x) for x in order)] # Contrast: what this word's NORMAL neighbours are, in general English. normal_order = _topk_neighbours(b_sims, i, min(top_k, len(space) - 1)) normal = [{"neighbor": space.words[j], "background_sim": round(float(b_sims[j]), 4)} for j in normal_order] return { "word": word, "ready": True, "found": True, "corpus_frequency": int(space.counts[i]), "relations": relations, "normal_neighbors": normal, } # ------------------------------------------------------------------ # # Stage C — contextual incongruence (the "zoom in"), via the transformer. # ------------------------------------------------------------------ # def _extract_entities(text: str, limit: int = 8) -> list[str]: """Capitalised name-like spans and dates, for the investigation view.""" found: list[str] = [] seen: set[str] = set() for m in _ENTITY_RE.finditer(text): token = m.group().strip() # Skip single sentence-initial capitalised stopwords ("The", "And", ...) if " " not in token and token.lower() in _STOPWORDS: continue if token.lower() not in seen: seen.add(token.lower()) found.append(token) for m in _DATE_RE.finditer(text): d = m.group() if d.lower() not in seen: seen.add(d.lower()) found.append(d) return found[:limit] def contextual_incongruence( engine: ContextualSimilarityEngine, keyword: str, canonical_meaning: Optional[str] = None, top_k: int = 10, ) -> dict: """ Find the occurrences where `keyword` is used most unlike its norm. Reference meaning is either: - `canonical_meaning` (a gloss you supply, e.g. "pizza, an Italian food"), or - the centroid of all the keyword's occurrence embeddings (its typical usage in THIS corpus) when no gloss is given. Per occurrence: incongruence = 1 - cos(chunk_embedding, reference). The highest-incongruence chunks are the candidate coded usages — returned with doc/snippet and co-occurring entities so you can read them directly. """ engine._ensure_index() contexts = engine.find_keyword_contexts(keyword) if not contexts: return {"keyword": keyword, "total_occurrences": 0, "occurrences": []} chunk_indices = [engine.chunks.index(ctx.chunk) for ctx in contexts] embeds = engine.embeddings[chunk_indices] # rows are L2-normalised (build_index) if canonical_meaning and canonical_meaning.strip(): ref = engine.model.encode([canonical_meaning], normalize_embeddings=True, convert_to_numpy=True)[0].astype(np.float32) ref_label = canonical_meaning.strip() ref_kind = "gloss" else: centroid = embeds.mean(axis=0) norm = np.linalg.norm(centroid) ref = (centroid / (norm if norm > 0 else 1e-12)).astype(np.float32) ref_label = "corpus-typical usage (centroid of all occurrences)" ref_kind = "centroid" sims = embeds @ ref # cosine (both sides unit-norm) incong = 1.0 - sims median_incong = float(np.median(incong)) order = np.argsort(incong)[::-1][:top_k] occurrences = [] for j in (int(x) for x in order): ctx = contexts[j] text = ctx.chunk.text if ctx.highlight_positions: start, end = ctx.highlight_positions[0] s, e = max(0, start - 120), min(len(text), end + 120) snippet = ("..." if s > 0 else "") + text[s:e].strip() + ("..." if e < len(text) else "") else: snippet = text[:240] occurrences.append({ "doc_id": ctx.chunk.doc_id, "chunk_index": ctx.chunk.chunk_index, "incongruence": round(float(incong[j]), 4), "snippet": snippet, "entities": _extract_entities(text), }) return { "keyword": keyword, "total_occurrences": len(contexts), "reference": ref_label, "reference_kind": ref_kind, "median_incongruence": round(median_incong, 4), "occurrences": occurrences, }