"""Text Embeddings Module V2 - Aspect-based Prototype Extraction""" import os import json import logging import numpy as np from datetime import datetime from typing import Dict, Tuple, List, Optional from sentence_transformers import SentenceTransformer logger = logging.getLogger(__name__) # Default aspect seeds (built-in fallback) DEFAULT_ASPECT_SEEDS = { "leadership": [ "led a team", "was team lead", "managed a project", "supervised interns", "coordinated a cross-functional team", "organized the club", "president of the society", "captain of the team", "ran weekly standups", "delegated tasks", "mentored junior members", "headed the project", "oversaw project timelines", "chaired the committee", "led end-to-end delivery", "directed project milestones", "led a 5-person team", "managed stakeholders", "took ownership of the initiative", "led code reviews", "organized campus events", "led product demo sessions", "led recruitment for volunteers", "managed vendor relationships", "spearheaded the outreach program" ], "technical_skills": [ "developed a web API", "implemented RESTful services", "coded in python", "built machine learning models", "trained neural networks", "implemented data pipelines", "used pandas for ETL", "designed database schemas", "built microservices", "deployed models using docker", "worked with FastAPI", "implemented CI/CD", "wrote unit tests", "optimized SQL queries", "used scikit-learn", "developed recommendation systems", "built feature engineering pipelines", "deployed to cloud", "developed ETL jobs", "worked with Kafka", "implemented caching layers", "used TensorFlow or PyTorch", "built backend services", "wrote production-grade code", "integrated third-party APIs" ], "problem_solving": [ "solved complex problem", "debugged production issues", "optimized an algorithm", "reduced latency of service", "designed a scalable solution", "investigated root cause", "improved system reliability", "created a novel solution", "troubleshot integration issues", "automated manual tasks", "reduced memory usage", "resolved data pipeline failures", "refactored critical code", "handled edge cases", "iterated on prototypes", "performed A/B testing to decide", "diagnosed performance bottlenecks", "designed fallback strategies", "resolved deployment failures", "created monitoring & alerts" ], "internships_experience": [ "summer internship", "industrial training", "interned at", "worked as an intern", "internship project", "internship in data science", "interned at a startup", "completed internship at", "interned with the engineering team", "intern experience", "interned at an e-commerce company", "industrial internship", "co-op placement", "paid internship", "research internship", "interned as a software engineer", "on-the-job training", "worked under mentor", "internship-driven project", "corporate internship" ], "communication": [ "presented to stakeholders", "gave a presentation", "wrote documentation", "authored reports", "explained results to non-technical", "public speaking", "delivered demo", "prepared slides", "wrote user guides", "communicated with clients", "collaborated across teams", "conducted knowledge transfer", "wrote clear emails", "explained technical concepts", "presented project outcomes", "led demo sessions", "created onboarding docs", "contributed to team discussions", "led workshops", "hosted training sessions" ], "teamwork": [ "collaborated with team", "worked in a cross-functional team", "paired programming", "contributed to group project", "supported teammates", "collaborated on design", "worked with designers and PMs", "helped teammates debug", "co-authored project", "mentored peers", "shared responsibilities", "worked effectively in group", "contributed in agile team", "participated in sprints", "assisted in integration" ], "project_execution": [ "delivered project on time", "met project deadlines", "managed milestones", "handled project planning", "released production features", "coordinated deployment", "delivered MVP", "tracked KPIs", "managed scope", "created project timeline", "ran retrospectives", "managed feature rollout", "ensured on-time delivery", "performed release validations", "deployed analytics dashboard", "iterated based on feedback" ], "initiative": [ "initiated a project", "proposed a new idea", "took initiative", "started a side project", "built a proof of concept", "started a campus chapter", "created an automation", "improved an existing process", "volunteered to lead", "identified improvement areas", "launched a mini-product", "ran a pilot program", "created onboarding scripts", "led process improvements", "started a mentoring circle" ], "learning_agility": [ "quick learner", "self-taught", "learned new framework", "picked up new language", "adapted to new tech", "completed online courses", "upskilled via projects", "transitioned domains", "learned on the job", "rapidly onboarded", "attended workshops", "completed bootcamp", "took certification courses", "learned through documentation", "scaled knowledge quickly", "adapted to changing scope" ], "career_alignment": [ "career goal is", "aspire to become", "interested in data science", "pursue a role in product", "long-term goal", "want to specialize in", "career objective", "planning to pursue masters", "aim to work in industry", "seek product management roles", "interested in research", "want to join a startup", "targeting roles in ML engineering", "aiming for consulting roles", "career path is focused on" ] } # Question to aspects mapping QUESTION_ASPECT_MAP = { "text_q1": ["technical_skills", "problem_solving", "learning_agility", "initiative", "communication"], "text_q2": ["career_alignment", "learning_agility", "initiative", "communication"], "text_q3": ["leadership", "teamwork", "project_execution", "internships_experience", "communication"] } class TextModuleV2: """Enhanced text scoring using aspect-based prototypes with all-mpnet-base-v2""" def __init__(self, model_name: str = None, seeds_path: str = "./aspect_seeds.json", centroids_path: str = "./aspect_centroids.npz"): # Config: allow model override via env or param self.model_name = model_name or os.getenv('ASPECT_MODEL_NAME', 'all-mpnet-base-v2') self.seeds_path = seeds_path self.centroids_path = centroids_path # Load model logger.info(f"Loading sentence transformer model: {self.model_name}") self.model = SentenceTransformer(self.model_name, device='cpu') # Load seeds self.aspect_seeds = self._load_seeds() # Load or build centroids self.centroids = self._load_or_build_centroids() logger.info(f"TextModuleV2 initialized with {len(self.aspect_seeds)} aspects") def _load_seeds(self) -> Dict[str, List[str]]: """Load aspect seeds from JSON or use defaults""" if os.path.exists(self.seeds_path): try: with open(self.seeds_path, 'r', encoding='utf-8') as f: seeds = json.load(f) logger.info(f"Loaded aspect seeds from {self.seeds_path}") return seeds except Exception as e: logger.warning(f"Failed to load seeds from {self.seeds_path}: {e}. Using defaults.") return DEFAULT_ASPECT_SEEDS.copy() def _load_or_build_centroids(self) -> Dict[str, np.ndarray]: """Load cached centroids or build from seeds""" if os.path.exists(self.centroids_path): try: data = np.load(self.centroids_path) centroids = {key: data[key] for key in data.files} logger.info(f"Loaded centroids from {self.centroids_path}") return centroids except Exception as e: logger.warning(f"Failed to load centroids: {e}. Rebuilding.") return self.build_prototypes(self.aspect_seeds, self.model) def build_prototypes(self, aspect_seeds: Dict[str, List[str]], model: SentenceTransformer) -> Dict[str, np.ndarray]: """Build centroid prototypes from seed phrases""" logger.info("Building aspect centroids...") centroids = {} for aspect, seeds in aspect_seeds.items(): if not seeds: logger.warning(f"Aspect '{aspect}' has no seeds, skipping") continue # Encode seeds (CPU, convert_to_tensor=False) embeddings = model.encode(seeds, convert_to_tensor=False, show_progress_bar=False) embeddings = np.array(embeddings, dtype=np.float32) # Compute centroid centroid = np.mean(embeddings, axis=0) centroid = centroid / np.linalg.norm(centroid) # Normalize centroids[aspect] = centroid # Save centroids try: np.savez(self.centroids_path, **centroids) logger.info(f"Saved centroids to {self.centroids_path}") except Exception as e: logger.error(f"Failed to save centroids: {e}") return centroids def score_text_aspects(self, text: str, centroids: Dict[str, np.ndarray], top_k: int = 3) -> Tuple[Dict[str, float], Dict[str, List[str]], float]: """ Score text against aspect centroids Returns: (aspect_scores, chunk_assignments, confidence) """ if not text or len(text) < 20: return {}, {}, 0.0 # Split into chunks (sentences or 50-word windows) chunks = self._split_text(text) if not chunks: return {}, {}, 0.0 # Encode chunks chunk_embeddings = self.model.encode(chunks, convert_to_tensor=False, show_progress_bar=False) chunk_embeddings = np.array(chunk_embeddings, dtype=np.float32) # Score each aspect aspect_scores = {} chunk_assignments = {aspect: [] for aspect in centroids.keys()} for aspect, centroid in centroids.items(): # Compute cosine similarities sims = np.dot(chunk_embeddings, centroid) / ( np.linalg.norm(chunk_embeddings, axis=1) * np.linalg.norm(centroid) + 1e-8 ) # Scoring formula: 0.6 * max_sim + 0.4 * mean_topk max_sim = np.max(sims) topk_sims = np.partition(sims, -min(top_k, len(sims)))[-top_k:] mean_topk = np.mean(topk_sims) # Map from [-1,1] to [0,1] raw_score = 0.6 * max_sim + 0.4 * mean_topk normalized_score = (raw_score + 1) / 2 aspect_scores[aspect] = float(np.clip(normalized_score, 0, 1)) # Assign chunks with sim > threshold threshold = 0.3 for i, sim in enumerate(sims): if sim > threshold: chunk_assignments[aspect].append(chunks[i]) # Calculate confidence confidence = self._calculate_aspect_confidence(text, aspect_scores) return aspect_scores, chunk_assignments, confidence def _split_text(self, text: str) -> List[str]: """Split text into scorable chunks""" import re # Split by sentences sentences = re.split(r'[.!?]+', text) chunks = [s.strip() for s in sentences if len(s.strip()) > 20] # If too few sentences, use sliding window if len(chunks) < 3: words = text.split() window_size = 50 step = 25 chunks = [] for i in range(0, max(1, len(words) - window_size + 1), step): chunk = ' '.join(words[i:i+window_size]) if len(chunk) > 20: chunks.append(chunk) return chunks[:20] # Limit to 20 chunks def _calculate_aspect_confidence(self, text: str, aspect_scores: Dict[str, float]) -> float: """Calculate confidence based on text quality and score distribution""" if not aspect_scores: return 0.0 # Text length factor word_count = len(text.split()) length_factor = min(word_count / 150, 1.0) # Score variance factor (higher variance = more confident signal) scores = list(aspect_scores.values()) score_std = np.std(scores) variance_factor = min(score_std * 2, 1.0) # Max score factor max_score = max(scores) confidence = 0.4 * length_factor + 0.3 * variance_factor + 0.3 * max_score return float(np.clip(confidence, 0, 1)) def score(self, text_responses: Dict[str, str]) -> Tuple[float, float, Dict]: """ Main scoring function - backward compatible interface Returns: (score, confidence, features) """ text_q1 = text_responses.get('text_q1', '') text_q2 = text_responses.get('text_q2', '') text_q3 = text_responses.get('text_q3', '') # Score each question with relevant aspects q1_aspects = QUESTION_ASPECT_MAP['text_q1'] q2_aspects = QUESTION_ASPECT_MAP['text_q2'] q3_aspects = QUESTION_ASPECT_MAP['text_q3'] q1_centroids = {k: self.centroids[k] for k in q1_aspects if k in self.centroids} q2_centroids = {k: self.centroids[k] for k in q2_aspects if k in self.centroids} q3_centroids = {k: self.centroids[k] for k in q3_aspects if k in self.centroids} q1_scores, _, q1_conf = self.score_text_aspects(text_q1, q1_centroids) q2_scores, _, q2_conf = self.score_text_aspects(text_q2, q2_centroids) q3_scores, _, q3_conf = self.score_text_aspects(text_q3, q3_centroids) # Aggregate features features = {} # Technical skills from Q1 features['technical_skills'] = q1_scores.get('technical_skills', 0.3) features['problem_solving'] = q1_scores.get('problem_solving', 0.3) # Career alignment from Q2 features['career_alignment'] = q2_scores.get('career_alignment', 0.3) features['learning_agility'] = max( q1_scores.get('learning_agility', 0.3), q2_scores.get('learning_agility', 0.3) ) # Leadership from Q3 features['leadership_score'] = q3_scores.get('leadership', 0.3) features['teamwork'] = q3_scores.get('teamwork', 0.3) features['internships_experience'] = q3_scores.get('internships_experience', 0.3) # Communication (averaged across all) comm_scores = [ q1_scores.get('communication', 0.3), q2_scores.get('communication', 0.3), q3_scores.get('communication', 0.3) ] features['communication'] = np.mean(comm_scores) # Writing quality (heuristic) features['writing_quality'] = self._assess_writing_quality(text_q1) # Content depth features['content_depth'] = self._assess_content_depth(text_q1, text_q2, text_q3) # Calculate overall score (weighted combination) text_score = ( features['technical_skills'] * 0.15 + features['problem_solving'] * 0.10 + features['leadership_score'] * 0.20 + features['career_alignment'] * 0.10 + features['communication'] * 0.15 + features['teamwork'] * 0.10 + features['learning_agility'] * 0.10 + features['content_depth'] * 0.10 ) # Overall confidence confidence = np.mean([q1_conf, q2_conf, q3_conf]) return text_score, confidence, features def _assess_writing_quality(self, text: str) -> float: """Heuristic writing quality assessment""" if not text or len(text) < 50: return 0.2 score = 0.5 word_count = len(text.split()) if 150 <= word_count <= 300: score += 0.3 elif 100 <= word_count < 150 or 300 < word_count <= 400: score += 0.2 else: score += 0.1 import re sentences = re.split(r'[.!?]+', text) if len(sentences) >= 5: score += 0.1 if text[0].isupper(): score += 0.05 words = text.lower().split() unique_ratio = len(set(words)) / len(words) if words else 0 if unique_ratio > 0.6: score += 0.05 return min(score, 1.0) def _assess_content_depth(self, text_q1: str, text_q2: str, text_q3: str) -> float: """Assess content depth""" total_words = len(text_q1.split()) + len(text_q2.split()) + len(text_q3.split()) if total_words >= 450: return 1.0 elif total_words >= 300: return 0.8 elif total_words >= 200: return 0.6 elif total_words >= 100: return 0.4 else: return 0.2 def explain(self, features: Dict) -> Dict: """Generate explanations""" explanations = { 'highlights': [], 'suggestions': [] } if features.get('technical_skills', 0) > 0.7: explanations['highlights'].append("Strong technical skills demonstrated") if features.get('leadership_score', 0) > 0.7: explanations['highlights'].append("Clear leadership experience") if features.get('career_alignment', 0) > 0.7: explanations['highlights'].append("Well-defined career goals") if features.get('communication', 0) > 0.7: explanations['highlights'].append("Excellent communication skills") if features.get('writing_quality', 0) < 0.5: explanations['suggestions'].append("Provide more detailed responses (150-300 words each)") if features.get('leadership_score', 0) < 0.5: explanations['suggestions'].append("Highlight leadership roles with specific examples") if features.get('technical_skills', 0) < 0.5: explanations['suggestions'].append("Describe technical projects and skills in detail") return explanations # Admin functions def get_aspect_seeds(self) -> Dict[str, List[str]]: """Return current loaded seeds""" return self.aspect_seeds.copy() def update_aspect_seeds(self, new_seeds: Dict[str, List[str]], persist: bool = True) -> Dict: """ Update aspect seeds and recompute centroids Returns: stats dict """ # Validate if not isinstance(new_seeds, dict): raise ValueError("new_seeds must be a dict") for key, seeds in new_seeds.items(): if not isinstance(key, str): raise ValueError(f"Aspect key must be string, got {type(key)}") if not isinstance(seeds, list) or not seeds: raise ValueError(f"Seeds for '{key}' must be non-empty list") if not all(isinstance(s, str) for s in seeds): raise ValueError(f"All seeds for '{key}' must be strings") # Update seeds self.aspect_seeds = new_seeds.copy() # Recompute centroids logger.info("Recomputing centroids after seed update") self.centroids = self.build_prototypes(self.aspect_seeds, self.model) # Persist if persist: try: with open(self.seeds_path, 'w', encoding='utf-8') as f: json.dump(new_seeds, f, indent=2, ensure_ascii=False) logger.info(f"Persisted new seeds to {self.seeds_path}") except Exception as e: logger.error(f"Failed to persist seeds: {e}") # Stats stats = { "num_aspects": len(new_seeds), "avg_seed_count": np.mean([len(seeds) for seeds in new_seeds.values()]), "timestamp": datetime.utcnow().isoformat() + 'Z' } logger.info(f"Aspect seeds updated: {stats}") return stats def suggest_seed_expansions(self, corpus_texts: List[str], aspect_key: str, top_n: int = 20) -> List[str]: """ Suggest seed expansions from corpus Uses TF-IDF + cosine similarity for lightweight extraction """ if aspect_key not in self.centroids: return [] centroid = self.centroids[aspect_key] # Extract candidate phrases from corpus from collections import Counter import re candidates = [] for text in corpus_texts[:100]: # Limit corpus # Extract 2-5 word n-grams words = text.lower().split() for n in range(2, 6): for i in range(len(words) - n + 1): phrase = ' '.join(words[i:i+n]) if len(phrase) > 10 and not re.search(r'\d{3,}', phrase): candidates.append(phrase) # Count frequency phrase_counts = Counter(candidates) top_candidates = [phrase for phrase, _ in phrase_counts.most_common(200)] if not top_candidates: return [] # Encode and rank by similarity candidate_embeddings = self.model.encode(top_candidates, convert_to_tensor=False, show_progress_bar=False) candidate_embeddings = np.array(candidate_embeddings, dtype=np.float32) sims = np.dot(candidate_embeddings, centroid) / ( np.linalg.norm(candidate_embeddings, axis=1) * np.linalg.norm(centroid) + 1e-8 ) # Return top_n top_indices = np.argsort(sims)[-top_n:][::-1] suggestions = [top_candidates[i] for i in top_indices] return suggestions def get_relevant_aspects_for_question(question_id: str) -> List[str]: """Get relevant aspect keys for a question""" return QUESTION_ASPECT_MAP.get(question_id, []) # Flask admin blueprint def register_admin_seed_endpoint(app, text_module: TextModuleV2): """Register admin endpoints for seed management""" from flask import Blueprint, request, jsonify admin_bp = Blueprint('admin_aspects', __name__, url_prefix='/admin') def check_admin_token(): token = request.headers.get('X-Admin-Token') expected = os.getenv('ADMIN_SEED_TOKEN', 'admin-secret-token') if token != expected: return jsonify({'error': 'Unauthorized'}), 401 return None @admin_bp.route('/aspect-seeds', methods=['GET']) def get_seeds(): """Get current aspect seeds""" auth_err = check_admin_token() if auth_err: return auth_err seeds = text_module.get_aspect_seeds() return jsonify({ 'success': True, 'seeds': seeds, 'num_aspects': len(seeds) }) @admin_bp.route('/aspect-seeds', methods=['POST']) def update_seeds(): """Update aspect seeds""" auth_err = check_admin_token() if auth_err: return auth_err data = request.json new_seeds = data.get('seeds') persist = data.get('persist', True) if not new_seeds: return jsonify({'error': 'Missing seeds field'}), 400 try: stats = text_module.update_aspect_seeds(new_seeds, persist=persist) return jsonify({ 'success': True, 'message': 'Aspect seeds updated successfully', 'stats': stats }) except Exception as e: logger.error(f"Failed to update seeds: {e}") return jsonify({'error': str(e)}), 400 app.register_blueprint(admin_bp) logger.info("Registered admin aspect-seed endpoints at /admin/aspect-seeds")