|
|
import fasttext |
|
|
import streamlit as st |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from gensim.models import Word2Vec |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import plotly.express as px |
|
|
import plotly.graph_objects as go |
|
|
from collections import Counter |
|
|
import os |
|
|
import glob |
|
|
|
|
|
|
|
|
class UnifiedVectorModel: |
|
|
def __init__(self, backend_model, model_type="w2v"): |
|
|
self.model = backend_model |
|
|
self.model_type = model_type.lower() |
|
|
|
|
|
if self.model_type == "w2v": |
|
|
self.wv = backend_model.wv |
|
|
self.key_to_index = self.wv.key_to_index |
|
|
self.vector_size = self.wv.vector_size |
|
|
self._words = set(self.wv.key_to_index.keys()) |
|
|
|
|
|
elif self.model_type == "ft": |
|
|
|
|
|
self.key_to_index = {word: i for i, word in enumerate(backend_model.get_words())} |
|
|
self.vector_size = backend_model.get_dimension() |
|
|
self._words = set(self.key_to_index.keys()) |
|
|
else: |
|
|
raise ValueError("model_type must be 'w2v' or 'ft'") |
|
|
|
|
|
def __contains__(self, word): |
|
|
return word in self._words |
|
|
|
|
|
def __getitem__(self, word): |
|
|
if self.model_type == "w2v": |
|
|
return self.wv[word] |
|
|
elif self.model_type == "ft": |
|
|
return self.model.get_word_vector(word) |
|
|
|
|
|
def most_similar(self, positive=None, negative=None, topn=10): |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
if not positive: |
|
|
positive = [] |
|
|
if not negative: |
|
|
negative = [] |
|
|
|
|
|
try: |
|
|
if self.model_type == "w2v": |
|
|
return self.wv.most_similar(positive=positive, negative=negative, topn=topn) |
|
|
|
|
|
elif self.model_type == "ft": |
|
|
vec = np.zeros(self.vector_size) |
|
|
for w in positive: |
|
|
if w in self: |
|
|
vec += self[w] |
|
|
else: |
|
|
continue |
|
|
for w in negative: |
|
|
if w in self: |
|
|
vec -= self[w] |
|
|
else: |
|
|
continue |
|
|
|
|
|
if np.allclose(vec, 0): |
|
|
return [] |
|
|
|
|
|
words = list(self._words) |
|
|
vectors = np.array([self[w] for w in words]) |
|
|
|
|
|
sims = cosine_similarity([vec], vectors)[0] |
|
|
best = np.argsort(sims)[::-1][:topn + len(positive) + len(negative)] |
|
|
|
|
|
result = [] |
|
|
for i in best: |
|
|
word = words[i] |
|
|
if word not in positive and word not in negative: |
|
|
result.append((word, float(sims[i]))) |
|
|
if len(result) >= topn: |
|
|
break |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in most_similar: {e}") |
|
|
return [] |
|
|
|
|
|
def similar_by_vector(self, vector, topn=10): |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
words = list(self._words) |
|
|
vectors = np.array([self[w] for w in words]) |
|
|
sims = cosine_similarity([vector], vectors)[0] |
|
|
best = np.argsort(sims)[::-1][:topn] |
|
|
|
|
|
return [(words[i], float(sims[i])) for i in best] |
|
|
|
|
|
def get_words(self): |
|
|
return list(self._words) |
|
|
|
|
|
@property |
|
|
def vectors(self): |
|
|
if not hasattr(self, '_cached_vectors'): |
|
|
words = list(self._words) |
|
|
self._cached_words = words |
|
|
self._cached_vectors = np.array([self[w] for w in words]) |
|
|
return self._cached_vectors |
|
|
|
|
|
@property |
|
|
def index_to_key(self): |
|
|
if not hasattr(self, '_index_to_key'): |
|
|
self._index_to_key = list(self._words) |
|
|
return self._index_to_key |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def load_model(model_path): |
|
|
try: |
|
|
if model_path.endswith(".model"): |
|
|
raw_model = Word2Vec.load(model_path) |
|
|
current_model = UnifiedVectorModel(raw_model, model_type="w2v") |
|
|
|
|
|
elif model_path.endswith(".bin"): |
|
|
raw_model = fasttext.load_model(model_path) |
|
|
current_model = UnifiedVectorModel(raw_model, model_type="ft") |
|
|
else: |
|
|
raise ValueError(f"wrong path format") |
|
|
return current_model |
|
|
except Exception as e: |
|
|
st.error(f"error loading model {model_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
MODELS_DIR = "models" |
|
|
|
|
|
if not os.path.exists(MODELS_DIR): |
|
|
st.error(f"Folder `{MODELS_DIR}` not found.") |
|
|
st.stop() |
|
|
|
|
|
model_files = [] |
|
|
for ext in ["*.bin", "*.model", "*.vec"]: |
|
|
model_files.extend(glob.glob(os.path.join(MODELS_DIR, ext))) |
|
|
model_files = [f for f in model_files if os.path.isfile(f)] |
|
|
model_names = [os.path.basename(f) for f in model_files] |
|
|
|
|
|
if len(model_names) == 0: |
|
|
st.error(f"No models in folder `{MODELS_DIR}` (.bin, .model, .vec).") |
|
|
st.info("Supported formats: Word2Vec (binary/text), FastText.") |
|
|
st.stop() |
|
|
|
|
|
selected_model_name = st.sidebar.selectbox( |
|
|
"Choose pretrained model", |
|
|
model_names |
|
|
) |
|
|
|
|
|
selected_model_path = os.path.join(MODELS_DIR, selected_model_name) |
|
|
|
|
|
st.sidebar.info(f"loading: `{selected_model_name}`") |
|
|
|
|
|
model = load_model(selected_model_path) |
|
|
|
|
|
if model is None: |
|
|
st.stop() |
|
|
else: |
|
|
st.sidebar.success(f"Model '{selected_model_name}' loaded") |
|
|
st.sidebar.write(f"Voc size: {len(model.key_to_index):,}") |
|
|
st.sidebar.write(f"Vector size: {model.vector_size}") |
|
|
|
|
|
def analogy_accuracy(model, file_name): |
|
|
right = 0 |
|
|
count = 0 |
|
|
results = [] |
|
|
with open(file_name, encoding='utf-8') as file: |
|
|
for line in file: |
|
|
words = line.strip().split() |
|
|
if len(words) != 4: |
|
|
continue |
|
|
try: |
|
|
most_similar = model.most_similar(positive=[words[0], words[2]], negative=[words[1]], topn=10) |
|
|
predicted = [x[0] for x in most_similar] |
|
|
correct = words[3] |
|
|
if correct in predicted: |
|
|
rank = predicted.index(correct) + 1 |
|
|
right += 1 |
|
|
else: |
|
|
rank = None |
|
|
count += 1 |
|
|
results.append({ |
|
|
"query": f"{words[0]} - {words[1]} + {words[2]}", |
|
|
"target": correct, |
|
|
"predicted": predicted[0], |
|
|
"rank": rank, |
|
|
"in_top10": bool(rank) |
|
|
}) |
|
|
except KeyError as e: |
|
|
continue |
|
|
accuracy = right / count if count > 0 else 0 |
|
|
return accuracy, results |
|
|
|
|
|
|
|
|
def avg_similarity(model, file_name): |
|
|
res = [] |
|
|
with open(file_name, encoding='utf-8') as file: |
|
|
for line in file: |
|
|
words = line.strip().split() |
|
|
try: |
|
|
vectors = [model[word] for word in words] |
|
|
except KeyError: |
|
|
continue |
|
|
sims = cosine_similarity(vectors) |
|
|
for i in range(len(words) - 1): |
|
|
for j in range(i + 1, len(words)): |
|
|
res.append(sims[i][j]) |
|
|
return sum(res) / len(res) if res else 0 |
|
|
|
|
|
|
|
|
def projection(word_vec, axis): |
|
|
axis_norm = axis / np.linalg.norm(axis) |
|
|
return np.dot(word_vec, axis_norm) |
|
|
|
|
|
|
|
|
def get_projection_row(model, axis): |
|
|
words = list(model.key_to_index.keys()) |
|
|
projections = [(word, projection(model[word], axis)) for word in words] |
|
|
projections = sorted(projections, key=lambda x: x[1]) |
|
|
return projections |
|
|
|
|
|
|
|
|
st.title("Vector embeddings") |
|
|
|
|
|
tab1, tab2, tab3, tab4, tab5 = st.tabs([ |
|
|
"Vector ariphmetics", |
|
|
"Semantic consistency", |
|
|
"Semantic axis", |
|
|
"Distribution analysis", |
|
|
"Report" |
|
|
]) |
|
|
|
|
|
with tab1: |
|
|
st.header("Vector ariphmetics") |
|
|
expr = st.text_input("Insert expression", value="рубль - россия + сша") |
|
|
|
|
|
if st.button("Compute"): |
|
|
words = expr.replace('+', ' + ').replace('-', ' - ').split() |
|
|
positive, negative = [], [] |
|
|
current = 'pos' |
|
|
|
|
|
for w in words: |
|
|
if w == '+': |
|
|
current = 'pos' |
|
|
elif w == '-': |
|
|
current = 'neg' |
|
|
else: |
|
|
(positive if current == 'pos' else negative).append(w) |
|
|
|
|
|
missing = [w for w in positive + negative if w not in model] |
|
|
if missing: |
|
|
st.warning(f"Words not found in voc: {', '.join(missing)}") |
|
|
st.stop() |
|
|
|
|
|
try: |
|
|
similar = model.most_similar( |
|
|
positive=positive, |
|
|
negative=negative, |
|
|
topn=10 |
|
|
) |
|
|
|
|
|
st.write("### Result:") |
|
|
result_words = [f"{w} ({s:.3f})" for w, s in similar] |
|
|
st.write("Nearest words: " + ", ".join(result_words)) |
|
|
|
|
|
st.write("### In-between steps") |
|
|
|
|
|
cum_vec = np.zeros(model.vector_size) |
|
|
|
|
|
steps_data = [] |
|
|
|
|
|
for i in range(len(positive)): |
|
|
cum_vec += model[w] |
|
|
nearest = model.most_similar(positive=positive[:i + 1], topn=1) |
|
|
steps_data.append({ |
|
|
"step": f"+ {positive[i]}", |
|
|
"nearest word": nearest[0][0], |
|
|
"similarity": nearest[0][1] |
|
|
}) |
|
|
|
|
|
for i in range(len(negative)): |
|
|
cum_vec -= model[w] |
|
|
nearest = model.most_similar(positive=positive, negative=negative[:i + 1], topn=1) |
|
|
steps_data.append({ |
|
|
"step": f"- {negative[i]}", |
|
|
"nearest word": nearest[0][0], |
|
|
"similarity": nearest[0][1] |
|
|
}) |
|
|
|
|
|
df_steps = pd.DataFrame(steps_data) |
|
|
st.dataframe(df_steps[["step", "nearest word", "similarity"]]) |
|
|
|
|
|
result_word = similar[0][0] |
|
|
fig = px.scatter( |
|
|
x=[cum_vec[0]], y=[cum_vec[1]], |
|
|
text=[result_word], |
|
|
title="Result (first 2 components)" |
|
|
) |
|
|
fig.update_traces(textposition='top center', marker=dict(size=12, color='red')) |
|
|
st.plotly_chart(fig) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error computing: {e}") |
|
|
|
|
|
with tab2: |
|
|
st.header("Similarity calculator") |
|
|
col1, col2 = st.columns(2) |
|
|
with col1: |
|
|
word1 = st.text_input("word 1", value="мужчина") |
|
|
with col2: |
|
|
word2 = st.text_input("word 2", value="женщина") |
|
|
|
|
|
if st.button("Compute similarity"): |
|
|
try: |
|
|
v1, v2 = model[word1], model[word2] |
|
|
sim = cosine_similarity([v1], [v2])[0][0] |
|
|
st.metric("Cosine similarity", f"{sim:.4f}") |
|
|
|
|
|
st.write("### Nearest neighbors graph") |
|
|
neighbors = model.most_similar(word1, topn=5) + model.most_similar(word2, topn=5) |
|
|
nodes = list(set([word1, word2] + [n[0] for n in neighbors])) |
|
|
edges = [(word1, n[0]) for n in model.most_similar(word1, topn=5)] + \ |
|
|
[(word2, n[0]) for n in model.most_similar(word2, topn=5)] |
|
|
|
|
|
G = go.Figure() |
|
|
pos = np.random.rand(len(nodes), 2) * 2 - 1 |
|
|
node_x = pos[:, 0] |
|
|
node_y = pos[:, 1] |
|
|
|
|
|
for edge in edges: |
|
|
x0, y0 = pos[nodes.index(edge[0])] |
|
|
x1, y1 = pos[nodes.index(edge[1])] |
|
|
G.add_trace(go.Scatter(x=[x0, x1], y=[y0, y1], mode='lines', line=dict(width=1, color='gray'), showlegend=False)) |
|
|
|
|
|
G.add_trace(go.Scatter(x=node_x, y=node_y, mode='text+markers', |
|
|
marker=dict(size=10, color='lightblue'), |
|
|
text=nodes, textposition="top center")) |
|
|
G.update_layout(title="Semantic links graph", showlegend=False) |
|
|
st.plotly_chart(G) |
|
|
|
|
|
except KeyError as e: |
|
|
st.error(f"Word not found: {e}") |
|
|
|
|
|
with tab3: |
|
|
st.header("Semantic axis projection") |
|
|
col1, col2 = st.columns(2) |
|
|
with col1: |
|
|
pos_axis = st.text_input("positive", value="мужчина") |
|
|
with col2: |
|
|
neg_axis = st.text_input("negative", value="женщина") |
|
|
|
|
|
if st.button("Build axis"): |
|
|
try: |
|
|
pos_vec = model[pos_axis] |
|
|
neg_vec = model[neg_axis] |
|
|
axis = pos_vec - neg_vec |
|
|
|
|
|
projections = get_projection_row(model, axis) |
|
|
top_pos = projections[-10:][::-1] |
|
|
top_neg = projections[:10] |
|
|
|
|
|
st.write(f"Axis: **{pos_axis} – {neg_axis}**") |
|
|
st.write("### Top 10 positive:") |
|
|
st.write(", ".join([f"{w} ({p:.3f})" for w, p in top_pos])) |
|
|
|
|
|
st.write("### Top 10 negative:") |
|
|
st.write(", ".join([f"{w} ({p:.3f})" for w, p in top_neg])) |
|
|
|
|
|
df_proj = pd.DataFrame(top_pos + top_neg, columns=["word", "projection"]) |
|
|
fig = px.bar(df_proj, x="projection", y="word", orientation='h', title=f"Projection on axis: {pos_axis}–{neg_axis}") |
|
|
st.plotly_chart(fig) |
|
|
|
|
|
except KeyError as e: |
|
|
st.error(f"Error: {e}") |
|
|
|
|
|
with tab4: |
|
|
st.header("Distance distribution analysis") |
|
|
all_vectors = model.vectors |
|
|
sample = all_vectors[np.random.choice(all_vectors.shape[0], 1000, replace=False)] |
|
|
|
|
|
dists = cosine_similarity(sample) |
|
|
np.fill_diagonal(dists, 0) |
|
|
flat_dists = dists.flatten() |
|
|
flat_dists = flat_dists[flat_dists > 0] |
|
|
|
|
|
fig = px.histogram(flat_dists, nbins=50, title="Cosine similarity distribution between random words") |
|
|
st.plotly_chart(fig) |
|
|
|
|
|
st.metric("Mean similarity", f"{np.mean(flat_dists):.3f}") |
|
|
st.metric("Std deviation", f"{np.std(flat_dists):.3f}") |
|
|
|
|
|
with tab5: |
|
|
st.header("Report") |
|
|
|
|
|
st.subheader("1. Analogy rate") |
|
|
analogies_file = "data/analogy.txt" |
|
|
if os.path.exists(analogies_file): |
|
|
acc, results = analogy_accuracy(model, analogies_file) |
|
|
st.metric("Analogy accuracy (in top 10)", f"{acc:.2%}") |
|
|
st.dataframe(pd.DataFrame(results)) |
|
|
else: |
|
|
st.warning("File `analogy.txt` not found.") |
|
|
|
|
|
st.subheader("2. Average synonyms similarity") |
|
|
sim_file = "data/synonyms.txt" |
|
|
if os.path.exists(sim_file): |
|
|
avg_sim = avg_similarity(model, sim_file) |
|
|
st.metric("Average similarity", f"{avg_sim:.4f}") |
|
|
else: |
|
|
st.warning("File `similarity_words.txt` not found.") |
|
|
|
|
|
st.subheader("3. Average antonyms similarity") |
|
|
sim_file = "data/antonyms.txt" |
|
|
if os.path.exists(sim_file): |
|
|
avg_sim = avg_similarity(model, sim_file) |
|
|
st.metric("Average similarity", f"{avg_sim:.4f}") |
|
|
else: |
|
|
st.warning("File `similarity_words.txt` not found.") |
|
|
|
|
|
st.subheader("4. Heatmap for nearest words") |
|
|
query_words = st.text_input("Enter words", value="мужчина женщина мальчик девочка").split() |
|
|
if st.button("Build heatmap"): |
|
|
try: |
|
|
vectors = [model[w] for w in query_words] |
|
|
sims = cosine_similarity(vectors) |
|
|
fig = px.imshow(sims, x=query_words, y=query_words, color_continuous_scale="Blues", title="Similarity heatmap") |
|
|
st.plotly_chart(fig) |
|
|
except KeyError as e: |
|
|
st.error(f"Error: {e}") |
|
|
|
|
|
st.subheader("5. 2D projection") |
|
|
sample_words = st.text_input("Input words", value="мужчина женщина мальчик девочка") |
|
|
word_list = sample_words.split() |
|
|
if st.button("Show clusters"): |
|
|
try: |
|
|
from sklearn.manifold import TSNE |
|
|
vectors = np.array([model[w] for w in word_list]) |
|
|
tsne = TSNE(n_components=2, perplexity=len(vectors) - 1, random_state=42) |
|
|
embedded = tsne.fit_transform(vectors) |
|
|
|
|
|
fig = px.scatter(x=embedded[:, 0], y=embedded[:, 1], text=word_list, title="words projection") |
|
|
fig.update_traces(textposition='top center') |
|
|
st.plotly_chart(fig) |
|
|
except KeyError as e: |
|
|
st.error(f"Word not found: {e}") |