3v324v23 commited on
Commit
e757cae
·
1 Parent(s): fc48788

Finalize project: Add Kafka architecture doc, optimize AI quotas and improve reliability

Browse files
Files changed (3) hide show
  1. .gitignore +1 -3
  2. KAFKA_ARCHITECTURE.md +43 -0
  3. main.py +22 -34
.gitignore CHANGED
@@ -1,7 +1,5 @@
1
  venv/
2
  __pycache__/
3
- *.pem
4
- *.cert
5
- *.key
6
  .env
7
  .aider*
 
1
  venv/
2
  __pycache__/
3
+
 
 
4
  .env
5
  .aider*
KAFKA_ARCHITECTURE.md ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 🏗️ Architecture Kafka : Forex Sentinel Intelligence
2
+
3
+ Ce document détaille comment Apache Kafka est utilisé pour orchestrer les flux de données temps réel dans ce projet.
4
+
5
+ ## 1. Pourquoi Kafka ?
6
+ Dans un système financier, la latence et la fiabilité sont critiques. Kafka a été choisi pour :
7
+ * **Découplage total** : Les producteurs (Yahoo Finance, RSS) ne connaissent pas les consommateurs (IA, MongoDB, Dashboard).
8
+ * **Gestion des pics de charge** : Si l'IA est lente à répondre, Kafka stocke les news en attente sans bloquer le reste du système.
9
+ * **Multi-consommation** : Un seul message de prix peut être lu simultanément par le Dashboard ET par l'archiviste MongoDB.
10
+
11
+ ## 2. Structure des Topics
12
+
13
+ Le projet utilise deux topics principaux sur le cluster Aiven :
14
+
15
+ ### 📡 `market-data`
16
+ * **Rôle** : Transmet les variations de prix spot.
17
+ * **Format** : JSON `{"asset": "XAU/USD", "price": 2350.12, "timestamp": ...}`
18
+ * **Fréquence** : Haute (1 message/seconde).
19
+
20
+ ### 🧠 `analyzed-news`
21
+ * **Rôle** : Transmet les news après traitement par l'IA.
22
+ * **Format** : JSON enrichi `{"headline": "...", "recommendation": "ACHETER OR", "reason": "...", ...}`
23
+ * **Fréquence** : Modérée (selon l'actualité mondiale).
24
+
25
+ ## 3. Flux de Données (Data Pipeline)
26
+
27
+ Le cycle de vie d'une donnée dans ce projet suit ce chemin :
28
+
29
+ 1. **Ingestion** : Le `market_worker` et le `ai_news_worker` récupèrent les données externes.
30
+ 2. **Production** : Les workers envoient ces données brutes vers Kafka via le `KafkaProducer`.
31
+ 3. **Traitement (Stream Processing)** : Le thread IA consomme les news brutes, interroge Gemini, et republie le résultat "intelligent" dans le topic `analyzed-news`.
32
+ 4. **Consommation** :
33
+ * **Bridge WebSocket** : Consomme les topics pour mettre à jour l'interface Web.
34
+ * **Archiviste DB** : Consomme les topics pour sauvegarder l'historique dans MongoDB.
35
+
36
+ ## 4. Configuration de Sécurité
37
+ La connexion au cluster Aiven est sécurisée par le protocole **SSL/TLS** :
38
+ * `ca.pem` : Certificat de l'autorité de certification.
39
+ * `service.cert` : Certificat client pour l'authentification.
40
+ * `service.key` : Clé privée pour le chiffrement.
41
+
42
+ ## 5. Résilience et Fallback
43
+ Le code inclut un **Mode Direct** : si le cluster Kafka est inaccessible (réseau, maintenance), le système bascule automatiquement sur une communication interne via WebSockets pour garantir que le Dashboard reste fonctionnel pendant la démo.
main.py CHANGED
@@ -16,7 +16,6 @@ from fastapi import FastAPI, WebSocket
16
  from fastapi.responses import FileResponse
17
  from kafka import KafkaConsumer, KafkaProducer
18
 
19
- # CHARGEMENT DES VARIABLES D'ENVIRONNEMENT (.env)
20
  load_dotenv()
21
 
22
  # --- CONFIGURATION ---
@@ -24,8 +23,6 @@ KAFKA_HOST = "kafka-4238954-kafka-2c1f.h.aivencloud.com"
24
  KAFKA_PORT = 17498
25
  KAFKA_URI = f"{KAFKA_HOST}:{KAFKA_PORT}"
26
  KAFKA_FOLDER = "./"
27
-
28
- # RÉCUPÉRATION SÉCURISÉE DE LA CLÉ
29
  GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
30
 
31
  ws_clients = []
@@ -53,7 +50,6 @@ def get_producer():
53
  connection_timeout_ms=5000
54
  )
