SmartContractAudit / utils /semanticmatcher.py
ajaxwin
refactor: Update ActionType to include costs and modified grader for task 1
5235476
"""
semanticmatcher.py
====================
Deterministic semantic string matcher for short strings (10–12 words).
Algorithm: Weighted ensemble of three independent signals
1. Lexical Jaccard β€” lemmatized token overlap (weight: 0.20)
2. Synonym Jaccard β€” WordNet-expanded token overlap (weight: 0.25)
3. Semantic Cosine β€” sentence-transformers embedding similarity (weight: 0.55)
All three layers are fully deterministic: same inputs β†’ same score, always.
Install dependencies:
python -m nltk.downloader wordnet omw-1.4 stopwords punkt punkt_tab averaged_perceptron_tagger_eng
"""
import re
import string
from functools import lru_cache
import nltk
import numpy as np
from nltk.corpus import wordnet, stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
# ── Config ────────────────────────────────────────────────────────────────────
WEIGHTS = {
"lexical": 0.20, # Plain lemma overlap
"synonym": 0.25, # WordNet-expanded overlap
"semantic": 0.55, # Embedding cosine similarity
}
MATCH_THRESHOLD = 0.72 # Score β‰₯ this β†’ strings "mean the same thing"
STRONG_THRESHOLD = 0.88 # Score β‰₯ this β†’ high-confidence match
# Embedding model: deterministic, no sampling
_EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"
# ── Lazy singletons ───────────────────────────────────────────────────────────
_model: SentenceTransformer | None = None
_lemmatizer: WordNetLemmatizer | None = None
_stop_words: set[str] | None = None
def _get_model() -> SentenceTransformer:
global _model
if _model is None:
_model = SentenceTransformer(_EMBEDDING_MODEL_NAME)
return _model
def _get_lemmatizer() -> WordNetLemmatizer:
global _lemmatizer
if _lemmatizer is None:
_lemmatizer = WordNetLemmatizer()
return _lemmatizer
def _get_stopwords() -> set[str]:
global _stop_words
if _stop_words is None:
_stop_words = set(stopwords.words("english"))
return _stop_words
# ── Text preprocessing ────────────────────────────────────────────────────────
def _get_wordnet_pos(treebank_tag: str) -> str:
"""Map POS treebank tag to WordNet POS constant for better lemmatization."""
if treebank_tag.startswith("J"):
return wordnet.ADJ
elif treebank_tag.startswith("V"):
return wordnet.VERB
elif treebank_tag.startswith("R"):
return wordnet.ADV
return wordnet.NOUN
def normalize(text: str) -> str:
"""Lowercase, strip punctuation, collapse whitespace."""
text = text.lower()
text = text.translate(str.maketrans("", "", string.punctuation))
text = re.sub(r"\s+", " ", text).strip()
return text
def tokenize_and_lemmatize(text: str) -> list[str]:
"""Tokenize, POS-tag, lemmatize, and remove stopwords."""
lemmatizer = _get_lemmatizer()
stop_words = _get_stopwords()
tokens = nltk.word_tokenize(normalize(text))
pos_tags = nltk.pos_tag(tokens)
lemmas = [
lemmatizer.lemmatize(word, _get_wordnet_pos(pos))
for word, pos in pos_tags
if word not in stop_words and word.isalpha()
]
return lemmas
# ── WordNet synonym expansion ─────────────────────────────────────────────────
@lru_cache(maxsize=512)
def _synonyms(word: str) -> frozenset[str]:
"""Return all WordNet lemma names for a word (including the word itself)."""
syns: set[str] = {word}
for synset in wordnet.synsets(word):
for lemma in synset.lemmas(): # type: ignore
syns.add(lemma.name().replace("_", " ").lower())
return frozenset(syns)
def expand_with_synonyms(tokens: list[str]) -> set[str]:
"""Expand a token list to include all WordNet synonyms."""
expanded: set[str] = set()
for token in tokens:
expanded.update(_synonyms(token))
return expanded
# ── Similarity metrics ────────────────────────────────────────────────────────
def jaccard(set_a: set[str], set_b: set[str]) -> float:
"""Jaccard similarity: |A ∩ B| / |A βˆͺ B|"""
if not set_a and not set_b:
return 1.0
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union)
def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float:
"""Cosine similarity between two L2-normalized vectors."""
norm_a = np.linalg.norm(vec_a)
norm_b = np.linalg.norm(vec_b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(vec_a, vec_b) / (norm_a * norm_b))
# ── Core matcher ──────────────────────────────────────────────────────────────
class SemanticMatcher:
"""
Deterministic semantic matcher for short strings.
Usage:
matcher = SemanticMatcher()
result = matcher.match("The cat sat on the mat",
"A cat was sitting on the mat")
print(result)
"""
def __init__(
self,
match_threshold: float = MATCH_THRESHOLD,
strong_threshold: float = STRONG_THRESHOLD,
weights: dict[str, float] | None = None,
):
self.match_threshold = match_threshold
self.strong_threshold = strong_threshold
self.weights = weights or WEIGHTS
self.confidence_level: str = "no_match"
total = sum(self.weights.values())
assert abs(total - 1.0) < 1e-6, f"Weights must sum to 1.0 (got {total:.4f})"
# ── Inner Functions ────────────────────────────────────────────────────
def _layer_lexical(self, tokens_a: list[str], tokens_b: list[str]) -> float:
return jaccard(set(tokens_a), set(tokens_b))
def _layer_synonym(self, tokens_a: list[str], tokens_b: list[str]) -> float:
expanded_a = expand_with_synonyms(tokens_a)
expanded_b = expand_with_synonyms(tokens_b)
return jaccard(expanded_a, expanded_b)
def _layer_semantic(self, text_a: str, text_b: str) -> float:
model = _get_model()
# encode() is deterministic: no sampling, fixed weights
embeddings = model.encode(
[normalize(text_a), normalize(text_b)],
convert_to_numpy=True,
normalize_embeddings=True,
)
return cosine_similarity(embeddings[0], embeddings[1]) # type: ignore
# ── Public Functions ────────────────────────────────────────────────────
def matchscore(self, text_a: str, text_b: str) -> float:
"""
Compare two strings and return a score of whether they are matching.
Returns a float between 0.0 and 1.0, where 1.0 indicates a perfect match.
"""
# Fast-path: normalized exact match
if normalize(text_a) == normalize(text_b):
self.confidence_level = "strong"
return 1.0
tokens_a = tokenize_and_lemmatize(text_a)
tokens_b = tokenize_and_lemmatize(text_b)
layer_scores = {
"lexical": self._layer_lexical(tokens_a, tokens_b),
"synonym": self._layer_synonym(tokens_a, tokens_b),
"semantic": self._layer_semantic(text_a, text_b),
}
score = sum(self.weights[k] * v for k, v in layer_scores.items())
if score >= self.strong_threshold:
self.confidence_level = "strong"
elif score >= self.match_threshold:
self.confidence_level = "moderate"
else:
self.confidence_level = "no_match"
return score
def match(self, text_a: str, text_b: str) -> bool:
"""Return True if the two texts are considered a match based on the score."""
score = self.matchscore(text_a, text_b)
return score >= self.match_threshold
def confidence(self) -> str:
"""Return 'strong' if score β‰₯ strong_threshold, else 'moderate' or 'no_match'."""
return self.confidence_level