interview_agents_api / tasks /worker_celery.py
QuentinL52's picture
Upload 39 files
a8ee0db verified
raw
history blame
3.2 kB
import os
import json
from celery import Celery
from crewai import Crew, Process
from src.deep_learning_analyzer import MultiModelInterviewAnalyzer
from src.rag_handler import RAGHandler
from src.crew.agents import report_generator_agent
from src.crew.tasks import generate_report_task
celery_app = Celery(
'worker_celery', # Nom de l'application Celery
broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"),
backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
include=['tasks.worker_celery'] # Indique à Celery où trouver les tâches
)
celery_app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Europe/Paris',
enable_utc=True,
)
@celery_app.task(name="tasks.run_interview_analysis")
def run_interview_analysis_task(conversation_history: list, job_description_text: list):
"""
Tâche Celery qui exécute l'analyse complète de l'entretien en arrière-plan.
Args:
conversation_history (list): L'historique complet de la conversation de l'entretien.
job_description_text (list): La description du poste sous forme de liste de textes.
Returns:
str: Le rapport final généré par le crew d'agents, au format string (potentiellement JSON).
"""
print(f"Début de l'analyse pour un entretien de {len(conversation_history)} messages.")
print("Étape 1/3: Exécution de l'analyse par Deep Learning...")
analyzer = MultiModelInterviewAnalyzer()
structured_analysis = analyzer.run_full_analysis(conversation_history, job_description_text)
print("Analyse DL terminée.")
print("Étape 2/3: Enrichissement avec le RAG...")
rag_handler = RAGHandler()
rag_feedback = []
if structured_analysis.get("intent_analysis"):
for intent in structured_analysis["intent_analysis"]:
query = f"Conseils pour un candidat qui cherche à {intent['labels'][0]}"
rag_feedback.extend(rag_handler.get_relevant_feedback(query))
if structured_analysis.get("sentiment_analysis"):
for sentiment_group in structured_analysis["sentiment_analysis"]:
for sentiment in sentiment_group:
if sentiment['label'] == 'stress' and sentiment['score'] > 0.6:
rag_feedback.extend(rag_handler.get_relevant_feedback("gestion du stress en entretien"))
unique_feedback = list(set(rag_feedback))
print("Enrichissement RAG terminé.")
print("Étape 3/3: Lancement du CrewAI pour la génération du rapport...")
interview_crew = Crew(
agents=[report_generator_agent],
tasks=[generate_report_task],
process=Process.sequential,
verbose=False, # Mettre à True pour un débuggage détaillé du crew
telemetry=False
)
final_report = interview_crew.kickoff(inputs={
'structured_analysis_data': json.dumps(structured_analysis, indent=2, ensure_ascii=False),
'rag_contextual_feedback': "\n- ".join(unique_feedback)
})
print("Rapport final généré. Tâche terminée.")
return final_report