Spaces:
Sleeping
Sleeping
| """ | |
| semanticmatcher.py | |
| ==================== | |
| Deterministic semantic string matcher for short strings (10β12 words). | |
| Algorithm: Weighted ensemble of three independent signals | |
| 1. Lexical Jaccard β lemmatized token overlap (weight: 0.20) | |
| 2. Synonym Jaccard β WordNet-expanded token overlap (weight: 0.25) | |
| 3. Semantic Cosine β sentence-transformers embedding similarity (weight: 0.55) | |
| All three layers are fully deterministic: same inputs β same score, always. | |
| Install dependencies: | |
| python -m nltk.downloader wordnet omw-1.4 stopwords punkt punkt_tab averaged_perceptron_tagger_eng | |
| """ | |
| import re | |
| import string | |
| from functools import lru_cache | |
| import nltk | |
| import numpy as np | |
| from nltk.corpus import wordnet, stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from sentence_transformers import SentenceTransformer | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| WEIGHTS = { | |
| "lexical": 0.20, # Plain lemma overlap | |
| "synonym": 0.25, # WordNet-expanded overlap | |
| "semantic": 0.55, # Embedding cosine similarity | |
| } | |
| MATCH_THRESHOLD = 0.72 # Score β₯ this β strings "mean the same thing" | |
| STRONG_THRESHOLD = 0.88 # Score β₯ this β high-confidence match | |
| # Embedding model: deterministic, no sampling | |
| _EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2" | |
| # ββ Lazy singletons βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _model: SentenceTransformer | None = None | |
| _lemmatizer: WordNetLemmatizer | None = None | |
| _stop_words: set[str] | None = None | |
| def _get_model() -> SentenceTransformer: | |
| global _model | |
| if _model is None: | |
| _model = SentenceTransformer(_EMBEDDING_MODEL_NAME) | |
| return _model | |
| def _get_lemmatizer() -> WordNetLemmatizer: | |
| global _lemmatizer | |
| if _lemmatizer is None: | |
| _lemmatizer = WordNetLemmatizer() | |
| return _lemmatizer | |
| def _get_stopwords() -> set[str]: | |
| global _stop_words | |
| if _stop_words is None: | |
| _stop_words = set(stopwords.words("english")) | |
| return _stop_words | |
| # ββ Text preprocessing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_wordnet_pos(treebank_tag: str) -> str: | |
| """Map POS treebank tag to WordNet POS constant for better lemmatization.""" | |
| if treebank_tag.startswith("J"): | |
| return wordnet.ADJ | |
| elif treebank_tag.startswith("V"): | |
| return wordnet.VERB | |
| elif treebank_tag.startswith("R"): | |
| return wordnet.ADV | |
| return wordnet.NOUN | |
| def normalize(text: str) -> str: | |
| """Lowercase, strip punctuation, collapse whitespace.""" | |
| text = text.lower() | |
| text = text.translate(str.maketrans("", "", string.punctuation)) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def tokenize_and_lemmatize(text: str) -> list[str]: | |
| """Tokenize, POS-tag, lemmatize, and remove stopwords.""" | |
| lemmatizer = _get_lemmatizer() | |
| stop_words = _get_stopwords() | |
| tokens = nltk.word_tokenize(normalize(text)) | |
| pos_tags = nltk.pos_tag(tokens) | |
| lemmas = [ | |
| lemmatizer.lemmatize(word, _get_wordnet_pos(pos)) | |
| for word, pos in pos_tags | |
| if word not in stop_words and word.isalpha() | |
| ] | |
| return lemmas | |
| # ββ WordNet synonym expansion βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _synonyms(word: str) -> frozenset[str]: | |
| """Return all WordNet lemma names for a word (including the word itself).""" | |
| syns: set[str] = {word} | |
| for synset in wordnet.synsets(word): | |
| for lemma in synset.lemmas(): # type: ignore | |
| syns.add(lemma.name().replace("_", " ").lower()) | |
| return frozenset(syns) | |
| def expand_with_synonyms(tokens: list[str]) -> set[str]: | |
| """Expand a token list to include all WordNet synonyms.""" | |
| expanded: set[str] = set() | |
| for token in tokens: | |
| expanded.update(_synonyms(token)) | |
| return expanded | |
| # ββ Similarity metrics ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def jaccard(set_a: set[str], set_b: set[str]) -> float: | |
| """Jaccard similarity: |A β© B| / |A βͺ B|""" | |
| if not set_a and not set_b: | |
| return 1.0 | |
| intersection = set_a & set_b | |
| union = set_a | set_b | |
| return len(intersection) / len(union) | |
| def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float: | |
| """Cosine similarity between two L2-normalized vectors.""" | |
| norm_a = np.linalg.norm(vec_a) | |
| norm_b = np.linalg.norm(vec_b) | |
| if norm_a == 0 or norm_b == 0: | |
| return 0.0 | |
| return float(np.dot(vec_a, vec_b) / (norm_a * norm_b)) | |
| # ββ Core matcher ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SemanticMatcher: | |
| """ | |
| Deterministic semantic matcher for short strings. | |
| Usage: | |
| matcher = SemanticMatcher() | |
| result = matcher.match("The cat sat on the mat", | |
| "A cat was sitting on the mat") | |
| print(result) | |
| """ | |
| def __init__( | |
| self, | |
| match_threshold: float = MATCH_THRESHOLD, | |
| strong_threshold: float = STRONG_THRESHOLD, | |
| weights: dict[str, float] | None = None, | |
| ): | |
| self.match_threshold = match_threshold | |
| self.strong_threshold = strong_threshold | |
| self.weights = weights or WEIGHTS | |
| self.confidence_level: str = "no_match" | |
| total = sum(self.weights.values()) | |
| assert abs(total - 1.0) < 1e-6, f"Weights must sum to 1.0 (got {total:.4f})" | |
| # ββ Inner Functions ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _layer_lexical(self, tokens_a: list[str], tokens_b: list[str]) -> float: | |
| return jaccard(set(tokens_a), set(tokens_b)) | |
| def _layer_synonym(self, tokens_a: list[str], tokens_b: list[str]) -> float: | |
| expanded_a = expand_with_synonyms(tokens_a) | |
| expanded_b = expand_with_synonyms(tokens_b) | |
| return jaccard(expanded_a, expanded_b) | |
| def _layer_semantic(self, text_a: str, text_b: str) -> float: | |
| model = _get_model() | |
| # encode() is deterministic: no sampling, fixed weights | |
| embeddings = model.encode( | |
| [normalize(text_a), normalize(text_b)], | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ) | |
| return cosine_similarity(embeddings[0], embeddings[1]) # type: ignore | |
| # ββ Public Functions ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def matchscore(self, text_a: str, text_b: str) -> float: | |
| """ | |
| Compare two strings and return a score of whether they are matching. | |
| Returns a float between 0.0 and 1.0, where 1.0 indicates a perfect match. | |
| """ | |
| # Fast-path: normalized exact match | |
| if normalize(text_a) == normalize(text_b): | |
| self.confidence_level = "strong" | |
| return 1.0 | |
| tokens_a = tokenize_and_lemmatize(text_a) | |
| tokens_b = tokenize_and_lemmatize(text_b) | |
| layer_scores = { | |
| "lexical": self._layer_lexical(tokens_a, tokens_b), | |
| "synonym": self._layer_synonym(tokens_a, tokens_b), | |
| "semantic": self._layer_semantic(text_a, text_b), | |
| } | |
| score = sum(self.weights[k] * v for k, v in layer_scores.items()) | |
| if score >= self.strong_threshold: | |
| self.confidence_level = "strong" | |
| elif score >= self.match_threshold: | |
| self.confidence_level = "moderate" | |
| else: | |
| self.confidence_level = "no_match" | |
| return score | |
| def match(self, text_a: str, text_b: str) -> bool: | |
| """Return True if the two texts are considered a match based on the score.""" | |
| score = self.matchscore(text_a, text_b) | |
| return score >= self.match_threshold | |
| def confidence(self) -> str: | |
| """Return 'strong' if score β₯ strong_threshold, else 'moderate' or 'no_match'.""" | |
| return self.confidence_level |