esfiles / anomaly.py
Besjon Cifliku
feat: implement anomaly detection to filter suspicious word relations
9c3ade2
Raw
History Blame Contribute Delete
15.5 kB
"""
Anomalous-relation detection — finding "code word" candidates.
A code word is a *common English word used uncommonly in this corpus*. We make
that precise by contrasting two distributional word spaces:
- the corpus Word2Vec (how words associate IN these documents), and
- a pretrained general-English model (how they associate NORMALLY).
The whole design avoids two traps:
1. Raw cosines from different embedding spaces are NOT comparable, so we never
subtract a corpus cosine from a background cosine. We compare neighbour
*sets* (scale-free) and, when we need per-neighbour scores, we standardise
within each space (z-scores relative to the same anchor word) before
combining — that subtraction *is* legitimate.
2. "Low similarity" is the default for almost all word pairs and is not a
signal. The signal is *surprise*: strong here, weak normally.
Three stages:
A. sweep — rank which words behave most differently here vs. normally
(neighbour-set divergence, z-scored across the vocabulary).
B. relations — for one flagged word, the specific neighbours that are
strong in-corpus but weak/absent in general English.
C. incongruence — uses the transformer to find the specific occurrences
(chunks/docs) where a keyword is used unlike its norm.
Stages A/B need the corpus Word2Vec + background model. Stage C needs the
transformer engine (contextual embeddings), which is the only one of the three
models that can judge a single *occurrence* in context.
"""
import logging
import re
import threading
from typing import Optional
import numpy as np
from contextual_similarity import ContextualSimilarityEngine
from word2vec_baseline import Word2VecEngine
from background_model import BackgroundModel
logger = logging.getLogger(__name__)
# Reuse the engine's stopword list so the three tools agree on what to ignore.
_STOPWORDS = ContextualSimilarityEngine._STOPWORDS
_ALPHA = re.compile(r"^[a-z]+$")
# Capitalised spans (names/orgs) and dates, for the Stage-C investigation view.
_ENTITY_RE = re.compile(r"\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,3}\b")
_DATE_RE = re.compile(
r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}"
r"|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2}"
r"|\d{4})\b"
)
def _is_candidate_word(word: str) -> bool:
"""A gating word must be a plain lowercase English word, not a stopword/short token."""
return len(word) >= 3 and bool(_ALPHA.match(word)) and word not in _STOPWORDS
def _normalize_rows(mat: np.ndarray) -> np.ndarray:
norms = np.linalg.norm(mat, axis=1, keepdims=True)
norms[norms == 0] = 1e-12
return (mat / norms).astype(np.float32)
class SharedSpace:
"""
The shared vocabulary of words present in BOTH the corpus Word2Vec and the
background model, with their normalised vectors in each space.
Restricting to shared words keeps neighbour-set overlap fair (both spaces
rank the *same* candidate set) and excludes names/jargon (absent from the
background) — which are domain vocabulary, not code words.
"""
def __init__(self, words: list[str], corpus_vecs: np.ndarray,
bg_vecs: np.ndarray, corpus_counts: np.ndarray):
self.words = words
self.index = {w: i for i, w in enumerate(words)}
self.C = _normalize_rows(corpus_vecs) # (n, d_corpus)
self.B = _normalize_rows(bg_vecs) # (n, d_bg)
self.counts = corpus_counts # corpus frequency per word
def __len__(self) -> int:
return len(self.words)
def build_shared_space(
w2v: Word2VecEngine,
background: BackgroundModel,
min_count: int = 5,
max_vocab: int = 3000,
) -> SharedSpace:
"""
Build the shared-vocabulary space.
Args:
min_count: ignore corpus words rarer than this (their vectors are noise).
max_vocab: cap to the N most frequent shared words for tractability.
"""
wv = w2v.model.wv
# Collect candidate words: frequent enough in the corpus, common in English.
candidates: list[tuple[str, int]] = []
for word in wv.index_to_key:
if not _is_candidate_word(word):
continue
count = wv.get_vecattr(word, "count")
if count < min_count:
continue
if not background.has(word):
continue
candidates.append((word, count))
# Keep the most frequent ones (their corpus vectors are the most reliable).
candidates.sort(key=lambda x: -x[1])
if len(candidates) > max_vocab:
logger.info("Shared vocab capped: %d candidates -> top %d by corpus frequency",
len(candidates), max_vocab)
candidates = candidates[:max_vocab]
words = [w for w, _ in candidates]
counts = np.array([c for _, c in candidates], dtype=np.int64)
corpus_vecs = np.array([wv[w] for w in words], dtype=np.float32)
bg_vecs = np.array([background.kv[w] for w in words], dtype=np.float32)
logger.info("Shared space built: %d words (corpus∩background, count>=%d)", len(words), min_count)
return SharedSpace(words, corpus_vecs, bg_vecs, counts)
# Module-level cache so the (expensive) shared space is reused across requests
# until the underlying corpus Word2Vec changes.
_cache_lock = threading.Lock()
_cached: dict = {"key": None, "space": None}
def get_shared_space(w2v: Word2VecEngine, background: BackgroundModel,
min_count: int, max_vocab: int) -> SharedSpace:
key = (id(w2v.model), len(w2v.model.wv), background.model_name, min_count, max_vocab)
with _cache_lock:
if _cached["key"] == key and _cached["space"] is not None:
return _cached["space"]
space = build_shared_space(w2v, background, min_count, max_vocab)
_cached["key"] = key
_cached["space"] = space
return space
def _topk_neighbours(sims: np.ndarray, self_idx: int, k: int) -> list[int]:
"""Indices of the top-k most similar rows, excluding the anchor itself."""
sims = sims.copy()
sims[self_idx] = -np.inf
if k >= len(sims):
order = np.argsort(sims)[::-1]
else:
part = np.argpartition(sims, -k)[-k:]
order = part[np.argsort(sims[part])[::-1]]
return [int(i) for i in order]
# ------------------------------------------------------------------ #
# Stage A — corpus-wide sweep: which words behave most anomalously?
# ------------------------------------------------------------------ #
def sweep_anomalous_words(
w2v: Word2VecEngine,
background: BackgroundModel,
min_count: int = 5,
max_vocab: int = 3000,
neighbours: int = 25,
top_n: int = 30,
preview: int = 6,
) -> dict:
"""
Rank words by how differently they associate here vs. in general English.
For each shared word W we compute its top-`neighbours` in each space and
score `shift = 1 - overlap@k` (Jaccard-style overlap of the two neighbour
sets). Overlap is scale-free — no cross-space cosine arithmetic. We then
z-score `shift` across the whole vocabulary so flagging adapts to the corpus
instead of using a magic threshold.
"""
space = get_shared_space(w2v, background, min_count, max_vocab)
n = len(space)
if n < neighbours + 2:
return {"ready": True, "vocab_size": n, "results": [],
"note": "Shared vocabulary too small to compute neighbourhoods."}
C, B = space.C, space.B
k = min(neighbours, n - 1)
shifts = np.zeros(n, dtype=np.float32)
corpus_nbrs: list[list[int]] = [None] * n
bg_nbrs: list[list[int]] = [None] * n
for i in range(n):
c_sims = C @ C[i]
b_sims = B @ B[i]
cn = _topk_neighbours(c_sims, i, k)
bn = _topk_neighbours(b_sims, i, k)
corpus_nbrs[i] = cn
bg_nbrs[i] = bn
overlap = len(set(cn) & set(bn)) / k
shifts[i] = 1.0 - overlap
mean, std = float(shifts.mean()), float(shifts.std())
std = std if std > 1e-9 else 1e-9
order = np.argsort(shifts)[::-1][:top_n]
results = []
for i in order:
i = int(i)
bg_set = set(bg_nbrs[i])
# Corpus neighbours that are NOT normal neighbours = the surprising ties.
surprising = [space.words[j] for j in corpus_nbrs[i] if j not in bg_set][:preview]
normal = [space.words[j] for j in bg_nbrs[i]][:preview]
results.append({
"word": space.words[i],
"corpus_frequency": int(space.counts[i]),
"shift": round(float(shifts[i]), 4),
"z_score": round((float(shifts[i]) - mean) / std, 3),
"surprising_neighbors": surprising, # strong here, absent normally
"normal_neighbors": normal, # what's normal in English
})
return {
"ready": True,
"vocab_size": n,
"neighbours": k,
"shift_mean": round(mean, 4),
"shift_std": round(std, 4),
"results": results,
}
# ------------------------------------------------------------------ #
# Stage B — per-relation surprise for a single flagged word.
# ------------------------------------------------------------------ #
def relation_surprise(
word: str,
w2v: Word2VecEngine,
background: BackgroundModel,
min_count: int = 5,
max_vocab: int = 3000,
top_k: int = 15,
) -> dict:
"""
For a single word, the neighbours that are strong in-corpus but weak/absent
in general English — the concrete "pizza -> fitness" rows.
We standardise each space's similarities to the anchor word into z-scores
(dimensionless, relative to the same anchor), then `surprise = corpus_z -
background_z`. Subtracting two within-space z-scores IS valid, unlike
subtracting raw cross-space cosines.
"""
word = word.lower().strip()
space = get_shared_space(w2v, background, min_count, max_vocab)
if word not in space.index:
in_corpus = word in w2v.model.wv
in_bg = background.has(word)
if in_corpus and not in_bg:
reason = ("not a common English word (absent from the background model), "
"so it's treated as domain vocabulary — a name/jargon, not a code-word candidate")
elif not in_corpus:
reason = "not in the corpus vocabulary (or too rare)"
else:
reason = "below the minimum corpus frequency"
return {"word": word, "ready": True, "found": False, "reason": reason, "relations": []}
i = space.index[word]
c_sims = space.C @ space.C[i]
b_sims = space.B @ space.B[i]
def zscore(sims: np.ndarray) -> np.ndarray:
m, s = sims.mean(), sims.std()
return (sims - m) / (s if s > 1e-9 else 1e-9)
c_z, b_z = zscore(c_sims), zscore(b_sims)
surprise = c_z - b_z
surprise[i] = -np.inf # exclude self
order = np.argsort(surprise)[::-1][:top_k]
relations = [{
"neighbor": space.words[j],
"corpus_sim": round(float(c_sims[j]), 4), # raw cosines: display only
"background_sim": round(float(b_sims[j]), 4),
"corpus_z": round(float(c_z[j]), 3),
"background_z": round(float(b_z[j]), 3),
"surprise": round(float(surprise[j]), 3),
} for j in (int(x) for x in order)]
# Contrast: what this word's NORMAL neighbours are, in general English.
normal_order = _topk_neighbours(b_sims, i, min(top_k, len(space) - 1))
normal = [{"neighbor": space.words[j], "background_sim": round(float(b_sims[j]), 4)}
for j in normal_order]
return {
"word": word,
"ready": True,
"found": True,
"corpus_frequency": int(space.counts[i]),
"relations": relations,
"normal_neighbors": normal,
}
# ------------------------------------------------------------------ #
# Stage C — contextual incongruence (the "zoom in"), via the transformer.
# ------------------------------------------------------------------ #
def _extract_entities(text: str, limit: int = 8) -> list[str]:
"""Capitalised name-like spans and dates, for the investigation view."""
found: list[str] = []
seen: set[str] = set()
for m in _ENTITY_RE.finditer(text):
token = m.group().strip()
# Skip single sentence-initial capitalised stopwords ("The", "And", ...)
if " " not in token and token.lower() in _STOPWORDS:
continue
if token.lower() not in seen:
seen.add(token.lower())
found.append(token)
for m in _DATE_RE.finditer(text):
d = m.group()
if d.lower() not in seen:
seen.add(d.lower())
found.append(d)
return found[:limit]
def contextual_incongruence(
engine: ContextualSimilarityEngine,
keyword: str,
canonical_meaning: Optional[str] = None,
top_k: int = 10,
) -> dict:
"""
Find the occurrences where `keyword` is used most unlike its norm.
Reference meaning is either:
- `canonical_meaning` (a gloss you supply, e.g. "pizza, an Italian food"), or
- the centroid of all the keyword's occurrence embeddings (its typical
usage in THIS corpus) when no gloss is given.
Per occurrence: incongruence = 1 - cos(chunk_embedding, reference). The
highest-incongruence chunks are the candidate coded usages — returned with
doc/snippet and co-occurring entities so you can read them directly.
"""
engine._ensure_index()
contexts = engine.find_keyword_contexts(keyword)
if not contexts:
return {"keyword": keyword, "total_occurrences": 0, "occurrences": []}
chunk_indices = [engine.chunks.index(ctx.chunk) for ctx in contexts]
embeds = engine.embeddings[chunk_indices] # rows are L2-normalised (build_index)
if canonical_meaning and canonical_meaning.strip():
ref = engine.model.encode([canonical_meaning], normalize_embeddings=True,
convert_to_numpy=True)[0].astype(np.float32)
ref_label = canonical_meaning.strip()
ref_kind = "gloss"
else:
centroid = embeds.mean(axis=0)
norm = np.linalg.norm(centroid)
ref = (centroid / (norm if norm > 0 else 1e-12)).astype(np.float32)
ref_label = "corpus-typical usage (centroid of all occurrences)"
ref_kind = "centroid"
sims = embeds @ ref # cosine (both sides unit-norm)
incong = 1.0 - sims
median_incong = float(np.median(incong))
order = np.argsort(incong)[::-1][:top_k]
occurrences = []
for j in (int(x) for x in order):
ctx = contexts[j]
text = ctx.chunk.text
if ctx.highlight_positions:
start, end = ctx.highlight_positions[0]
s, e = max(0, start - 120), min(len(text), end + 120)
snippet = ("..." if s > 0 else "") + text[s:e].strip() + ("..." if e < len(text) else "")
else:
snippet = text[:240]
occurrences.append({
"doc_id": ctx.chunk.doc_id,
"chunk_index": ctx.chunk.chunk_index,
"incongruence": round(float(incong[j]), 4),
"snippet": snippet,
"entities": _extract_entities(text),
})
return {
"keyword": keyword,
"total_occurrences": len(contexts),
"reference": ref_label,
"reference_kind": ref_kind,
"median_incongruence": round(median_incong, 4),
"occurrences": occurrences,
}