QuentinL52's picture
Update main.py
f00b750 verified
raw
history blame
9.24 kB
import tempfile
import requests
import os
import logging
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.concurrency import run_in_threadpool
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
os.environ['HOME'] = '/tmp'
# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Imports avec gestion d'erreurs robuste
try:
from src.cv_parsing_agents import CvParserAgent, create_fallback_cv_data
CV_PARSING_AVAILABLE = True
logger.info("✅ CV Parsing disponible")
except Exception as e:
logger.error(f"❌ CV Parsing indisponible: {e}")
CV_PARSING_AVAILABLE = False
CvParserAgent = None
create_fallback_cv_data = None
try:
from src.interview_simulator.entretient_version_prod import InterviewProcessor
INTERVIEW_AVAILABLE = True
logger.info("✅ Interview Simulator disponible")
except Exception as e:
logger.error(f"❌ Interview Simulator indisponible: {e}")
INTERVIEW_AVAILABLE = False
InterviewProcessor = None
try:
from src.scoring_engine import ContextualScoringEngine
SCORING_AVAILABLE = True
logger.info("✅ Scoring Engine disponible")
except Exception as e:
logger.error(f"❌ Scoring Engine indisponible: {e}")
SCORING_AVAILABLE = False
ContextualScoringEngine = None
# Application FastAPI
app = FastAPI(
title="AIrh Interview Assistant",
description="API pour l'analyse de CV et la simulation d'entretiens d'embauche",
version="1.3.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Configuration CORS pour HF Spaces
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuration API Celery
CELERY_API_URL = os.getenv("CELERY_API_URL", "https://celery-7as1.onrender.com")
# Modèles Pydantic
class InterviewRequest(BaseModel):
user_id: str = Field(..., example="user_12345")
job_offer_id: str = Field(..., example="job_offer_abcde")
cv_document: Dict[str, Any]
job_offer: Dict[str, Any]
messages: List[Dict[str, Any]]
conversation_history: List[Dict[str, Any]]
class AnalysisRequest(BaseModel):
conversation_history: List[Dict[str, Any]]
job_description_text: str
candidate_id: Optional[str] = None
class TaskResponse(BaseModel):
task_id: str
status: str
result: Any = None
message: Optional[str] = None
class HealthCheck(BaseModel):
status: str = "ok"
celery_api_status: Optional[str] = None
services: Dict[str, bool] = Field(default_factory=dict)
message: str = "API AIrh fonctionnelle"
# Endpoints
@app.get("/", response_model=HealthCheck, tags=["Status"])
async def health_check():
"""Health check de l'API avec test de connectivité Celery."""
# Test connexion Celery
celery_status = "unknown"
try:
response = requests.get(f"{CELERY_API_URL}/", timeout=5)
celery_status = "connected" if response.status_code == 200 else "error"
except Exception:
celery_status = "disconnected"
services = {
"cv_parsing": CV_PARSING_AVAILABLE,
"interview_simulation": INTERVIEW_AVAILABLE,
"scoring_engine": SCORING_AVAILABLE,
"celery_api": celery_status == "connected"
}
return HealthCheck(
celery_api_status=celery_status,
services=services
)
@app.post("/parse-cv/", tags=["CV Parsing"])
async def parse_cv(file: UploadFile = File(...)):
"""Analyse un CV PDF et extrait les informations structurées."""
if not CV_PARSING_AVAILABLE:
# Fallback si le parsing n'est pas disponible
return create_fallback_cv_data() if create_fallback_cv_data else {
"error": "Service de parsing de CV temporairement indisponible",
"candidat": {
"informations_personnelles": {"nom": "Test User"},
"compétences": {"hard_skills": [], "soft_skills": []}
}
}
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Fichier PDF requis")
tmp_path = None
try:
# Sauvegarder le fichier temporairement
contents = await file.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(contents)
tmp_path = tmp.name
# Traiter le CV
cv_agent = CvParserAgent(pdf_path=tmp_path)
parsed_data = await run_in_threadpool(cv_agent.process)
if not parsed_data and create_fallback_cv_data:
parsed_data = create_fallback_cv_data(tmp_path)
# Scoring si disponible
if SCORING_AVAILABLE and ContextualScoringEngine and parsed_data:
try:
scoring_engine = ContextualScoringEngine(parsed_data)
scored_data = await run_in_threadpool(scoring_engine.calculate_scores)
if parsed_data.get("candidat"):
parsed_data["candidat"].update(scored_data)
except Exception as e:
logger.warning(f"Scoring échoué: {e}")
return parsed_data
except Exception as e:
logger.error(f"Erreur parsing CV: {e}")
if create_fallback_cv_data:
return create_fallback_cv_data(tmp_path)
raise HTTPException(status_code=500, detail=str(e))
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except Exception:
pass
@app.post("/simulate-interview/", tags=["Interview"])
async def simulate_interview(request: InterviewRequest):
"""Gère une conversation d'entretien d'embauche."""
if not INTERVIEW_AVAILABLE:
raise HTTPException(
status_code=503,
detail="Service de simulation d'entretien indisponible"
)
try:
processor = InterviewProcessor(
cv_document=request.cv_document,
job_offer=request.job_offer,
conversation_history=request.conversation_history
)
result = await run_in_threadpool(processor.run, messages=request.messages)
return {"response": result["messages"][-1].content}
except Exception as e:
logger.error(f"Erreur simulation entretien: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/trigger-analysis/", response_model=TaskResponse, status_code=202, tags=["Analysis"])
async def trigger_analysis(request: AnalysisRequest):
"""Déclenche une analyse asynchrone via l'API Celery."""
try:
response = requests.post(
f"{CELERY_API_URL}/trigger-analysis",
json=request.dict(),
headers={"Content-Type": "application/json"},
timeout=30
)
if response.status_code == 202:
data = response.json()
return TaskResponse(
task_id=data["task_id"],
status=data["status"],
message="Analyse démarrée"
)
else:
raise HTTPException(status_code=503, detail="Service d'analyse indisponible")
except requests.RequestException:
raise HTTPException(status_code=503, detail="API Celery inaccessible")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/analysis-status/{task_id}", response_model=TaskResponse, tags=["Analysis"])
async def get_analysis_status(task_id: str):
"""Récupère le statut d'une analyse."""
try:
response = requests.get(f"{CELERY_API_URL}/task-status/{task_id}", timeout=10)
if response.status_code == 200:
data = response.json()
return TaskResponse(
task_id=task_id,
status=data["status"],
result=data.get("result"),
message=data.get("progress", "Statut récupéré")
)
else:
raise HTTPException(status_code=503, detail="Service d'analyse indisponible")
except requests.RequestException:
raise HTTPException(status_code=503, detail="API Celery inaccessible")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Endpoint de debug pour HF Spaces
@app.get("/debug", tags=["Debug"])
async def debug_info():
"""Informations de debug pour le déploiement."""
return {
"environment": {
"HF_HOME": os.getenv("HF_HOME"),
"CELERY_API_URL": CELERY_API_URL,
"PYTHONPATH": os.getenv("PYTHONPATH")
},
"services": {
"cv_parsing": CV_PARSING_AVAILABLE,
"interview_simulation": INTERVIEW_AVAILABLE,
"scoring_engine": SCORING_AVAILABLE
},
"cache_dirs": {
"/tmp/cache": os.path.exists("/tmp/cache"),
"/app/cache": os.path.exists("/app/cache")
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)