55
  KAFKA_CONNECTED = True
56
- print("✅ Kafka Connecté")
57
  return p
58
  except:
59
  KAFKA_CONNECTED = False
@@ -79,29 +75,23 @@ def market_worker():
79
  real_p = t.fast_info['last_price']
80
  if real_p and real_p > 0: prices[name] = real_p
81
  except: pass
82
-
83
  jitter = random.uniform(-0.0006, 0.0006) if "EUR" in name else random.uniform(-0.35, 0.35)
84
  display_price = round(prices[name] + jitter, 4)
85
-
86
  msg = {"topic": "market-data", "asset": name, "price": display_price, "timestamp": int(time.time())}
87
  if KAFKA_CONNECTED and producer:
88
  try: producer.send("market-data", value=msg)
89
  except: pass
90
  send_to_ws(msg)
91
-
92
  if KAFKA_CONNECTED and producer: producer.flush()
93
  time.sleep(1)
94
 
95
  # --- WORKER NEWS + AI ---
96
  def ai_news_worker():
97
  global last_intelligence
98
- if not GOOGLE_API_KEY:
99
- print("❌ ERREUR : La variable GOOGLE_API_KEY n'est pas définie dans le .env !")
100
- return
101
 
102
- print("🧠 Worker IA: Expert Macro-Economique opérationnel...")
103
  producer = get_producer()
104
-
105
  client = genai.Client(api_key=GOOGLE_API_KEY)
106
  model_id = "gemini-2.0-flash"
107
 
@@ -113,8 +103,8 @@ def ai_news_worker():
113
 
114
  seen = set()
115
  while True:
116
- for url in RSS_SOURCES:
117
- try:
118
  feed = feedparser.parse(url)
119
  if feed.entries:
120
  entry = feed.entries[0]
@@ -122,27 +112,17 @@ def ai_news_worker():
122
  print(f"🔍 ANALYSE MACRO : {entry.title[:50]}...")
123
 
124
  try:
125
- prompt = f"""
126
- Expert stratège macro-économique. Analyse: "{entry.title}"
127
- Impact sur OR (XAU) et EUR/USD.
128
- Réponse en FRANÇAIS, pro.
129
- Réponds UNIQUEMENT en JSON:
130
- {{
131
- "impact_gold": "BULLISH" | "BEARISH" | "NEUTRAL",
132
- "impact_eur": "BULLISH" | "BEARISH" | "NEUTRAL",
133
- "recommendation": "ACHETER OR" | "VENTE OR" | "ACHETER EURO" | "VENTE EURO" | "ATTENDRE",
134
- "reason": "explication causale (15 mots max)",
135
- "forecast": "sentiment court terme"
136
- }}
137
- """
138
- response = client.models.generate_content(model=model_id, contents=prompt)
139
 
 
140
  txt = response.text.strip()
141
  if "```json" in txt: txt = txt.split("```json")[1].split("```")[0]
142
  elif "```" in txt: txt = txt.split("```")[1].split("```")[0]
143
- analysis = json.loads(txt)
144
 
145
- msg = {"topic": "analyzed-news", "headline": entry.title, "source": "Expert Flow", **analysis}
146
  last_intelligence = msg
147
 
148
  if KAFKA_CONNECTED and producer:
@@ -151,11 +131,19 @@ def ai_news_worker():
151
 
152
  send_to_ws(msg)
153
  seen.add(entry.title)
154
- break
 
 
 
 
155
  except Exception as e:
