FCT / services /text_module_v2.py
Parthnuwal7
Adding analytical content
3d015cd
"""Text Embeddings Module V2 - Aspect-based Prototype Extraction"""
import os
import json
import logging
import numpy as np
from datetime import datetime
from typing import Dict, Tuple, List, Optional
from sentence_transformers import SentenceTransformer
logger = logging.getLogger(__name__)
# Default aspect seeds (built-in fallback)
DEFAULT_ASPECT_SEEDS = {
"leadership": [
"led a team", "was team lead", "managed a project", "supervised interns",
"coordinated a cross-functional team", "organized the club", "president of the society",
"captain of the team", "ran weekly standups", "delegated tasks", "mentored junior members",
"headed the project", "oversaw project timelines", "chaired the committee",
"led end-to-end delivery", "directed project milestones", "led a 5-person team",
"managed stakeholders", "took ownership of the initiative", "led code reviews",
"organized campus events", "led product demo sessions", "led recruitment for volunteers",
"managed vendor relationships", "spearheaded the outreach program"
],
"technical_skills": [
"developed a web API", "implemented RESTful services", "coded in python",
"built machine learning models", "trained neural networks", "implemented data pipelines",
"used pandas for ETL", "designed database schemas", "built microservices",
"deployed models using docker", "worked with FastAPI", "implemented CI/CD",
"wrote unit tests", "optimized SQL queries", "used scikit-learn",
"developed recommendation systems", "built feature engineering pipelines",
"deployed to cloud", "developed ETL jobs", "worked with Kafka",
"implemented caching layers", "used TensorFlow or PyTorch", "built backend services",
"wrote production-grade code", "integrated third-party APIs"
],
"problem_solving": [
"solved complex problem", "debugged production issues", "optimized an algorithm",
"reduced latency of service", "designed a scalable solution", "investigated root cause",
"improved system reliability", "created a novel solution", "troubleshot integration issues",
"automated manual tasks", "reduced memory usage", "resolved data pipeline failures",
"refactored critical code", "handled edge cases", "iterated on prototypes",
"performed A/B testing to decide", "diagnosed performance bottlenecks",
"designed fallback strategies", "resolved deployment failures", "created monitoring & alerts"
],
"internships_experience": [
"summer internship", "industrial training", "interned at", "worked as an intern",
"internship project", "internship in data science", "interned at a startup",
"completed internship at", "interned with the engineering team", "intern experience",
"interned at an e-commerce company", "industrial internship", "co-op placement",
"paid internship", "research internship", "interned as a software engineer",
"on-the-job training", "worked under mentor", "internship-driven project",
"corporate internship"
],
"communication": [
"presented to stakeholders", "gave a presentation", "wrote documentation",
"authored reports", "explained results to non-technical", "public speaking",
"delivered demo", "prepared slides", "wrote user guides", "communicated with clients",
"collaborated across teams", "conducted knowledge transfer", "wrote clear emails",
"explained technical concepts", "presented project outcomes", "led demo sessions",
"created onboarding docs", "contributed to team discussions", "led workshops",
"hosted training sessions"
],
"teamwork": [
"collaborated with team", "worked in a cross-functional team", "paired programming",
"contributed to group project", "supported teammates", "collaborated on design",
"worked with designers and PMs", "helped teammates debug", "co-authored project",
"mentored peers", "shared responsibilities", "worked effectively in group",
"contributed in agile team", "participated in sprints", "assisted in integration"
],
"project_execution": [
"delivered project on time", "met project deadlines", "managed milestones",
"handled project planning", "released production features", "coordinated deployment",
"delivered MVP", "tracked KPIs", "managed scope", "created project timeline",
"ran retrospectives", "managed feature rollout", "ensured on-time delivery",
"performed release validations", "deployed analytics dashboard", "iterated based on feedback"
],
"initiative": [
"initiated a project", "proposed a new idea", "took initiative", "started a side project",
"built a proof of concept", "started a campus chapter", "created an automation",
"improved an existing process", "volunteered to lead", "identified improvement areas",
"launched a mini-product", "ran a pilot program", "created onboarding scripts",
"led process improvements", "started a mentoring circle"
],
"learning_agility": [
"quick learner", "self-taught", "learned new framework", "picked up new language",
"adapted to new tech", "completed online courses", "upskilled via projects",
"transitioned domains", "learned on the job", "rapidly onboarded", "attended workshops",
"completed bootcamp", "took certification courses", "learned through documentation",
"scaled knowledge quickly", "adapted to changing scope"
],
"career_alignment": [
"career goal is", "aspire to become", "interested in data science",
"pursue a role in product", "long-term goal", "want to specialize in",
"career objective", "planning to pursue masters", "aim to work in industry",
"seek product management roles", "interested in research", "want to join a startup",
"targeting roles in ML engineering", "aiming for consulting roles",
"career path is focused on"
]
}
# Question to aspects mapping
QUESTION_ASPECT_MAP = {
"text_q1": ["technical_skills", "problem_solving", "learning_agility", "initiative", "communication"],
"text_q2": ["career_alignment", "learning_agility", "initiative", "communication"],
"text_q3": ["leadership", "teamwork", "project_execution", "internships_experience", "communication"]
}
class TextModuleV2:
"""Enhanced text scoring using aspect-based prototypes with all-mpnet-base-v2"""
def __init__(self, model_name: str = None, seeds_path: str = "./aspect_seeds.json",
centroids_path: str = "./aspect_centroids.npz"):
# Config: allow model override via env or param
self.model_name = model_name or os.getenv('ASPECT_MODEL_NAME', 'all-mpnet-base-v2')
self.seeds_path = seeds_path
self.centroids_path = centroids_path
# Load model
logger.info(f"Loading sentence transformer model: {self.model_name}")
self.model = SentenceTransformer(self.model_name, device='cpu')
# Load seeds
self.aspect_seeds = self._load_seeds()
# Load or build centroids
self.centroids = self._load_or_build_centroids()
logger.info(f"TextModuleV2 initialized with {len(self.aspect_seeds)} aspects")
def _load_seeds(self) -> Dict[str, List[str]]:
"""Load aspect seeds from JSON or use defaults"""
if os.path.exists(self.seeds_path):
try:
with open(self.seeds_path, 'r', encoding='utf-8') as f:
seeds = json.load(f)
logger.info(f"Loaded aspect seeds from {self.seeds_path}")
return seeds
except Exception as e:
logger.warning(f"Failed to load seeds from {self.seeds_path}: {e}. Using defaults.")
return DEFAULT_ASPECT_SEEDS.copy()
def _load_or_build_centroids(self) -> Dict[str, np.ndarray]:
"""Load cached centroids or build from seeds"""
if os.path.exists(self.centroids_path):
try:
data = np.load(self.centroids_path)
centroids = {key: data[key] for key in data.files}
logger.info(f"Loaded centroids from {self.centroids_path}")
return centroids
except Exception as e:
logger.warning(f"Failed to load centroids: {e}. Rebuilding.")
return self.build_prototypes(self.aspect_seeds, self.model)
def build_prototypes(self, aspect_seeds: Dict[str, List[str]],
model: SentenceTransformer) -> Dict[str, np.ndarray]:
"""Build centroid prototypes from seed phrases"""
logger.info("Building aspect centroids...")
centroids = {}
for aspect, seeds in aspect_seeds.items():
if not seeds:
logger.warning(f"Aspect '{aspect}' has no seeds, skipping")
continue
# Encode seeds (CPU, convert_to_tensor=False)
embeddings = model.encode(seeds, convert_to_tensor=False, show_progress_bar=False)
embeddings = np.array(embeddings, dtype=np.float32)
# Compute centroid
centroid = np.mean(embeddings, axis=0)
centroid = centroid / np.linalg.norm(centroid) # Normalize
centroids[aspect] = centroid
# Save centroids
try:
np.savez(self.centroids_path, **centroids)
logger.info(f"Saved centroids to {self.centroids_path}")
except Exception as e:
logger.error(f"Failed to save centroids: {e}")
return centroids
def score_text_aspects(self, text: str, centroids: Dict[str, np.ndarray],
top_k: int = 3) -> Tuple[Dict[str, float], Dict[str, List[str]], float]:
"""
Score text against aspect centroids
Returns: (aspect_scores, chunk_assignments, confidence)
"""
if not text or len(text) < 20:
return {}, {}, 0.0
# Split into chunks (sentences or 50-word windows)
chunks = self._split_text(text)
if not chunks:
return {}, {}, 0.0
# Encode chunks
chunk_embeddings = self.model.encode(chunks, convert_to_tensor=False, show_progress_bar=False)
chunk_embeddings = np.array(chunk_embeddings, dtype=np.float32)
# Score each aspect
aspect_scores = {}
chunk_assignments = {aspect: [] for aspect in centroids.keys()}
for aspect, centroid in centroids.items():
# Compute cosine similarities
sims = np.dot(chunk_embeddings, centroid) / (
np.linalg.norm(chunk_embeddings, axis=1) * np.linalg.norm(centroid) + 1e-8
)
# Scoring formula: 0.6 * max_sim + 0.4 * mean_topk
max_sim = np.max(sims)
topk_sims = np.partition(sims, -min(top_k, len(sims)))[-top_k:]
mean_topk = np.mean(topk_sims)
# Map from [-1,1] to [0,1]
raw_score = 0.6 * max_sim + 0.4 * mean_topk
normalized_score = (raw_score + 1) / 2
aspect_scores[aspect] = float(np.clip(normalized_score, 0, 1))
# Assign chunks with sim > threshold
threshold = 0.3
for i, sim in enumerate(sims):
if sim > threshold:
chunk_assignments[aspect].append(chunks[i])
# Calculate confidence
confidence = self._calculate_aspect_confidence(text, aspect_scores)
return aspect_scores, chunk_assignments, confidence
def _split_text(self, text: str) -> List[str]:
"""Split text into scorable chunks"""
import re
# Split by sentences
sentences = re.split(r'[.!?]+', text)
chunks = [s.strip() for s in sentences if len(s.strip()) > 20]
# If too few sentences, use sliding window
if len(chunks) < 3:
words = text.split()
window_size = 50
step = 25
chunks = []
for i in range(0, max(1, len(words) - window_size + 1), step):
chunk = ' '.join(words[i:i+window_size])
if len(chunk) > 20:
chunks.append(chunk)
return chunks[:20] # Limit to 20 chunks
def _calculate_aspect_confidence(self, text: str, aspect_scores: Dict[str, float]) -> float:
"""Calculate confidence based on text quality and score distribution"""
if not aspect_scores:
return 0.0
# Text length factor
word_count = len(text.split())
length_factor = min(word_count / 150, 1.0)
# Score variance factor (higher variance = more confident signal)
scores = list(aspect_scores.values())
score_std = np.std(scores)
variance_factor = min(score_std * 2, 1.0)
# Max score factor
max_score = max(scores)
confidence = 0.4 * length_factor + 0.3 * variance_factor + 0.3 * max_score
return float(np.clip(confidence, 0, 1))
def score(self, text_responses: Dict[str, str]) -> Tuple[float, float, Dict]:
"""
Main scoring function - backward compatible interface
Returns: (score, confidence, features)
"""
text_q1 = text_responses.get('text_q1', '')
text_q2 = text_responses.get('text_q2', '')
text_q3 = text_responses.get('text_q3', '')
# Score each question with relevant aspects
q1_aspects = QUESTION_ASPECT_MAP['text_q1']
q2_aspects = QUESTION_ASPECT_MAP['text_q2']
q3_aspects = QUESTION_ASPECT_MAP['text_q3']
q1_centroids = {k: self.centroids[k] for k in q1_aspects if k in self.centroids}
q2_centroids = {k: self.centroids[k] for k in q2_aspects if k in self.centroids}
q3_centroids = {k: self.centroids[k] for k in q3_aspects if k in self.centroids}
q1_scores, _, q1_conf = self.score_text_aspects(text_q1, q1_centroids)
q2_scores, _, q2_conf = self.score_text_aspects(text_q2, q2_centroids)
q3_scores, _, q3_conf = self.score_text_aspects(text_q3, q3_centroids)
# Aggregate features
features = {}
# Technical skills from Q1
features['technical_skills'] = q1_scores.get('technical_skills', 0.3)
features['problem_solving'] = q1_scores.get('problem_solving', 0.3)
# Career alignment from Q2
features['career_alignment'] = q2_scores.get('career_alignment', 0.3)
features['learning_agility'] = max(
q1_scores.get('learning_agility', 0.3),
q2_scores.get('learning_agility', 0.3)
)
# Leadership from Q3
features['leadership_score'] = q3_scores.get('leadership', 0.3)
features['teamwork'] = q3_scores.get('teamwork', 0.3)
features['internships_experience'] = q3_scores.get('internships_experience', 0.3)
# Communication (averaged across all)
comm_scores = [
q1_scores.get('communication', 0.3),
q2_scores.get('communication', 0.3),
q3_scores.get('communication', 0.3)
]
features['communication'] = np.mean(comm_scores)
# Writing quality (heuristic)
features['writing_quality'] = self._assess_writing_quality(text_q1)
# Content depth
features['content_depth'] = self._assess_content_depth(text_q1, text_q2, text_q3)
# Calculate overall score (weighted combination)
text_score = (
features['technical_skills'] * 0.15 +
features['problem_solving'] * 0.10 +
features['leadership_score'] * 0.20 +
features['career_alignment'] * 0.10 +
features['communication'] * 0.15 +
features['teamwork'] * 0.10 +
features['learning_agility'] * 0.10 +
features['content_depth'] * 0.10
)
# Overall confidence
confidence = np.mean([q1_conf, q2_conf, q3_conf])
return text_score, confidence, features
def _assess_writing_quality(self, text: str) -> float:
"""Heuristic writing quality assessment"""
if not text or len(text) < 50:
return 0.2
score = 0.5
word_count = len(text.split())
if 150 <= word_count <= 300:
score += 0.3
elif 100 <= word_count < 150 or 300 < word_count <= 400:
score += 0.2
else:
score += 0.1
import re
sentences = re.split(r'[.!?]+', text)
if len(sentences) >= 5:
score += 0.1
if text[0].isupper():
score += 0.05
words = text.lower().split()
unique_ratio = len(set(words)) / len(words) if words else 0
if unique_ratio > 0.6:
score += 0.05
return min(score, 1.0)
def _assess_content_depth(self, text_q1: str, text_q2: str, text_q3: str) -> float:
"""Assess content depth"""
total_words = len(text_q1.split()) + len(text_q2.split()) + len(text_q3.split())
if total_words >= 450:
return 1.0
elif total_words >= 300:
return 0.8
elif total_words >= 200:
return 0.6
elif total_words >= 100:
return 0.4
else:
return 0.2
def explain(self, features: Dict) -> Dict:
"""Generate explanations"""
explanations = {
'highlights': [],
'suggestions': []
}
if features.get('technical_skills', 0) > 0.7:
explanations['highlights'].append("Strong technical skills demonstrated")
if features.get('leadership_score', 0) > 0.7:
explanations['highlights'].append("Clear leadership experience")
if features.get('career_alignment', 0) > 0.7:
explanations['highlights'].append("Well-defined career goals")
if features.get('communication', 0) > 0.7:
explanations['highlights'].append("Excellent communication skills")
if features.get('writing_quality', 0) < 0.5:
explanations['suggestions'].append("Provide more detailed responses (150-300 words each)")
if features.get('leadership_score', 0) < 0.5:
explanations['suggestions'].append("Highlight leadership roles with specific examples")
if features.get('technical_skills', 0) < 0.5:
explanations['suggestions'].append("Describe technical projects and skills in detail")
return explanations
# Admin functions
def get_aspect_seeds(self) -> Dict[str, List[str]]:
"""Return current loaded seeds"""
return self.aspect_seeds.copy()
def update_aspect_seeds(self, new_seeds: Dict[str, List[str]],
persist: bool = True) -> Dict:
"""
Update aspect seeds and recompute centroids
Returns: stats dict
"""
# Validate
if not isinstance(new_seeds, dict):
raise ValueError("new_seeds must be a dict")
for key, seeds in new_seeds.items():
if not isinstance(key, str):
raise ValueError(f"Aspect key must be string, got {type(key)}")
if not isinstance(seeds, list) or not seeds:
raise ValueError(f"Seeds for '{key}' must be non-empty list")
if not all(isinstance(s, str) for s in seeds):
raise ValueError(f"All seeds for '{key}' must be strings")
# Update seeds
self.aspect_seeds = new_seeds.copy()
# Recompute centroids
logger.info("Recomputing centroids after seed update")
self.centroids = self.build_prototypes(self.aspect_seeds, self.model)
# Persist
if persist:
try:
with open(self.seeds_path, 'w', encoding='utf-8') as f:
json.dump(new_seeds, f, indent=2, ensure_ascii=False)
logger.info(f"Persisted new seeds to {self.seeds_path}")
except Exception as e:
logger.error(f"Failed to persist seeds: {e}")
# Stats
stats = {
"num_aspects": len(new_seeds),
"avg_seed_count": np.mean([len(seeds) for seeds in new_seeds.values()]),
"timestamp": datetime.utcnow().isoformat() + 'Z'
}
logger.info(f"Aspect seeds updated: {stats}")
return stats
def suggest_seed_expansions(self, corpus_texts: List[str], aspect_key: str,
top_n: int = 20) -> List[str]:
"""
Suggest seed expansions from corpus
Uses TF-IDF + cosine similarity for lightweight extraction
"""
if aspect_key not in self.centroids:
return []
centroid = self.centroids[aspect_key]
# Extract candidate phrases from corpus
from collections import Counter
import re
candidates = []
for text in corpus_texts[:100]: # Limit corpus
# Extract 2-5 word n-grams
words = text.lower().split()
for n in range(2, 6):
for i in range(len(words) - n + 1):
phrase = ' '.join(words[i:i+n])
if len(phrase) > 10 and not re.search(r'\d{3,}', phrase):
candidates.append(phrase)
# Count frequency
phrase_counts = Counter(candidates)
top_candidates = [phrase for phrase, _ in phrase_counts.most_common(200)]
if not top_candidates:
return []
# Encode and rank by similarity
candidate_embeddings = self.model.encode(top_candidates, convert_to_tensor=False,
show_progress_bar=False)
candidate_embeddings = np.array(candidate_embeddings, dtype=np.float32)
sims = np.dot(candidate_embeddings, centroid) / (
np.linalg.norm(candidate_embeddings, axis=1) * np.linalg.norm(centroid) + 1e-8
)
# Return top_n
top_indices = np.argsort(sims)[-top_n:][::-1]
suggestions = [top_candidates[i] for i in top_indices]
return suggestions
def get_relevant_aspects_for_question(question_id: str) -> List[str]:
"""Get relevant aspect keys for a question"""
return QUESTION_ASPECT_MAP.get(question_id, [])
# Flask admin blueprint
def register_admin_seed_endpoint(app, text_module: TextModuleV2):
"""Register admin endpoints for seed management"""
from flask import Blueprint, request, jsonify
admin_bp = Blueprint('admin_aspects', __name__, url_prefix='/admin')
def check_admin_token():
token = request.headers.get('X-Admin-Token')
expected = os.getenv('ADMIN_SEED_TOKEN', 'admin-secret-token')
if token != expected:
return jsonify({'error': 'Unauthorized'}), 401
return None
@admin_bp.route('/aspect-seeds', methods=['GET'])
def get_seeds():
"""Get current aspect seeds"""
auth_err = check_admin_token()
if auth_err:
return auth_err
seeds = text_module.get_aspect_seeds()
return jsonify({
'success': True,
'seeds': seeds,
'num_aspects': len(seeds)
})
@admin_bp.route('/aspect-seeds', methods=['POST'])
def update_seeds():
"""Update aspect seeds"""
auth_err = check_admin_token()
if auth_err:
return auth_err
data = request.json
new_seeds = data.get('seeds')
persist = data.get('persist', True)
if not new_seeds:
return jsonify({'error': 'Missing seeds field'}), 400
try:
stats = text_module.update_aspect_seeds(new_seeds, persist=persist)
return jsonify({
'success': True,
'message': 'Aspect seeds updated successfully',
'stats': stats
})
except Exception as e:
logger.error(f"Failed to update seeds: {e}")
return jsonify({'error': str(e)}), 400
app.register_blueprint(admin_bp)
logger.info("Registered admin aspect-seed endpoints at /admin/aspect-seeds")