testing / interview /engine.py
Danielsz's picture
deploy
16e8be2
Raw
History Blame Contribute Delete
14.4 kB
"""
Interview Engine β€” Core RAG + sentiment analysis logic.
Ported from the TalkingRAG FastAPI main.py into a Django-compatible singleton class.
"""
from __future__ import annotations
import logging
import math
import re
from collections import OrderedDict
from functools import lru_cache
from typing import Optional
import numpy as np
from sentence_transformers import SentenceTransformer
from .phrases_data import setup_data
log = logging.getLogger("interview.engine")
import os
# ─────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────
QDRANT_HOST = os.getenv("QDRANT_HOST", "127.0.0.1")
QDRANT_PORT = int(os.getenv("QDRANT_PORT", 6333))
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "")
COLLECTION_NAME = "entrevista_itca"
MODEL_NAME = "all-MiniLM-L6-v2"
SENTIMENT_MODEL = "nlptown/bert-base-multilingual-uncased-sentiment"
SCORE_THRESHOLD = 0.40
CACHE_MAX_SIZE = 256
SENTIMENT_CACHE_MAX_SIZE = 512
# ─────────────────────────────────────────────────────────────
# HEURISTIC PATTERNS
# ─────────────────────────────────────────────────────────────
NEGATIVE_NON_ANSWER_PATTERNS = (
"no se", "no sΓ©", "no lo se", "no lo sΓ©", "ns", "nose", "no tengo idea",
"no sabria", "no sabrΓ­a", "paso", "no quiero responder", "no entendi",
"no entendΓ­", "no entiendo", "no responde", "sin comentarios", "n/a",
)
NEGATIVE_SHORT_PATTERNS = (
"no", "no realmente", "no, realmente no", "para nada", "nunca",
"claro que no", "no me gusta", "no creo", "no estoy seguro",
"no estoy segura", "tampoco",
) + NEGATIVE_NON_ANSWER_PATTERNS
POSITIVE_SHORT_PATTERNS = (
"si", "sΓ­", "claro", "claro que si", "claro que sΓ­", "por supuesto",
"definitivamente", "totalmente", "exacto", "obvio", "obviamente",
"siempre", "me encanta", "me gusta mucho", "sin duda", "asi es", "asΓ­ es",
)
# ponytail: eliminado LRUCache gigante. Usaremos @lru_cache de functools.
def normalize_text(text: str) -> str:
return " ".join(text.lower().strip().split())
def _numeric_id_key(phrase: dict) -> int:
match = re.search(r"(\d+)", phrase["id"])
return int(match.group(1)) if match else 0
def cosine_similarity(a: list[float], b: list[float]) -> float:
# ponytail: use numpy dot product for cosine similarity
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) if len(a) and len(b) else 0.0
def _heuristic_sentiment(normalized: str) -> Optional[str]:
word_count = len(normalized.split())
if normalized in NEGATIVE_SHORT_PATTERNS:
return "negative"
if normalized in POSITIVE_SHORT_PATTERNS:
return "positive"
if word_count <= 5:
if any(p in normalized for p in NEGATIVE_SHORT_PATTERNS):
return "negative"
if any(p in normalized for p in POSITIVE_SHORT_PATTERNS):
return "positive"
return None
# ─────────────────────────────────────────────────────────────
# INTERVIEW ENGINE (Singleton)
# ─────────────────────────────────────────────────────────────
class InterviewEngine:
_instance: Optional["InterviewEngine"] = None
def __init__(self):
log.info("Initializing InterviewEngine...")
# Load SentenceTransformer model
log.info("Loading embedding model '%s'...", MODEL_NAME)
self.model = SentenceTransformer(MODEL_NAME)
# Load sentiment classifier
log.info("Loading sentiment classifier '%s'...", SENTIMENT_MODEL)
from transformers import pipeline
self.sentiment_classifier = pipeline(
"sentiment-analysis", model=SENTIMENT_MODEL
)
# Load phrases from setup_data
all_phrases = setup_data()
self.greetings = [p for p in all_phrases if p["category"] == "greeting"]
self.questions = sorted(
[p for p in all_phrases if p["category"] == "questions"],
key=_numeric_id_key,
)
self.feedback_all = [p for p in all_phrases if p["category"] == "feedback"]
self.feedback_positive = [
p for p in self.feedback_all if p.get("sentiment") == "positive"
]
self.feedback_negative = [
p for p in self.feedback_all if p.get("sentiment") == "negative"
]
self.default_feedback_by_sentiment = {
"positive": (
self.feedback_positive[0]
if self.feedback_positive
else (self.feedback_all[0] if self.feedback_all else {})
),
"negative": (
self.feedback_negative[0]
if self.feedback_negative
else (self.feedback_all[0] if self.feedback_all else {})
),
}
# Pre-encode feedback vectors
self.feedback_encoded_pos = []
self.feedback_encoded_neg = []
if self.feedback_positive:
pos_vecs = self.model.encode(
[p["text"] for p in self.feedback_positive],
batch_size=64,
show_progress_bar=False,
)
self.feedback_encoded_pos = list(
zip(self.feedback_positive, [v.tolist() for v in pos_vecs])
)
if self.feedback_negative:
neg_vecs = self.model.encode(
[p["text"] for p in self.feedback_negative],
batch_size=64,
show_progress_bar=False,
)
self.feedback_encoded_neg = list(
zip(self.feedback_negative, [v.tolist() for v in neg_vecs])
)
# Qdrant connection
self.qdrant_client = None
self.qdrant_ok = False
try:
from qdrant_client import QdrantClient
log.info("Connecting to Qdrant at %s:%s...", QDRANT_HOST, QDRANT_PORT)
if QDRANT_API_KEY:
self.qdrant_client = QdrantClient(url=QDRANT_HOST, port=QDRANT_PORT, api_key=QDRANT_API_KEY)
else:
self.qdrant_client = QdrantClient(QDRANT_HOST, port=QDRANT_PORT)
collections = [
c.name
for c in self.qdrant_client.get_collections().collections
]
if COLLECTION_NAME not in collections:
raise ValueError(f"Collection '{COLLECTION_NAME}' not found")
# ponytail: ensure payload keyword indexes exist for category/sentiment
try:
from qdrant_client.models import PayloadSchemaType
self.qdrant_client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="category",
field_schema=PayloadSchemaType.KEYWORD,
)
self.qdrant_client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="sentiment",
field_schema=PayloadSchemaType.KEYWORD,
)
except Exception as e:
log.debug("Payload indexes creation check: %s", e)
self.qdrant_ok = True
log.info("Qdrant connected β€” collection '%s' ready.", COLLECTION_NAME)
except Exception as exc:
self.qdrant_ok = False
log.warning(
"Qdrant NOT available. Using local search. Detail: %s", exc
)
# Caches eliminated in __init__, using function decorators instead.
log.info(
"InterviewEngine ready β€” %d questions, %d feedback phrases.",
len(self.questions),
len(self.feedback_all),
)
@classmethod
def get_instance(cls) -> "InterviewEngine":
if cls._instance is None:
cls._instance = cls()
return cls._instance
# ── Data accessors ──────────────────────────────────────
def get_greeting(self) -> dict:
greeting = self.greetings[0] if self.greetings else {}
# Greeting has no video, just return text
return greeting
def get_question(self, index: int) -> Optional[dict]:
if 0 <= index < len(self.questions):
q = dict(self.questions[index])
phrase_id = q.get("id", "")
# Strip category prefix to get actual filename
# e.g. "preguntas_final_audio_0" -> "final_audio_0"
actual_filename = phrase_id.replace("preguntas_", "", 1)
q["video_url"] = f"videos/preguntas/{actual_filename}.mp4"
return q
return None
def get_total_questions(self) -> int:
return len(self.questions)
# ── Sentiment classification ────────────────────────────
@lru_cache(maxsize=SENTIMENT_CACHE_MAX_SIZE)
def classify_sentiment(self, text: str) -> tuple[str, str]:
normalized = normalize_text(text)
# Heuristic first
heuristic = _heuristic_sentiment(normalized)
if heuristic:
return heuristic, "heuristic"
# Model fallback
result = self.sentiment_classifier(text, truncation=True)[0]
score = int(result["label"].split(" ")[0])
mapped = "negative" if score <= 2 else "positive"
return mapped, f"llm:{result['label']}:{result['score']:.2f}"
# ── Local fallback search ───────────────────────────────
def _local_best_feedback(
self, query_vector: list[float], filter_sentiment: str
) -> dict:
candidates = (
self.feedback_encoded_pos
if filter_sentiment == "positive"
else self.feedback_encoded_neg
)
best_score, best_payload = -1.0, None
for payload, vec in candidates:
score = cosine_similarity(query_vector, vec)
if score > best_score:
best_score, best_payload = score, payload
if not best_payload:
best_payload = self.default_feedback_by_sentiment.get(
filter_sentiment,
self.feedback_all[0] if self.feedback_all else {},
)
return best_payload
# ── RAG query ───────────────────────────────────────────
@lru_cache(maxsize=CACHE_MAX_SIZE)
def query_feedback(self, text: str, last_feedback_id: str = None) -> dict:
"""
Given user answer text, classify sentiment, search Qdrant (or local),
and return the best feedback payload with video_url.
"""
query_vector = self.model.encode(text).tolist()
mapped_sentiment, reason = self.classify_sentiment(text)
payload = None
# Try Qdrant first
if self.qdrant_ok:
try:
from qdrant_client.models import (
Filter,
FieldCondition,
MatchValue,
)
response = self.qdrant_client.query_points(
collection_name=COLLECTION_NAME,
query=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="feedback"),
),
FieldCondition(
key="sentiment",
match=MatchValue(value=mapped_sentiment),
),
]
),
limit=3,
)
results = response.points
for res in results:
if res.score >= SCORE_THRESHOLD:
if res.payload.get("id") != last_feedback_id:
payload = res.payload
break
# Fallback if the only good result was the one we just played
if payload is None and results and results[0].score >= SCORE_THRESHOLD:
payload = results[0].payload
except Exception as exc:
log.warning("Qdrant search error: %s", exc)
# Local fallback
if payload is None:
payload = self._local_best_feedback(
query_vector, filter_sentiment=mapped_sentiment
)
phrase_id = payload.get("id", "")
# Build video_url based on category prefix
# Strip prefix to get actual filename (e.g. "positivas_final_audio_5" -> "final_audio_5")
if phrase_id.startswith("positivas_"):
actual_filename = phrase_id.replace("positivas_", "", 1)
video_url = f"videos/positivas/{actual_filename}.mp4"
elif phrase_id.startswith("negativos_"):
actual_filename = phrase_id.replace("negativos_", "", 1)
video_url = f"videos/negativos/{actual_filename}.mp4"
elif phrase_id.startswith("preguntas_"):
actual_filename = phrase_id.replace("preguntas_", "", 1)
video_url = f"videos/preguntas/{actual_filename}.mp4"
else:
video_url = f"videos/{phrase_id}.mp4"
result = {
"id": phrase_id,
"text": payload.get("text"),
"video_url": video_url,
}
return result
# Instantiate the singleton instance for easy import
# ponytail: get singleton instance to avoid double loading
engine = InterviewEngine.get_instance()