3v324v23 commited on
Commit
0166753
·
1 Parent(s): f3a9584

Enhance WebSocket connection handling and improve Kafka error logging

Browse files
Files changed (3) hide show
  1. ..env.swp +0 -0
  2. index.html +3 -1
  3. main.py +79 -18
..env.swp ADDED
Binary file (1.02 kB). View file
 
index.html CHANGED
@@ -144,7 +144,9 @@
144
 
145
  <script>
146
  const wsHost = window.location.hostname || '127.0.0.1';
147
- const socket = new WebSocket(`ws://${wsHost}:8000/ws`);
 
 
148
  let lastPrices = { 'XAU/USD': 0, 'EUR/USD': 0 };
149
 
150
  socket.onmessage = function(event) {
 
144
 
145
  <script>
146
  const wsHost = window.location.hostname || '127.0.0.1';
147
+ const wsPort = window.location.port || '7860';
148
+ const wsProtocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
149
+ const socket = new WebSocket(`${wsProtocol}://${wsHost}:${wsPort}/ws`);
150
  let lastPrices = { 'XAU/USD': 0, 'EUR/USD': 0 };
151
 
152
  socket.onmessage = function(event) {
main.py CHANGED
@@ -49,14 +49,25 @@ last_intelligence = {
49
  }
50
 
51
  def get_kafka_ip():
52
- try: return socket.gethostbyname(KAFKA_HOST)
53
- except: return KAFKA_HOST
 
 
 
 
54
 
55
  def get_producer():
56
  global KAFKA_CONNECTED
 
 
 
 
 
57
  ip = get_kafka_ip()
58
  target = f"{ip}:{KAFKA_PORT}"
 
59
  try:
 
60
  p = KafkaProducer(
61
  bootstrap_servers=target,
62
  security_protocol="SSL",
@@ -67,9 +78,13 @@ def get_producer():
67
  request_timeout_ms=5000,
68
  connection_timeout_ms=5000
69
  )
 
 
70
  KAFKA_CONNECTED = True
 
71
  return p
72
- except:
 
73
  KAFKA_CONNECTED = False
74
  return None
75
 
@@ -109,29 +124,43 @@ def get_mock_analysis(headline):
109
 
110
  # --- WORKER MARKET ---
111
  def market_worker():
 
112
  print("📈 Worker Market actif")
113
  producer = get_producer()
114
  assets = {"GC=F": "XAU/USD", "EURUSD=X": "EUR/USD"}
115
  prices = {"XAU/USD": 2350.0, "EUR/USD": 1.0850}
116
  while True:
 
 
 
117
  for ticker, name in assets.items():
118
  try:
119
  t = yf.Ticker(ticker); real_p = t.fast_info['last_price']
120
  if real_p > 0: prices[name] = real_p
121
- except: pass
 
122
  jitter = random.uniform(-0.001, 0.001) if "EUR" in name else random.uniform(-0.5, 0.5)
123
  display_price = round(prices[name] + jitter, 4)
124
  msg = {"topic": "market-data", "asset": name, "price": display_price, "timestamp": int(time.time())}
125
  if KAFKA_CONNECTED and producer:
126
- try: producer.send("market-data", value=msg)
127
- except: pass
 
 
 
128
  send_to_ws(msg)
129
- if KAFKA_CONNECTED and producer: producer.flush()
 
 
 
 
 
 
130
  time.sleep(1)
131
 
132
  # --- WORKER NEWS + AI ---
133
  def ai_news_worker():
134
- global last_intelligence
135
  print("🧠 Worker IA (Hybride) actif")
136
 
137
  if not GOOGLE_API_KEY:
@@ -150,6 +179,9 @@ def ai_news_worker():
150
 
151
  seen = set()
152
  while True:
 
 
 
153
  for url in RSS_SOURCES:
154
  try:
155
  feed = feedparser.parse(url)
@@ -166,35 +198,64 @@ def ai_news_worker():
166
  analysis = json.loads(txt)
167
  print("✨ Gemini OK")
168
  except Exception as e:
169
- print(f"⚠️ Mode Secours activé (Cause: {str(e)[:50]}...)")
170
  analysis = get_mock_analysis(entry.title)
171
 
172
  msg = {"topic": "analyzed-news", "headline": entry.title, "source": url.split('.')[1] if '.' in url else "RSS", **analysis}
173
  last_intelligence = msg
174
  if KAFKA_CONNECTED and producer:
175
- try: producer.send("analyzed-news", value=msg); producer.flush()
176
- except: pass
 
 
 
 
177
  send_to_ws(msg)
178
  seen.add(entry.title)
179
  time.sleep(90)
180
  break
181
- except: continue
 
 
182
  time.sleep(30)
183
 
184
  # --- WORKER DB ---
185
  def db_worker():
186
- if not MONGO_URI: return
 
 
 
 
187
  try:
188
  client = MongoClient(MONGO_URI)
189
  db = client['dji']
190
- if KAFKA_CONNECTED:
191
- consumer = KafkaConsumer("market-data", "analyzed-news", bootstrap_servers=KAFKA_URI, security_protocol="SSL",
192
- ssl_cafile="ca.pem", ssl_certfile="service.cert",
193
- ssl_keyfile="service.key", value_deserializer=lambda x: json.loads(x.decode('utf-8')))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
  for message in consumer:
195
  coll = db['market_history'] if message.topic == "market-data" else db['news_history']
196
  coll.insert_one(message.value)
197
- except: pass
 
 
 
198
 
199
  @asynccontextmanager
200
  async def lifespan(app: FastAPI):
 
49
  }
50
 
51
  def get_kafka_ip():
52
+ try:
53
+ return socket.gethostbyname(KAFKA_HOST)
54
+ except Exception as e:
55
+ print(f"⚠️ Résolution DNS Kafka échouée : {e}")
56
+ return KAFKA_HOST
57
+
58
 
59
  def get_producer():
60
  global KAFKA_CONNECTED
61
+ if not os.path.exists("ca.pem") or not os.path.exists("service.cert") or not os.path.exists("service.key"):
62
+ print("❌ Certificats Kafka manquants (ca.pem/service.cert/service.key). Kafka désactivé.")
63
+ KAFKA_CONNECTED = False
64
+ return None
65
+
66
  ip = get_kafka_ip()
67
  target = f"{ip}:{KAFKA_PORT}"
68
+
69
  try:
70
+ print(f"🔌 Tentative de connexion à Kafka {target}...")
71
  p = KafkaProducer(
72
  bootstrap_servers=target,
73
  security_protocol="SSL",
 
78
  request_timeout_ms=5000,
79
  connection_timeout_ms=5000
80
  )
81
+ # Force la connexion pour vérifier immédiatement
82
+ p.bootstrap_connected()
83
  KAFKA_CONNECTED = True
84
+ print("✅ Connecté à Kafka")
85
  return p
86
+ except Exception as e:
87
+ print(f"❌ Échec connexion Kafka : {type(e).__name__} {e}")
88
  KAFKA_CONNECTED = False
89
  return None
90
 
 
124
 
125
  # --- WORKER MARKET ---
126
  def market_worker():
127
+ global KAFKA_CONNECTED
128
  print("📈 Worker Market actif")
129
  producer = get_producer()
130
  assets = {"GC=F": "XAU/USD", "EURUSD=X": "EUR/USD"}
131
  prices = {"XAU/USD": 2350.0, "EUR/USD": 1.0850}
132
  while True:
133
+ if not KAFKA_CONNECTED or producer is None:
134
+ producer = get_producer()
135
+
136
  for ticker, name in assets.items():
137
  try:
138
  t = yf.Ticker(ticker); real_p = t.fast_info['last_price']
139
  if real_p > 0: prices[name] = real_p
140
+ except Exception as e:
141
+ print(f"⚠️ market_worker - yfinance erreur: {e}")
142
  jitter = random.uniform(-0.001, 0.001) if "EUR" in name else random.uniform(-0.5, 0.5)
143
  display_price = round(prices[name] + jitter, 4)
144
  msg = {"topic": "market-data", "asset": name, "price": display_price, "timestamp": int(time.time())}
145
  if KAFKA_CONNECTED and producer:
146
+ try:
147
+ producer.send("market-data", value=msg)
148
+ except Exception as e:
149
+ print(f"⚠️ market_worker - Kafka send fail: {e}")
150
+ KAFKA_CONNECTED = False
151
  send_to_ws(msg)
152
+
153
+ if KAFKA_CONNECTED and producer:
154
+ try:
155
+ producer.flush()
156
+ except Exception as e:
157
+ print(f"⚠️ market_worker - Kafka flush fail: {e}")
158
+ KAFKA_CONNECTED = False
159
  time.sleep(1)
160
 
161
  # --- WORKER NEWS + AI ---
162
  def ai_news_worker():
163
+ global last_intelligence, KAFKA_CONNECTED
164
  print("🧠 Worker IA (Hybride) actif")
165
 
166
  if not GOOGLE_API_KEY:
 
179
 
180
  seen = set()
181
  while True:
182
+ if not KAFKA_CONNECTED or producer is None:
183
+ producer = get_producer()
184
+
185
  for url in RSS_SOURCES:
186
  try:
187
  feed = feedparser.parse(url)
 
198
  analysis = json.loads(txt)
199
  print("✨ Gemini OK")
200
  except Exception as e:
201
+ print(f"⚠️ Mode Secours activé (Cause: {str(e)[:100]}...)")
202
  analysis = get_mock_analysis(entry.title)
203
 
204
  msg = {"topic": "analyzed-news", "headline": entry.title, "source": url.split('.')[1] if '.' in url else "RSS", **analysis}
205
  last_intelligence = msg
206
  if KAFKA_CONNECTED and producer:
207
+ try:
208
+ producer.send("analyzed-news", value=msg)
209
+ producer.flush()
210
+ except Exception as e:
211
+ print(f"⚠️ ai_news_worker - Kafka send/flush fail: {e}")
212
+ KAFKA_CONNECTED = False
213
  send_to_ws(msg)
214
  seen.add(entry.title)
215
  time.sleep(90)
216
  break
217
+ except Exception as e:
218
+ print(f"⚠️ ai_news_worker - feed error {url}: {e}")
219
+ continue
220
  time.sleep(30)
221
 
222
  # --- WORKER DB ---
223
  def db_worker():
224
+ global KAFKA_CONNECTED
225
+ if not MONGO_URI:
226
+ print("⚠️ db_worker : MONGO_URI manquant, DB désactivée")
227
+ return
228
+
229
  try:
230
  client = MongoClient(MONGO_URI)
231
  db = client['dji']
232
+ except Exception as e:
233
+ print(f" db_worker : échec connexion MongoDB {e}")
234
+ return
235
+
236
+ while True:
237
+ if not KAFKA_CONNECTED:
238
+ time.sleep(5)
239
+ continue
240
+
241
+ try:
242
+ consumer = KafkaConsumer(
243
+ "market-data", "analyzed-news",
244
+ bootstrap_servers=KAFKA_URI,
245
+ security_protocol="SSL",
246
+ ssl_cafile="ca.pem",
247
+ ssl_certfile="service.cert",
248
+ ssl_keyfile="service.key",
249
+ value_deserializer=lambda x: json.loads(x.decode('utf-8'))
250
+ )
251
+ print("✅ db_worker : KafkaConsumer démarré")
252
  for message in consumer:
253
  coll = db['market_history'] if message.topic == "market-data" else db['news_history']
254
  coll.insert_one(message.value)
255
+ except Exception as e:
256
+ print(f"⚠️ db_worker : KafkaConsumer erreur {e}")
257
+ KAFKA_CONNECTED = False
258
+ time.sleep(5)
259
 
260
  @asynccontextmanager
261
  async def lifespan(app: FastAPI):