import tempfile import requests import os import logging from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional os.environ['HOME'] = '/tmp' # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Imports avec gestion d'erreurs robuste try: from src.cv_parsing_agents import CvParserAgent, create_fallback_cv_data CV_PARSING_AVAILABLE = True logger.info("✅ CV Parsing disponible") except Exception as e: logger.error(f"❌ CV Parsing indisponible: {e}") CV_PARSING_AVAILABLE = False CvParserAgent = None create_fallback_cv_data = None try: from src.interview_simulator.entretient_version_prod import InterviewProcessor INTERVIEW_AVAILABLE = True logger.info("✅ Interview Simulator disponible") except Exception as e: logger.error(f"❌ Interview Simulator indisponible: {e}") INTERVIEW_AVAILABLE = False InterviewProcessor = None try: from src.scoring_engine import ContextualScoringEngine SCORING_AVAILABLE = True logger.info("✅ Scoring Engine disponible") except Exception as e: logger.error(f"❌ Scoring Engine indisponible: {e}") SCORING_AVAILABLE = False ContextualScoringEngine = None # Application FastAPI app = FastAPI( title="AIrh Interview Assistant", description="API pour l'analyse de CV et la simulation d'entretiens d'embauche", version="1.3.0", docs_url="/docs", redoc_url="/redoc" ) # Configuration CORS pour HF Spaces app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration API Celery CELERY_API_URL = os.getenv("CELERY_API_URL", "https://celery-7as1.onrender.com") # Modèles Pydantic class InterviewRequest(BaseModel): user_id: str = Field(..., example="user_12345") job_offer_id: str = Field(..., example="job_offer_abcde") cv_document: Dict[str, Any] job_offer: Dict[str, Any] messages: List[Dict[str, Any]] conversation_history: List[Dict[str, Any]] class AnalysisRequest(BaseModel): conversation_history: List[Dict[str, Any]] job_description_text: str candidate_id: Optional[str] = None class TaskResponse(BaseModel): task_id: str status: str result: Any = None message: Optional[str] = None class HealthCheck(BaseModel): status: str = "ok" celery_api_status: Optional[str] = None services: Dict[str, bool] = Field(default_factory=dict) message: str = "API AIrh fonctionnelle" # Endpoints @app.get("/", response_model=HealthCheck, tags=["Status"]) async def health_check(): """Health check de l'API avec test de connectivité Celery.""" # Test connexion Celery celery_status = "unknown" try: response = requests.get(f"{CELERY_API_URL}/", timeout=5) celery_status = "connected" if response.status_code == 200 else "error" except Exception: celery_status = "disconnected" services = { "cv_parsing": CV_PARSING_AVAILABLE, "interview_simulation": INTERVIEW_AVAILABLE, "scoring_engine": SCORING_AVAILABLE, "celery_api": celery_status == "connected" } return HealthCheck( celery_api_status=celery_status, services=services ) @app.post("/parse-cv/", tags=["CV Parsing"]) async def parse_cv(file: UploadFile = File(...)): """Analyse un CV PDF et extrait les informations structurées.""" if not CV_PARSING_AVAILABLE: # Fallback si le parsing n'est pas disponible return create_fallback_cv_data() if create_fallback_cv_data else { "error": "Service de parsing de CV temporairement indisponible", "candidat": { "informations_personnelles": {"nom": "Test User"}, "compétences": {"hard_skills": [], "soft_skills": []} } } if file.content_type != "application/pdf": raise HTTPException(status_code=400, detail="Fichier PDF requis") tmp_path = None try: # Sauvegarder le fichier temporairement contents = await file.read() with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: tmp.write(contents) tmp_path = tmp.name # Traiter le CV cv_agent = CvParserAgent(pdf_path=tmp_path) parsed_data = await run_in_threadpool(cv_agent.process) if not parsed_data and create_fallback_cv_data: parsed_data = create_fallback_cv_data(tmp_path) # Scoring si disponible if SCORING_AVAILABLE and ContextualScoringEngine and parsed_data: try: scoring_engine = ContextualScoringEngine(parsed_data) scored_data = await run_in_threadpool(scoring_engine.calculate_scores) if parsed_data.get("candidat"): parsed_data["candidat"].update(scored_data) except Exception as e: logger.warning(f"Scoring échoué: {e}") return parsed_data except Exception as e: logger.error(f"Erreur parsing CV: {e}") if create_fallback_cv_data: return create_fallback_cv_data(tmp_path) raise HTTPException(status_code=500, detail=str(e)) finally: if tmp_path and os.path.exists(tmp_path): try: os.remove(tmp_path) except Exception: pass @app.post("/simulate-interview/", tags=["Interview"]) async def simulate_interview(request: InterviewRequest): """Gère une conversation d'entretien d'embauche.""" if not INTERVIEW_AVAILABLE: raise HTTPException( status_code=503, detail="Service de simulation d'entretien indisponible" ) try: processor = InterviewProcessor( cv_document=request.cv_document, job_offer=request.job_offer, conversation_history=request.conversation_history ) result = await run_in_threadpool(processor.run, messages=request.messages) return {"response": result["messages"][-1].content} except Exception as e: logger.error(f"Erreur simulation entretien: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/trigger-analysis/", response_model=TaskResponse, status_code=202, tags=["Analysis"]) async def trigger_analysis(request: AnalysisRequest): """Déclenche une analyse asynchrone via l'API Celery.""" try: response = requests.post( f"{CELERY_API_URL}/trigger-analysis", json=request.dict(), headers={"Content-Type": "application/json"}, timeout=30 ) if response.status_code == 202: data = response.json() return TaskResponse( task_id=data["task_id"], status=data["status"], message="Analyse démarrée" ) else: raise HTTPException(status_code=503, detail="Service d'analyse indisponible") except requests.RequestException: raise HTTPException(status_code=503, detail="API Celery inaccessible") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/analysis-status/{task_id}", response_model=TaskResponse, tags=["Analysis"]) async def get_analysis_status(task_id: str): """Récupère le statut d'une analyse.""" try: response = requests.get(f"{CELERY_API_URL}/task-status/{task_id}", timeout=10) if response.status_code == 200: data = response.json() return TaskResponse( task_id=task_id, status=data["status"], result=data.get("result"), message=data.get("progress", "Statut récupéré") ) else: raise HTTPException(status_code=503, detail="Service d'analyse indisponible") except requests.RequestException: raise HTTPException(status_code=503, detail="API Celery inaccessible") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Endpoint de debug pour HF Spaces @app.get("/debug", tags=["Debug"]) async def debug_info(): """Informations de debug pour le déploiement.""" return { "environment": { "HF_HOME": os.getenv("HF_HOME"), "CELERY_API_URL": CELERY_API_URL, "PYTHONPATH": os.getenv("PYTHONPATH") }, "services": { "cv_parsing": CV_PARSING_AVAILABLE, "interview_simulation": INTERVIEW_AVAILABLE, "scoring_engine": SCORING_AVAILABLE }, "cache_dirs": { "/tmp/cache": os.path.exists("/tmp/cache"), "/app/cache": os.path.exists("/app/cache") } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)