File size: 8,561 Bytes
db764ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f87ec0
db764ae
 
 
9f87ec0
db764ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f87ec0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db764ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
Word2Vec Baseline (gensim)

Trains a Word2Vec model on your corpus and provides the same interface
as the transformer engine, so you can compare results side by side.

Key limitation: Word2Vec gives ONE static vector per word regardless of
context. "pizza" always has the same embedding whether it means food or school.
The only contextual signal comes from averaging word vectors in a sentence.

Usage:
    w2v = Word2VecEngine()
    w2v.add_document("doc1", text)
    w2v.build_index()  # trains Word2Vec on your corpus
    results = w2v.query("a place where children learn", top_k=5)
    score = w2v.compare_texts("pizza gives me homework", "school gives me homework")
"""

import json
import re
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import numpy as np
from gensim.models import Word2Vec

logger = logging.getLogger(__name__)


@dataclass
class W2VResult:
    """A single similarity result."""
    text: str
    doc_id: str
    score: float
    rank: int


class Word2VecEngine:
    """
    Word2Vec baseline for comparison with the transformer engine.

    Trains Word2Vec on your corpus, represents sentences as averaged
    word vectors, and uses cosine similarity for matching.
    """

    def __init__(
        self,
        vector_size: int = 100,
        window: int = 5,
        min_count: int = 1,
        epochs: int = 50,
        sg: int = 1,
    ):
        """
        Args:
            vector_size: Dimensionality of word vectors.
            window: Context window size.
            min_count: Ignore words with frequency below this.
            epochs: Training epochs.
            sg: 1 for skip-gram, 0 for CBOW.
        """
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.epochs = epochs
        self.sg = sg

        self.model: Optional[Word2Vec] = None
        self.sentences: list[str] = []
        self.sentence_docs: list[str] = []
        self.sentence_vecs: Optional[np.ndarray] = None

    def add_document(self, doc_id: str, text: str) -> int:
        """Add a document. Returns number of sentences extracted."""
        sents = self._split_sentences(text)
        self.sentences.extend(sents)
        self.sentence_docs.extend([doc_id] * len(sents))
        return len(sents)

    def build_index(self) -> dict:
        """Train Word2Vec on the corpus and compute sentence vectors."""
        tokenized = [self._tokenize(s) for s in self.sentences]

        self.model = Word2Vec(
            sentences=tokenized,
            vector_size=self.vector_size,
            window=self.window,
            min_count=self.min_count,
            epochs=self.epochs,
            sg=self.sg,
            workers=4,
        )

        # Compute sentence vectors (average of word vectors)
        vecs = []
        for tokens in tokenized:
            vecs.append(self._sentence_vector(tokens))
        self.sentence_vecs = np.array(vecs, dtype=np.float32)

        vocab_size = len(self.model.wv)
        logger.info(f"Word2Vec trained: {vocab_size} words, {len(self.sentences)} sentences")
        return {
            "vocab_size": vocab_size,
            "sentences": len(self.sentences),
            "vector_size": self.vector_size,
        }

    def compare_texts(self, text_a: str, text_b: str) -> float:
        """Cosine similarity between two texts (averaged word vectors)."""
        vec_a = self._sentence_vector(self._tokenize(text_a))
        vec_b = self._sentence_vector(self._tokenize(text_b))
        return float(self._cosine(vec_a, vec_b))

    def query(self, text: str, top_k: int = 10) -> list[W2VResult]:
        """Find most similar sentences to a query."""
        query_vec = self._sentence_vector(self._tokenize(text))
        scores = self.sentence_vecs @ query_vec
        norms = np.linalg.norm(self.sentence_vecs, axis=1) * np.linalg.norm(query_vec)
        norms[norms == 0] = 1e-10
        scores = scores / norms

        top_idx = np.argsort(scores)[::-1][:top_k]
        return [
            W2VResult(
                text=self.sentences[i],
                doc_id=self.sentence_docs[i],
                score=float(scores[i]),
                rank=rank + 1,
            )
            for rank, i in enumerate(top_idx)
        ]

    def most_similar_words(self, word: str, top_k: int = 10) -> list[tuple[str, float]]:
        """Find words most similar to a given word (static, no context)."""
        word = word.lower()
        if word not in self.model.wv:
            return []
        return self.model.wv.most_similar(word, topn=top_k)

    def word_similarity(self, word_a: str, word_b: str) -> float:
        """Cosine similarity between two individual words."""
        a, b = word_a.lower(), word_b.lower()
        if a not in self.model.wv or b not in self.model.wv:
            return 0.0
        return float(self.model.wv.similarity(a, b))

    # ------------------------------------------------------------------ #
    #  Persistence
    # ------------------------------------------------------------------ #

    def save(self, directory: str) -> dict:
        """Save trained Word2Vec state to disk for later restore."""
        save_dir = Path(directory)
        save_dir.mkdir(parents=True, exist_ok=True)

        if self.model is None:
            raise RuntimeError("Cannot save: model has not been trained yet.")

        self.model.save(str(save_dir / "w2v.model"))
        np.save(save_dir / "sentence_vecs.npy", self.sentence_vecs)

        meta = {
            "vector_size": self.vector_size,
            "window": self.window,
            "min_count": self.min_count,
            "epochs": self.epochs,
            "sg": self.sg,
            "num_sentences": len(self.sentences),
            "vocab_size": len(self.model.wv),
        }
        with open(save_dir / "w2v_meta.json", "w") as f:
            json.dump(meta, f, indent=2)

        # Save sentences and their doc mappings
        with open(save_dir / "w2v_sentences.json", "w") as f:
            json.dump({"sentences": self.sentences, "sentence_docs": self.sentence_docs}, f)

        logger.info("Word2Vec saved to %s: %d sentences, %d vocab",
                     directory, len(self.sentences), len(self.model.wv))
        return meta

    @classmethod
    def load(cls, directory: str) -> "Word2VecEngine":
        """Load a previously saved Word2Vec state from disk."""
        save_dir = Path(directory)
        if not (save_dir / "w2v_meta.json").is_file():
            raise FileNotFoundError(f"No saved Word2Vec state at {directory}")

        with open(save_dir / "w2v_meta.json") as f:
            meta = json.load(f)

        engine = cls(
            vector_size=meta["vector_size"],
            window=meta["window"],
            min_count=meta["min_count"],
            epochs=meta["epochs"],
            sg=meta["sg"],
        )

        engine.model = Word2Vec.load(str(save_dir / "w2v.model"))
        engine.sentence_vecs = np.load(save_dir / "sentence_vecs.npy")

        with open(save_dir / "w2v_sentences.json") as f:
            data = json.load(f)
            engine.sentences = data["sentences"]
            engine.sentence_docs = data["sentence_docs"]

        logger.info("Word2Vec loaded from %s: %d sentences, %d vocab",
                     directory, len(engine.sentences), len(engine.model.wv))
        return engine

    @staticmethod
    def has_saved_state(directory: str) -> bool:
        """Check if a saved Word2Vec state exists at the given directory."""
        return (Path(directory) / "w2v_meta.json").is_file()

    # ------------------------------------------------------------------ #

    def _sentence_vector(self, tokens: list[str]) -> np.ndarray:
        """Average word vectors for a sentence."""
        vecs = [self.model.wv[t] for t in tokens if t in self.model.wv]
        if not vecs:
            return np.zeros(self.vector_size, dtype=np.float32)
        return np.mean(vecs, axis=0).astype(np.float32)

    @staticmethod
    def _cosine(a: np.ndarray, b: np.ndarray) -> float:
        dot = np.dot(a, b)
        norm = np.linalg.norm(a) * np.linalg.norm(b)
        return dot / norm if norm > 0 else 0.0

    @staticmethod
    def _tokenize(text: str) -> list[str]:
        return re.findall(r"\b[a-z]+\b", text.lower())

    @staticmethod
    def _split_sentences(text: str) -> list[str]:
        parts = re.split(r"(?<=[.!?])\s+", text.strip())
        return [s.strip() for s in parts if len(s.split()) >= 4]