DivYonko commited on
Commit
2965fd0
·
1 Parent(s): c5b07c4

fix: store backlog immediately without ML inference, add per-message error logging

Browse files
Files changed (1) hide show
  1. app.py +47 -14
app.py CHANGED
@@ -196,10 +196,12 @@ def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Even
196
  # Step 2: poll for messages
197
  page_token = None
198
  seen_ids: set = set() # avoid reprocessing messages on first page
 
199
 
200
  while not stop_event.is_set():
201
  messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token)
202
 
 
203
  for item in messages:
204
  if stop_event.is_set():
205
  break
@@ -210,7 +212,6 @@ def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Even
210
  seen_ids.add(msg_id)
211
 
212
  snippet = item.get("snippet", {})
213
- # only process text messages
214
  if snippet.get("type") != "textMessageEvent":
215
  continue
216
 
@@ -220,19 +221,51 @@ def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Even
220
  if not text:
221
  continue
222
 
223
- sentiment, s_conf = _safe_sentiment(text)
224
- topic, t_conf = _safe_topic(text)
225
-
226
- message_data = {
227
- "author": author,
228
- "text": text,
229
- "sentiment": sentiment,
230
- "confidence": round(s_conf, 3),
231
- "topic": topic,
232
- "topic_conf": round(t_conf, 3),
233
- "time": datetime.now().isoformat(),
234
- }
235
- store_rpush(redis_key, json.dumps(message_data))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
 
237
  # keep seen_ids from growing unbounded
238
  if len(seen_ids) > 5000:
 
196
  # Step 2: poll for messages
197
  page_token = None
198
  seen_ids: set = set() # avoid reprocessing messages on first page
199
+ is_first_page = True # skip ML on backlog to avoid startup delay
200
 
201
  while not stop_event.is_set():
202
  messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token)
203
 
204
+ new_msgs = []
205
  for item in messages:
206
  if stop_event.is_set():
207
  break
 
212
  seen_ids.add(msg_id)
213
 
214
  snippet = item.get("snippet", {})
 
215
  if snippet.get("type") != "textMessageEvent":
216
  continue
217
 
 
221
  if not text:
222
  continue
223
 
224
+ new_msgs.append((msg_id, text, author))
225
+
226
+ # On the first page (backlog), store messages with placeholder sentiment
227
+ # so the UI shows something immediately, then process ML on subsequent pages
228
+ if is_first_page and new_msgs:
229
+ logger.info("First page: storing %d backlog messages with placeholder sentiment", len(new_msgs))
230
+ for _, text, author in new_msgs:
231
+ message_data = {
232
+ "author": author,
233
+ "text": text,
234
+ "sentiment": "Neutral",
235
+ "confidence": 0.5,
236
+ "topic": "General",
237
+ "topic_conf": 0.5,
238
+ "time": datetime.now().isoformat(),
239
+ }
240
+ store_rpush(redis_key, json.dumps(message_data))
241
+ logger.info("Backlog stored: %d messages now in store", store_llen(redis_key))
242
+ is_first_page = False
243
+ else:
244
+ # Normal processing with full ML inference
245
+ for _, text, author in new_msgs:
246
+ if stop_event.is_set():
247
+ break
248
+ try:
249
+ sentiment, s_conf = _safe_sentiment(text)
250
+ topic, t_conf = _safe_topic(text)
251
+ except Exception as exc:
252
+ logger.error("ML inference failed for text=%r: %s", text[:50], exc)
253
+ sentiment, s_conf = "Neutral", 0.5
254
+ topic, t_conf = "General", 0.5
255
+
256
+ message_data = {
257
+ "author": author,
258
+ "text": text,
259
+ "sentiment": sentiment,
260
+ "confidence": round(s_conf, 3),
261
+ "topic": topic,
262
+ "topic_conf": round(t_conf, 3),
263
+ "time": datetime.now().isoformat(),
264
+ }
265
+ store_rpush(redis_key, json.dumps(message_data))
266
+
267
+ if new_msgs:
268
+ logger.info("Processed %d new messages, store size=%d", len(new_msgs), store_llen(redis_key))
269
 
270
  # keep seen_ids from growing unbounded
271
  if len(seen_ids) > 5000: