File size: 8,728 Bytes
cf983b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671787b
5235476
cf983b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5235476
cfae7a7
cf983b8
 
 
 
cfae7a7
cf983b8
cfae7a7
cf983b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""
semanticmatcher.py
====================
Deterministic semantic string matcher for short strings (10–12 words).

Algorithm: Weighted ensemble of three independent signals
  1. Lexical Jaccard     β€” lemmatized token overlap (weight: 0.20)
  2. Synonym Jaccard     β€” WordNet-expanded token overlap (weight: 0.25)
  3. Semantic Cosine     β€” sentence-transformers embedding similarity (weight: 0.55)

All three layers are fully deterministic: same inputs β†’ same score, always.

Install dependencies:
  python -m nltk.downloader wordnet omw-1.4 stopwords punkt punkt_tab averaged_perceptron_tagger_eng
"""

import re
import string
from functools import lru_cache

import nltk
import numpy as np
from nltk.corpus import wordnet, stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer

# ── Config ────────────────────────────────────────────────────────────────────

WEIGHTS = {
    "lexical":  0.20,   # Plain lemma overlap
    "synonym":  0.25,   # WordNet-expanded overlap
    "semantic": 0.55,   # Embedding cosine similarity
}

MATCH_THRESHOLD   = 0.72   # Score β‰₯ this β†’ strings "mean the same thing"
STRONG_THRESHOLD  = 0.88   # Score β‰₯ this β†’ high-confidence match

# Embedding model: deterministic, no sampling
_EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"

# ── Lazy singletons ───────────────────────────────────────────────────────────

_model: SentenceTransformer | None = None
_lemmatizer: WordNetLemmatizer | None = None
_stop_words: set[str] | None = None


def _get_model() -> SentenceTransformer:
    global _model
    if _model is None:
        _model = SentenceTransformer(_EMBEDDING_MODEL_NAME)
    return _model


def _get_lemmatizer() -> WordNetLemmatizer:
    global _lemmatizer
    if _lemmatizer is None:
        _lemmatizer = WordNetLemmatizer()
    return _lemmatizer


def _get_stopwords() -> set[str]:
    global _stop_words
    if _stop_words is None:
        _stop_words = set(stopwords.words("english"))
    return _stop_words


# ── Text preprocessing ────────────────────────────────────────────────────────

def _get_wordnet_pos(treebank_tag: str) -> str:
    """Map POS treebank tag to WordNet POS constant for better lemmatization."""
    if treebank_tag.startswith("J"):
        return wordnet.ADJ
    elif treebank_tag.startswith("V"):
        return wordnet.VERB
    elif treebank_tag.startswith("R"):
        return wordnet.ADV
    return wordnet.NOUN


def normalize(text: str) -> str:
    """Lowercase, strip punctuation, collapse whitespace."""
    text = text.lower()
    text = text.translate(str.maketrans("", "", string.punctuation))
    text = re.sub(r"\s+", " ", text).strip()
    return text


def tokenize_and_lemmatize(text: str) -> list[str]:
    """Tokenize, POS-tag, lemmatize, and remove stopwords."""
    lemmatizer = _get_lemmatizer()
    stop_words = _get_stopwords()

    tokens = nltk.word_tokenize(normalize(text))
    pos_tags = nltk.pos_tag(tokens)

    lemmas = [
        lemmatizer.lemmatize(word, _get_wordnet_pos(pos))
        for word, pos in pos_tags
        if word not in stop_words and word.isalpha()
    ]
    return lemmas


# ── WordNet synonym expansion ─────────────────────────────────────────────────

@lru_cache(maxsize=512)
def _synonyms(word: str) -> frozenset[str]:
    """Return all WordNet lemma names for a word (including the word itself)."""
    syns: set[str] = {word}
    for synset in wordnet.synsets(word):
        for lemma in synset.lemmas(): # type: ignore
            syns.add(lemma.name().replace("_", " ").lower())
    return frozenset(syns)


def expand_with_synonyms(tokens: list[str]) -> set[str]:
    """Expand a token list to include all WordNet synonyms."""
    expanded: set[str] = set()
    for token in tokens:
        expanded.update(_synonyms(token))
    return expanded


# ── Similarity metrics ────────────────────────────────────────────────────────

def jaccard(set_a: set[str], set_b: set[str]) -> float:
    """Jaccard similarity: |A ∩ B| / |A βˆͺ B|"""
    if not set_a and not set_b:
        return 1.0
    intersection = set_a & set_b
    union = set_a | set_b
    return len(intersection) / len(union)


def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float:
    """Cosine similarity between two L2-normalized vectors."""
    norm_a = np.linalg.norm(vec_a)
    norm_b = np.linalg.norm(vec_b)
    if norm_a == 0 or norm_b == 0:
        return 0.0
    return float(np.dot(vec_a, vec_b) / (norm_a * norm_b))

# ── Core matcher ──────────────────────────────────────────────────────────────

class SemanticMatcher:
    """
    Deterministic semantic matcher for short strings.

    Usage:
        matcher = SemanticMatcher()
        result  = matcher.match("The cat sat on the mat",
                                "A cat was sitting on the mat")
        print(result)
    """

    def __init__(
        self,
        match_threshold: float = MATCH_THRESHOLD,
        strong_threshold: float = STRONG_THRESHOLD,
        weights: dict[str, float] | None = None,
    ):
        self.match_threshold  = match_threshold
        self.strong_threshold = strong_threshold
        self.weights = weights or WEIGHTS
        self.confidence_level: str = "no_match"

        total = sum(self.weights.values())
        assert abs(total - 1.0) < 1e-6, f"Weights must sum to 1.0 (got {total:.4f})"

    # ── Inner Functions ────────────────────────────────────────────────────

    def _layer_lexical(self, tokens_a: list[str], tokens_b: list[str]) -> float:
        return jaccard(set(tokens_a), set(tokens_b))

    def _layer_synonym(self, tokens_a: list[str], tokens_b: list[str]) -> float:
        expanded_a = expand_with_synonyms(tokens_a)
        expanded_b = expand_with_synonyms(tokens_b)
        return jaccard(expanded_a, expanded_b)

    def _layer_semantic(self, text_a: str, text_b: str) -> float:
        model = _get_model()
        # encode() is deterministic: no sampling, fixed weights
        embeddings = model.encode(
            [normalize(text_a), normalize(text_b)],
            convert_to_numpy=True,
            normalize_embeddings=True,
        )
        return cosine_similarity(embeddings[0], embeddings[1]) # type: ignore

    # ── Public Functions ────────────────────────────────────────────────────

    def matchscore(self, text_a: str, text_b: str) -> float:
        """
        Compare two strings and return a score of whether they are matching.

        Returns a float between 0.0 and 1.0, where 1.0 indicates a perfect match.
        """
        # Fast-path: normalized exact match
        if normalize(text_a) == normalize(text_b):
            self.confidence_level = "strong"
            return 1.0

        tokens_a = tokenize_and_lemmatize(text_a)
        tokens_b = tokenize_and_lemmatize(text_b)

        layer_scores = {
            "lexical":  self._layer_lexical(tokens_a, tokens_b),
            "synonym":  self._layer_synonym(tokens_a, tokens_b),
            "semantic": self._layer_semantic(text_a, text_b),
        }

        score = sum(self.weights[k] * v for k, v in layer_scores.items())
        if score >= self.strong_threshold:
            self.confidence_level = "strong"
        elif score >= self.match_threshold:
            self.confidence_level = "moderate"
        else:
            self.confidence_level = "no_match"
        return score

    def match(self, text_a: str, text_b: str) -> bool:
        """Return True if the two texts are considered a match based on the score."""
        score = self.matchscore(text_a, text_b)
        return score >= self.match_threshold

    def confidence(self) -> str:
        """Return 'strong' if score β‰₯ strong_threshold, else 'moderate' or 'no_match'."""
        return self.confidence_level