156
- print(f"AI SDK Error: {e}")
157
- except: continue
158
- time.sleep(25)
 
 
 
 
159
 
160
  @asynccontextmanager
161
  async def lifespan(app: FastAPI):
 
16
  from fastapi.responses import FileResponse
17
  from kafka import KafkaConsumer, KafkaProducer
18
 
 
19
  load_dotenv()
20
 
21
  # --- CONFIGURATION ---
 
23
  KAFKA_PORT = 17498
24
  KAFKA_URI = f"{KAFKA_HOST}:{KAFKA_PORT}"
25
  KAFKA_FOLDER = "./"
 
 
26
  GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
27
 
28
  ws_clients = []
 
50
  connection_timeout_ms=5000
51
  )
52
  KAFKA_CONNECTED = True
 
53
  return p
54
  except:
55
  KAFKA_CONNECTED = False
 
75
  real_p = t.fast_info['last_price']
76
  if real_p and real_p > 0: prices[name] = real_p
77
  except: pass
 
78
  jitter = random.uniform(-0.0006, 0.0006) if "EUR" in name else random.uniform(-0.35, 0.35)
79
  display_price = round(prices[name] + jitter, 4)
 
80
  msg = {"topic": "market-data", "asset": name, "price": display_price, "timestamp": int(time.time())}
81
  if KAFKA_CONNECTED and producer:
82
  try: producer.send("market-data", value=msg)
83
  except: pass
84
  send_to_ws(msg)
 
85
  if KAFKA_CONNECTED and producer: producer.flush()
86
  time.sleep(1)
87
 
88
  # --- WORKER NEWS + AI ---
89
  def ai_news_worker():
90
  global last_intelligence
91
+ if not GOOGLE_API_KEY: return
 
 
92
 
93
+ print("🧠 Worker IA: Optimisé pour quotas limités")
94
  producer = get_producer()
 
95
  client = genai.Client(api_key=GOOGLE_API_KEY)
96
  model_id = "gemini-2.0-flash"
97
 
 
103
 
104
  seen = set()
105
  while True:
106
+ try:
107
+ for url in RSS_SOURCES:
108
  feed = feedparser.parse(url)
109
  if feed.entries:
110
  entry = feed.entries[0]
 
112
  print(f"🔍 ANALYSE MACRO : {entry.title[:50]}...")
113
 
114
  try:
115
+ prompt = f"""Expert Macro. Analyse: "{entry.title}". Réponds en FRANÇAIS, UNIQUEMENT en JSON:
116
+ {{"impact_gold": "BULLISH|BEARISH|NEUTRAL", "impact_eur": "BULLISH|BEARISH|NEUTRAL",
117
+ "recommendation": "ACHETER OR|VENTE OR|ACHETER EURO|VENTE EURO|ATTENDRE",
118
+ "reason": "explication causale (15 mots max)", "forecast": "sentiment court terme"}}"""
 
 
 
 
 
 
 
 
 
 
119
 
120
+ response = client.models.generate_content(model=model_id, contents=prompt)
121
  txt = response.text.strip()
122
  if "```json" in txt: txt = txt.split("```json")[1].split("```")[0]
123
  elif "```" in txt: txt = txt.split("```")[1].split("```")[0]
 
124
 
125
+ msg = {"topic": "analyzed-news", "headline": entry.title, "source": "Expert Flow", **json.loads(txt)}
126
  last_intelligence = msg
127
 
128
  if KAFKA_CONNECTED and producer:
 
131
 
132
  send_to_ws(msg)
133
  seen.add(entry.title)
134
+
135
+ # PAUSE CRUCIALE : On attend 60s entre chaque appel IA pour préserver le quota
136
+ print("💤 Pause quota (60s)...")
137
+ time.sleep(60)
138
+ break
139
  except Exception as e:
140
+ if "429" in str(e):
141
+ print("⚠️ Quota épuisé. Attente de 30s...")
142
+ time.sleep(30)
143
+ else:
144
+ print(f"AI Error: {e}")
145
+ time.sleep(20)
146
+ except: continue
147
 
148
  @asynccontextmanager
149
  async def lifespan(app: FastAPI):