from loguru import logger from typing import List, Dict, Optional, Tuple import numpy as np from services.word_service import WordEmbeddingService class StudyService: def __init__(self, word_service: WordEmbeddingService): self.word_service = word_service async def analyze_word_neighborhood(self, word: str, n_neighbors: int = 20) -> Dict: """Get detailed analysis of a word's semantic neighborhood""" try: vector = await self.word_service.get_vector(word) similar_words = await self.word_service.get_most_similar_words(word, n=n_neighbors) return { "word": word, "in_vocabulary": vector is not None, "similar_words": similar_words, "vector_norm": float(np.linalg.norm(vector)) if vector is not None else None } except Exception as e: logger.exception(f"Error analyzing word neighborhood: {e}") return { "word": word, "in_vocabulary": False, "similar_words": [], "vector_norm": None } async def analyze_concept(self, positive_words: List[str], negative_words: List[str] = None, n_results: int = 10) -> Dict: """ Analyze a concept defined by positive and negative words Example: "roi - homme + femme = reine" """ try: negative_words = negative_words or [] # Get vectors for all words (assuming FastText dimension = 300) concept_vec = np.zeros(300) # Add positive word vectors for word in positive_words: vector = await self.word_service.get_vector(word) if vector is not None: concept_vec += vector # Subtract negative word vectors for word in negative_words: vector = await self.word_service.get_vector(word) if vector is not None: concept_vec -= vector # Normalize the concept vector concept_vec = concept_vec / np.linalg.norm(concept_vec) # Find similar words to the concept vector similar_words = await self.word_service.get_similar_by_vector(concept_vec, n=n_results) return { "concept": { "positive_words": positive_words, "negative_words": negative_words }, "similar_words": similar_words, "vector_norm": float(np.linalg.norm(concept_vec)) } except Exception as e: logger.exception(f"Error analyzing concept: {e}") return { "concept": { "positive_words": positive_words, "negative_words": negative_words }, "similar_words": [], "vector_norm": None } async def get_phrase_vector(self, words: List[str]) -> Optional[List[float]]: """Compute the averaged embedding for a phrase (list of words).""" vectors = [] for word in words: vector = await self.word_service.get_vector(word) if vector is not None: vectors.append(vector) if not vectors: return None phrase_vec = np.mean(vectors, axis=0) return phrase_vec.tolist() async def cluster_words(self, words: List[str], n_clusters: int = 3) -> Dict: """ Cluster the embeddings of the given words using K-Means. Returns a dictionary with cluster centroids and word assignments. """ from sklearn.cluster import KMeans vectors = [] valid_words = [] for word in words: vector = await self.word_service.get_vector(word) if vector is not None: vectors.append(vector) valid_words.append(word) if not vectors: return {"error": "No valid vectors found."} kmeans = KMeans(n_clusters=n_clusters, random_state=42) labels = kmeans.fit_predict(np.array(vectors)) clusters = {} for word, label in zip(valid_words, labels): clusters.setdefault(int(label), []).append(word) return {"clusters": clusters, "centroids": kmeans.cluster_centers_.tolist()} async def find_outlier(self, words: List[str]) -> Dict: """ Identify the outlier in a list of words (the one least similar to the rest). """ vectors = [] valid_words = [] for word in words: vector = await self.word_service.get_vector(word) if vector is not None: vectors.append(vector) valid_words.append(word) if len(vectors) < 2: return {"error": "Not enough valid words to determine an outlier."} similarities = [] for i, vec in enumerate(vectors): # Compute average similarity to all other words sim_sum = 0 count = 0 for j, other_vec in enumerate(vectors): if i != j: sim = np.dot(vec, other_vec) / (np.linalg.norm(vec) * np.linalg.norm(other_vec)) sim_sum += sim count += 1 avg_sim = sim_sum / count if count > 0 else 0 similarities.append(avg_sim) outlier_index = int(np.argmin(similarities)) return {"outlier": valid_words[outlier_index], "average_similarities": dict(zip(valid_words, similarities))} async def distance_distribution(self, word: str, sample_size: int = 1000) -> Dict: """ Compute the distribution of cosine similarities (or distances) between the target word and a sample of words. """ target_vector = await self.word_service.get_vector(word) if target_vector is None: return {"error": "Target word not found in vocabulary."} all_words = list(self.word_service.vocab_vectors.keys()) sample_words = np.random.choice(all_words, size=min(sample_size, len(all_words)), replace=False) distances = [] for other in sample_words: other_vector = self.word_service.vocab_vectors[other] # Using cosine similarity for example sim = np.dot(target_vector, other_vector) / (np.linalg.norm(target_vector) * np.linalg.norm(other_vector)) distances.append(sim) return { "word": word, "similarity_distribution": { "min": float(np.min(distances)), "max": float(np.max(distances)), "mean": float(np.mean(distances)), "std": float(np.std(distances)) } } async def interpolate_words(self, word1: str, word2: str, steps: int = 5) -> Dict: """ Generate a series of intermediate vectors between two words and retrieve the closest word for each interpolation. """ vec1 = await self.word_service.get_vector(word1) vec2 = await self.word_service.get_vector(word2) if vec1 is None or vec2 is None: return {"error": "One or both words not found in vocabulary."} interpolations = [] for i in range(steps + 1): ratio = i / steps interp_vec = (1 - ratio) * vec1 + ratio * vec2 # Find closest word to the interpolated vector similar = await self.word_service.get_similar_by_vector(interp_vec, n=1) interpolations.append({ "step": i, "vector": interp_vec.tolist(), "closest_word": similar[0] if similar else None }) return {"interpolations": interpolations} async def combine_word_vectors(self, positive: List[tuple], negative: List[tuple]) -> Optional[List[float]]: """ Combine word vectors given weighted positive and negative contributions. Each input is a list of tuples (word, weight). Returns the combined normalized vector. """ combined_vec = np.zeros(300) count = 0 for word, weight in positive: vector = await self.word_service.get_vector(word) if vector is not None: combined_vec += weight * vector count += 1 for word, weight in negative: vector = await self.word_service.get_vector(word) if vector is not None: combined_vec -= weight * vector count += 1 if count == 0 or np.linalg.norm(combined_vec) == 0: return None combined_vec = combined_vec / np.linalg.norm(combined_vec) return combined_vec.tolist() async def analyze_analogy(self, word1: str, word2: str, word3: str, n_results: int = 10) -> Dict: """ Analyze word analogies (a:b :: c:?). Example: paris:france :: berlin:? (should find "allemagne") """ try: # Get vectors for each word vec1 = await self.word_service.get_vector(word1) vec2 = await self.word_service.get_vector(word2) vec3 = await self.word_service.get_vector(word3) # Use explicit checks to see if any vector is missing if vec1 is None or vec2 is None or vec3 is None: return { "analogy": f"{word1}:{word2} :: {word3}:?", "similar_words": [], "error": "One or more words not found in vocabulary" } # Calculate analogy vector (vec2 - vec1 + vec3) analogy_vec = vec2 - vec1 + vec3 # Normalize the analogy vector analogy_vec = analogy_vec / np.linalg.norm(analogy_vec) # Find similar words using the analogy vector similar_words = await self.word_service.get_similar_by_vector(analogy_vec, n=n_results) return { "analogy": f"{word1}:{word2} :: {word3}:?", "similar_words": similar_words } except Exception as e: logger.exception(f"Error analyzing analogy: {e}") return { "analogy": f"{word1}:{word2} :: {word3}:?", "similar_words": [], "error": str(e) } async def analyze_semantic_field(self, words: List[str], n_neighbors: int = 5) -> Dict: """ Analyze the semantic field created by a group of words """ try: results = [] center_vector = np.zeros(300) # FastText dimension valid_vectors = 0 # Calculate center of the semantic field for word in words: vector = await self.word_service.get_vector(word) if vector is not None: center_vector += vector valid_vectors += 1 # Analyze each word similar = await self.word_service.get_most_similar_words(word, n=n_neighbors) results.append({ "word": word, "similar_words": similar, "vector_norm": float(np.linalg.norm(vector)) }) if valid_vectors > 0: center_vector = center_vector / valid_vectors center_similar = await self.word_service.get_similar_by_vector(center_vector, n=n_neighbors) else: center_similar = [] return { "words": results, "center_word_candidates": center_similar, "valid_words_count": valid_vectors } except Exception as e: logger.exception(f"Error analyzing semantic field: {e}") return { "words": [], "center_word_candidates": [], "valid_words_count": 0, "error": str(e) } async def get_word_vectors(self, words: List[str]) -> Dict: """ Retrieve the vector representations for a list of words. Returns a dictionary with each word and its vector (as a list). This data can then be sent to an external visualization service. """ try: data = [] for word in words: vector = await self.word_service.get_vector(word) data.append({ "word": word, "vector": vector.tolist() if vector is not None else None }) return {"data": data} except Exception as e: logger.exception(f"Error retrieving word vectors: {e}") return {"error": str(e)}