QuentinL52's picture
Upload 39 files
a8ee0db verified
raw
history blame
6.55 kB
import tempfile
from fastapi import FastAPI, UploadFile, File, HTTPException, Body
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from datetime import datetime
from pymongo import MongoClient
from bson.objectid import ObjectId
import uvicorn
import os
import logging
from celery.result import AsyncResult
from tasks.worker_celery import run_interview_analysis_task
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from src.cv_parsing_agents import CvParserAgent
from src.interview_simulator.entretient_version_prod import InterviewProcessor
from src.scoring_engine import ContextualScoringEngine
from src.rag_handler import RAGHandler
app = FastAPI(
title="API d'IA pour la RH",
description="Une API pour le parsing de CV et la simulation d'entretiens.",
version="1.2.0"
)
# Initialisation des services au démarrage
try:
logger.info("Initialisation du RAG Handler...")
rag_handler = RAGHandler()
if rag_handler.vector_store:
logger.info(f"Vector store chargé avec {rag_handler.vector_store.index.ntotal} vecteurs.")
else:
logger.warning("Le RAG Handler n'a pas pu être initialisé (pas de documents ?). Le feedback contextuel sera désactivé.")
except Exception as e:
logger.error(f"Erreur critique lors de l'initialisation du RAG Handler: {e}", exc_info=True)
rag_handler = None
# Configuration MongoDB
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
client = MongoClient(MONGO_URI)
db = client.hr_ai_system
feedback_collection = db.interview_feedbacks
class InterviewRequest(BaseModel):
user_id: str = Field(..., example="google_user_12345")
job_offer_id: str = Field(..., example="job_offer_abcde")
cv_document: Dict[str, Any] = Field(..., example={"candidat": {"nom": "John Doe", "compétences": {"hard_skills": ["Python", "FastAPI"]}}})
job_offer: Dict[str, Any] = Field(..., example={"poste": "Développeur Python", "description": "Recherche développeur expérimenté..."})
messages: List[Dict[str, Any]]
conversation_history: List[Dict[str, Any]]
class HealthCheck(BaseModel):
status: str = Field(default="ok", example="ok")
@app.get("/", tags=["Status"], summary="Vérification de l'état de l'API")
def read_root() -> HealthCheck:
"""Vérifie que l'API est en cours d'exécution."""
return HealthCheck(status="ok")
# --- Endpoint du parser de CV ---
@app.post("/parse-cv/", tags=["CV Parsing"], summary="Analyser un CV au format PDF avec scoring contextuel")
async def parse_cv_endpoint(file: UploadFile = File(...)):
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Le fichier doit être au format PDF.")
tmp_path = None
try:
contents = await file.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(contents)
tmp.flush()
tmp_path = tmp.name
logger.info(f"Début du parsing du CV temporaire : {tmp_path}")
cv_agent = CvParserAgent(pdf_path=tmp_path)
parsed_data = await run_in_threadpool(cv_agent.process)
if not parsed_data:
raise HTTPException(status_code=500, detail="Échec du parsing du CV.")
logger.info("Parsing du CV réussi. Lancement du scoring contextuel.")
scoring_engine = ContextualScoringEngine(parsed_data)
scored_skills_data = await run_in_threadpool(scoring_engine.calculate_scores)
if parsed_data.get("candidat"):
parsed_data["candidat"].update(scored_skills_data)
else:
parsed_data.update(scored_skills_data)
logger.info("Scoring terminé. Retour de la réponse complète.")
return parsed_data
except Exception as e:
logger.error(f"Erreur lors du parsing ou du scoring du CV : {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}")
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
logger.info(f"Fichier temporaire supprimé : {tmp_path}")
except Exception as cleanup_error:
logger.warning(f"Erreur lors de la suppression du fichier temporaire : {cleanup_error}")
# --- Endpoint de simulation d'entretien ---
@app.post("/simulate-interview/", tags=["Simulation d'Entretien"], summary="Gérer une conversation d'entretien")
async def simulate_interview_endpoint(request: InterviewRequest):
try:
processor = InterviewProcessor(
cv_document=request.cv_document,
job_offer=request.job_offer,
conversation_history=request.conversation_history
)
ai_response_object = await run_in_threadpool(processor.run, messages=request.messages)
# On retourne juste la réponse de l'assistant pour le chat
return {"response": ai_response_object["messages"][-1].content}
except Exception as e:
logger.error(f"Erreur interne dans /simulate-interview/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}")
# --- Endpoints pour l'analyse asynchrone ---
class AnalysisRequest(BaseModel):
conversation_history: List[Dict[str, Any]]
job_description_text: str
@app.post("/trigger-analysis/", tags=["Analyse Asynchrone"], status_code=202)
def trigger_analysis(request: AnalysisRequest):
"""
Déclenche l'analyse de l'entretien en tâche de fond.
Retourne immédiatement un ID de tâche.
"""
task = run_interview_analysis_task.delay(
request.conversation_history,
[request.job_description_text]
)
return {"task_id": task.id}
@app.get("/analysis-status/{task_id}", tags=["Analyse Asynchrone"])
def get_analysis_status(task_id: str):
"""
Vérifie le statut de la tâche d'analyse.
Si terminée, retourne le résultat.
"""
task_result = AsyncResult(task_id)
if task_result.ready():
if task_result.successful():
return {
"status": "SUCCESS",
"result": task_result.get()
}
else:
return {"status": "FAILURE", "error": str(task_result.info)}
else:
return {"status": "PENDING"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
## la bonne version de l'API est celle-ci, avec les imports et la structure de base.