| """ |
| Interview Engine β Core RAG + sentiment analysis logic. |
| Ported from the TalkingRAG FastAPI main.py into a Django-compatible singleton class. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import math |
| import re |
| from collections import OrderedDict |
| from functools import lru_cache |
| from typing import Optional |
| import numpy as np |
|
|
| from sentence_transformers import SentenceTransformer |
|
|
| from .phrases_data import setup_data |
|
|
| log = logging.getLogger("interview.engine") |
|
|
| import os |
|
|
| |
| |
| |
| QDRANT_HOST = os.getenv("QDRANT_HOST", "127.0.0.1") |
| QDRANT_PORT = int(os.getenv("QDRANT_PORT", 6333)) |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "") |
| COLLECTION_NAME = "entrevista_itca" |
| MODEL_NAME = "all-MiniLM-L6-v2" |
| SENTIMENT_MODEL = "nlptown/bert-base-multilingual-uncased-sentiment" |
|
|
| SCORE_THRESHOLD = 0.40 |
| CACHE_MAX_SIZE = 256 |
| SENTIMENT_CACHE_MAX_SIZE = 512 |
|
|
| |
| |
| |
| NEGATIVE_NON_ANSWER_PATTERNS = ( |
| "no se", "no sΓ©", "no lo se", "no lo sΓ©", "ns", "nose", "no tengo idea", |
| "no sabria", "no sabrΓa", "paso", "no quiero responder", "no entendi", |
| "no entendΓ", "no entiendo", "no responde", "sin comentarios", "n/a", |
| ) |
| NEGATIVE_SHORT_PATTERNS = ( |
| "no", "no realmente", "no, realmente no", "para nada", "nunca", |
| "claro que no", "no me gusta", "no creo", "no estoy seguro", |
| "no estoy segura", "tampoco", |
| ) + NEGATIVE_NON_ANSWER_PATTERNS |
|
|
| POSITIVE_SHORT_PATTERNS = ( |
| "si", "sΓ", "claro", "claro que si", "claro que sΓ", "por supuesto", |
| "definitivamente", "totalmente", "exacto", "obvio", "obviamente", |
| "siempre", "me encanta", "me gusta mucho", "sin duda", "asi es", "asΓ es", |
| ) |
|
|
|
|
| |
| def normalize_text(text: str) -> str: |
| return " ".join(text.lower().strip().split()) |
| def _numeric_id_key(phrase: dict) -> int: |
| match = re.search(r"(\d+)", phrase["id"]) |
| return int(match.group(1)) if match else 0 |
|
|
|
|
| def cosine_similarity(a: list[float], b: list[float]) -> float: |
| |
| return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) if len(a) and len(b) else 0.0 |
|
|
|
|
| def _heuristic_sentiment(normalized: str) -> Optional[str]: |
| word_count = len(normalized.split()) |
| if normalized in NEGATIVE_SHORT_PATTERNS: |
| return "negative" |
| if normalized in POSITIVE_SHORT_PATTERNS: |
| return "positive" |
| if word_count <= 5: |
| if any(p in normalized for p in NEGATIVE_SHORT_PATTERNS): |
| return "negative" |
| if any(p in normalized for p in POSITIVE_SHORT_PATTERNS): |
| return "positive" |
| return None |
|
|
|
|
| |
| |
| |
| class InterviewEngine: |
| _instance: Optional["InterviewEngine"] = None |
|
|
| def __init__(self): |
| log.info("Initializing InterviewEngine...") |
|
|
| |
| log.info("Loading embedding model '%s'...", MODEL_NAME) |
| self.model = SentenceTransformer(MODEL_NAME) |
|
|
| |
| log.info("Loading sentiment classifier '%s'...", SENTIMENT_MODEL) |
| from transformers import pipeline |
| self.sentiment_classifier = pipeline( |
| "sentiment-analysis", model=SENTIMENT_MODEL |
| ) |
|
|
| |
| all_phrases = setup_data() |
|
|
| self.greetings = [p for p in all_phrases if p["category"] == "greeting"] |
| self.questions = sorted( |
| [p for p in all_phrases if p["category"] == "questions"], |
| key=_numeric_id_key, |
| ) |
| self.feedback_all = [p for p in all_phrases if p["category"] == "feedback"] |
| self.feedback_positive = [ |
| p for p in self.feedback_all if p.get("sentiment") == "positive" |
| ] |
| self.feedback_negative = [ |
| p for p in self.feedback_all if p.get("sentiment") == "negative" |
| ] |
|
|
| self.default_feedback_by_sentiment = { |
| "positive": ( |
| self.feedback_positive[0] |
| if self.feedback_positive |
| else (self.feedback_all[0] if self.feedback_all else {}) |
| ), |
| "negative": ( |
| self.feedback_negative[0] |
| if self.feedback_negative |
| else (self.feedback_all[0] if self.feedback_all else {}) |
| ), |
| } |
|
|
| |
| self.feedback_encoded_pos = [] |
| self.feedback_encoded_neg = [] |
|
|
| if self.feedback_positive: |
| pos_vecs = self.model.encode( |
| [p["text"] for p in self.feedback_positive], |
| batch_size=64, |
| show_progress_bar=False, |
| ) |
| self.feedback_encoded_pos = list( |
| zip(self.feedback_positive, [v.tolist() for v in pos_vecs]) |
| ) |
|
|
| if self.feedback_negative: |
| neg_vecs = self.model.encode( |
| [p["text"] for p in self.feedback_negative], |
| batch_size=64, |
| show_progress_bar=False, |
| ) |
| self.feedback_encoded_neg = list( |
| zip(self.feedback_negative, [v.tolist() for v in neg_vecs]) |
| ) |
|
|
| |
| self.qdrant_client = None |
| self.qdrant_ok = False |
| try: |
| from qdrant_client import QdrantClient |
|
|
| log.info("Connecting to Qdrant at %s:%s...", QDRANT_HOST, QDRANT_PORT) |
| if QDRANT_API_KEY: |
| self.qdrant_client = QdrantClient(url=QDRANT_HOST, port=QDRANT_PORT, api_key=QDRANT_API_KEY) |
| else: |
| self.qdrant_client = QdrantClient(QDRANT_HOST, port=QDRANT_PORT) |
|
|
| collections = [ |
| c.name |
| for c in self.qdrant_client.get_collections().collections |
| ] |
| if COLLECTION_NAME not in collections: |
| raise ValueError(f"Collection '{COLLECTION_NAME}' not found") |
| |
| |
| try: |
| from qdrant_client.models import PayloadSchemaType |
| self.qdrant_client.create_payload_index( |
| collection_name=COLLECTION_NAME, |
| field_name="category", |
| field_schema=PayloadSchemaType.KEYWORD, |
| ) |
| self.qdrant_client.create_payload_index( |
| collection_name=COLLECTION_NAME, |
| field_name="sentiment", |
| field_schema=PayloadSchemaType.KEYWORD, |
| ) |
| except Exception as e: |
| log.debug("Payload indexes creation check: %s", e) |
|
|
| self.qdrant_ok = True |
| log.info("Qdrant connected β collection '%s' ready.", COLLECTION_NAME) |
| except Exception as exc: |
| self.qdrant_ok = False |
| log.warning( |
| "Qdrant NOT available. Using local search. Detail: %s", exc |
| ) |
|
|
| |
|
|
| log.info( |
| "InterviewEngine ready β %d questions, %d feedback phrases.", |
| len(self.questions), |
| len(self.feedback_all), |
| ) |
|
|
| @classmethod |
| def get_instance(cls) -> "InterviewEngine": |
| if cls._instance is None: |
| cls._instance = cls() |
| return cls._instance |
|
|
| |
| def get_greeting(self) -> dict: |
| greeting = self.greetings[0] if self.greetings else {} |
| |
| return greeting |
|
|
| def get_question(self, index: int) -> Optional[dict]: |
| if 0 <= index < len(self.questions): |
| q = dict(self.questions[index]) |
| phrase_id = q.get("id", "") |
| |
| |
| actual_filename = phrase_id.replace("preguntas_", "", 1) |
| q["video_url"] = f"videos/preguntas/{actual_filename}.mp4" |
| return q |
| return None |
|
|
| def get_total_questions(self) -> int: |
| return len(self.questions) |
|
|
| |
| @lru_cache(maxsize=SENTIMENT_CACHE_MAX_SIZE) |
| def classify_sentiment(self, text: str) -> tuple[str, str]: |
| normalized = normalize_text(text) |
|
|
| |
| heuristic = _heuristic_sentiment(normalized) |
| if heuristic: |
| return heuristic, "heuristic" |
|
|
| |
| result = self.sentiment_classifier(text, truncation=True)[0] |
| score = int(result["label"].split(" ")[0]) |
| mapped = "negative" if score <= 2 else "positive" |
| return mapped, f"llm:{result['label']}:{result['score']:.2f}" |
|
|
| |
| def _local_best_feedback( |
| self, query_vector: list[float], filter_sentiment: str |
| ) -> dict: |
| candidates = ( |
| self.feedback_encoded_pos |
| if filter_sentiment == "positive" |
| else self.feedback_encoded_neg |
| ) |
| best_score, best_payload = -1.0, None |
|
|
| for payload, vec in candidates: |
| score = cosine_similarity(query_vector, vec) |
| if score > best_score: |
| best_score, best_payload = score, payload |
|
|
| if not best_payload: |
| best_payload = self.default_feedback_by_sentiment.get( |
| filter_sentiment, |
| self.feedback_all[0] if self.feedback_all else {}, |
| ) |
| return best_payload |
|
|
| |
| @lru_cache(maxsize=CACHE_MAX_SIZE) |
| def query_feedback(self, text: str, last_feedback_id: str = None) -> dict: |
| """ |
| Given user answer text, classify sentiment, search Qdrant (or local), |
| and return the best feedback payload with video_url. |
| """ |
|
|
| query_vector = self.model.encode(text).tolist() |
| mapped_sentiment, reason = self.classify_sentiment(text) |
|
|
| payload = None |
|
|
| |
| if self.qdrant_ok: |
| try: |
| from qdrant_client.models import ( |
| Filter, |
| FieldCondition, |
| MatchValue, |
| ) |
|
|
| response = self.qdrant_client.query_points( |
| collection_name=COLLECTION_NAME, |
| query=query_vector, |
| query_filter=Filter( |
| must=[ |
| FieldCondition( |
| key="category", |
| match=MatchValue(value="feedback"), |
| ), |
| FieldCondition( |
| key="sentiment", |
| match=MatchValue(value=mapped_sentiment), |
| ), |
| ] |
| ), |
| limit=3, |
| ) |
| results = response.points |
| for res in results: |
| if res.score >= SCORE_THRESHOLD: |
| if res.payload.get("id") != last_feedback_id: |
| payload = res.payload |
| break |
| |
| if payload is None and results and results[0].score >= SCORE_THRESHOLD: |
| payload = results[0].payload |
|
|
| except Exception as exc: |
| log.warning("Qdrant search error: %s", exc) |
|
|
| |
| if payload is None: |
| payload = self._local_best_feedback( |
| query_vector, filter_sentiment=mapped_sentiment |
| ) |
|
|
| phrase_id = payload.get("id", "") |
| |
| |
| if phrase_id.startswith("positivas_"): |
| actual_filename = phrase_id.replace("positivas_", "", 1) |
| video_url = f"videos/positivas/{actual_filename}.mp4" |
| elif phrase_id.startswith("negativos_"): |
| actual_filename = phrase_id.replace("negativos_", "", 1) |
| video_url = f"videos/negativos/{actual_filename}.mp4" |
| elif phrase_id.startswith("preguntas_"): |
| actual_filename = phrase_id.replace("preguntas_", "", 1) |
| video_url = f"videos/preguntas/{actual_filename}.mp4" |
| else: |
| video_url = f"videos/{phrase_id}.mp4" |
|
|
| result = { |
| "id": phrase_id, |
| "text": payload.get("text"), |
| "video_url": video_url, |
| } |
| return result |
|
|
| |
| |
| engine = InterviewEngine.get_instance() |
|
|