| """ |
| Anomalous-relation detection — finding "code word" candidates. |
| |
| A code word is a *common English word used uncommonly in this corpus*. We make |
| that precise by contrasting two distributional word spaces: |
| |
| - the corpus Word2Vec (how words associate IN these documents), and |
| - a pretrained general-English model (how they associate NORMALLY). |
| |
| The whole design avoids two traps: |
| 1. Raw cosines from different embedding spaces are NOT comparable, so we never |
| subtract a corpus cosine from a background cosine. We compare neighbour |
| *sets* (scale-free) and, when we need per-neighbour scores, we standardise |
| within each space (z-scores relative to the same anchor word) before |
| combining — that subtraction *is* legitimate. |
| 2. "Low similarity" is the default for almost all word pairs and is not a |
| signal. The signal is *surprise*: strong here, weak normally. |
| |
| Three stages: |
| A. sweep — rank which words behave most differently here vs. normally |
| (neighbour-set divergence, z-scored across the vocabulary). |
| B. relations — for one flagged word, the specific neighbours that are |
| strong in-corpus but weak/absent in general English. |
| C. incongruence — uses the transformer to find the specific occurrences |
| (chunks/docs) where a keyword is used unlike its norm. |
| |
| Stages A/B need the corpus Word2Vec + background model. Stage C needs the |
| transformer engine (contextual embeddings), which is the only one of the three |
| models that can judge a single *occurrence* in context. |
| """ |
|
|
| import logging |
| import re |
| import threading |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| from contextual_similarity import ContextualSimilarityEngine |
| from word2vec_baseline import Word2VecEngine |
| from background_model import BackgroundModel |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| _STOPWORDS = ContextualSimilarityEngine._STOPWORDS |
| _ALPHA = re.compile(r"^[a-z]+$") |
|
|
| |
| _ENTITY_RE = re.compile(r"\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,3}\b") |
| _DATE_RE = re.compile( |
| r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}" |
| r"|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2}" |
| r"|\d{4})\b" |
| ) |
|
|
|
|
| def _is_candidate_word(word: str) -> bool: |
| """A gating word must be a plain lowercase English word, not a stopword/short token.""" |
| return len(word) >= 3 and bool(_ALPHA.match(word)) and word not in _STOPWORDS |
|
|
|
|
| def _normalize_rows(mat: np.ndarray) -> np.ndarray: |
| norms = np.linalg.norm(mat, axis=1, keepdims=True) |
| norms[norms == 0] = 1e-12 |
| return (mat / norms).astype(np.float32) |
|
|
|
|
| class SharedSpace: |
| """ |
| The shared vocabulary of words present in BOTH the corpus Word2Vec and the |
| background model, with their normalised vectors in each space. |
| |
| Restricting to shared words keeps neighbour-set overlap fair (both spaces |
| rank the *same* candidate set) and excludes names/jargon (absent from the |
| background) — which are domain vocabulary, not code words. |
| """ |
|
|
| def __init__(self, words: list[str], corpus_vecs: np.ndarray, |
| bg_vecs: np.ndarray, corpus_counts: np.ndarray): |
| self.words = words |
| self.index = {w: i for i, w in enumerate(words)} |
| self.C = _normalize_rows(corpus_vecs) |
| self.B = _normalize_rows(bg_vecs) |
| self.counts = corpus_counts |
|
|
| def __len__(self) -> int: |
| return len(self.words) |
|
|
|
|
| def build_shared_space( |
| w2v: Word2VecEngine, |
| background: BackgroundModel, |
| min_count: int = 5, |
| max_vocab: int = 3000, |
| ) -> SharedSpace: |
| """ |
| Build the shared-vocabulary space. |
| |
| Args: |
| min_count: ignore corpus words rarer than this (their vectors are noise). |
| max_vocab: cap to the N most frequent shared words for tractability. |
| """ |
| wv = w2v.model.wv |
| |
| candidates: list[tuple[str, int]] = [] |
| for word in wv.index_to_key: |
| if not _is_candidate_word(word): |
| continue |
| count = wv.get_vecattr(word, "count") |
| if count < min_count: |
| continue |
| if not background.has(word): |
| continue |
| candidates.append((word, count)) |
|
|
| |
| candidates.sort(key=lambda x: -x[1]) |
| if len(candidates) > max_vocab: |
| logger.info("Shared vocab capped: %d candidates -> top %d by corpus frequency", |
| len(candidates), max_vocab) |
| candidates = candidates[:max_vocab] |
|
|
| words = [w for w, _ in candidates] |
| counts = np.array([c for _, c in candidates], dtype=np.int64) |
| corpus_vecs = np.array([wv[w] for w in words], dtype=np.float32) |
| bg_vecs = np.array([background.kv[w] for w in words], dtype=np.float32) |
| logger.info("Shared space built: %d words (corpus∩background, count>=%d)", len(words), min_count) |
| return SharedSpace(words, corpus_vecs, bg_vecs, counts) |
|
|
|
|
| |
| |
| _cache_lock = threading.Lock() |
| _cached: dict = {"key": None, "space": None} |
|
|
|
|
| def get_shared_space(w2v: Word2VecEngine, background: BackgroundModel, |
| min_count: int, max_vocab: int) -> SharedSpace: |
| key = (id(w2v.model), len(w2v.model.wv), background.model_name, min_count, max_vocab) |
| with _cache_lock: |
| if _cached["key"] == key and _cached["space"] is not None: |
| return _cached["space"] |
| space = build_shared_space(w2v, background, min_count, max_vocab) |
| _cached["key"] = key |
| _cached["space"] = space |
| return space |
|
|
|
|
| def _topk_neighbours(sims: np.ndarray, self_idx: int, k: int) -> list[int]: |
| """Indices of the top-k most similar rows, excluding the anchor itself.""" |
| sims = sims.copy() |
| sims[self_idx] = -np.inf |
| if k >= len(sims): |
| order = np.argsort(sims)[::-1] |
| else: |
| part = np.argpartition(sims, -k)[-k:] |
| order = part[np.argsort(sims[part])[::-1]] |
| return [int(i) for i in order] |
|
|
|
|
| |
| |
| |
|
|
| def sweep_anomalous_words( |
| w2v: Word2VecEngine, |
| background: BackgroundModel, |
| min_count: int = 5, |
| max_vocab: int = 3000, |
| neighbours: int = 25, |
| top_n: int = 30, |
| preview: int = 6, |
| ) -> dict: |
| """ |
| Rank words by how differently they associate here vs. in general English. |
| |
| For each shared word W we compute its top-`neighbours` in each space and |
| score `shift = 1 - overlap@k` (Jaccard-style overlap of the two neighbour |
| sets). Overlap is scale-free — no cross-space cosine arithmetic. We then |
| z-score `shift` across the whole vocabulary so flagging adapts to the corpus |
| instead of using a magic threshold. |
| """ |
| space = get_shared_space(w2v, background, min_count, max_vocab) |
| n = len(space) |
| if n < neighbours + 2: |
| return {"ready": True, "vocab_size": n, "results": [], |
| "note": "Shared vocabulary too small to compute neighbourhoods."} |
|
|
| C, B = space.C, space.B |
| k = min(neighbours, n - 1) |
|
|
| shifts = np.zeros(n, dtype=np.float32) |
| corpus_nbrs: list[list[int]] = [None] * n |
| bg_nbrs: list[list[int]] = [None] * n |
| for i in range(n): |
| c_sims = C @ C[i] |
| b_sims = B @ B[i] |
| cn = _topk_neighbours(c_sims, i, k) |
| bn = _topk_neighbours(b_sims, i, k) |
| corpus_nbrs[i] = cn |
| bg_nbrs[i] = bn |
| overlap = len(set(cn) & set(bn)) / k |
| shifts[i] = 1.0 - overlap |
|
|
| mean, std = float(shifts.mean()), float(shifts.std()) |
| std = std if std > 1e-9 else 1e-9 |
|
|
| order = np.argsort(shifts)[::-1][:top_n] |
| results = [] |
| for i in order: |
| i = int(i) |
| bg_set = set(bg_nbrs[i]) |
| |
| surprising = [space.words[j] for j in corpus_nbrs[i] if j not in bg_set][:preview] |
| normal = [space.words[j] for j in bg_nbrs[i]][:preview] |
| results.append({ |
| "word": space.words[i], |
| "corpus_frequency": int(space.counts[i]), |
| "shift": round(float(shifts[i]), 4), |
| "z_score": round((float(shifts[i]) - mean) / std, 3), |
| "surprising_neighbors": surprising, |
| "normal_neighbors": normal, |
| }) |
|
|
| return { |
| "ready": True, |
| "vocab_size": n, |
| "neighbours": k, |
| "shift_mean": round(mean, 4), |
| "shift_std": round(std, 4), |
| "results": results, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def relation_surprise( |
| word: str, |
| w2v: Word2VecEngine, |
| background: BackgroundModel, |
| min_count: int = 5, |
| max_vocab: int = 3000, |
| top_k: int = 15, |
| ) -> dict: |
| """ |
| For a single word, the neighbours that are strong in-corpus but weak/absent |
| in general English — the concrete "pizza -> fitness" rows. |
| |
| We standardise each space's similarities to the anchor word into z-scores |
| (dimensionless, relative to the same anchor), then `surprise = corpus_z - |
| background_z`. Subtracting two within-space z-scores IS valid, unlike |
| subtracting raw cross-space cosines. |
| """ |
| word = word.lower().strip() |
| space = get_shared_space(w2v, background, min_count, max_vocab) |
|
|
| if word not in space.index: |
| in_corpus = word in w2v.model.wv |
| in_bg = background.has(word) |
| if in_corpus and not in_bg: |
| reason = ("not a common English word (absent from the background model), " |
| "so it's treated as domain vocabulary — a name/jargon, not a code-word candidate") |
| elif not in_corpus: |
| reason = "not in the corpus vocabulary (or too rare)" |
| else: |
| reason = "below the minimum corpus frequency" |
| return {"word": word, "ready": True, "found": False, "reason": reason, "relations": []} |
|
|
| i = space.index[word] |
| c_sims = space.C @ space.C[i] |
| b_sims = space.B @ space.B[i] |
|
|
| def zscore(sims: np.ndarray) -> np.ndarray: |
| m, s = sims.mean(), sims.std() |
| return (sims - m) / (s if s > 1e-9 else 1e-9) |
|
|
| c_z, b_z = zscore(c_sims), zscore(b_sims) |
| surprise = c_z - b_z |
| surprise[i] = -np.inf |
|
|
| order = np.argsort(surprise)[::-1][:top_k] |
| relations = [{ |
| "neighbor": space.words[j], |
| "corpus_sim": round(float(c_sims[j]), 4), |
| "background_sim": round(float(b_sims[j]), 4), |
| "corpus_z": round(float(c_z[j]), 3), |
| "background_z": round(float(b_z[j]), 3), |
| "surprise": round(float(surprise[j]), 3), |
| } for j in (int(x) for x in order)] |
|
|
| |
| normal_order = _topk_neighbours(b_sims, i, min(top_k, len(space) - 1)) |
| normal = [{"neighbor": space.words[j], "background_sim": round(float(b_sims[j]), 4)} |
| for j in normal_order] |
|
|
| return { |
| "word": word, |
| "ready": True, |
| "found": True, |
| "corpus_frequency": int(space.counts[i]), |
| "relations": relations, |
| "normal_neighbors": normal, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _extract_entities(text: str, limit: int = 8) -> list[str]: |
| """Capitalised name-like spans and dates, for the investigation view.""" |
| found: list[str] = [] |
| seen: set[str] = set() |
| for m in _ENTITY_RE.finditer(text): |
| token = m.group().strip() |
| |
| if " " not in token and token.lower() in _STOPWORDS: |
| continue |
| if token.lower() not in seen: |
| seen.add(token.lower()) |
| found.append(token) |
| for m in _DATE_RE.finditer(text): |
| d = m.group() |
| if d.lower() not in seen: |
| seen.add(d.lower()) |
| found.append(d) |
| return found[:limit] |
|
|
|
|
| def contextual_incongruence( |
| engine: ContextualSimilarityEngine, |
| keyword: str, |
| canonical_meaning: Optional[str] = None, |
| top_k: int = 10, |
| ) -> dict: |
| """ |
| Find the occurrences where `keyword` is used most unlike its norm. |
| |
| Reference meaning is either: |
| - `canonical_meaning` (a gloss you supply, e.g. "pizza, an Italian food"), or |
| - the centroid of all the keyword's occurrence embeddings (its typical |
| usage in THIS corpus) when no gloss is given. |
| |
| Per occurrence: incongruence = 1 - cos(chunk_embedding, reference). The |
| highest-incongruence chunks are the candidate coded usages — returned with |
| doc/snippet and co-occurring entities so you can read them directly. |
| """ |
| engine._ensure_index() |
| contexts = engine.find_keyword_contexts(keyword) |
| if not contexts: |
| return {"keyword": keyword, "total_occurrences": 0, "occurrences": []} |
|
|
| chunk_indices = [engine.chunks.index(ctx.chunk) for ctx in contexts] |
| embeds = engine.embeddings[chunk_indices] |
|
|
| if canonical_meaning and canonical_meaning.strip(): |
| ref = engine.model.encode([canonical_meaning], normalize_embeddings=True, |
| convert_to_numpy=True)[0].astype(np.float32) |
| ref_label = canonical_meaning.strip() |
| ref_kind = "gloss" |
| else: |
| centroid = embeds.mean(axis=0) |
| norm = np.linalg.norm(centroid) |
| ref = (centroid / (norm if norm > 0 else 1e-12)).astype(np.float32) |
| ref_label = "corpus-typical usage (centroid of all occurrences)" |
| ref_kind = "centroid" |
|
|
| sims = embeds @ ref |
| incong = 1.0 - sims |
| median_incong = float(np.median(incong)) |
|
|
| order = np.argsort(incong)[::-1][:top_k] |
| occurrences = [] |
| for j in (int(x) for x in order): |
| ctx = contexts[j] |
| text = ctx.chunk.text |
| if ctx.highlight_positions: |
| start, end = ctx.highlight_positions[0] |
| s, e = max(0, start - 120), min(len(text), end + 120) |
| snippet = ("..." if s > 0 else "") + text[s:e].strip() + ("..." if e < len(text) else "") |
| else: |
| snippet = text[:240] |
| occurrences.append({ |
| "doc_id": ctx.chunk.doc_id, |
| "chunk_index": ctx.chunk.chunk_index, |
| "incongruence": round(float(incong[j]), 4), |
| "snippet": snippet, |
| "entities": _extract_entities(text), |
| }) |
|
|
| return { |
| "keyword": keyword, |
| "total_occurrences": len(contexts), |
| "reference": ref_label, |
| "reference_kind": ref_kind, |
| "median_incongruence": round(median_incong, 4), |
| "occurrences": occurrences, |
| } |
|
|