| """ |
| Interview app — DRF API views. |
| |
| Permission strategy: |
| - Public (AllowAny): endpoints consumed by the student interview frontend. |
| - Admin (IsAdminRole): endpoints consumed exclusively by the admin Dashboard. |
| - Authenticated (IsAuthenticated): me_api — requires any valid JWT (student or admin). |
| """ |
|
|
| import logging |
| import os |
| import tempfile |
| import threading |
|
|
| from rest_framework import status |
| from rest_framework.decorators import api_view, permission_classes |
| from rest_framework.permissions import AllowAny, IsAuthenticated |
| from rest_framework.response import Response |
| from rest_framework.views import APIView |
| from django.db.models import Count, Q |
| from django.db.models.functions import TruncMonth, ExtractDay |
| from datetime import datetime |
|
|
| from .models import Applicant, TranscriptionProviderConfig, LLMProviderConfig, SystemPromptConfig |
| from .engine import InterviewEngine |
| from .permissions import IsAdminRole |
|
|
| log = logging.getLogger("interview.views") |
|
|
| from .transcription_engine import transcribe_audio_file |
|
|
|
|
| @api_view(["GET"]) |
| @permission_classes([AllowAny]) |
| def health(request): |
| """Health check endpoint.""" |
| engine = InterviewEngine.get_instance() |
| |
| from .transcription_engine import _whisper_model |
| whisper_status = "Operacional" if _whisper_model is not None else "En Espera" |
|
|
| services = [ |
| { |
| "label": "Motor de Evaluaciones", |
| "status": "Operacional" if engine.model is not None and engine.sentiment_classifier is not None else "Error", |
| "ok": engine.model is not None and engine.sentiment_classifier is not None |
| }, |
| { |
| "label": "Base Vectorial (Qdrant)", |
| "status": "Operacional" if engine.qdrant_ok else "Desconectado", |
| "ok": engine.qdrant_ok |
| }, |
| { |
| "label": "NLP de Voz (Whisper)", |
| "status": whisper_status, |
| "ok": True |
| } |
| ] |
|
|
| return Response({ |
| "status": "ok", |
| "services": services |
| }) |
|
|
|
|
| @api_view(["GET"]) |
| @permission_classes([AllowAny]) |
| def interview_start(request): |
| """Return interview config to bootstrap the interview UI.""" |
| engine = InterviewEngine.get_instance() |
| return Response({ |
| "total_questions": engine.get_total_questions(), |
| "bienvenida_url": "videos/final_bienvenida.mp4", |
| "despedida_url": "videos/final_despedida.mp4", |
| "loop_url": "videos/loop_pingpong.mp4", |
| }) |
|
|
|
|
| @api_view(["GET"]) |
| @permission_classes([AllowAny]) |
| def interview_question(request, index: int): |
| """Return a specific question by its 0-based index.""" |
| engine = InterviewEngine.get_instance() |
| q = engine.get_question(index) |
| if q is None: |
| return Response( |
| {"detail": "No more questions"}, |
| status=status.HTTP_404_NOT_FOUND, |
| ) |
| return Response(q) |
|
|
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def query_rag(request): |
| """ |
| Accept user answer text, classify sentiment, search for the best |
| feedback phrase via Qdrant (or local fallback), and return it. |
| """ |
| text = request.data.get("text", "").strip() |
| last_feedback_id = request.data.get("last_feedback_id") |
| if not text: |
| return Response( |
| {"detail": "Text required"}, |
| status=status.HTTP_400_BAD_REQUEST, |
| ) |
| engine = InterviewEngine.get_instance() |
| result = engine.query_feedback(text, last_feedback_id=last_feedback_id) |
| return Response(result) |
|
|
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def transcribe_audio(request): |
| """ |
| Accept an audio file upload, transcribe it with the currently active model, |
| and return the transcribed text. |
| """ |
| audio_file = request.FILES.get("audio") |
| if not audio_file: |
| return Response( |
| {"detail": "No audio file"}, |
| status=status.HTTP_400_BAD_REQUEST, |
| ) |
|
|
| |
| with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as tmp: |
| for chunk in audio_file.chunks(): |
| tmp.write(chunk) |
| tmp_path = tmp.name |
|
|
| try: |
| |
| provider_config = TranscriptionProviderConfig.objects.filter(is_active=True).first() |
| provider = provider_config.provider_type if provider_config else "whisper_local" |
|
|
| text, _, error = transcribe_audio_file(tmp_path, provider_type=provider) |
| |
| if error: |
| raise RuntimeError(error) |
|
|
| return Response({"text": text}) |
| except Exception as exc: |
| log.exception("Transcription failed") |
| return Response( |
| {"detail": f"Transcription error: {exc}"}, |
| status=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| ) |
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
|
|
| class DashboardStatsAPIView(APIView): |
| permission_classes = [IsAdminRole] |
|
|
| def get(self, request, *args, **kwargs): |
| year = request.query_params.get('year') |
| month = request.query_params.get('month') |
|
|
| qs = Applicant.objects.all() |
|
|
| if year: |
| try: |
| qs = qs.filter(created_at__year=int(year)) |
| except ValueError: |
| pass |
| |
| if month: |
| try: |
| qs = qs.filter(created_at__month=int(month)) |
| except ValueError: |
| pass |
|
|
| total_applicants = qs.count() |
| approved = qs.filter(status='approved').count() |
| pending = qs.filter(status='registered').count() |
| rejected = qs.filter(status='rejected').count() |
|
|
| admission_rate = 0 |
| if total_applicants > 0: |
| admission_rate = round((approved / total_applicants) * 100, 2) |
|
|
| |
| |
| chart_year = int(year) if year else datetime.now().year |
| chart_month = int(month) if month else datetime.now().month |
| chart_qs = Applicant.objects.filter(created_at__year=chart_year) |
| |
| |
| monthly_stats = chart_qs.annotate(month=TruncMonth('created_at')).values('month').annotate( |
| total=Count('id'), |
| approved=Count('id', filter=Q(status='approved')) |
| ).order_by('month') |
|
|
| |
| months_es = ['Ene', 'Feb', 'Mar', 'Abr', 'May', 'Jun', 'Jul', 'Ago', 'Sep', 'Oct', 'Nov', 'Dic'] |
| chart_data = [] |
| |
| |
| stats_dict = {item['month'].month: item for item in monthly_stats if item['month']} |
|
|
| for i, month_name in enumerate(months_es, 1): |
| stat = stats_dict.get(i) |
| rate = 0 |
| if stat and stat['total'] > 0: |
| rate = round((stat['approved'] / stat['total']) * 100, 2) |
| |
| |
| active = False |
| if month and int(month) == i: |
| active = True |
|
|
| chart_data.append({ |
| 'name': month_name, |
| 'value': rate, |
| 'active': active |
| }) |
|
|
| |
| |
| earliest_record = Applicant.objects.order_by('created_at').first() |
| start_year = earliest_record.created_at.year if earliest_record else datetime.now().year |
| current_year = datetime.now().year |
| available_years = list(range(start_year, current_year + 1)) |
|
|
| |
| approved_in_month = Applicant.objects.filter( |
| created_at__year=chart_year, |
| created_at__month=chart_month, |
| status='approved' |
| ).annotate(day=ExtractDay('created_at')).values('day').annotate(count=Count('id')) |
| |
| calendar_data = {item['day']: item['count'] for item in approved_in_month} |
|
|
| return Response({ |
| 'total_applicants': total_applicants, |
| 'approved_pending': approved, |
| 'pending': pending, |
| 'rejected': rejected, |
| 'admission_rate': admission_rate, |
| 'chart_data': chart_data, |
| 'available_years': available_years, |
| 'calendar_data': calendar_data |
| }) |
|
|
| from .models import Interview |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def interviews_api(request): |
| if request.method == "GET": |
| year = request.query_params.get('year') |
| month = request.query_params.get('month') |
| day = request.query_params.get('day') |
| |
| qs = Interview.objects.all() |
| if year: qs = qs.filter(date__year=int(year)) |
| if month: qs = qs.filter(date__month=int(month)) |
| if day: qs = qs.filter(date__day=int(day)) |
| |
| data = [] |
| for i in qs: |
| data.append({ |
| "id": i.id, |
| "title": i.title, |
| "date": str(i.date), |
| "timeIndex": i.time_index, |
| "durationMinutes": i.duration_minutes, |
| "active": i.active, |
| "users": [app.first_name[0].upper() for app in i.applicants.all()] |
| }) |
| return Response(data) |
|
|
| data = request.data |
| title = data.get("title") or "Entrevista Técnica" |
| date_str = data.get("date") |
| time_index = int(data.get("timeIndex", 0)) |
| duration_minutes = int(data.get("durationMinutes", 60)) |
| applicant_ids = data.get("applicants", []) |
|
|
| iv = Interview.objects.create( |
| title=title, |
| date=date_str, |
| time_index=time_index, |
| duration_minutes=duration_minutes, |
| active=True |
| ) |
| |
| for app_id in applicant_ids: |
| app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0" |
| try: |
| app = Applicant.objects.get(id=int(app_id_clean)) |
| iv.applicants.add(app) |
| except Applicant.DoesNotExist: |
| pass |
| |
| iv.save() |
| |
| return Response({"id": iv.id, "title": iv.title, "date": str(iv.date), "timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes, "active": iv.active, "users": [app.first_name[0].upper() for app in iv.applicants.all()]}) |
|
|
| @api_view(["PATCH", "DELETE"]) |
| @permission_classes([IsAdminRole]) |
| def interview_detail_api(request, pk): |
| try: |
| iv = Interview.objects.get(pk=pk) |
| except Interview.DoesNotExist: |
| return Response({"error": "Not found"}, status=status.HTTP_404_NOT_FOUND) |
| |
| if request.method == "DELETE": |
| iv.delete() |
| return Response(status=status.HTTP_204_NO_CONTENT) |
| |
| if request.method == "PATCH": |
| data = request.data |
| if "title" in data: iv.title = data["title"] |
| if "date" in data: iv.date = data["date"] |
| if "timeIndex" in data: iv.time_index = int(data["timeIndex"]) |
| if "durationMinutes" in data: iv.duration_minutes = int(data["durationMinutes"]) |
| iv.save() |
| |
| if "applicants" in data: |
| iv.applicants.clear() |
| for app_id in data["applicants"]: |
| app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0" |
| try: |
| app = Applicant.objects.get(id=int(app_id_clean)) |
| iv.applicants.add(app) |
| except Applicant.DoesNotExist: |
| pass |
| |
| return Response({ |
| "id": iv.id, "title": iv.title, "date": str(iv.date), |
| "timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes, |
| "active": iv.active, |
| "users": [app.first_name[0].upper() for app in iv.applicants.all()] |
| }) |
|
|
| |
| from rest_framework.permissions import IsAuthenticated |
| from rest_framework.decorators import permission_classes |
| from .models import Applicant |
|
|
| from django.utils import timezone |
| from django.views.decorators.csrf import ensure_csrf_cookie |
|
|
| @api_view(["GET"]) |
| @ensure_csrf_cookie |
| @permission_classes([IsAuthenticated]) |
| def me_api(request): |
| user = request.user |
| app = Applicant.objects.filter(email__iexact=user.email).first() |
|
|
| is_test_admin = user.email in ["admin@itca.edu.sv", "pruebas@itca.edu.sv"] |
|
|
| |
| from .models import TranscriptionProviderConfig, InterviewSession |
| test_mode_active = TranscriptionProviderConfig.objects.filter(name="TEST_ADMIN_MODE", is_active=True).exists() |
|
|
| is_spoofed = test_mode_active or is_test_admin |
|
|
| if is_spoofed: |
| |
| iv = Interview.objects.filter(applicants=app).order_by('-date', '-time_index').first() if app else None |
| |
| if is_test_admin and not iv: |
| |
| return Response({ |
| "name": "Simulación Admin", |
| "testMode": test_mode_active, |
| "sessionStatus": None, |
| "interview": { |
| "title": "Entrevista Nuevo Ingreso", |
| "date": str(timezone.now().date()), |
| "timeIndex": 0, |
| "durationMinutes": 1440, |
| "id": 9999 |
| } |
| }) |
| else: |
| |
| iv = Interview.objects.filter(applicants=app, date__gte=timezone.now().date()).order_by('date', 'time_index').first() if app else None |
|
|
| session_status = None |
| if app and iv: |
| session = InterviewSession.objects.filter(applicant=app, interview=iv).order_by('-started_at').first() |
| if session: |
| session_status = session.status |
|
|
| if is_spoofed: |
| session_status = None |
|
|
| name_to_show = "Estudiante" |
| if app and app.first_name and app.first_name.strip(): |
| name_to_show = f"{app.first_name} {app.last_name}".strip() |
| elif user.first_name and user.first_name.strip(): |
| name_to_show = f"{user.first_name} {user.last_name}".strip() |
|
|
| return Response({ |
| "name": name_to_show, |
| "testMode": is_spoofed, |
| "sessionStatus": session_status, |
| "interview": { |
| "title": iv.title, |
| |
| "date": str(timezone.now().date()) if is_spoofed else str(iv.date), |
| "timeIndex": 0 if is_spoofed else iv.time_index, |
| "durationMinutes": 1440 if is_spoofed else iv.duration_minutes, |
| "id": iv.id |
| } if iv else None |
| }) |
|
|
| @api_view(["POST", "GET"]) |
| @permission_classes([IsAdminRole]) |
| def test_mode_toggle_api(request): |
| from .models import TranscriptionProviderConfig |
| from auth_api.models import User |
| from .models import Applicant |
| |
| config, created = TranscriptionProviderConfig.objects.get_or_create( |
| name="TEST_ADMIN_MODE", |
| defaults={ |
| "provider_type": "system", |
| "is_active": False, |
| "endpoint": "", |
| "api_key": "" |
| } |
| ) |
| if request.method == "POST": |
| is_active = request.data.get("is_active", False) |
| config.is_active = is_active |
| config.save() |
| |
| email = "admin@itca.edu.sv" |
| if is_active: |
| if not User.objects.filter(username=email).exists(): |
| User.objects.create_user(username=email, email=email, password='admin', role='user', first_name='Admin', last_name='Simulación') |
| if not Applicant.objects.filter(email=email).exists(): |
| Applicant.objects.create(email=email, first_name='Admin', last_name='Simulación', status='approved') |
| else: |
| User.objects.filter(username=email).delete() |
| Applicant.objects.filter(email=email).delete() |
| |
| return Response({"is_active": config.is_active}) |
|
|
| |
| from .models import InterviewSession, QuestionResponse, EvaluationResult, EvaluationCategory |
| from .rag_evaluation import evaluate_session |
| from django.core.files.storage import default_storage |
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def session_start(request): |
| app_id = request.data.get("applicant_id") |
| iv_id = request.data.get("interview_id") |
| if not app_id or not iv_id: |
| if request.user.is_authenticated: |
| app = Applicant.objects.filter(email__iexact=request.user.email).first() |
| if app: |
| app_id = app.id |
| iv = Interview.objects.filter(applicants=app).order_by('-date').first() |
| if not iv and app.email == "admin@itca.edu.sv": |
| iv = Interview.objects.first() |
| if iv: |
| iv_id = iv.id |
| |
| if not app_id or not iv_id: |
| return Response({"error": "Faltan IDs o no hay entrevistas disponibles"}, status=400) |
| |
| session = InterviewSession.objects.create(applicant_id=app_id, interview_id=iv_id) |
| return Response({"session_id": session.id}) |
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def session_answer(request, session_id): |
| session = InterviewSession.objects.get(id=session_id) |
| idx = request.data.get("index", 0) |
| text = request.data.get("text", "") |
| transcription = request.data.get("transcription", "") |
| |
| |
| audio_url = "" |
| if "audio" in request.FILES: |
| f = request.FILES["audio"] |
| path = default_storage.save(f"answers/session_{session_id}_{idx}.webm", f) |
| audio_url = default_storage.url(path) |
| |
| QuestionResponse.objects.create( |
| session=session, |
| question_index=idx, |
| question_text=text, |
| audio_url=audio_url, |
| transcription=transcription |
| ) |
| return Response({"status": "ok"}) |
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def session_recording(request, session_id): |
| session = InterviewSession.objects.get(id=session_id) |
| if "video" in request.FILES: |
| f = request.FILES["video"] |
| path = default_storage.save(f"recordings/session_{session_id}.webm", f) |
| session.recording_url = default_storage.url(path) |
| session.save() |
| return Response({"status": "ok"}) |
|
|
| @api_view(["POST"]) |
| @permission_classes([AllowAny]) |
| def session_end(request, session_id): |
| session = InterviewSession.objects.get(id=session_id) |
| session.ended_at = timezone.now() |
| session.status = 'completed' |
| session.save() |
| |
| |
| import threading |
| t = threading.Thread(target=evaluate_session, args=(session.id,)) |
| t.start() |
| |
| return Response({"status": "evaluating"}) |
|
|
| @api_view(["POST"]) |
| def session_reprocess(request, session_id): |
| try: |
| session = InterviewSession.objects.get(id=session_id) |
| EvaluationResult.objects.filter(session=session).delete() |
| session.overall_score = None |
| session.status = 'in_progress' |
| session.save() |
| |
| |
| import threading |
| t = threading.Thread(target=evaluate_session, args=(session.id,)) |
| t.start() |
| |
| return Response({"status": "evaluating"}) |
| except Exception as e: |
| return Response({"error": str(e)}, status=500) |
|
|
| @api_view(["POST"]) |
| @permission_classes([IsAdminRole]) |
| def applicant_approve(request, applicant_id): |
| try: |
| app = Applicant.objects.get(id=applicant_id) |
| app.status = 'approved' |
| app.save() |
| |
| session = InterviewSession.objects.filter(applicant_id=app.id).order_by('-started_at').first() |
| if session: |
| session.status = 'evaluated' |
| session.overall_score = 100 |
| session.save() |
| |
| return Response({"status": "ok"}) |
| except Exception as e: |
| return Response({"error": str(e)}, status=500) |
|
|
|
|
| @api_view(["GET"]) |
| @permission_classes([AllowAny]) |
| def session_evaluation(request, session_id): |
| session = InterviewSession.objects.get(id=session_id) |
| results = EvaluationResult.objects.filter(session=session) |
| data = { |
| "status": session.status, |
| "overall_score": session.overall_score, |
| "traits": session.dynamic_traits, |
| "interviewer_notes": session.interviewer_notes, |
| "hr_recommendation": session.hr_recommendation, |
| "recording_url": session.recording_url, |
| "categories": [ |
| { |
| "name": r.category.name, |
| "score": r.score, |
| "feedback": r.feedback, |
| "color": r.category.color |
| } for r in results |
| ] |
| } |
| return Response(data) |
|
|
| @api_view(["GET"]) |
| @permission_classes([AllowAny]) |
| def session_transcript(request, session_id): |
| session = InterviewSession.objects.get(id=session_id) |
| responses = QuestionResponse.objects.filter(session=session).order_by('question_index') |
| data = [] |
| |
| current_time = 0 |
| for r in responses: |
| duration = r.duration_seconds if r.duration_seconds > 0 else 45 |
| data.append({ |
| "id": r.id, |
| "speaker": "Interviewer", |
| "text": r.question_text, |
| "start_time": current_time, |
| "duration": 5, |
| "audio_url": None |
| }) |
| current_time += 5 |
| data.append({ |
| "id": f"{r.id}_ans", |
| "speaker": "Candidate", |
| "text": r.transcription or "(Silencio / Sin respuesta transcrita)", |
| "start_time": current_time, |
| "duration": duration, |
| "audio_url": r.audio_url if r.audio_url else None |
| }) |
| current_time += duration |
| |
| return Response(data) |
|
|
| @api_view(["GET"]) |
| @permission_classes([IsAdminRole]) |
| def applicant_sessions(request, applicant_id): |
| sessions = InterviewSession.objects.filter(applicant_id=applicant_id).order_by('-started_at') |
| data = [] |
| for s in sessions: |
| data.append({ |
| "id": s.id, |
| "date": s.started_at, |
| "status": s.status, |
| "overall_score": s.overall_score, |
| "recording_url": s.recording_url |
| }) |
| return Response(data) |
| import requests |
|
|
| def ping_llm_provider(provider_type, api_key, model, endpoint=""): |
| try: |
| if api_key: api_key = api_key.strip() |
| if provider_type == "openai": |
| base_url = endpoint.strip() if endpoint else "https://api.openai.com/v1" |
| if base_url.endswith("/"): base_url = base_url[:-1] |
| ping_url = f"{base_url}/models" if not base_url.endswith("/models") else base_url |
| headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} |
| res = requests.get(ping_url, headers=headers, timeout=5) |
| if res.status_code != 200: |
| return False, f"OpenAI error (Code {res.status_code}): Verifica tu API Key o el Endpoint." |
| return True, "ok" |
| elif provider_type == "anthropic": |
| if not api_key.startswith("sk-ant"): |
| return False, "La API Key de Anthropic no parece válida (debe empezar con sk-ant)." |
| return True, "ok" |
| elif provider_type == "ollama": |
| endpoint = api_key if api_key else "http://127.0.0.1:11434" |
| if endpoint.endswith('/'): endpoint = endpoint[:-1] |
| res = requests.get(f"{endpoint}/api/tags", timeout=5) |
| if res.status_code != 200: |
| return False, f"Ollama error: No se pudo conectar al endpoint local." |
| |
| |
| data = res.json() |
| models_list = [m.get("name", "") for m in data.get("models", [])] |
| if model and not any(model in name for name in models_list): |
| return False, f"Ollama está conectado, pero el modelo '{model}' no está descargado." |
| return True, "ok" |
| elif provider_type == "talentai": |
| if not endpoint: |
| return False, "El endpoint del Space es requerido (ej: https://tu-space.hf.space/v1)." |
| ep = endpoint.strip().rstrip("/") |
| |
| ping_url = f"{ep}/models" |
| headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} |
| try: |
| res = requests.get(ping_url, headers=headers, timeout=10) |
| if res.status_code != 200: |
| return False, f"TalentAI error (Code {res.status_code}): No se pudo conectar al Space. Verifica que el Space esté activo y la URL sea correcta." |
| except requests.exceptions.ConnectionError: |
| return False, "TalentAI: No se pudo conectar al Space. Verifica que esté activo." |
| except requests.exceptions.Timeout: |
| return False, "TalentAI: Timeout al conectar con el Space. Puede estar iniciando (cold start)." |
| return True, "ok" |
| elif provider_type == "nvidia": |
| if not api_key: |
| return False, "La API Key de NVIDIA es requerida." |
| payload = { |
| "model": model or "nvidia/nemotron-4-340b-instruct", |
| "messages": [{"role": "user", "content": "test"}], |
| "max_tokens": 5 |
| } |
| res = requests.post( |
| "https://integrate.api.nvidia.com/v1/chat/completions", |
| headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, |
| json=payload, |
| timeout=10 |
| ) |
| if res.status_code != 200: |
| err = res.text |
| try: |
| err = res.json() |
| except: pass |
| return False, f"NVIDIA API error (Code {res.status_code}): {err}" |
| return True, "ok" |
| except Exception as e: |
| return False, f"Fallo de conexión: {str(e)}" |
| return True, "ok" |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def settings_llm(request): |
| if request.method == "GET": |
| configs = LLMProviderConfig.objects.all() |
| return Response([{"id": c.id, "name": c.name, "type": c.provider_type, "model": c.model, "api_key": c.api_key, "endpoint": c.endpoint, "active": c.is_active} for c in configs]) |
| |
| |
| for data in request.data.get("configs", []): |
| if data.get("active", False): |
| provider_type = data.get("type", "") |
| api_key = (data.get("api_key", "") or "").strip() |
| endpoint = (data.get("endpoint", "") or "").strip() |
|
|
| |
| |
| requires_key = provider_type in ("openai", "anthropic", "nvidia") |
| if requires_key and not api_key: |
| continue |
|
|
| |
| is_valid, error_msg = ping_llm_provider(provider_type, api_key, data.get("model", ""), endpoint) |
| if not is_valid: |
| return Response({"status": "error", "message": error_msg}, status=400) |
|
|
| c, _ = LLMProviderConfig.objects.get_or_create( |
| id=data.get("id"), |
| defaults={ |
| "name": data.get("name"), |
| "provider_type": data.get("type", "") |
| } |
| ) |
| api_key = data.get("api_key", c.api_key) |
| if isinstance(api_key, str): api_key = api_key.strip() |
| endpoint = data.get("endpoint", c.endpoint) |
| if isinstance(endpoint, str): endpoint = endpoint.strip() |
| |
| c.api_key = api_key |
| c.endpoint = endpoint |
| c.model = data.get("model", c.model) |
| c.is_active = data.get("active", c.is_active) |
| c.save() |
| return Response({"status": "ok"}) |
|
|
|
|
| @api_view(["POST"]) |
| @permission_classes([IsAdminRole]) |
| def settings_llm_test(request): |
| provider_type = request.data.get("type", "") |
| api_key = request.data.get("api_key", "") |
| endpoint = request.data.get("endpoint", "") |
| model = request.data.get("model", "") |
| if not model and provider_type == 'talentai': |
| model = "qwen3-vl-4b-instruct.Q4_K_M.gguf" |
| prompt = request.data.get("prompt", "Hola, ¿cómo estás?") |
| |
| messages = [{"role": "user", "content": prompt}] |
| |
| import litellm |
| try: |
| if provider_type == 'ollama': |
| ep = api_key if api_key else "http://127.0.0.1:11434" |
| response = litellm.completion( |
| model=f"ollama/{model}", |
| messages=messages, |
| api_base=ep |
| ) |
| elif provider_type == 'talentai': |
| ep = endpoint.strip().rstrip("/") if endpoint else "" |
| if not ep: |
| return Response({"status": "error", "message": "El endpoint del Space de TalentAI es requerido."}, status=400) |
| response = litellm.completion( |
| model=f"openai/{model}", |
| messages=messages, |
| api_base=ep, |
| api_key=api_key if api_key else "no-key-needed", |
| custom_llm_provider="openai" |
| ) |
| elif provider_type == 'openai': |
| response = litellm.completion( |
| model=model, |
| messages=messages, |
| api_key=api_key |
| ) |
| elif provider_type == 'anthropic': |
| response = litellm.completion( |
| model=model, |
| messages=messages, |
| api_key=api_key |
| ) |
| elif provider_type == 'nvidia': |
| import openai |
| client = openai.OpenAI( |
| base_url="https://integrate.api.nvidia.com/v1", |
| api_key=api_key |
| ) |
| response = client.chat.completions.create( |
| model=model, |
| messages=messages, |
| max_tokens=200 |
| ) |
| return Response({"response": response.choices[0].message.content}) |
| else: |
| response = litellm.completion( |
| model=model, |
| messages=messages, |
| api_key=api_key, |
| api_base=endpoint if endpoint else None |
| ) |
| |
| return Response({"response": response.choices[0].message.content}) |
| except Exception as e: |
| return Response({"status": "error", "message": str(e)}, status=400) |
|
|
| from .models import TranscriptionProviderConfig |
| import threading |
| import logging |
| logger = logging.getLogger(__name__) |
|
|
| def download_model_in_background(model_id): |
| try: |
| from huggingface_hub import snapshot_download |
| logger.warning(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====") |
| print(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====", flush=True) |
| snapshot_download(repo_id=model_id) |
| logger.warning(f"===== DESCARGA EXITOSA: {model_id} =====") |
| print(f"===== DESCARGA EXITOSA: {model_id} =====", flush=True) |
| except Exception as e: |
| logger.error(f"===== ERROR DESCARGANDO {model_id}: {e} =====") |
| print(f"===== ERROR DESCARGANDO {model_id}: {e} =====", flush=True) |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def settings_transcription(request): |
| if request.method == "GET": |
| configs = TranscriptionProviderConfig.objects.all() |
| return Response([{ |
| "id": c.id, "name": c.name, "type": c.provider_type, |
| "active": c.is_active, "model": c.model, "role": c.role |
| } for c in configs]) |
| |
| for data in request.data.get("configs", []): |
| c, _ = TranscriptionProviderConfig.objects.get_or_create(id=data.get("id"), defaults={"name": data.get("name")}) |
| c.provider_type = data.get("type", c.provider_type) |
| c.api_key = data.get("api_key", c.api_key) |
| c.endpoint = data.get("endpoint", c.endpoint) |
| c.is_active = data.get("active", c.is_active) |
| c.model = data.get("model", c.model) |
| c.role = data.get("role", c.role) |
| c.save() |
|
|
| if c.is_active and c.provider_type in ["nemotron", "whisper_local"]: |
| model_to_dl = c.model or ("nvidia/nemotron-3.5-asr-streaming-0.6b" if c.provider_type == "nemotron" else "openai/whisper-base") |
| if model_to_dl: |
| threading.Thread(target=download_model_in_background, args=(model_to_dl,)).start() |
|
|
| return Response({"status": "ok"}) |
|
|
| @api_view(["POST"]) |
| @permission_classes([IsAdminRole]) |
| def test_transcription_benchmark(request): |
| """ |
| Recibe un audio de prueba y el tipo de proveedor (ej: nemotron, whisper_local) |
| Devuelve la transcripción y el tiempo de latencia/benchmark. |
| """ |
| audio_file = request.FILES.get("audio") |
| provider_type = request.data.get("provider_type") |
| |
| if not audio_file or not provider_type: |
| return Response({"detail": "Falta audio o provider_type"}, status=status.HTTP_400_BAD_REQUEST) |
|
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: |
| for chunk in audio_file.chunks(): |
| tmp.write(chunk) |
| tmp_path = tmp.name |
|
|
| try: |
| from .transcription_engine import transcribe_audio_file |
| text, time_ms, error = transcribe_audio_file(tmp_path, provider_type=provider_type) |
| if error: |
| return Response({"detail": f"Error del motor: {error}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
| |
| return Response({ |
| "text": text, |
| "time_ms": time_ms |
| }) |
| except Exception as e: |
| log.exception(f"Error benchmark {provider_type}") |
| return Response({"detail": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def settings_categories(request): |
| if request.method == "GET": |
| cats = EvaluationCategory.objects.all().order_by("order") |
| return Response([{"id": c.id, "name": c.name, "weight": c.weight, "color": c.color, "active": c.active} for c in cats]) |
| |
| |
| EvaluationCategory.objects.all().delete() |
| for idx, data in enumerate(request.data.get("categories", [])): |
| EvaluationCategory.objects.create( |
| name=data["name"], |
| weight=float(data.get("weight", 1.0)), |
| color=data.get("color", "#0F766E"), |
| active=data.get("active", True), |
| order=idx |
| ) |
| return Response({"status": "ok"}) |
|
|
| from .models import SystemPromptConfig |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def settings_prompt(request): |
| config, _ = SystemPromptConfig.objects.get_or_create(id=1) |
| if request.method == "GET": |
| return Response({"prompt_text": config.prompt_text}) |
| |
| config.prompt_text = request.data.get("prompt_text", config.prompt_text) |
| config.save() |
| return Response({"status": "ok", "prompt_text": config.prompt_text}) |
|
|
| from .models import Organization, Group |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def organizations_api(request): |
| if request.method == "GET": |
| orgs = Organization.objects.all() |
| data = [] |
| for o in orgs: |
| data.append({ |
| "id": str(o.id), |
| "type": o.type, |
| "career": o.career, |
| "groups": o.groups.count(), |
| "candidates": Applicant.objects.filter(organization=o).count(), |
| "approved": Applicant.objects.filter(organization=o, status='approved').count(), |
| "gradientIndex": o.gradient_index |
| }) |
| return Response(data) |
| |
| data = request.data |
| org = Organization.objects.create( |
| type=data.get("type", "Ingeniería"), |
| career=data.get("career", ""), |
| gradient_index=data.get("gradientIndex", 0) |
| ) |
| groups_count = data.get("groups", 0) |
| for i in range(groups_count): |
| Group.objects.create(name=f"Grupo {i+1}", organization=org) |
| |
| return Response({ |
| "id": str(org.id), |
| "type": org.type, |
| "career": org.career, |
| "groups": groups_count, |
| "candidates": 0, |
| "approved": 0, |
| "gradientIndex": org.gradient_index |
| }) |
|
|
| @api_view(["GET", "POST"]) |
| @permission_classes([IsAdminRole]) |
| def applicants_api(request): |
| if request.method == "GET": |
| apps = Applicant.objects.all().order_by('-id') |
| data = [] |
| for a in apps: |
| status_display = a.get_status_display() |
| |
| if a.status == 'registered': |
| latest_session = InterviewSession.objects.filter(applicant_id=a.id).order_by('-started_at').first() |
| if latest_session: |
| if latest_session.status == 'evaluated': |
| if latest_session.overall_score is not None and latest_session.overall_score >= 70: |
| status_display = 'Aprobado' |
| else: |
| status_display = 'Reprobado' |
| elif latest_session.status == 'error': |
| status_display = 'Error en Evaluación' |
| else: |
| status_display = 'En Proceso' |
|
|
| data.append({ |
| "id": str(a.id), |
| "name": f"{a.first_name} {a.last_name}", |
| "email": a.email, |
| "status": status_display, |
| "modality": a.modality, |
| "career": a.organization.career if a.organization else "N/A", |
| "photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random" |
| }) |
| return Response(data) |
| |
| data = request.data |
| names = data.get("name", "").split(" ", 1) |
| first_name = names[0] |
| last_name = names[1] if len(names) > 1 else "" |
| |
| org = Organization.objects.filter(career=data.get("carrera")).first() |
| |
| email = data.get("email", "").strip() |
| if not email: |
| return Response({"error": "El correo electrónico es un campo obligatorio."}, status=400) |
| |
| try: |
| from auth_api.models import User |
| if not User.objects.filter(username=email).exists(): |
| password = data.get("password", "Entrevista2026") |
| User.objects.create_user( |
| username=email, |
| email=email, |
| password=password, |
| role='user', |
| first_name=first_name, |
| last_name=last_name |
| ) |
|
|
| a = Applicant.objects.create( |
| first_name=first_name, |
| last_name=last_name, |
| email=email, |
| organization=org, |
| modality="Virtual", |
| status="registered" |
| ) |
| except Exception as e: |
| return Response({"error": "Ese correo ya está registrado u ocurrió un error: " + str(e)}, status=400) |
| |
| return Response({ |
| "id": str(a.id), |
| "name": f"{a.first_name} {a.last_name}", |
| "email": a.email, |
| "status": a.get_status_display(), |
| "modality": a.modality, |
| "career": a.organization.career if a.organization else "N/A", |
| "photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random" |
| }) |
|
|
| @api_view(["GET", "PUT", "DELETE"]) |
| @permission_classes([IsAdminRole]) |
| def applicant_detail_api(request, pk): |
| try: |
| app = Applicant.objects.get(pk=pk) |
| except Applicant.DoesNotExist: |
| return Response({"error": "Aspirante no encontrado"}, status=404) |
| |
| if request.method == "GET": |
| return Response({ |
| "id": app.id, |
| "name": f"{app.first_name} {app.last_name}", |
| "email": app.email, |
| "status": app.get_status_display(), |
| "date": app.created_at.strftime("%d %b %Y"), |
| "career": app.organization.career if app.organization else "Sin Carrera", |
| }) |
| |
| if request.method == "DELETE": |
| email = app.email |
| app.delete() |
| from auth_api.models import User |
| User.objects.filter(username=email).delete() |
| return Response({"success": True}) |
| |
| if request.method == "PUT": |
| data = request.data |
| if "name" in data: |
| names = data["name"].split(" ", 1) |
| app.first_name = names[0] |
| app.last_name = names[1] if len(names) > 1 else "" |
| |
| old_email = app.email |
| new_email = data.get("email", old_email).strip() |
| |
| from auth_api.models import User |
| if new_email != old_email and User.objects.filter(username=new_email).exists(): |
| return Response({"error": "Ese correo ya está en uso por otro usuario."}, status=400) |
| |
| if "carrera" in data: |
| org = Organization.objects.filter(career=data["carrera"]).first() |
| if org: |
| app.organization = org |
| |
| app.email = new_email |
| app.save() |
| |
| user = User.objects.filter(username=old_email).first() |
| if user: |
| user.username = new_email |
| user.email = new_email |
| user.first_name = app.first_name |
| user.last_name = app.last_name |
| if data.get("password"): |
| user.set_password(data["password"]) |
| user.save() |
| |
| return Response({ |
| "id": str(app.id), |
| "name": f"{app.first_name} {app.last_name}", |
| "email": app.email, |
| "status": app.get_status_display(), |
| "modality": app.modality, |
| "career": app.organization.career if app.organization else "N/A", |
| "photo": f"https://ui-avatars.com/api/?name={app.first_name}+{app.last_name}&background=random" |
| }) |
|
|