ocs-semantic-scoring / scoring.py
Peter Organisciak
Restore GloVe to dropdown; files uploading to HF
e5d14d0
"""
Semantic distance scoring for creativity research.
Ported from the open-creativity-scoring library (https://github.com/massivetexts/open-scoring).
Computes originality scores by measuring cosine distance between word embeddings
of a prompt and response in embedding space.
"""
import os
import subprocess
import logging
import numpy as np
import pandas as pd
from gensim.models import KeyedVectors
from sklearn.preprocessing import MinMaxScaler
from huggingface_hub import hf_hub_download
logger = logging.getLogger(__name__)
# Available models with their HF repos and scaling parameters
MODELS = {
"motes_100k": {
"repo": "massivetexts/motes-embeddings-100k",
"files": ["all_weighted_10-12_100k.kv", "all_weighted_10-12_100k.kv.vectors.npy"],
"main_file": "all_weighted_10-12_100k.kv",
"description": "MOTES children's embeddings (ages 10–12, 100k vocab)",
"scaling": {"min": 0.5033, "max": 0.8955},
"hosted": True,
},
"glove_840B": {
"repo": "massivetexts/glove-840b-gensim",
"files": ["glove.840B-300d.wv", "glove.840B-300d.wv.vectors.npy"],
"main_file": "glove.840B-300d.wv",
"description": "GloVe 840B 300d (Pennington et al. 2014) — general English vocabulary",
"scaling": {"min": 0.6456, "max": 0.9610},
"hosted": True,
},
}
DEFAULT_MODEL = "motes_100k"
# Default scaling (used when no model-specific scaling is set)
DEFAULT_SCALING = MODELS[DEFAULT_MODEL]["scaling"]
# Path to IDF values
IDF_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets", "idf-vals.parquet")
def ensure_spacy_model():
"""Download spaCy en_core_web_sm if not already installed."""
try:
import spacy
spacy.load("en_core_web_sm")
except OSError:
subprocess.run(
["python", "-m", "spacy", "download", "en_core_web_sm"],
check=True,
capture_output=True,
)
def download_model(model_name=None, progress_callback=None):
"""Download model files from Hugging Face Hub. Returns path to main .wv/.kv file.
Args:
model_name: Key from MODELS dict (e.g., 'glove_840B', 'motes_100k').
Defaults to DEFAULT_MODEL.
progress_callback: Optional callback(progress, message) for UI updates.
"""
if model_name is None:
model_name = DEFAULT_MODEL
if model_name not in MODELS:
raise ValueError(f"Unknown model: {model_name}. Available: {list(MODELS.keys())}")
model_info = MODELS[model_name]
if progress_callback:
progress_callback(0, f"Downloading {model_name} from Hugging Face Hub...")
paths = {}
for i, filename in enumerate(model_info["files"]):
path = hf_hub_download(
repo_id=model_info["repo"],
filename=filename,
repo_type="model",
)
paths[filename] = path
if progress_callback:
progress_callback((i + 1) / len(model_info["files"]), f"Downloaded {filename}")
return paths[model_info["main_file"]]
class SemanticScorer:
"""Scores originality of divergent thinking responses using semantic distance.
Measures cosine similarity between word embeddings of the prompt object
and the response, then subtracts from 1 to get a distance score.
Higher scores = more original (more distant in semantic space).
"""
def __init__(self, model_name=None):
self._model = None
self._idf_ref = None
self._default_idf = None
self._nlp = None
self._inflect_engine = None
self._scaler = None
self._model_name = model_name or DEFAULT_MODEL
# Set up normalization scaler using model-specific scaling
scaling = MODELS.get(self._model_name, MODELS[DEFAULT_MODEL])["scaling"]
self._scaler = MinMaxScaler(feature_range=(1.0, 7.0), clip=True)
self._scaler.fit(np.array([[scaling["min"]], [scaling["max"]]]))
def _ensure_nlp(self):
"""Lazy-load spaCy model."""
if self._nlp is None:
import spacy
import inflect
ensure_spacy_model()
self._nlp = spacy.load("en_core_web_sm")
self._inflect_engine = inflect.engine()
@property
def nlp(self):
self._ensure_nlp()
return self._nlp
@property
def p(self):
self._ensure_nlp()
return self._inflect_engine
@property
def idf(self):
"""Load IDF scores from parquet file.
Uses page-level scores from:
Organisciak, P. 2016. Term Frequencies for 235k Language and Literature Texts.
http://hdl.handle.net/2142/89515.
"""
if self._idf_ref is None:
idf_df = pd.read_parquet(IDF_PATH)
self._idf_ref = idf_df["IPF"].to_dict()
self._default_idf = idf_df.iloc[10000]["IPF"]
return self._idf_ref
@property
def default_idf(self):
if self._default_idf is None:
_ = self.idf # triggers load
return self._default_idf
def load_model(self, model_path, mmap="r"):
"""Load a gensim KeyedVectors model."""
self._model = KeyedVectors.load(model_path, mmap=mmap)
def _get_phrase_vecs(self, phrase, stopword=False, term_weighting=False, exclude=None):
"""Return stacked array of model vectors for words in phrase.
Args:
phrase: Text string or spaCy Doc
stopword: If True, skip stopwords
term_weighting: If True, compute IDF weights
exclude: List of words to skip (lowercased)
Returns:
Tuple of (vectors array, weights list)
"""
import spacy
if exclude is None:
exclude = []
arrlist = []
weights = []
if not isinstance(phrase, spacy.tokens.doc.Doc):
phrase = self.nlp(phrase[: self.nlp.max_length], disable=["parser", "ner", "lemmatizer"])
exclude_lower = [x.lower() for x in exclude]
for word in phrase:
if stopword and word.is_stop:
continue
elif word.lower_ in exclude_lower:
continue
else:
try:
vec = self._model[word.lower_]
arrlist.append(vec)
except KeyError:
continue
if term_weighting:
weight = self.idf.get(word.lower_, self.default_idf)
weights.append(weight)
if len(arrlist):
vecs = np.vstack(arrlist)
return vecs, weights
else:
return [], []
def originality(self, target, response, stopword=False, term_weighting=False,
flip=True, exclude_target=False):
"""Score originality as semantic distance between target prompt and response.
Args:
target: The prompt/object (e.g., "brick")
response: The creative response (e.g., "modern art sculpture")
stopword: Remove stopwords before scoring
term_weighting: Weight words by IDF
flip: If True, return 1 - similarity (higher = more original)
exclude_target: If True, exclude prompt words from response
Returns:
Float originality score, or None if scoring fails
"""
if self._model is None:
raise RuntimeError("No model loaded. Call load_model() first.")
exclude_words = []
if exclude_target:
exclude_words = target.split()
for word in list(exclude_words):
try:
sense = self.p.plural(word.lower())
if isinstance(sense, str) and len(sense) and sense not in exclude_words:
exclude_words.append(sense)
except Exception:
pass
vecs, weights = self._get_phrase_vecs(
response, stopword, term_weighting, exclude=exclude_words
)
if len(vecs) == 0:
return None
if " " in target:
target_vecs = self._get_phrase_vecs(target, stopword, term_weighting)[0]
if len(target_vecs) == 0:
return None
targetvec = target_vecs.sum(0)
else:
try:
targetvec = self._model[target.lower()]
except KeyError:
return None
scores = self._model.cosine_similarities(targetvec, vecs)
if len(scores) and not term_weighting:
s = np.mean(scores)
elif len(scores):
s = np.average(scores, weights=weights)
else:
return None
if flip:
s = 1 - s
return float(s)
def elaboration(self, phrase, method="whitespace"):
"""Score elaboration (response length/complexity).
Args:
phrase: The response text
method: One of 'whitespace', 'stoplist', 'idf', 'pos'
Returns:
Numeric elaboration score
"""
if method == "whitespace":
return len(phrase.split())
doc = self.nlp(phrase[: self.nlp.max_length], disable=["parser", "ner", "lemmatizer"])
if method == "stoplist":
return len([w for w in doc if not (w.is_stop or w.is_punct)])
elif method == "idf":
weights = []
for word in doc:
if word.is_punct:
continue
weights.append(self.idf.get(word.lower_, self.default_idf))
return sum(weights)
elif method == "pos":
doc = self.nlp(phrase[: self.nlp.max_length], disable=["ner", "lemmatizer"])
return len([w for w in doc if w.pos_ in ["NOUN", "VERB", "ADJ", "ADV", "PROPN"] and not w.is_punct])
else:
raise ValueError(f"Unknown elaboration method: {method}")
def score_batch(self, df, stopword=False, term_weighting=False,
exclude_target=False, normalize=False, elab_method=None):
"""Score a DataFrame of prompt-response pairs.
Args:
df: DataFrame with 'prompt' and 'response' columns
stopword: Remove stopwords
term_weighting: Weight by IDF
exclude_target: Exclude prompt words from response
normalize: Scale to 1-7 range
elab_method: Elaboration method or None
Returns:
DataFrame with 'originality' (and optionally 'elaboration') columns added
"""
df = df.copy()
df["originality"] = df.apply(
lambda x: self.originality(
x["prompt"], x["response"],
stopword=stopword,
term_weighting=term_weighting,
exclude_target=exclude_target,
),
axis=1,
)
if normalize:
valid_mask = df["originality"].notna()
if valid_mask.any():
df.loc[valid_mask, "originality"] = self._scaler.transform(
df.loc[valid_mask, "originality"].values.reshape(-1, 1)
)[:, 0]
df["originality"] = df["originality"].round(1)
else:
df["originality"] = df["originality"].round(4)
if elab_method and elab_method != "none":
df["elaboration"] = df["response"].apply(
lambda x: self.elaboration(x, method=elab_method)
)
return df