""" Interview app — DRF API views. Permission strategy: - Public (AllowAny): endpoints consumed by the student interview frontend. - Admin (IsAdminRole): endpoints consumed exclusively by the admin Dashboard. - Authenticated (IsAuthenticated): me_api — requires any valid JWT (student or admin). """ import logging import os import tempfile import threading from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView from django.db.models import Count, Q from django.db.models.functions import TruncMonth, ExtractDay from datetime import datetime from .models import Applicant, TranscriptionProviderConfig, LLMProviderConfig, SystemPromptConfig from .engine import InterviewEngine from .permissions import IsAdminRole log = logging.getLogger("interview.views") from .transcription_engine import transcribe_audio_file @api_view(["GET"]) @permission_classes([AllowAny]) def health(request): """Health check endpoint.""" engine = InterviewEngine.get_instance() from .transcription_engine import _whisper_model whisper_status = "Operacional" if _whisper_model is not None else "En Espera" services = [ { "label": "Motor de Evaluaciones", "status": "Operacional" if engine.model is not None and engine.sentiment_classifier is not None else "Error", "ok": engine.model is not None and engine.sentiment_classifier is not None }, { "label": "Base Vectorial (Qdrant)", "status": "Operacional" if engine.qdrant_ok else "Desconectado", "ok": engine.qdrant_ok }, { "label": "NLP de Voz (Whisper)", "status": whisper_status, "ok": True } ] return Response({ "status": "ok", "services": services }) @api_view(["GET"]) @permission_classes([AllowAny]) def interview_start(request): """Return interview config to bootstrap the interview UI.""" engine = InterviewEngine.get_instance() return Response({ "total_questions": engine.get_total_questions(), "bienvenida_url": "videos/final_bienvenida.mp4", "despedida_url": "videos/final_despedida.mp4", "loop_url": "videos/loop_pingpong.mp4", }) @api_view(["GET"]) @permission_classes([AllowAny]) def interview_question(request, index: int): """Return a specific question by its 0-based index.""" engine = InterviewEngine.get_instance() q = engine.get_question(index) if q is None: return Response( {"detail": "No more questions"}, status=status.HTTP_404_NOT_FOUND, ) return Response(q) @api_view(["POST"]) @permission_classes([AllowAny]) def query_rag(request): """ Accept user answer text, classify sentiment, search for the best feedback phrase via Qdrant (or local fallback), and return it. """ text = request.data.get("text", "").strip() last_feedback_id = request.data.get("last_feedback_id") if not text: return Response( {"detail": "Text required"}, status=status.HTTP_400_BAD_REQUEST, ) engine = InterviewEngine.get_instance() result = engine.query_feedback(text, last_feedback_id=last_feedback_id) return Response(result) @api_view(["POST"]) @permission_classes([AllowAny]) def transcribe_audio(request): """ Accept an audio file upload, transcribe it with the currently active model, and return the transcribed text. """ audio_file = request.FILES.get("audio") if not audio_file: return Response( {"detail": "No audio file"}, status=status.HTTP_400_BAD_REQUEST, ) # Write uploaded audio to a temporary file with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as tmp: for chunk in audio_file.chunks(): tmp.write(chunk) tmp_path = tmp.name try: # Por defecto usar el activo o fallback a whisper_local provider_config = TranscriptionProviderConfig.objects.filter(is_active=True).first() provider = provider_config.provider_type if provider_config else "whisper_local" text, _, error = transcribe_audio_file(tmp_path, provider_type=provider) if error: raise RuntimeError(error) return Response({"text": text}) except Exception as exc: log.exception("Transcription failed") return Response( {"detail": f"Transcription error: {exc}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) class DashboardStatsAPIView(APIView): permission_classes = [IsAdminRole] def get(self, request, *args, **kwargs): year = request.query_params.get('year') month = request.query_params.get('month') qs = Applicant.objects.all() if year: try: qs = qs.filter(created_at__year=int(year)) except ValueError: pass if month: try: qs = qs.filter(created_at__month=int(month)) except ValueError: pass total_applicants = qs.count() approved = qs.filter(status='approved').count() pending = qs.filter(status='registered').count() rejected = qs.filter(status='rejected').count() admission_rate = 0 if total_applicants > 0: admission_rate = round((approved / total_applicants) * 100, 2) # For the chart # If no year is passed, default to current year chart_year = int(year) if year else datetime.now().year chart_month = int(month) if month else datetime.now().month chart_qs = Applicant.objects.filter(created_at__year=chart_year) # Group by month monthly_stats = chart_qs.annotate(month=TruncMonth('created_at')).values('month').annotate( total=Count('id'), approved=Count('id', filter=Q(status='approved')) ).order_by('month') # Format into standard 12 months array months_es = ['Ene', 'Feb', 'Mar', 'Abr', 'May', 'Jun', 'Jul', 'Ago', 'Sep', 'Oct', 'Nov', 'Dic'] chart_data = [] # Build dictionary for quick lookup stats_dict = {item['month'].month: item for item in monthly_stats if item['month']} for i, month_name in enumerate(months_es, 1): stat = stats_dict.get(i) rate = 0 if stat and stat['total'] > 0: rate = round((stat['approved'] / stat['total']) * 100, 2) # if month param is passed, we can highlight it active = False if month and int(month) == i: active = True chart_data.append({ 'name': month_name, 'value': rate, 'active': active }) # Dynamically get available years from the DB to populate the Year Dropdown # Start from the earliest record or current year earliest_record = Applicant.objects.order_by('created_at').first() start_year = earliest_record.created_at.year if earliest_record else datetime.now().year current_year = datetime.now().year available_years = list(range(start_year, current_year + 1)) # Calendar data for the selected month and year approved_in_month = Applicant.objects.filter( created_at__year=chart_year, created_at__month=chart_month, status='approved' ).annotate(day=ExtractDay('created_at')).values('day').annotate(count=Count('id')) calendar_data = {item['day']: item['count'] for item in approved_in_month} return Response({ 'total_applicants': total_applicants, 'approved_pending': approved, 'pending': pending, 'rejected': rejected, 'admission_rate': admission_rate, 'chart_data': chart_data, 'available_years': available_years, 'calendar_data': calendar_data }) from .models import Interview @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def interviews_api(request): if request.method == "GET": year = request.query_params.get('year') month = request.query_params.get('month') day = request.query_params.get('day') qs = Interview.objects.all() if year: qs = qs.filter(date__year=int(year)) if month: qs = qs.filter(date__month=int(month)) if day: qs = qs.filter(date__day=int(day)) data = [] for i in qs: data.append({ "id": i.id, "title": i.title, "date": str(i.date), "timeIndex": i.time_index, "durationMinutes": i.duration_minutes, "active": i.active, "users": [app.first_name[0].upper() for app in i.applicants.all()] }) return Response(data) data = request.data title = data.get("title") or "Entrevista Técnica" date_str = data.get("date") time_index = int(data.get("timeIndex", 0)) duration_minutes = int(data.get("durationMinutes", 60)) applicant_ids = data.get("applicants", []) iv = Interview.objects.create( title=title, date=date_str, time_index=time_index, duration_minutes=duration_minutes, active=True ) for app_id in applicant_ids: app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0" try: app = Applicant.objects.get(id=int(app_id_clean)) iv.applicants.add(app) except Applicant.DoesNotExist: pass iv.save() return Response({"id": iv.id, "title": iv.title, "date": str(iv.date), "timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes, "active": iv.active, "users": [app.first_name[0].upper() for app in iv.applicants.all()]}) @api_view(["PATCH", "DELETE"]) @permission_classes([IsAdminRole]) def interview_detail_api(request, pk): try: iv = Interview.objects.get(pk=pk) except Interview.DoesNotExist: return Response({"error": "Not found"}, status=status.HTTP_404_NOT_FOUND) if request.method == "DELETE": iv.delete() return Response(status=status.HTTP_204_NO_CONTENT) if request.method == "PATCH": data = request.data if "title" in data: iv.title = data["title"] if "date" in data: iv.date = data["date"] if "timeIndex" in data: iv.time_index = int(data["timeIndex"]) if "durationMinutes" in data: iv.duration_minutes = int(data["durationMinutes"]) iv.save() if "applicants" in data: iv.applicants.clear() for app_id in data["applicants"]: app_id_clean = str(app_id).replace("EST-", "").lstrip('0') or "0" try: app = Applicant.objects.get(id=int(app_id_clean)) iv.applicants.add(app) except Applicant.DoesNotExist: pass return Response({ "id": iv.id, "title": iv.title, "date": str(iv.date), "timeIndex": iv.time_index, "durationMinutes": iv.duration_minutes, "active": iv.active, "users": [app.first_name[0].upper() for app in iv.applicants.all()] }) # ponytail: ultra minimal me endpoint from rest_framework.permissions import IsAuthenticated from rest_framework.decorators import permission_classes from .models import Applicant from django.utils import timezone from django.views.decorators.csrf import ensure_csrf_cookie @api_view(["GET"]) @ensure_csrf_cookie @permission_classes([IsAuthenticated]) def me_api(request): user = request.user app = Applicant.objects.filter(email__iexact=user.email).first() is_test_admin = user.email in ["admin@itca.edu.sv", "pruebas@itca.edu.sv"] # Check if Test Admin Mode is active globally from .models import TranscriptionProviderConfig, InterviewSession test_mode_active = TranscriptionProviderConfig.objects.filter(name="TEST_ADMIN_MODE", is_active=True).exists() is_spoofed = test_mode_active or is_test_admin if is_spoofed: # Fetch the most recent interview regardless of date to allow testing iv = Interview.objects.filter(applicants=app).order_by('-date', '-time_index').first() if app else None if is_test_admin and not iv: # Create a dummy interview response so admin always bypasses return Response({ "name": "Simulación Admin", "testMode": test_mode_active, "sessionStatus": None, "interview": { "title": "Entrevista Nuevo Ingreso", "date": str(timezone.now().date()), "timeIndex": 0, "durationMinutes": 1440, "id": 9999 } }) else: # Normal flow: fetch today's or future interviews iv = Interview.objects.filter(applicants=app, date__gte=timezone.now().date()).order_by('date', 'time_index').first() if app else None session_status = None if app and iv: session = InterviewSession.objects.filter(applicant=app, interview=iv).order_by('-started_at').first() if session: session_status = session.status if is_spoofed: session_status = None name_to_show = "Estudiante" if app and app.first_name and app.first_name.strip(): name_to_show = f"{app.first_name} {app.last_name}".strip() elif user.first_name and user.first_name.strip(): name_to_show = f"{user.first_name} {user.last_name}".strip() return Response({ "name": name_to_show, "testMode": is_spoofed, "sessionStatus": session_status, "interview": { "title": iv.title, # Spoof date and times so the frontend bypasses its strict checks "date": str(timezone.now().date()) if is_spoofed else str(iv.date), "timeIndex": 0 if is_spoofed else iv.time_index, "durationMinutes": 1440 if is_spoofed else iv.duration_minutes, "id": iv.id } if iv else None }) @api_view(["POST", "GET"]) @permission_classes([IsAdminRole]) def test_mode_toggle_api(request): from .models import TranscriptionProviderConfig from auth_api.models import User from .models import Applicant config, created = TranscriptionProviderConfig.objects.get_or_create( name="TEST_ADMIN_MODE", defaults={ "provider_type": "system", "is_active": False, "endpoint": "", "api_key": "" } ) if request.method == "POST": is_active = request.data.get("is_active", False) config.is_active = is_active config.save() email = "admin@itca.edu.sv" if is_active: if not User.objects.filter(username=email).exists(): User.objects.create_user(username=email, email=email, password='admin', role='user', first_name='Admin', last_name='Simulación') if not Applicant.objects.filter(email=email).exists(): Applicant.objects.create(email=email, first_name='Admin', last_name='Simulación', status='approved') else: User.objects.filter(username=email).delete() Applicant.objects.filter(email=email).delete() return Response({"is_active": config.is_active}) # --- RAG EVALUATION ENDPOINTS (Ponytail Ultra) --- from .models import InterviewSession, QuestionResponse, EvaluationResult, EvaluationCategory from .rag_evaluation import evaluate_session from django.core.files.storage import default_storage @api_view(["POST"]) @permission_classes([AllowAny]) def session_start(request): app_id = request.data.get("applicant_id") iv_id = request.data.get("interview_id") if not app_id or not iv_id: if request.user.is_authenticated: app = Applicant.objects.filter(email__iexact=request.user.email).first() if app: app_id = app.id iv = Interview.objects.filter(applicants=app).order_by('-date').first() if not iv and app.email == "admin@itca.edu.sv": iv = Interview.objects.first() if iv: iv_id = iv.id if not app_id or not iv_id: return Response({"error": "Faltan IDs o no hay entrevistas disponibles"}, status=400) session = InterviewSession.objects.create(applicant_id=app_id, interview_id=iv_id) return Response({"session_id": session.id}) @api_view(["POST"]) @permission_classes([AllowAny]) def session_answer(request, session_id): session = InterviewSession.objects.get(id=session_id) idx = request.data.get("index", 0) text = request.data.get("text", "") transcription = request.data.get("transcription", "") # Si viene un archivo de audio, guardarlo (TODO: o pasarlo a Whisper) audio_url = "" if "audio" in request.FILES: f = request.FILES["audio"] path = default_storage.save(f"answers/session_{session_id}_{idx}.webm", f) audio_url = default_storage.url(path) QuestionResponse.objects.create( session=session, question_index=idx, question_text=text, audio_url=audio_url, transcription=transcription ) return Response({"status": "ok"}) @api_view(["POST"]) @permission_classes([AllowAny]) def session_recording(request, session_id): session = InterviewSession.objects.get(id=session_id) if "video" in request.FILES: f = request.FILES["video"] path = default_storage.save(f"recordings/session_{session_id}.webm", f) session.recording_url = default_storage.url(path) session.save() return Response({"status": "ok"}) @api_view(["POST"]) @permission_classes([AllowAny]) def session_end(request, session_id): session = InterviewSession.objects.get(id=session_id) session.ended_at = timezone.now() session.status = 'completed' session.save() # ponytail: Disparar evaluación (idealmente en tarea Celery, pero aquí síncrono para simplicidad ultra) import threading t = threading.Thread(target=evaluate_session, args=(session.id,)) t.start() return Response({"status": "evaluating"}) @api_view(["POST"]) def session_reprocess(request, session_id): try: session = InterviewSession.objects.get(id=session_id) EvaluationResult.objects.filter(session=session).delete() session.overall_score = None session.status = 'in_progress' session.save() # ponytail: Disparar evaluación (reprocesa audios si existen) import threading t = threading.Thread(target=evaluate_session, args=(session.id,)) t.start() return Response({"status": "evaluating"}) except Exception as e: return Response({"error": str(e)}, status=500) @api_view(["POST"]) @permission_classes([IsAdminRole]) def applicant_approve(request, applicant_id): try: app = Applicant.objects.get(id=applicant_id) app.status = 'approved' app.save() session = InterviewSession.objects.filter(applicant_id=app.id).order_by('-started_at').first() if session: session.status = 'evaluated' session.overall_score = 100 session.save() return Response({"status": "ok"}) except Exception as e: return Response({"error": str(e)}, status=500) @api_view(["GET"]) @permission_classes([AllowAny]) def session_evaluation(request, session_id): session = InterviewSession.objects.get(id=session_id) results = EvaluationResult.objects.filter(session=session) data = { "status": session.status, "overall_score": session.overall_score, "traits": session.dynamic_traits, "interviewer_notes": session.interviewer_notes, "hr_recommendation": session.hr_recommendation, "recording_url": session.recording_url, "categories": [ { "name": r.category.name, "score": r.score, "feedback": r.feedback, "color": r.category.color } for r in results ] } return Response(data) @api_view(["GET"]) @permission_classes([AllowAny]) def session_transcript(request, session_id): session = InterviewSession.objects.get(id=session_id) responses = QuestionResponse.objects.filter(session=session).order_by('question_index') data = [] # Mock timestamps for demo purposes if duration_seconds is missing current_time = 0 for r in responses: duration = r.duration_seconds if r.duration_seconds > 0 else 45 data.append({ "id": r.id, "speaker": "Interviewer", "text": r.question_text, "start_time": current_time, "duration": 5, "audio_url": None }) current_time += 5 data.append({ "id": f"{r.id}_ans", "speaker": "Candidate", "text": r.transcription or "(Silencio / Sin respuesta transcrita)", "start_time": current_time, "duration": duration, "audio_url": r.audio_url if r.audio_url else None }) current_time += duration return Response(data) @api_view(["GET"]) @permission_classes([IsAdminRole]) def applicant_sessions(request, applicant_id): sessions = InterviewSession.objects.filter(applicant_id=applicant_id).order_by('-started_at') data = [] for s in sessions: data.append({ "id": s.id, "date": s.started_at, "status": s.status, "overall_score": s.overall_score, "recording_url": s.recording_url }) return Response(data) import requests def ping_llm_provider(provider_type, api_key, model, endpoint=""): try: if api_key: api_key = api_key.strip() if provider_type == "openai": base_url = endpoint.strip() if endpoint else "https://api.openai.com/v1" if base_url.endswith("/"): base_url = base_url[:-1] ping_url = f"{base_url}/models" if not base_url.endswith("/models") else base_url headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} res = requests.get(ping_url, headers=headers, timeout=5) if res.status_code != 200: return False, f"OpenAI error (Code {res.status_code}): Verifica tu API Key o el Endpoint." return True, "ok" elif provider_type == "anthropic": if not api_key.startswith("sk-ant"): return False, "La API Key de Anthropic no parece válida (debe empezar con sk-ant)." return True, "ok" elif provider_type == "ollama": endpoint = api_key if api_key else "http://127.0.0.1:11434" if endpoint.endswith('/'): endpoint = endpoint[:-1] res = requests.get(f"{endpoint}/api/tags", timeout=5) if res.status_code != 200: return False, f"Ollama error: No se pudo conectar al endpoint local." # Verificar si el modelo está descargado data = res.json() models_list = [m.get("name", "") for m in data.get("models", [])] if model and not any(model in name for name in models_list): return False, f"Ollama está conectado, pero el modelo '{model}' no está descargado." return True, "ok" elif provider_type == "talentai": if not endpoint: return False, "El endpoint del Space es requerido (ej: https://tu-space.hf.space/v1)." ep = endpoint.strip().rstrip("/") # llama-cpp-python expone /v1/models compatible con OpenAI ping_url = f"{ep}/models" headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} try: res = requests.get(ping_url, headers=headers, timeout=10) if res.status_code != 200: return False, f"TalentAI error (Code {res.status_code}): No se pudo conectar al Space. Verifica que el Space esté activo y la URL sea correcta." except requests.exceptions.ConnectionError: return False, "TalentAI: No se pudo conectar al Space. Verifica que esté activo." except requests.exceptions.Timeout: return False, "TalentAI: Timeout al conectar con el Space. Puede estar iniciando (cold start)." return True, "ok" elif provider_type == "nvidia": if not api_key: return False, "La API Key de NVIDIA es requerida." payload = { "model": model or "nvidia/nemotron-4-340b-instruct", "messages": [{"role": "user", "content": "test"}], "max_tokens": 5 } res = requests.post( "https://integrate.api.nvidia.com/v1/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json=payload, timeout=10 ) if res.status_code != 200: err = res.text try: err = res.json() except: pass return False, f"NVIDIA API error (Code {res.status_code}): {err}" return True, "ok" except Exception as e: return False, f"Fallo de conexión: {str(e)}" return True, "ok" @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def settings_llm(request): if request.method == "GET": configs = LLMProviderConfig.objects.all() return Response([{"id": c.id, "name": c.name, "type": c.provider_type, "model": c.model, "api_key": c.api_key, "endpoint": c.endpoint, "active": c.is_active} for c in configs]) # POST to update for data in request.data.get("configs", []): if data.get("active", False): provider_type = data.get("type", "") api_key = (data.get("api_key", "") or "").strip() endpoint = (data.get("endpoint", "") or "").strip() # Skip validation for providers that don't have required credentials yet. # They'll fail at runtime, but shouldn't block saving other providers. requires_key = provider_type in ("openai", "anthropic", "nvidia") if requires_key and not api_key: continue # For ollama/talentai: validate if they have endpoint/key configured is_valid, error_msg = ping_llm_provider(provider_type, api_key, data.get("model", ""), endpoint) if not is_valid: return Response({"status": "error", "message": error_msg}, status=400) c, _ = LLMProviderConfig.objects.get_or_create( id=data.get("id"), defaults={ "name": data.get("name"), "provider_type": data.get("type", "") } ) api_key = data.get("api_key", c.api_key) if isinstance(api_key, str): api_key = api_key.strip() endpoint = data.get("endpoint", c.endpoint) if isinstance(endpoint, str): endpoint = endpoint.strip() c.api_key = api_key c.endpoint = endpoint c.model = data.get("model", c.model) c.is_active = data.get("active", c.is_active) c.save() return Response({"status": "ok"}) @api_view(["POST"]) @permission_classes([IsAdminRole]) def settings_llm_test(request): provider_type = request.data.get("type", "") api_key = request.data.get("api_key", "") endpoint = request.data.get("endpoint", "") model = request.data.get("model", "") if not model and provider_type == 'talentai': model = "qwen3-vl-4b-instruct.Q4_K_M.gguf" prompt = request.data.get("prompt", "Hola, ¿cómo estás?") messages = [{"role": "user", "content": prompt}] import litellm try: if provider_type == 'ollama': ep = api_key if api_key else "http://127.0.0.1:11434" response = litellm.completion( model=f"ollama/{model}", messages=messages, api_base=ep ) elif provider_type == 'talentai': ep = endpoint.strip().rstrip("/") if endpoint else "" if not ep: return Response({"status": "error", "message": "El endpoint del Space de TalentAI es requerido."}, status=400) response = litellm.completion( model=f"openai/{model}", messages=messages, api_base=ep, api_key=api_key if api_key else "no-key-needed", custom_llm_provider="openai" ) elif provider_type == 'openai': response = litellm.completion( model=model, messages=messages, api_key=api_key ) elif provider_type == 'anthropic': response = litellm.completion( model=model, messages=messages, api_key=api_key ) elif provider_type == 'nvidia': import openai client = openai.OpenAI( base_url="https://integrate.api.nvidia.com/v1", api_key=api_key ) response = client.chat.completions.create( model=model, messages=messages, max_tokens=200 ) return Response({"response": response.choices[0].message.content}) else: response = litellm.completion( model=model, messages=messages, api_key=api_key, api_base=endpoint if endpoint else None ) return Response({"response": response.choices[0].message.content}) except Exception as e: return Response({"status": "error", "message": str(e)}, status=400) from .models import TranscriptionProviderConfig import threading import logging logger = logging.getLogger(__name__) def download_model_in_background(model_id): try: from huggingface_hub import snapshot_download logger.warning(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====") print(f"===== INICIANDO DESCARGA EN SEGUNDO PLANO DE: {model_id} =====", flush=True) snapshot_download(repo_id=model_id) logger.warning(f"===== DESCARGA EXITOSA: {model_id} =====") print(f"===== DESCARGA EXITOSA: {model_id} =====", flush=True) except Exception as e: logger.error(f"===== ERROR DESCARGANDO {model_id}: {e} =====") print(f"===== ERROR DESCARGANDO {model_id}: {e} =====", flush=True) @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def settings_transcription(request): if request.method == "GET": configs = TranscriptionProviderConfig.objects.all() return Response([{ "id": c.id, "name": c.name, "type": c.provider_type, "active": c.is_active, "model": c.model, "role": c.role } for c in configs]) for data in request.data.get("configs", []): c, _ = TranscriptionProviderConfig.objects.get_or_create(id=data.get("id"), defaults={"name": data.get("name")}) c.provider_type = data.get("type", c.provider_type) c.api_key = data.get("api_key", c.api_key) c.endpoint = data.get("endpoint", c.endpoint) c.is_active = data.get("active", c.is_active) c.model = data.get("model", c.model) c.role = data.get("role", c.role) c.save() if c.is_active and c.provider_type in ["nemotron", "whisper_local"]: model_to_dl = c.model or ("nvidia/nemotron-3.5-asr-streaming-0.6b" if c.provider_type == "nemotron" else "openai/whisper-base") if model_to_dl: threading.Thread(target=download_model_in_background, args=(model_to_dl,)).start() return Response({"status": "ok"}) @api_view(["POST"]) @permission_classes([IsAdminRole]) def test_transcription_benchmark(request): """ Recibe un audio de prueba y el tipo de proveedor (ej: nemotron, whisper_local) Devuelve la transcripción y el tiempo de latencia/benchmark. """ audio_file = request.FILES.get("audio") provider_type = request.data.get("provider_type") if not audio_file or not provider_type: return Response({"detail": "Falta audio o provider_type"}, status=status.HTTP_400_BAD_REQUEST) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: for chunk in audio_file.chunks(): tmp.write(chunk) tmp_path = tmp.name try: from .transcription_engine import transcribe_audio_file text, time_ms, error = transcribe_audio_file(tmp_path, provider_type=provider_type) if error: return Response({"detail": f"Error del motor: {error}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({ "text": text, "time_ms": time_ms }) except Exception as e: log.exception(f"Error benchmark {provider_type}") return Response({"detail": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def settings_categories(request): if request.method == "GET": cats = EvaluationCategory.objects.all().order_by("order") return Response([{"id": c.id, "name": c.name, "weight": c.weight, "color": c.color, "active": c.active} for c in cats]) # POST to update all EvaluationCategory.objects.all().delete() # ponytail: recreate all for simplicity for idx, data in enumerate(request.data.get("categories", [])): EvaluationCategory.objects.create( name=data["name"], weight=float(data.get("weight", 1.0)), color=data.get("color", "#0F766E"), active=data.get("active", True), order=idx ) return Response({"status": "ok"}) from .models import SystemPromptConfig @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def settings_prompt(request): config, _ = SystemPromptConfig.objects.get_or_create(id=1) if request.method == "GET": return Response({"prompt_text": config.prompt_text}) config.prompt_text = request.data.get("prompt_text", config.prompt_text) config.save() return Response({"status": "ok", "prompt_text": config.prompt_text}) from .models import Organization, Group @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def organizations_api(request): if request.method == "GET": orgs = Organization.objects.all() data = [] for o in orgs: data.append({ "id": str(o.id), "type": o.type, "career": o.career, "groups": o.groups.count(), "candidates": Applicant.objects.filter(organization=o).count(), "approved": Applicant.objects.filter(organization=o, status='approved').count(), "gradientIndex": o.gradient_index }) return Response(data) data = request.data org = Organization.objects.create( type=data.get("type", "Ingeniería"), career=data.get("career", ""), gradient_index=data.get("gradientIndex", 0) ) groups_count = data.get("groups", 0) for i in range(groups_count): Group.objects.create(name=f"Grupo {i+1}", organization=org) return Response({ "id": str(org.id), "type": org.type, "career": org.career, "groups": groups_count, "candidates": 0, "approved": 0, "gradientIndex": org.gradient_index }) @api_view(["GET", "POST"]) @permission_classes([IsAdminRole]) def applicants_api(request): if request.method == "GET": apps = Applicant.objects.all().order_by('-id') data = [] for a in apps: status_display = a.get_status_display() # Si sigue registrado, verificar si ya hizo alguna sesión de entrevista if a.status == 'registered': latest_session = InterviewSession.objects.filter(applicant_id=a.id).order_by('-started_at').first() if latest_session: if latest_session.status == 'evaluated': if latest_session.overall_score is not None and latest_session.overall_score >= 70: status_display = 'Aprobado' else: status_display = 'Reprobado' elif latest_session.status == 'error': status_display = 'Error en Evaluación' else: status_display = 'En Proceso' data.append({ "id": str(a.id), "name": f"{a.first_name} {a.last_name}", "email": a.email, "status": status_display, "modality": a.modality, "career": a.organization.career if a.organization else "N/A", "photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random" }) return Response(data) data = request.data names = data.get("name", "").split(" ", 1) first_name = names[0] last_name = names[1] if len(names) > 1 else "" org = Organization.objects.filter(career=data.get("carrera")).first() email = data.get("email", "").strip() if not email: return Response({"error": "El correo electrónico es un campo obligatorio."}, status=400) try: from auth_api.models import User if not User.objects.filter(username=email).exists(): password = data.get("password", "Entrevista2026") User.objects.create_user( username=email, email=email, password=password, role='user', first_name=first_name, last_name=last_name ) a = Applicant.objects.create( first_name=first_name, last_name=last_name, email=email, organization=org, modality="Virtual", status="registered" ) except Exception as e: return Response({"error": "Ese correo ya está registrado u ocurrió un error: " + str(e)}, status=400) return Response({ "id": str(a.id), "name": f"{a.first_name} {a.last_name}", "email": a.email, "status": a.get_status_display(), "modality": a.modality, "career": a.organization.career if a.organization else "N/A", "photo": f"https://ui-avatars.com/api/?name={a.first_name}+{a.last_name}&background=random" }) @api_view(["GET", "PUT", "DELETE"]) @permission_classes([IsAdminRole]) def applicant_detail_api(request, pk): try: app = Applicant.objects.get(pk=pk) except Applicant.DoesNotExist: return Response({"error": "Aspirante no encontrado"}, status=404) if request.method == "GET": return Response({ "id": app.id, "name": f"{app.first_name} {app.last_name}", "email": app.email, "status": app.get_status_display(), "date": app.created_at.strftime("%d %b %Y"), "career": app.organization.career if app.organization else "Sin Carrera", }) if request.method == "DELETE": email = app.email app.delete() from auth_api.models import User User.objects.filter(username=email).delete() return Response({"success": True}) if request.method == "PUT": data = request.data if "name" in data: names = data["name"].split(" ", 1) app.first_name = names[0] app.last_name = names[1] if len(names) > 1 else "" old_email = app.email new_email = data.get("email", old_email).strip() from auth_api.models import User if new_email != old_email and User.objects.filter(username=new_email).exists(): return Response({"error": "Ese correo ya está en uso por otro usuario."}, status=400) if "carrera" in data: org = Organization.objects.filter(career=data["carrera"]).first() if org: app.organization = org app.email = new_email app.save() user = User.objects.filter(username=old_email).first() if user: user.username = new_email user.email = new_email user.first_name = app.first_name user.last_name = app.last_name if data.get("password"): user.set_password(data["password"]) user.save() return Response({ "id": str(app.id), "name": f"{app.first_name} {app.last_name}", "email": app.email, "status": app.get_status_display(), "modality": app.modality, "career": app.organization.career if app.organization else "N/A", "photo": f"https://ui-avatars.com/api/?name={app.first_name}+{app.last_name}&background=random" })