File size: 11,521 Bytes
053a22c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad0abee
053a22c
ad0abee
 
 
 
 
 
e5d14d0
ad0abee
e5d14d0
053a22c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Semantic distance scoring for creativity research.

Ported from the open-creativity-scoring library (https://github.com/massivetexts/open-scoring).
Computes originality scores by measuring cosine distance between word embeddings
of a prompt and response in embedding space.
"""

import os
import subprocess
import logging

import numpy as np
import pandas as pd
from gensim.models import KeyedVectors
from sklearn.preprocessing import MinMaxScaler
from huggingface_hub import hf_hub_download

logger = logging.getLogger(__name__)

# Available models with their HF repos and scaling parameters
MODELS = {
    "motes_100k": {
        "repo": "massivetexts/motes-embeddings-100k",
        "files": ["all_weighted_10-12_100k.kv", "all_weighted_10-12_100k.kv.vectors.npy"],
        "main_file": "all_weighted_10-12_100k.kv",
        "description": "MOTES children's embeddings (ages 10–12, 100k vocab)",
        "scaling": {"min": 0.5033, "max": 0.8955},
        "hosted": True,
    },
    "glove_840B": {
        "repo": "massivetexts/glove-840b-gensim",
        "files": ["glove.840B-300d.wv", "glove.840B-300d.wv.vectors.npy"],
        "main_file": "glove.840B-300d.wv",
        "description": "GloVe 840B 300d (Pennington et al. 2014) — general English vocabulary",
        "scaling": {"min": 0.6456, "max": 0.9610},
        "hosted": True,
    },
}

DEFAULT_MODEL = "motes_100k"

# Default scaling (used when no model-specific scaling is set)
DEFAULT_SCALING = MODELS[DEFAULT_MODEL]["scaling"]

# Path to IDF values
IDF_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets", "idf-vals.parquet")


def ensure_spacy_model():
    """Download spaCy en_core_web_sm if not already installed."""
    try:
        import spacy
        spacy.load("en_core_web_sm")
    except OSError:
        subprocess.run(
            ["python", "-m", "spacy", "download", "en_core_web_sm"],
            check=True,
            capture_output=True,
        )


def download_model(model_name=None, progress_callback=None):
    """Download model files from Hugging Face Hub. Returns path to main .wv/.kv file.

    Args:
        model_name: Key from MODELS dict (e.g., 'glove_840B', 'motes_100k').
                    Defaults to DEFAULT_MODEL.
        progress_callback: Optional callback(progress, message) for UI updates.
    """
    if model_name is None:
        model_name = DEFAULT_MODEL

    if model_name not in MODELS:
        raise ValueError(f"Unknown model: {model_name}. Available: {list(MODELS.keys())}")

    model_info = MODELS[model_name]

    if progress_callback:
        progress_callback(0, f"Downloading {model_name} from Hugging Face Hub...")

    paths = {}
    for i, filename in enumerate(model_info["files"]):
        path = hf_hub_download(
            repo_id=model_info["repo"],
            filename=filename,
            repo_type="model",
        )
        paths[filename] = path
        if progress_callback:
            progress_callback((i + 1) / len(model_info["files"]), f"Downloaded {filename}")

    return paths[model_info["main_file"]]


class SemanticScorer:
    """Scores originality of divergent thinking responses using semantic distance.

    Measures cosine similarity between word embeddings of the prompt object
    and the response, then subtracts from 1 to get a distance score.
    Higher scores = more original (more distant in semantic space).
    """

    def __init__(self, model_name=None):
        self._model = None
        self._idf_ref = None
        self._default_idf = None
        self._nlp = None
        self._inflect_engine = None
        self._scaler = None
        self._model_name = model_name or DEFAULT_MODEL

        # Set up normalization scaler using model-specific scaling
        scaling = MODELS.get(self._model_name, MODELS[DEFAULT_MODEL])["scaling"]
        self._scaler = MinMaxScaler(feature_range=(1.0, 7.0), clip=True)
        self._scaler.fit(np.array([[scaling["min"]], [scaling["max"]]]))

    def _ensure_nlp(self):
        """Lazy-load spaCy model."""
        if self._nlp is None:
            import spacy
            import inflect
            ensure_spacy_model()
            self._nlp = spacy.load("en_core_web_sm")
            self._inflect_engine = inflect.engine()

    @property
    def nlp(self):
        self._ensure_nlp()
        return self._nlp

    @property
    def p(self):
        self._ensure_nlp()
        return self._inflect_engine

    @property
    def idf(self):
        """Load IDF scores from parquet file.

        Uses page-level scores from:
        Organisciak, P. 2016. Term Frequencies for 235k Language and Literature Texts.
            http://hdl.handle.net/2142/89515.
        """
        if self._idf_ref is None:
            idf_df = pd.read_parquet(IDF_PATH)
            self._idf_ref = idf_df["IPF"].to_dict()
            self._default_idf = idf_df.iloc[10000]["IPF"]
        return self._idf_ref

    @property
    def default_idf(self):
        if self._default_idf is None:
            _ = self.idf  # triggers load
        return self._default_idf

    def load_model(self, model_path, mmap="r"):
        """Load a gensim KeyedVectors model."""
        self._model = KeyedVectors.load(model_path, mmap=mmap)

    def _get_phrase_vecs(self, phrase, stopword=False, term_weighting=False, exclude=None):
        """Return stacked array of model vectors for words in phrase.

        Args:
            phrase: Text string or spaCy Doc
            stopword: If True, skip stopwords
            term_weighting: If True, compute IDF weights
            exclude: List of words to skip (lowercased)

        Returns:
            Tuple of (vectors array, weights list)
        """
        import spacy

        if exclude is None:
            exclude = []

        arrlist = []
        weights = []

        if not isinstance(phrase, spacy.tokens.doc.Doc):
            phrase = self.nlp(phrase[: self.nlp.max_length], disable=["parser", "ner", "lemmatizer"])

        exclude_lower = [x.lower() for x in exclude]
        for word in phrase:
            if stopword and word.is_stop:
                continue
            elif word.lower_ in exclude_lower:
                continue
            else:
                try:
                    vec = self._model[word.lower_]
                    arrlist.append(vec)
                except KeyError:
                    continue

                if term_weighting:
                    weight = self.idf.get(word.lower_, self.default_idf)
                    weights.append(weight)

        if len(arrlist):
            vecs = np.vstack(arrlist)
            return vecs, weights
        else:
            return [], []

    def originality(self, target, response, stopword=False, term_weighting=False,
                    flip=True, exclude_target=False):
        """Score originality as semantic distance between target prompt and response.

        Args:
            target: The prompt/object (e.g., "brick")
            response: The creative response (e.g., "modern art sculpture")
            stopword: Remove stopwords before scoring
            term_weighting: Weight words by IDF
            flip: If True, return 1 - similarity (higher = more original)
            exclude_target: If True, exclude prompt words from response

        Returns:
            Float originality score, or None if scoring fails
        """
        if self._model is None:
            raise RuntimeError("No model loaded. Call load_model() first.")

        exclude_words = []
        if exclude_target:
            exclude_words = target.split()
            for word in list(exclude_words):
                try:
                    sense = self.p.plural(word.lower())
                    if isinstance(sense, str) and len(sense) and sense not in exclude_words:
                        exclude_words.append(sense)
                except Exception:
                    pass

        vecs, weights = self._get_phrase_vecs(
            response, stopword, term_weighting, exclude=exclude_words
        )

        if len(vecs) == 0:
            return None

        if " " in target:
            target_vecs = self._get_phrase_vecs(target, stopword, term_weighting)[0]
            if len(target_vecs) == 0:
                return None
            targetvec = target_vecs.sum(0)
        else:
            try:
                targetvec = self._model[target.lower()]
            except KeyError:
                return None

        scores = self._model.cosine_similarities(targetvec, vecs)

        if len(scores) and not term_weighting:
            s = np.mean(scores)
        elif len(scores):
            s = np.average(scores, weights=weights)
        else:
            return None

        if flip:
            s = 1 - s
        return float(s)

    def elaboration(self, phrase, method="whitespace"):
        """Score elaboration (response length/complexity).

        Args:
            phrase: The response text
            method: One of 'whitespace', 'stoplist', 'idf', 'pos'

        Returns:
            Numeric elaboration score
        """
        if method == "whitespace":
            return len(phrase.split())

        doc = self.nlp(phrase[: self.nlp.max_length], disable=["parser", "ner", "lemmatizer"])

        if method == "stoplist":
            return len([w for w in doc if not (w.is_stop or w.is_punct)])
        elif method == "idf":
            weights = []
            for word in doc:
                if word.is_punct:
                    continue
                weights.append(self.idf.get(word.lower_, self.default_idf))
            return sum(weights)
        elif method == "pos":
            doc = self.nlp(phrase[: self.nlp.max_length], disable=["ner", "lemmatizer"])
            return len([w for w in doc if w.pos_ in ["NOUN", "VERB", "ADJ", "ADV", "PROPN"] and not w.is_punct])
        else:
            raise ValueError(f"Unknown elaboration method: {method}")

    def score_batch(self, df, stopword=False, term_weighting=False,
                    exclude_target=False, normalize=False, elab_method=None):
        """Score a DataFrame of prompt-response pairs.

        Args:
            df: DataFrame with 'prompt' and 'response' columns
            stopword: Remove stopwords
            term_weighting: Weight by IDF
            exclude_target: Exclude prompt words from response
            normalize: Scale to 1-7 range
            elab_method: Elaboration method or None

        Returns:
            DataFrame with 'originality' (and optionally 'elaboration') columns added
        """
        df = df.copy()
        df["originality"] = df.apply(
            lambda x: self.originality(
                x["prompt"], x["response"],
                stopword=stopword,
                term_weighting=term_weighting,
                exclude_target=exclude_target,
            ),
            axis=1,
        )

        if normalize:
            valid_mask = df["originality"].notna()
            if valid_mask.any():
                df.loc[valid_mask, "originality"] = self._scaler.transform(
                    df.loc[valid_mask, "originality"].values.reshape(-1, 1)
                )[:, 0]
            df["originality"] = df["originality"].round(1)
        else:
            df["originality"] = df["originality"].round(4)

        if elab_method and elab_method != "none":
            df["elaboration"] = df["response"].apply(
                lambda x: self.elaboration(x, method=elab_method)
            )

        return df