import tempfile import requests import os import logging import json from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional os.environ['HOME'] = '/tmp' os.makedirs('/tmp/feedbacks', exist_ok=True) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: from src.deep_learning_analyzer import MultiModelInterviewAnalyzer from src.rag_handler import get_rag_handler from src.crew.crew_pool import run_interview_analysis analyzer_model = MultiModelInterviewAnalyzer() rag_handler_instance = get_rag_handler() MODELS_AVAILABLE = True logger.info("✅ Modèles d'analyse et RAG pré-chargés avec succès") except Exception as e: logger.error(f"❌ Erreur lors du pré-chargement des modèles: {e}") MODELS_AVAILABLE = False analyzer_model = None rag_handler_instance = None run_interview_analysis = None try: from src.cv_parsing_agents import CvParserAgent, create_fallback_cv_data CV_PARSING_AVAILABLE = True logger.info("✅ CV Parsing disponible") except Exception as e: logger.error(f"❌ CV Parsing indisponible: {e}") CV_PARSING_AVAILABLE = False CvParserAgent = None create_fallback_cv_data = None try: from src.interview_simulator.entretient_version_prod import InterviewProcessor INTERVIEW_AVAILABLE = True logger.info("✅ Interview Simulator disponible") except Exception as e: logger.error(f"❌ Interview Simulator indisponible: {e}") INTERVIEW_AVAILABLE = False InterviewProcessor = None try: from src.scoring_engine import ContextualScoringEngine SCORING_AVAILABLE = True logger.info("✅ Scoring Engine disponible") except Exception as e: logger.error(f"❌ Scoring Engine indisponible: {e}") SCORING_AVAILABLE = False ContextualScoringEngine = None app = FastAPI( title="AIrh Interview Assistant", description="API pour l'analyse de CV et la simulation d'entretiens d'embauche avec analyse asynchrone.", version="2.0.0", docs_url="/docs", redoc_url="/redoc" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class InterviewRequest(BaseModel): user_id: str = Field(..., example="user_12345") job_offer_id: str = Field(..., example="job_offer_abcde") cv_document: Dict[str, Any] job_offer: Dict[str, Any] messages: List[Dict[str, Any]] conversation_history: List[Dict[str, Any]] class Feedback(BaseModel): status: str feedback_data: Optional[Dict[str, Any]] = None class HealthCheck(BaseModel): status: str = "ok" services: Dict[str, bool] = Field(default_factory=dict) message: str = "API AIrh fonctionnelle" def analysis_in_background(user_id: str, conversation_history: list, job_description_text: str): """ Fonction exécutée en arrière-plan pour analyser l'entretien et sauvegarder le résultat. """ logger.info(f"Démarrage de l'analyse en arrière-plan pour l'utilisateur: {user_id}") try: if not MODELS_AVAILABLE: raise RuntimeError("Les modèles d'analyse ne sont pas disponibles.") report = run_interview_analysis( conversation_history, job_description_text, analyzer_model, rag_handler_instance ) feedback_path = f"/tmp/feedbacks/{user_id}.json" with open(feedback_path, "w", encoding="utf-8") as f: json.dump({"status": "completed", "feedback_data": report}, f, ensure_ascii=False, indent=4) logger.info(f"✅ Analyse terminée et sauvegardée pour l'utilisateur: {user_id}") except Exception as e: logger.error(f"❌ Erreur durant l'analyse en arrière-plan pour {user_id}: {e}") feedback_path = f"/tmp/feedbacks/{user_id}.json" with open(feedback_path, "w", encoding="utf-8") as f: json.dump({"status": "error", "feedback_data": str(e)}, f, ensure_ascii=False, indent=4) @app.get("/", response_model=HealthCheck, tags=["Status"]) async def health_check(): """Health check de l'API.""" services = { "models_loaded": MODELS_AVAILABLE, "cv_parsing": CV_PARSING_AVAILABLE, "interview_simulation": INTERVIEW_AVAILABLE, "scoring_engine": SCORING_AVAILABLE } return HealthCheck(services=services) @app.post("/parse-cv/", tags=["CV Parsing"]) async def parse_cv(file: UploadFile = File(...)): """Analyse un CV PDF et extrait les informations structurées.""" if not CV_PARSING_AVAILABLE: return create_fallback_cv_data() if create_fallback_cv_data else {"error": "Service de parsing indisponible"} if file.content_type != "application/pdf": raise HTTPException(status_code=400, detail="Fichier PDF requis") tmp_path = None try: contents = await file.read() with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: tmp.write(contents) tmp_path = tmp.name cv_agent = CvParserAgent(pdf_path=tmp_path) parsed_data = await run_in_threadpool(cv_agent.process) if not parsed_data and create_fallback_cv_data: parsed_data = create_fallback_cv_data(tmp_path) if SCORING_AVAILABLE and ContextualScoringEngine and parsed_data: try: scoring_engine = ContextualScoringEngine(parsed_data) scored_data = await run_in_threadpool(scoring_engine.calculate_scores) if parsed_data.get("candidat"): parsed_data["candidat"].update(scored_data) except Exception as e: logger.warning(f"Scoring échoué: {e}") return parsed_data except Exception as e: logger.error(f"Erreur parsing CV: {e}") if create_fallback_cv_data: return create_fallback_cv_data(tmp_path) raise HTTPException(status_code=500, detail=str(e)) finally: if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) @app.post("/simulate-interview/", tags=["Interview"]) async def simulate_interview(request: InterviewRequest, background_tasks: BackgroundTasks): """ Gère une conversation d'entretien. Si la conversation se termine, lance une analyse en arrière-plan. """ if not INTERVIEW_AVAILABLE or not MODELS_AVAILABLE: raise HTTPException(status_code=503, detail="Service de simulation ou modèles indisponibles") try: processor = InterviewProcessor( cv_document=request.cv_document, job_offer=request.job_offer, conversation_history=request.conversation_history ) result = await run_in_threadpool(processor.run, messages=request.messages) response_content = result["messages"][-1].content # Déclencher l'analyse si l'entretien est terminé if "nous allons maintenant passer a l'analyse" in response_content.lower(): logger.info(f"Fin d'entretien détectée pour {request.user_id}. Lancement de l'analyse en arrière-plan.") # Sauvegarder un statut initial feedback_path = f"/tmp/feedbacks/{request.user_id}.json" with open(feedback_path, "w", encoding="utf-8") as f: json.dump({"status": "processing"}, f, ensure_ascii=False, indent=4) job_description = request.job_offer.get('description', '') background_tasks.add_task( analysis_in_background, request.user_id, request.conversation_history + request.messages, job_description ) return {"response": response_content} except Exception as e: logger.error(f"Erreur simulation entretien: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get-feedback/{user_id}", response_model=Feedback, tags=["Analysis"]) async def get_feedback(user_id: str): """Récupère le résultat de l'analyse post-entretien.""" feedback_path = f"/tmp/feedbacks/{user_id}.json" if not os.path.exists(feedback_path): raise HTTPException(status_code=404, detail="Feedback non trouvé ou non encore traité.") try: with open(feedback_path, "r", encoding="utf-8") as f: data = json.load(f) return Feedback(**data) except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur à la lecture du feedback: {e}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)