testing / interview /views.py
Danielsz's picture
Allow saving LLM config with missing required keys
c30c945
Raw
History Blame Contribute Delete
42.3 kB
"""
Interview app — DRF API views.
Permission strategy:
- Public (AllowAny): endpoints consumed by the student interview frontend.
- Admin (IsAdminRole): endpoints consumed exclusively by the admin Dashboard.
- Authenticated (IsAuthenticated): me_api — requires any valid JWT (student or admin).
"""
import logging
import os
import tempfile
import threading
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from django.db.models import Count, Q
from django.db.models.functions import TruncMonth, ExtractDay
from datetime import datetime
from .models import Applicant, TranscriptionProviderConfig, LLMProviderConfig, SystemPromptConfig
from .engine import InterviewEngine
from .permissions import IsAdminRole
log = logging.getLogger("interview.views")
from .transcription_engine import transcribe_audio_file
@api_view(["GET"])
@permission_classes([AllowAny])
def health(request):
"""Health check endpoint."""
engine = InterviewEngine.get_instance()
from .transcription_engine import _whisper_model
whisper_status = "Operacional" if _whisper_model is not None else "En Espera"
services = [
{
"label": "Motor de Evaluaciones",
"status": "Operacional" if engine.model is not None and engine.sentiment_classifier is not None else "Error",
"ok": engine.model is not None and engine.sentiment_classifier is not None
},
{
"label": "Base Vectorial (Qdrant)",
"status": "Operacional" if engine.qdrant_ok else "Desconectado",
"ok": engine.qdrant_ok
},
{
"label": "NLP de Voz (Whisper)",
"status": whisper_status,
"ok": True
}
]
return Response({
"status": "ok",
"services": services
})
@api_view(["GET"])
@permission_classes([AllowAny])
def interview_start(request):
"""Return interview config to bootstrap the interview UI."""
engine = InterviewEngine.get_instance()
return Response({
"total_questions": engine.get_total_questions(),
"bienvenida_url": "videos/final_bienvenida.mp4",
"despedida_url": "videos/final_despedida.mp4",
"loop_url": "videos/loop_pingpong.mp4",
})
@api_view(["GET"])
@permission_classes([AllowAny])
def interview_question(request, index: int):
"""Return a specific question by its 0-based index."""
engine = InterviewEngine.get_instance()
q = engine.get_question(index)
if q is None:
return Response(
{"detail": "No more questions"},
status=status.HTTP_404_NOT_FOUND,
)
return Response(q)
@api_view(["POST"])
@permission_classes([AllowAny])
def query_rag(request):
"""
Accept user answer text, classify sentiment, search for the best
feedback phrase via Qdrant (or local fallback), and return it.
"""
text = request.data.get("text", "").strip()
last_feedback_id = request.data.get("last_feedback_id")
if not text:
return Response(
{"detail": "Text required"},
status=status.HTTP_400_BAD_REQUEST,
)
engine = InterviewEngine.get_instance()
result = engine.query_feedback(text, last_feedback_id=last_feedback_id)
return Response(result)
@api_view(["POST"])
@permission_classes([AllowAny])
def transcribe_audio(request):
"""
Accept an audio file upload, transcribe it with the currently active model,
and return the transcribed text.
"""
audio_file = request.FILES.get("audio")
if not audio_file:
return Response(
{"detail": "No audio file"},
status=status.HTTP_400_BAD_REQUEST,
)
# Write uploaded audio to a temporary file
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as tmp:
for chunk in audio_file.chunks():
tmp.write(chunk)
tmp_path = tmp.name
try:
# Por defecto usar el activo o fallback a whisper_local
provider_config = TranscriptionProviderConfig.objects.filter(is_active=True).first()
provider = provider_config.provider_type if provider_config else "whisper_local"
text, _, error = transcribe_audio_file(tmp_path, provider_type=provider)
if error:
raise RuntimeError(error)
return Response({"text": text})
except Exception as exc:
log.exception("Transcription failed")
return Response(
{"detail": f"Transcription error: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
class DashboardStatsAPIView(APIView):
permission_classes = [IsAdminRole]
def get(self, request, *args, **kwargs):
year = request.query_params.get('year')
month = request.query_params.get('month')
qs = Applicant.objects.all()
if year:
try:
qs = qs.filter(created_at__year=int(year))
except ValueError:
pass
if month:
try:
qs = qs.filter(created_at__month=int(month))
except ValueError:
pass
total_applicants = qs.count()
approved = qs.filter(status='approved').count()
pending = qs.filter(status='registered').count()
rejected = qs.filter(status='rejected').count()
admission_rate = 0
if total_applicants > 0:
admission_rate = round((approved / total_applicants) * 100, 2)
# For the chart
# If no year is passed, default to current year
chart_year = int(year) if year else datetime.now().year
chart_month = int(month) if month else datetime.now().month
chart_qs = Applicant.objects.filter(created_at__year=chart_year)
# Group by month
monthly_stats = chart_qs.annotate(month=TruncMonth('created_at')).values('month').annotate(
total=Count('id'),
approved=Count('id', filter=Q(status='approved'))
).order_by('month')
# Format into standard 12 months array
months_es = ['Ene', 'Feb', 'Mar', 'Abr', 'May', 'Jun', 'Jul', 'Ago', 'Sep', 'Oct', 'Nov', 'Dic']
chart_data = []
# Build dictionary for quick lookup
stats_dict = {item['month'].month: item for item in monthly_stats if item['month']}
for i, month_name in enumerate(months_es, 1):
stat = stats_dict.get(i)
rate = 0
if stat and stat['total'] > 0:
rate = round((stat['approved'] / stat['total']) * 100, 2)
# if month param is passed, we can highlight it
active = False
if month and int(month) == i:
active = True
chart_data.append({
'name': month_name,
'value': rate,
'active': active
})
# Dynamically get available years from the DB to populate the Year Dropdown
# Start from the earliest record or current year
earliest_record = Applicant.objects.order_by('created_at').first()
start_year = earliest_record.created_at.year if earliest_record else datetime.now().year
current_year = datetime.now().year
available_years = list(range(start_year, current_year + 1))
# Calendar data for the selected month and year
approved_in_month = Applicant.objects.filter(
created_at__year=chart_year,
created_at__month=chart_month,
status='approved'
).annotate(day=ExtractDay('created_at')).values('day').annotate(count=Count('id'))
calendar_data = {item['day']: item['count'] for item in approved_in_month}
return Response({
'total_applicants': total_applicants,
'approved_pending': approved,
'pending': pending,
'rejected': rejected,
'admission_rate': admission_rate,
'chart_data': chart_data,
'available_years': available_years,
'calendar_data': calendar_data
})
from .models import Interview
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def interviews_api(request):
if request.method == "GET":
year = request.query_params.get('year')
month = request.query_params.get('month')
day = request.query_params.get('day')
qs = Interview.objects.all()
if year: qs = qs.filter(date__year=int(year))
if month: qs = qs.filter(date__month=int(month))
if day: qs = qs.filter(date__day=int(day))
data = []
for i in qs:
data.append({
"id": i.id,
"title": i.title,
"date": str(i.date),
"timeIndex": i.time_index,
"durationMinutes": i.duration_minutes,
"active": i.active,
"users": [app.first_name[0].upper() for app in i.applicants.all()]
})
return Response(data)
data = request.data
title = data.get("title") or "Entrevista Técnica"
date_str = data.get("date")
time_index = int(data.get("timeIndex", 0))
duration_minutes = int(data.get("durationMinutes", 60))
applicant_ids = data.get("applicants", [])
iv = Interview.objects.create(
title=title,
date=date_str,
time_index=time_index,
duration_minutes=duration_minutes,
active=True
)
for app_id in applicant_ids:
app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0"
try:
app = Applicant.objects.get(id=int(app_id_clean))
iv.applicants.add(app)
except Applicant.DoesNotExist:
pass
iv.save()
return Response({"id": iv.id, "title": iv.title, "date": str(iv.date), "timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes, "active": iv.active, "users": [app.first_name[0].upper() for app in iv.applicants.all()]})
@api_view(["PATCH", "DELETE"])
@permission_classes([IsAdminRole])
def interview_detail_api(request, pk):
try:
iv = Interview.objects.get(pk=pk)
except Interview.DoesNotExist:
return Response({"error": "Not found"}, status=status.HTTP_404_NOT_FOUND)
if request.method == "DELETE":
iv.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
if request.method == "PATCH":
data = request.data
if "title" in data: iv.title = data["title"]
if "date" in data: iv.date = data["date"]
if "timeIndex" in data: iv.time_index = int(data["timeIndex"])
if "durationMinutes" in data: iv.duration_minutes = int(data["durationMinutes"])
iv.save()
if "applicants" in data:
iv.applicants.clear()
for app_id in data["applicants"]:
app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0"
try:
app = Applicant.objects.get(id=int(app_id_clean))
iv.applicants.add(app)
except Applicant.DoesNotExist:
pass
return Response({
"id": iv.id, "title": iv.title, "date": str(iv.date),
"timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes,
"active": iv.active,
"users": [app.first_name[0].upper() for app in iv.applicants.all()]
})
# ponytail: ultra minimal me endpoint
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import permission_classes
from .models import Applicant
from django.utils import timezone
from django.views.decorators.csrf import ensure_csrf_cookie
@api_view(["GET"])
@ensure_csrf_cookie
@permission_classes([IsAuthenticated])
def me_api(request):
user = request.user
app = Applicant.objects.filter(email__iexact=user.email).first()
is_test_admin = user.email in ["admin@itca.edu.sv", "pruebas@itca.edu.sv"]
# Check if Test Admin Mode is active globally
from .models import TranscriptionProviderConfig, InterviewSession
test_mode_active = TranscriptionProviderConfig.objects.filter(name="TEST_ADMIN_MODE", is_active=True).exists()
is_spoofed = test_mode_active or is_test_admin
if is_spoofed:
# Fetch the most recent interview regardless of date to allow testing
iv = Interview.objects.filter(applicants=app).order_by('-date', '-time_index').first() if app else None
if is_test_admin and not iv:
# Create a dummy interview response so admin always bypasses
return Response({
"name": "Simulación Admin",
"testMode": test_mode_active,
"sessionStatus": None,
"interview": {
"title": "Entrevista Nuevo Ingreso",
"date": str(timezone.now().date()),
"timeIndex": 0,
"durationMinutes": 1440,
"id": 9999
}
})
else:
# Normal flow: fetch today's or future interviews
iv = Interview.objects.filter(applicants=app, date__gte=timezone.now().date()).order_by('date', 'time_index').first() if app else None
session_status = None
if app and iv:
session = InterviewSession.objects.filter(applicant=app, interview=iv).order_by('-started_at').first()
if session:
session_status = session.status
if is_spoofed:
session_status = None
name_to_show = "Estudiante"
if app and app.first_name and app.first_name.strip():
name_to_show = f"{app.first_name} {app.last_name}".strip()
elif user.first_name and user.first_name.strip():
name_to_show = f"{user.first_name} {user.last_name}".strip()
return Response({
"name": name_to_show,
"testMode": is_spoofed,
"sessionStatus": session_status,
"interview": {
"title": iv.title,
# Spoof date and times so the frontend bypasses its strict checks
"date": str(timezone.now().date()) if is_spoofed else str(iv.date),
"timeIndex": 0 if is_spoofed else iv.time_index,
"durationMinutes": 1440 if is_spoofed else iv.duration_minutes,
"id": iv.id
} if iv else None
})
@api_view(["POST", "GET"])
@permission_classes([IsAdminRole])
def test_mode_toggle_api(request):
from .models import TranscriptionProviderConfig
from auth_api.models import User
from .models import Applicant
config, created = TranscriptionProviderConfig.objects.get_or_create(
name="TEST_ADMIN_MODE",
defaults={
"provider_type": "system",
"is_active": False,
"endpoint": "",
"api_key": ""
}
)
if request.method == "POST":
is_active = request.data.get("is_active", False)
config.is_active = is_active
config.save()
email = "admin@itca.edu.sv"
if is_active:
if not User.objects.filter(username=email).exists():
User.objects.create_user(username=email, email=email, password='admin', role='user', first_name='Admin', last_name='Simulación')
if not Applicant.objects.filter(email=email).exists():
Applicant.objects.create(email=email, first_name='Admin', last_name='Simulación', status='approved')
else:
User.objects.filter(username=email).delete()
Applicant.objects.filter(email=email).delete()
return Response({"is_active": config.is_active})
# --- RAG EVALUATION ENDPOINTS (Ponytail Ultra) ---
from .models import InterviewSession, QuestionResponse, EvaluationResult, EvaluationCategory
from .rag_evaluation import evaluate_session
from django.core.files.storage import default_storage
@api_view(["POST"])
@permission_classes([AllowAny])
def session_start(request):
app_id = request.data.get("applicant_id")
iv_id = request.data.get("interview_id")
if not app_id or not iv_id:
if request.user.is_authenticated:
app = Applicant.objects.filter(email__iexact=request.user.email).first()
if app:
app_id = app.id
iv = Interview.objects.filter(applicants=app).order_by('-date').first()
if not iv and app.email == "admin@itca.edu.sv":
iv = Interview.objects.first()
if iv:
iv_id = iv.id
if not app_id or not iv_id:
return Response({"error": "Faltan IDs o no hay entrevistas disponibles"}, status=400)
session = InterviewSession.objects.create(applicant_id=app_id, interview_id=iv_id)
return Response({"session_id": session.id})
@api_view(["POST"])
@permission_classes([AllowAny])
def session_answer(request, session_id):
session = InterviewSession.objects.get(id=session_id)
idx = request.data.get("index", 0)
text = request.data.get("text", "")
transcription = request.data.get("transcription", "")
# Si viene un archivo de audio, guardarlo (TODO: o pasarlo a Whisper)
audio_url = ""
if "audio" in request.FILES:
f = request.FILES["audio"]
path = default_storage.save(f"answers/session_{session_id}_{idx}.webm", f)
audio_url = default_storage.url(path)
QuestionResponse.objects.create(
session=session,
question_index=idx,
question_text=text,
audio_url=audio_url,
transcription=transcription
)
return Response({"status": "ok"})
@api_view(["POST"])
@permission_classes([AllowAny])
def session_recording(request, session_id):
session = InterviewSession.objects.get(id=session_id)
if "video" in request.FILES:
f = request.FILES["video"]
path = default_storage.save(f"recordings/session_{session_id}.webm", f)
session.recording_url = default_storage.url(path)
session.save()
return Response({"status": "ok"})
@api_view(["POST"])
@permission_classes([AllowAny])
def session_end(request, session_id):
session = InterviewSession.objects.get(id=session_id)
session.ended_at = timezone.now()
session.status = 'completed'
session.save()
# ponytail: Disparar evaluación (idealmente en tarea Celery, pero aquí síncrono para simplicidad ultra)
import threading
t = threading.Thread(target=evaluate_session, args=(session.id,))
t.start()
return Response({"status": "evaluating"})
@api_view(["POST"])
def session_reprocess(request, session_id):
try:
session = InterviewSession.objects.get(id=session_id)
EvaluationResult.objects.filter(session=session).delete()
session.overall_score = None
session.status = 'in_progress'
session.save()
# ponytail: Disparar evaluación (reprocesa audios si existen)
import threading
t = threading.Thread(target=evaluate_session, args=(session.id,))
t.start()
return Response({"status": "evaluating"})
except Exception as e:
return Response({"error": str(e)}, status=500)
@api_view(["POST"])
@permission_classes([IsAdminRole])
def applicant_approve(request, applicant_id):
try:
app = Applicant.objects.get(id=applicant_id)
app.status = 'approved'
app.save()
session = InterviewSession.objects.filter(applicant_id=app.id).order_by('-started_at').first()
if session:
session.status = 'evaluated'
session.overall_score = 100
session.save()
return Response({"status": "ok"})
except Exception as e:
return Response({"error": str(e)}, status=500)
@api_view(["GET"])
@permission_classes([AllowAny])
def session_evaluation(request, session_id):
session = InterviewSession.objects.get(id=session_id)
results = EvaluationResult.objects.filter(session=session)
data = {
"status": session.status,
"overall_score": session.overall_score,
"traits": session.dynamic_traits,
"interviewer_notes": session.interviewer_notes,
"hr_recommendation": session.hr_recommendation,
"recording_url": session.recording_url,
"categories": [
{
"name": r.category.name,
"score": r.score,
"feedback": r.feedback,
"color": r.category.color
} for r in results
]
}
return Response(data)
@api_view(["GET"])
@permission_classes([AllowAny])
def session_transcript(request, session_id):
session = InterviewSession.objects.get(id=session_id)
responses = QuestionResponse.objects.filter(session=session).order_by('question_index')
data = []
# Mock timestamps for demo purposes if duration_seconds is missing
current_time = 0
for r in responses:
duration = r.duration_seconds if r.duration_seconds > 0 else 45
data.append({
"id": r.id,
"speaker": "Interviewer",
"text": r.question_text,
"start_time": current_time,
"duration": 5,
"audio_url": None
})
current_time += 5
data.append({
"id": f"{r.id}_ans",
"speaker": "Candidate",
"text": r.transcription or "(Silencio / Sin respuesta transcrita)",
"start_time": current_time,
"duration": duration,
"audio_url": r.audio_url if r.audio_url else None
})
current_time += duration
return Response(data)
@api_view(["GET"])
@permission_classes([IsAdminRole])
def applicant_sessions(request, applicant_id):
sessions = InterviewSession.objects.filter(applicant_id=applicant_id).order_by('-started_at')
data = []
for s in sessions:
data.append({
"id": s.id,
"date": s.started_at,
"status": s.status,
"overall_score": s.overall_score,
"recording_url": s.recording_url
})
return Response(data)
import requests
def ping_llm_provider(provider_type, api_key, model, endpoint=""):
try:
if api_key: api_key = api_key.strip()
if provider_type == "openai":
base_url = endpoint.strip() if endpoint else "https://api.openai.com/v1"
if base_url.endswith("/"): base_url = base_url[:-1]
ping_url = f"{base_url}/models" if not base_url.endswith("/models") else base_url
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
res = requests.get(ping_url, headers=headers, timeout=5)
if res.status_code != 200:
return False, f"OpenAI error (Code {res.status_code}): Verifica tu API Key o el Endpoint."
return True, "ok"
elif provider_type == "anthropic":
if not api_key.startswith("sk-ant"):
return False, "La API Key de Anthropic no parece válida (debe empezar con sk-ant)."
return True, "ok"
elif provider_type == "ollama":
endpoint = api_key if api_key else "http://127.0.0.1:11434"
if endpoint.endswith('/'): endpoint = endpoint[:-1]
res = requests.get(f"{endpoint}/api/tags", timeout=5)
if res.status_code != 200:
return False, f"Ollama error: No se pudo conectar al endpoint local."
# Verificar si el modelo está descargado
data = res.json()
models_list = [m.get("name", "") for m in data.get("models", [])]
if model and not any(model in name for name in models_list):
return False, f"Ollama está conectado, pero el modelo '{model}' no está descargado."
return True, "ok"
elif provider_type == "talentai":
if not endpoint:
return False, "El endpoint del Space es requerido (ej: https://tu-space.hf.space/v1)."
ep = endpoint.strip().rstrip("/")
# llama-cpp-python expone /v1/models compatible con OpenAI
ping_url = f"{ep}/models"
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
try:
res = requests.get(ping_url, headers=headers, timeout=10)
if res.status_code != 200:
return False, f"TalentAI error (Code {res.status_code}): No se pudo conectar al Space. Verifica que el Space esté activo y la URL sea correcta."
except requests.exceptions.ConnectionError:
return False, "TalentAI: No se pudo conectar al Space. Verifica que esté activo."
except requests.exceptions.Timeout:
return False, "TalentAI: Timeout al conectar con el Space. Puede estar iniciando (cold start)."
return True, "ok"
elif provider_type == "nvidia":
if not api_key:
return False, "La API Key de NVIDIA es requerida."
payload = {
"model": model or "nvidia/nemotron-4-340b-instruct",
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5
}
res = requests.post(
"https://integrate.api.nvidia.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
json=payload,
timeout=10
)
if res.status_code != 200:
err = res.text
try:
err = res.json()
except: pass
return False, f"NVIDIA API error (Code {res.status_code}): {err}"
return True, "ok"
except Exception as e:
return False, f"Fallo de conexión: {str(e)}"
return True, "ok"
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def settings_llm(request):
if request.method == "GET":
configs = LLMProviderConfig.objects.all()
return Response([{"id": c.id, "name": c.name, "type": c.provider_type, "model": c.model, "api_key": c.api_key, "endpoint": c.endpoint, "active": c.is_active} for c in configs])
# POST to update
for data in request.data.get("configs", []):
if data.get("active", False):
provider_type = data.get("type", "")
api_key = (data.get("api_key", "") or "").strip()
endpoint = (data.get("endpoint", "") or "").strip()
# Skip validation for providers that don't have required credentials yet.
# They'll fail at runtime, but shouldn't block saving other providers.
requires_key = provider_type in ("openai", "anthropic", "nvidia")
if requires_key and not api_key:
continue
# For ollama/talentai: validate if they have endpoint/key configured
is_valid, error_msg = ping_llm_provider(provider_type, api_key, data.get("model", ""), endpoint)
if not is_valid:
return Response({"status": "error", "message": error_msg}, status=400)
c, _ = LLMProviderConfig.objects.get_or_create(
id=data.get("id"),
defaults={
"name": data.get("name"),
"provider_type": data.get("type", "")
}
)
api_key = data.get("api_key", c.api_key)
if isinstance(api_key, str): api_key = api_key.strip()
endpoint = data.get("endpoint", c.endpoint)
if isinstance(endpoint, str): endpoint = endpoint.strip()
c.api_key = api_key
c.endpoint = endpoint
c.model = data.get("model", c.model)
c.is_active = data.get("active", c.is_active)
c.save()
return Response({"status": "ok"})
@api_view(["POST"])
@permission_classes([IsAdminRole])
def settings_llm_test(request):
provider_type = request.data.get("type", "")
api_key = request.data.get("api_key", "")
endpoint = request.data.get("endpoint", "")
model = request.data.get("model", "")
if not model and provider_type == 'talentai':
model = "qwen3-vl-4b-instruct.Q4_K_M.gguf"
prompt = request.data.get("prompt", "Hola, ¿cómo estás?")
messages = [{"role": "user", "content": prompt}]
import litellm
try:
if provider_type == 'ollama':
ep = api_key if api_key else "http://127.0.0.1:11434"
response = litellm.completion(
model=f"ollama/{model}",
messages=messages,
api_base=ep
)
elif provider_type == 'talentai':
ep = endpoint.strip().rstrip("/") if endpoint else ""
if not ep:
return Response({"status": "error", "message": "El endpoint del Space de TalentAI es requerido."}, status=400)
response = litellm.completion(
model=f"openai/{model}",
messages=messages,
api_base=ep,
api_key=api_key if api_key else "no-key-needed",
custom_llm_provider="openai"
)
elif provider_type == 'openai':
response = litellm.completion(
model=model,
messages=messages,
api_key=api_key
)
elif provider_type == 'anthropic':
response = litellm.completion(
model=model,
messages=messages,
api_key=api_key
)
elif provider_type == 'nvidia':
import openai
client = openai.OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=api_key
)
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=200
)
return Response({"response": response.choices[0].message.content})
else:
response = litellm.completion(
model=model,
messages=messages,
api_key=api_key,
api_base=endpoint if endpoint else None
)
return Response({"response": response.choices[0].message.content})
except Exception as e:
return Response({"status": "error", "message": str(e)}, status=400)
from .models import TranscriptionProviderConfig
import threading
import logging
logger = logging.getLogger(__name__)
def download_model_in_background(model_id):
try:
from huggingface_hub import snapshot_download
logger.warning(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====")
print(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====", flush=True)
snapshot_download(repo_id=model_id)
logger.warning(f"===== DESCARGA EXITOSA: {model_id} =====")
print(f"===== DESCARGA EXITOSA: {model_id} =====", flush=True)
except Exception as e:
logger.error(f"===== ERROR DESCARGANDO {model_id}: {e} =====")
print(f"===== ERROR DESCARGANDO {model_id}: {e} =====", flush=True)
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def settings_transcription(request):
if request.method == "GET":
configs = TranscriptionProviderConfig.objects.all()
return Response([{
"id": c.id, "name": c.name, "type": c.provider_type,
"active": c.is_active, "model": c.model, "role": c.role
} for c in configs])
for data in request.data.get("configs", []):
c, _ = TranscriptionProviderConfig.objects.get_or_create(id=data.get("id"), defaults={"name": data.get("name")})
c.provider_type = data.get("type", c.provider_type)
c.api_key = data.get("api_key", c.api_key)
c.endpoint = data.get("endpoint", c.endpoint)
c.is_active = data.get("active", c.is_active)
c.model = data.get("model", c.model)
c.role = data.get("role", c.role)
c.save()
if c.is_active and c.provider_type in ["nemotron", "whisper_local"]:
model_to_dl = c.model or ("nvidia/nemotron-3.5-asr-streaming-0.6b" if c.provider_type == "nemotron" else "openai/whisper-base")
if model_to_dl:
threading.Thread(target=download_model_in_background, args=(model_to_dl,)).start()
return Response({"status": "ok"})
@api_view(["POST"])
@permission_classes([IsAdminRole])
def test_transcription_benchmark(request):
"""
Recibe un audio de prueba y el tipo de proveedor (ej: nemotron, whisper_local)
Devuelve la transcripción y el tiempo de latencia/benchmark.
"""
audio_file = request.FILES.get("audio")
provider_type = request.data.get("provider_type")
if not audio_file or not provider_type:
return Response({"detail": "Falta audio o provider_type"}, status=status.HTTP_400_BAD_REQUEST)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
for chunk in audio_file.chunks():
tmp.write(chunk)
tmp_path = tmp.name
try:
from .transcription_engine import transcribe_audio_file
text, time_ms, error = transcribe_audio_file(tmp_path, provider_type=provider_type)
if error:
return Response({"detail": f"Error del motor: {error}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
"text": text,
"time_ms": time_ms
})
except Exception as e:
log.exception(f"Error benchmark {provider_type}")
return Response({"detail": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def settings_categories(request):
if request.method == "GET":
cats = EvaluationCategory.objects.all().order_by("order")
return Response([{"id": c.id, "name": c.name, "weight": c.weight, "color": c.color, "active": c.active} for c in cats])
# POST to update all
EvaluationCategory.objects.all().delete() # ponytail: recreate all for simplicity
for idx, data in enumerate(request.data.get("categories", [])):
EvaluationCategory.objects.create(
name=data["name"],
weight=float(data.get("weight", 1.0)),
color=data.get("color", "#0F766E"),
active=data.get("active", True),
order=idx
)
return Response({"status": "ok"})
from .models import SystemPromptConfig
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def settings_prompt(request):
config, _ = SystemPromptConfig.objects.get_or_create(id=1)
if request.method == "GET":
return Response({"prompt_text": config.prompt_text})
config.prompt_text = request.data.get("prompt_text", config.prompt_text)
config.save()
return Response({"status": "ok", "prompt_text": config.prompt_text})
from .models import Organization, Group
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def organizations_api(request):
if request.method == "GET":
orgs = Organization.objects.all()
data = []
for o in orgs:
data.append({
"id": str(o.id),
"type": o.type,
"career": o.career,
"groups": o.groups.count(),
"candidates": Applicant.objects.filter(organization=o).count(),
"approved": Applicant.objects.filter(organization=o, status='approved').count(),
"gradientIndex": o.gradient_index
})
return Response(data)
data = request.data
org = Organization.objects.create(
type=data.get("type", "Ingeniería"),
career=data.get("career", ""),
gradient_index=data.get("gradientIndex", 0)
)
groups_count = data.get("groups", 0)
for i in range(groups_count):
Group.objects.create(name=f"Grupo {i+1}", organization=org)
return Response({
"id": str(org.id),
"type": org.type,
"career": org.career,
"groups": groups_count,
"candidates": 0,
"approved": 0,
"gradientIndex": org.gradient_index
})
@api_view(["GET", "POST"])
@permission_classes([IsAdminRole])
def applicants_api(request):
if request.method == "GET":
apps = Applicant.objects.all().order_by('-id')
data = []
for a in apps:
status_display = a.get_status_display()
# Si sigue registrado, verificar si ya hizo alguna sesión de entrevista
if a.status == 'registered':
latest_session = InterviewSession.objects.filter(applicant_id=a.id).order_by('-started_at').first()
if latest_session:
if latest_session.status == 'evaluated':
if latest_session.overall_score is not None and latest_session.overall_score >= 70:
status_display = 'Aprobado'
else:
status_display = 'Reprobado'
elif latest_session.status == 'error':
status_display = 'Error en Evaluación'
else:
status_display = 'En Proceso'
data.append({
"id": str(a.id),
"name": f"{a.first_name} {a.last_name}",
"email": a.email,
"status": status_display,
"modality": a.modality,
"career": a.organization.career if a.organization else "N/A",
"photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random"
})
return Response(data)
data = request.data
names = data.get("name", "").split(" ", 1)
first_name = names[0]
last_name = names[1] if len(names) > 1 else ""
org = Organization.objects.filter(career=data.get("carrera")).first()
email = data.get("email", "").strip()
if not email:
return Response({"error": "El correo electrónico es un campo obligatorio."}, status=400)
try:
from auth_api.models import User
if not User.objects.filter(username=email).exists():
password = data.get("password", "Entrevista2026")
User.objects.create_user(
username=email,
email=email,
password=password,
role='user',
first_name=first_name,
last_name=last_name
)
a = Applicant.objects.create(
first_name=first_name,
last_name=last_name,
email=email,
organization=org,
modality="Virtual",
status="registered"
)
except Exception as e:
return Response({"error": "Ese correo ya está registrado u ocurrió un error: " + str(e)}, status=400)
return Response({
"id": str(a.id),
"name": f"{a.first_name} {a.last_name}",
"email": a.email,
"status": a.get_status_display(),
"modality": a.modality,
"career": a.organization.career if a.organization else "N/A",
"photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random"
})
@api_view(["GET", "PUT", "DELETE"])
@permission_classes([IsAdminRole])
def applicant_detail_api(request, pk):
try:
app = Applicant.objects.get(pk=pk)
except Applicant.DoesNotExist:
return Response({"error": "Aspirante no encontrado"}, status=404)
if request.method == "GET":
return Response({
"id": app.id,
"name": f"{app.first_name} {app.last_name}",
"email": app.email,
"status": app.get_status_display(),
"date": app.created_at.strftime("%d %b %Y"),
"career": app.organization.career if app.organization else "Sin Carrera",
})
if request.method == "DELETE":
email = app.email
app.delete()
from auth_api.models import User
User.objects.filter(username=email).delete()
return Response({"success": True})
if request.method == "PUT":
data = request.data
if "name" in data:
names = data["name"].split(" ", 1)
app.first_name = names[0]
app.last_name = names[1] if len(names) > 1 else ""
old_email = app.email
new_email = data.get("email", old_email).strip()
from auth_api.models import User
if new_email != old_email and User.objects.filter(username=new_email).exists():
return Response({"error": "Ese correo ya está en uso por otro usuario."}, status=400)
if "carrera" in data:
org = Organization.objects.filter(career=data["carrera"]).first()
if org:
app.organization = org
app.email = new_email
app.save()
user = User.objects.filter(username=old_email).first()
if user:
user.username = new_email
user.email = new_email
user.first_name = app.first_name
user.last_name = app.last_name
if data.get("password"):
user.set_password(data["password"])
user.save()
return Response({
"id": str(app.id),
"name": f"{app.first_name} {app.last_name}",
"email": app.email,
"status": app.get_status_display(),
"modality": app.modality,
"career": app.organization.career if app.organization else "N/A",
"photo": f"https://ui-avatars.com/api/?name={app.first_name}+{app.last_name}&background=random"
})