| | import fasttext
|
| | import streamlit as st
|
| | import numpy as np
|
| | import pandas as pd
|
| | from gensim.models import Word2Vec
|
| | from sklearn.metrics.pairwise import cosine_similarity
|
| | import plotly.express as px
|
| | import plotly.graph_objects as go
|
| | from collections import Counter
|
| | import os
|
| | import glob
|
| |
|
| |
|
| | class UnifiedVectorModel:
|
| | def __init__(self, backend_model, model_type="w2v"):
|
| | self.model = backend_model
|
| | self.model_type = model_type.lower()
|
| |
|
| | if self.model_type == "w2v":
|
| | self.wv = backend_model.wv
|
| | self.key_to_index = self.wv.key_to_index
|
| | self.vector_size = self.wv.vector_size
|
| | self._words = set(self.wv.key_to_index.keys())
|
| |
|
| | elif self.model_type == "ft":
|
| |
|
| | self.key_to_index = {word: i for i, word in enumerate(backend_model.get_words())}
|
| | self.vector_size = backend_model.get_dimension()
|
| | self._words = set(self.key_to_index.keys())
|
| | else:
|
| | raise ValueError("model_type must be 'w2v' or 'ft'")
|
| |
|
| | def __contains__(self, word):
|
| | return word in self._words
|
| |
|
| | def __getitem__(self, word):
|
| | if self.model_type == "w2v":
|
| | return self.wv[word]
|
| | elif self.model_type == "ft":
|
| | return self.model.get_word_vector(word)
|
| |
|
| | def most_similar(self, positive=None, negative=None, topn=10):
|
| | from sklearn.metrics.pairwise import cosine_similarity
|
| |
|
| | if not positive:
|
| | positive = []
|
| | if not negative:
|
| | negative = []
|
| |
|
| | try:
|
| | if self.model_type == "w2v":
|
| | return self.wv.most_similar(positive=positive, negative=negative, topn=topn)
|
| |
|
| | elif self.model_type == "ft":
|
| | vec = np.zeros(self.vector_size)
|
| | for w in positive:
|
| | if w in self:
|
| | vec += self[w]
|
| | else:
|
| | continue
|
| | for w in negative:
|
| | if w in self:
|
| | vec -= self[w]
|
| | else:
|
| | continue
|
| |
|
| | if np.allclose(vec, 0):
|
| | return []
|
| |
|
| | words = list(self._words)
|
| | vectors = np.array([self[w] for w in words])
|
| |
|
| | sims = cosine_similarity([vec], vectors)[0]
|
| | best = np.argsort(sims)[::-1][:topn + len(positive) + len(negative)]
|
| |
|
| | result = []
|
| | for i in best:
|
| | word = words[i]
|
| | if word not in positive and word not in negative:
|
| | result.append((word, float(sims[i])))
|
| | if len(result) >= topn:
|
| | break
|
| | return result
|
| |
|
| | except Exception as e:
|
| | print(f"Error in most_similar: {e}")
|
| | return []
|
| |
|
| | def similar_by_vector(self, vector, topn=10):
|
| | from sklearn.metrics.pairwise import cosine_similarity
|
| |
|
| | words = list(self._words)
|
| | vectors = np.array([self[w] for w in words])
|
| | sims = cosine_similarity([vector], vectors)[0]
|
| | best = np.argsort(sims)[::-1][:topn]
|
| |
|
| | return [(words[i], float(sims[i])) for i in best]
|
| |
|
| | def get_words(self):
|
| | return list(self._words)
|
| |
|
| | @property
|
| | def vectors(self):
|
| | if not hasattr(self, '_cached_vectors'):
|
| | words = list(self._words)
|
| | self._cached_words = words
|
| | self._cached_vectors = np.array([self[w] for w in words])
|
| | return self._cached_vectors
|
| |
|
| | @property
|
| | def index_to_key(self):
|
| | if not hasattr(self, '_index_to_key'):
|
| | self._index_to_key = list(self._words)
|
| | return self._index_to_key
|
| |
|
| |
|
| | @st.cache_resource
|
| | def load_model(model_path):
|
| | try:
|
| | if model_path.endswith(".model"):
|
| | raw_model = Word2Vec.load(model_path)
|
| | current_model = UnifiedVectorModel(raw_model, model_type="w2v")
|
| |
|
| | elif model_path.endswith(".bin"):
|
| | raw_model = fasttext.load_model(model_path)
|
| | current_model = UnifiedVectorModel(raw_model, model_type="ft")
|
| | else:
|
| | raise ValueError(f"wrong path format")
|
| | return current_model
|
| | except Exception as e:
|
| | st.error(f"error loading model {model_path}: {e}")
|
| | return None
|
| |
|
| |
|
| | MODELS_DIR = "models"
|
| |
|
| | if not os.path.exists(MODELS_DIR):
|
| | st.error(f"Folder `{MODELS_DIR}` not found.")
|
| | st.stop()
|
| |
|
| | model_files = []
|
| | for ext in ["*.bin", "*.model", "*.vec"]:
|
| | model_files.extend(glob.glob(os.path.join(MODELS_DIR, ext)))
|
| | model_files = [f for f in model_files if os.path.isfile(f)]
|
| | model_names = [os.path.basename(f) for f in model_files]
|
| |
|
| | if len(model_names) == 0:
|
| | st.error(f"No models in folder `{MODELS_DIR}` (.bin, .model, .vec).")
|
| | st.info("Supported formats: Word2Vec (binary/text), FastText.")
|
| | st.stop()
|
| |
|
| | selected_model_name = st.sidebar.selectbox(
|
| | "Choose pretrained model",
|
| | model_names
|
| | )
|
| |
|
| | selected_model_path = os.path.join(MODELS_DIR, selected_model_name)
|
| |
|
| | st.sidebar.info(f"loading: `{selected_model_name}`")
|
| |
|
| | model = load_model(selected_model_path)
|
| |
|
| | if model is None:
|
| | st.stop()
|
| | else:
|
| | st.sidebar.success(f"Model '{selected_model_name}' loaded")
|
| | st.sidebar.write(f"Voc size: {len(model.key_to_index):,}")
|
| | st.sidebar.write(f"Vector size: {model.vector_size}")
|
| |
|
| | def analogy_accuracy(model, file_name):
|
| | right = 0
|
| | count = 0
|
| | results = []
|
| | with open(file_name, encoding='utf-8') as file:
|
| | for line in file:
|
| | words = line.strip().split()
|
| | if len(words) != 4:
|
| | continue
|
| | try:
|
| | most_similar = model.most_similar(positive=[words[0], words[2]], negative=[words[1]], topn=10)
|
| | predicted = [x[0] for x in most_similar]
|
| | correct = words[3]
|
| | if correct in predicted:
|
| | rank = predicted.index(correct) + 1
|
| | right += 1
|
| | else:
|
| | rank = None
|
| | count += 1
|
| | results.append({
|
| | "query": f"{words[0]} - {words[1]} + {words[2]}",
|
| | "target": correct,
|
| | "predicted": predicted[0],
|
| | "rank": rank,
|
| | "in_top10": bool(rank)
|
| | })
|
| | except KeyError as e:
|
| | continue
|
| | accuracy = right / count if count > 0 else 0
|
| | return accuracy, results
|
| |
|
| |
|
| | def avg_similarity(model, file_name):
|
| | res = []
|
| | with open(file_name, encoding='utf-8') as file:
|
| | for line in file:
|
| | words = line.strip().split()
|
| | try:
|
| | vectors = [model[word] for word in words]
|
| | except KeyError:
|
| | continue
|
| | sims = cosine_similarity(vectors)
|
| | for i in range(len(words) - 1):
|
| | for j in range(i + 1, len(words)):
|
| | res.append(sims[i][j])
|
| | return sum(res) / len(res) if res else 0
|
| |
|
| |
|
| | def projection(word_vec, axis):
|
| | axis_norm = axis / np.linalg.norm(axis)
|
| | return np.dot(word_vec, axis_norm)
|
| |
|
| |
|
| | def get_projection_row(model, axis):
|
| | words = list(model.key_to_index.keys())
|
| | projections = [(word, projection(model[word], axis)) for word in words]
|
| | projections = sorted(projections, key=lambda x: x[1])
|
| | return projections
|
| |
|
| |
|
| | st.title("Vector embeddings")
|
| |
|
| | tab1, tab2, tab3, tab4, tab5 = st.tabs([
|
| | "Vector ariphmetics",
|
| | "Semantic consistency",
|
| | "Semantic axis",
|
| | "Distribution analysis",
|
| | "Report"
|
| | ])
|
| |
|
| | with tab1:
|
| | st.header("Vector ariphmetics")
|
| | expr = st.text_input("Insert expression", value="рубль - россия + сша")
|
| |
|
| | if st.button("Compute"):
|
| | words = expr.replace('+', ' + ').replace('-', ' - ').split()
|
| | positive, negative = [], []
|
| | current = 'pos'
|
| |
|
| | for w in words:
|
| | if w == '+':
|
| | current = 'pos'
|
| | elif w == '-':
|
| | current = 'neg'
|
| | else:
|
| | (positive if current == 'pos' else negative).append(w)
|
| |
|
| | missing = [w for w in positive + negative if w not in model]
|
| | if missing:
|
| | st.warning(f"Words not found in voc: {', '.join(missing)}")
|
| | st.stop()
|
| |
|
| | try:
|
| | similar = model.most_similar(
|
| | positive=positive,
|
| | negative=negative,
|
| | topn=10
|
| | )
|
| |
|
| | st.write("### Result:")
|
| | result_words = [f"{w} ({s:.3f})" for w, s in similar]
|
| | st.write("Nearest words: " + ", ".join(result_words))
|
| |
|
| | st.write("### In-between steps")
|
| |
|
| | cum_vec = np.zeros(model.vector_size)
|
| |
|
| | steps_data = []
|
| |
|
| | for i in range(len(positive)):
|
| | cum_vec += model[w]
|
| | nearest = model.most_similar(positive=positive[:i + 1], topn=1)
|
| | steps_data.append({
|
| | "step": f"+ {positive[i]}",
|
| | "nearest word": nearest[0][0],
|
| | "similarity": nearest[0][1]
|
| | })
|
| |
|
| | for i in range(len(negative)):
|
| | cum_vec -= model[w]
|
| | nearest = model.most_similar(positive=positive, negative=negative[:i + 1], topn=1)
|
| | steps_data.append({
|
| | "step": f"- {negative[i]}",
|
| | "nearest word": nearest[0][0],
|
| | "similarity": nearest[0][1]
|
| | })
|
| |
|
| | df_steps = pd.DataFrame(steps_data)
|
| | st.dataframe(df_steps[["step", "nearest word", "similarity"]])
|
| |
|
| | result_word = similar[0][0]
|
| | fig = px.scatter(
|
| | x=[cum_vec[0]], y=[cum_vec[1]],
|
| | text=[result_word],
|
| | title="Result (first 2 components)"
|
| | )
|
| | fig.update_traces(textposition='top center', marker=dict(size=12, color='red'))
|
| | st.plotly_chart(fig)
|
| |
|
| | except Exception as e:
|
| | st.error(f"Error computing: {e}")
|
| |
|
| | with tab2:
|
| | st.header("Similarity calculator")
|
| | col1, col2 = st.columns(2)
|
| | with col1:
|
| | word1 = st.text_input("word 1", value="мужчина")
|
| | with col2:
|
| | word2 = st.text_input("word 2", value="женщина")
|
| |
|
| | if st.button("Compute similarity"):
|
| | try:
|
| | v1, v2 = model[word1], model[word2]
|
| | sim = cosine_similarity([v1], [v2])[0][0]
|
| | st.metric("Cosine similarity", f"{sim:.4f}")
|
| |
|
| | st.write("### Nearest neighbors graph")
|
| | neighbors = model.most_similar(word1, topn=5) + model.most_similar(word2, topn=5)
|
| | nodes = list(set([word1, word2] + [n[0] for n in neighbors]))
|
| | edges = [(word1, n[0]) for n in model.most_similar(word1, topn=5)] + \
|
| | [(word2, n[0]) for n in model.most_similar(word2, topn=5)]
|
| |
|
| | G = go.Figure()
|
| | pos = np.random.rand(len(nodes), 2) * 2 - 1
|
| | node_x = pos[:, 0]
|
| | node_y = pos[:, 1]
|
| |
|
| | for edge in edges:
|
| | x0, y0 = pos[nodes.index(edge[0])]
|
| | x1, y1 = pos[nodes.index(edge[1])]
|
| | G.add_trace(go.Scatter(x=[x0, x1], y=[y0, y1], mode='lines', line=dict(width=1, color='gray'), showlegend=False))
|
| |
|
| | G.add_trace(go.Scatter(x=node_x, y=node_y, mode='text+markers',
|
| | marker=dict(size=10, color='lightblue'),
|
| | text=nodes, textposition="top center"))
|
| | G.update_layout(title="Semantic links graph", showlegend=False)
|
| | st.plotly_chart(G)
|
| |
|
| | except KeyError as e:
|
| | st.error(f"Word not found: {e}")
|
| |
|
| | with tab3:
|
| | st.header("Semantic axis projection")
|
| | col1, col2 = st.columns(2)
|
| | with col1:
|
| | pos_axis = st.text_input("positive", value="мужчина")
|
| | with col2:
|
| | neg_axis = st.text_input("negative", value="женщина")
|
| |
|
| | if st.button("Build axis"):
|
| | try:
|
| | pos_vec = model[pos_axis]
|
| | neg_vec = model[neg_axis]
|
| | axis = pos_vec - neg_vec
|
| |
|
| | projections = get_projection_row(model, axis)
|
| | top_pos = projections[-10:][::-1]
|
| | top_neg = projections[:10]
|
| |
|
| | st.write(f"Axis: **{pos_axis} – {neg_axis}**")
|
| | st.write("### Top 10 positive:")
|
| | st.write(", ".join([f"{w} ({p:.3f})" for w, p in top_pos]))
|
| |
|
| | st.write("### Top 10 negative:")
|
| | st.write(", ".join([f"{w} ({p:.3f})" for w, p in top_neg]))
|
| |
|
| | df_proj = pd.DataFrame(top_pos + top_neg, columns=["word", "projection"])
|
| | fig = px.bar(df_proj, x="projection", y="word", orientation='h', title=f"Projection on axis: {pos_axis}–{neg_axis}")
|
| | st.plotly_chart(fig)
|
| |
|
| | except KeyError as e:
|
| | st.error(f"Error: {e}")
|
| |
|
| | with tab4:
|
| | st.header("Distance distribution analysis")
|
| | all_vectors = model.vectors
|
| | sample = all_vectors[np.random.choice(all_vectors.shape[0], 1000, replace=False)]
|
| |
|
| | dists = cosine_similarity(sample)
|
| | np.fill_diagonal(dists, 0)
|
| | flat_dists = dists.flatten()
|
| | flat_dists = flat_dists[flat_dists > 0]
|
| |
|
| | fig = px.histogram(flat_dists, nbins=50, title="Cosine similarity distribution between random words")
|
| | st.plotly_chart(fig)
|
| |
|
| | st.metric("Mean similarity", f"{np.mean(flat_dists):.3f}")
|
| | st.metric("Std deviation", f"{np.std(flat_dists):.3f}")
|
| |
|
| | with tab5:
|
| | st.header("Report")
|
| |
|
| | st.subheader("1. Analogy rate")
|
| | analogies_file = "data/analogy.txt"
|
| | if os.path.exists(analogies_file):
|
| | acc, results = analogy_accuracy(model, analogies_file)
|
| | st.metric("Analogy accuracy (in top 10)", f"{acc:.2%}")
|
| | st.dataframe(pd.DataFrame(results))
|
| | else:
|
| | st.warning("File `analogy.txt` not found.")
|
| |
|
| | st.subheader("2. Average synonyms similarity")
|
| | sim_file = "data/synonyms.txt"
|
| | if os.path.exists(sim_file):
|
| | avg_sim = avg_similarity(model, sim_file)
|
| | st.metric("Average similarity", f"{avg_sim:.4f}")
|
| | else:
|
| | st.warning("File `similarity_words.txt` not found.")
|
| |
|
| | st.subheader("3. Average antonyms similarity")
|
| | sim_file = "data/antonyms.txt"
|
| | if os.path.exists(sim_file):
|
| | avg_sim = avg_similarity(model, sim_file)
|
| | st.metric("Average similarity", f"{avg_sim:.4f}")
|
| | else:
|
| | st.warning("File `similarity_words.txt` not found.")
|
| |
|
| | st.subheader("4. Heatmap for nearest words")
|
| | query_words = st.text_input("Enter words", value="мужчина женщина мальчик девочка").split()
|
| | if st.button("Build heatmap"):
|
| | try:
|
| | vectors = [model[w] for w in query_words]
|
| | sims = cosine_similarity(vectors)
|
| | fig = px.imshow(sims, x=query_words, y=query_words, color_continuous_scale="Blues", title="Similarity heatmap")
|
| | st.plotly_chart(fig)
|
| | except KeyError as e:
|
| | st.error(f"Error: {e}")
|
| |
|
| | st.subheader("5. 2D projection")
|
| | sample_words = st.text_input("Input words", value="мужчина женщина мальчик девочка")
|
| | word_list = sample_words.split()
|
| | if st.button("Show clusters"):
|
| | try:
|
| | from sklearn.manifold import TSNE
|
| | vectors = np.array([model[w] for w in word_list])
|
| | tsne = TSNE(n_components=2, perplexity=len(vectors) - 1, random_state=42)
|
| | embedded = tsne.fit_transform(vectors)
|
| |
|
| | fig = px.scatter(x=embedded[:, 0], y=embedded[:, 1], text=word_list, title="words projection")
|
| | fig.update_traces(textposition='top center')
|
| | st.plotly_chart(fig)
|
| | except KeyError as e:
|
| | st.error(f"Word not found: {e}") |