import os import json from celery import Celery from crewai import Crew, Process from dotenv import load_dotenv # Charger les variables d'environnement depuis un fichier .env load_dotenv() # --- Import des modules de votre application --- from src.deep_learning_analyzer import MultiModelInterviewAnalyzer from src.rag_handler import RAGHandler from src.crew.agents import report_generator_agent from src.crew.tasks import generate_report_task # --- Configuration de Celery avec Upstash --- # On récupère les URLs et le token depuis les variables d'environnement UPSTASH_REDIS_URL = os.environ.get("UPSTASH_REDIS_URL") UPSTASH_REDIS_TOKEN = os.environ.get("UPSTASH_REDIS_TOKEN") # On formate l'URL pour Celery en incluant le token # Format: rediss://:@ broker_url = f"rediss://:{UPSTASH_REDIS_TOKEN}@{UPSTASH_REDIS_URL.replace('https://', '')}" celery_app = Celery( 'worker_celery', broker=broker_url, backend=broker_url, broker_connection_retry_on_startup=True, include=['tasks.worker_celery'] ) celery_app.conf.update( task_serializer='json', result_serializer='json', accept_content=['json'], timezone='Europe/Paris', enable_utc=True, # Configuration SSL nécessaire pour Upstash broker_use_ssl={ 'ssl_cert_reqs': 'CERT_NONE' }, redis_backend_use_ssl={ 'ssl_cert_reqs': 'CERT_NONE' } ) # --- La tâche reste la même --- @celery_app.task(name="tasks.run_interview_analysis") def run_interview_analysis_task(conversation_history: list, job_description_text: list): """ Tâche Celery qui exécute l'analyse complète de l'entretien en arrière-plan. """ print("Début de l'analyse via le worker Celery...") # Étape 1: Analyse DL analyzer = MultiModelInterviewAnalyzer() structured_analysis = analyzer.run_full_analysis(conversation_history, job_description_text) # Étape 2: Enrichissement RAG rag_handler = RAGHandler() rag_feedback = [] if structured_analysis.get("intent_analysis"): for intent in structured_analysis["intent_analysis"]: query = f"Conseils pour un candidat qui cherche à {intent['labels'][0]}" rag_feedback.extend(rag_handler.get_relevant_feedback(query)) if structured_analysis.get("sentiment_analysis"): for sentiment_group in structured_analysis["sentiment_analysis"]: for sentiment in sentiment_group: if sentiment['label'] == 'stress' and sentiment['score'] > 0.6: rag_feedback.extend(rag_handler.get_relevant_feedback("gestion du stress en entretien")) unique_feedback = list(set(rag_feedback)) # Étape 3: Génération du rapport avec CrewAI interview_crew = Crew( agents=[report_generator_agent], tasks=[generate_report_task], process=Process.sequential, verbose=False, telemetry=False ) final_report = interview_crew.kickoff(inputs={ 'structured_analysis_data': json.dumps(structured_analysis, indent=2, ensure_ascii=False), 'rag_contextual_feedback': "\n- ".join(unique_feedback) }) print("Rapport final généré. Tâche terminée.") return final_report