QuentinL52 commited on
Commit
795f43a
·
verified ·
1 Parent(s): bf83549

Update tasks/worker_celery.py

Browse files
Files changed (1) hide show
  1. tasks/worker_celery.py +90 -71
tasks/worker_celery.py CHANGED
@@ -1,72 +1,91 @@
1
- import os
2
- import json
3
- from celery import Celery
4
- from crewai import Crew, Process
5
- from src.deep_learning_analyzer import MultiModelInterviewAnalyzer
6
- from src.rag_handler import RAGHandler
7
- from src.crew.agents import report_generator_agent
8
- from src.crew.tasks import generate_report_task
9
-
10
- celery_app = Celery(
11
- 'worker_celery', # Nom de l'application Celery
12
- broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"),
13
- backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
14
- include=['tasks.worker_celery'] # Indique à Celery où trouver les tâches
15
- )
16
-
17
- celery_app.conf.update(
18
- task_serializer='json',
19
- result_serializer='json',
20
- accept_content=['json'],
21
- timezone='Europe/Paris',
22
- enable_utc=True,
23
- )
24
-
25
- @celery_app.task(name="tasks.run_interview_analysis")
26
- def run_interview_analysis_task(conversation_history: list, job_description_text: list):
27
- """
28
- Tâche Celery qui exécute l'analyse complète de l'entretien en arrière-plan.
29
-
30
- Args:
31
- conversation_history (list): L'historique complet de la conversation de l'entretien.
32
- job_description_text (list): La description du poste sous forme de liste de textes.
33
-
34
- Returns:
35
- str: Le rapport final généré par le crew d'agents, au format string (potentiellement JSON).
36
- """
37
- print(f"Début de l'analyse pour un entretien de {len(conversation_history)} messages.")
38
- print("Étape 1/3: Exécution de l'analyse par Deep Learning...")
39
- analyzer = MultiModelInterviewAnalyzer()
40
- structured_analysis = analyzer.run_full_analysis(conversation_history, job_description_text)
41
- print("Analyse DL terminée.")
42
- print("Étape 2/3: Enrichissement avec le RAG...")
43
- rag_handler = RAGHandler()
44
- rag_feedback = []
45
-
46
- if structured_analysis.get("intent_analysis"):
47
- for intent in structured_analysis["intent_analysis"]:
48
- query = f"Conseils pour un candidat qui cherche à {intent['labels'][0]}"
49
- rag_feedback.extend(rag_handler.get_relevant_feedback(query))
50
-
51
- if structured_analysis.get("sentiment_analysis"):
52
- for sentiment_group in structured_analysis["sentiment_analysis"]:
53
- for sentiment in sentiment_group:
54
- if sentiment['label'] == 'stress' and sentiment['score'] > 0.6:
55
- rag_feedback.extend(rag_handler.get_relevant_feedback("gestion du stress en entretien"))
56
- unique_feedback = list(set(rag_feedback))
57
- print("Enrichissement RAG terminé.")
58
- print("Étape 3/3: Lancement du CrewAI pour la génération du rapport...")
59
- interview_crew = Crew(
60
- agents=[report_generator_agent],
61
- tasks=[generate_report_task],
62
- process=Process.sequential,
63
- verbose=False, # Mettre à True pour un débuggage détaillé du crew
64
- telemetry=False
65
- )
66
- final_report = interview_crew.kickoff(inputs={
67
- 'structured_analysis_data': json.dumps(structured_analysis, indent=2, ensure_ascii=False),
68
- 'rag_contextual_feedback': "\n- ".join(unique_feedback)
69
- })
70
-
71
- print("Rapport final généré. Tâche terminée.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  return final_report
 
1
+ import os
2
+ import json
3
+ from celery import Celery
4
+ from crewai import Crew, Process
5
+ from dotenv import load_dotenv
6
+
7
+ # Charger les variables d'environnement depuis un fichier .env
8
+ load_dotenv()
9
+
10
+ # --- Import des modules de votre application ---
11
+ from src.deep_learning_analyzer import MultiModelInterviewAnalyzer
12
+ from src.rag_handler import RAGHandler
13
+ from src.crew.agents import report_generator_agent
14
+ from src.crew.tasks import generate_report_task
15
+
16
+ # --- Configuration de Celery avec Upstash ---
17
+ # On récupère les URLs et le token depuis les variables d'environnement
18
+ UPSTASH_REDIS_URL = os.environ.get("UPSTASH_REDIS_URL")
19
+ UPSTASH_REDIS_TOKEN = os.environ.get("UPSTASH_REDIS_TOKEN")
20
+
21
+ # On formate l'URL pour Celery en incluant le token
22
+ # Format: rediss://:<token>@<url_sans_https>
23
+ broker_url = f"rediss://:{UPSTASH_REDIS_TOKEN}@{UPSTASH_REDIS_URL.replace('https://', '')}"
24
+
25
+ celery_app = Celery(
26
+ 'worker_celery',
27
+ broker=broker_url,
28
+ backend=broker_url,
29
+ broker_connection_retry_on_startup=True,
30
+ include=['tasks.worker_celery']
31
+ )
32
+
33
+ celery_app.conf.update(
34
+ task_serializer='json',
35
+ result_serializer='json',
36
+ accept_content=['json'],
37
+ timezone='Europe/Paris',
38
+ enable_utc=True,
39
+ # Configuration SSL nécessaire pour Upstash
40
+ broker_use_ssl={
41
+ 'ssl_cert_reqs': 'CERT_NONE'
42
+ },
43
+ redis_backend_use_ssl={
44
+ 'ssl_cert_reqs': 'CERT_NONE'
45
+ }
46
+ )
47
+
48
+ # --- La tâche reste la même ---
49
+ @celery_app.task(name="tasks.run_interview_analysis")
50
+ def run_interview_analysis_task(conversation_history: list, job_description_text: list):
51
+ """
52
+ Tâche Celery qui exécute l'analyse complète de l'entretien en arrière-plan.
53
+ """
54
+ print("Début de l'analyse via le worker Celery...")
55
+
56
+ # Étape 1: Analyse DL
57
+ analyzer = MultiModelInterviewAnalyzer()
58
+ structured_analysis = analyzer.run_full_analysis(conversation_history, job_description_text)
59
+
60
+ # Étape 2: Enrichissement RAG
61
+ rag_handler = RAGHandler()
62
+ rag_feedback = []
63
+ if structured_analysis.get("intent_analysis"):
64
+ for intent in structured_analysis["intent_analysis"]:
65
+ query = f"Conseils pour un candidat qui cherche à {intent['labels'][0]}"
66
+ rag_feedback.extend(rag_handler.get_relevant_feedback(query))
67
+
68
+ if structured_analysis.get("sentiment_analysis"):
69
+ for sentiment_group in structured_analysis["sentiment_analysis"]:
70
+ for sentiment in sentiment_group:
71
+ if sentiment['label'] == 'stress' and sentiment['score'] > 0.6:
72
+ rag_feedback.extend(rag_handler.get_relevant_feedback("gestion du stress en entretien"))
73
+
74
+ unique_feedback = list(set(rag_feedback))
75
+
76
+ # Étape 3: Génération du rapport avec CrewAI
77
+ interview_crew = Crew(
78
+ agents=[report_generator_agent],
79
+ tasks=[generate_report_task],
80
+ process=Process.sequential,
81
+ verbose=False,
82
+ telemetry=False
83
+ )
84
+
85
+ final_report = interview_crew.kickoff(inputs={
86
+ 'structured_analysis_data': json.dumps(structured_analysis, indent=2, ensure_ascii=False),
87
+ 'rag_contextual_feedback': "\n- ".join(unique_feedback)
88
+ })
89
+
90
+ print("Rapport final généré. Tâche terminée.")
91
  return final_report