shaliz-kong commited on
Commit
94311f6
Β·
1 Parent(s): b0267d8

removed redis hammering

Browse files
Files changed (3) hide show
  1. app/main.py +12 -8
  2. app/mapper.py +3 -3
  3. app/tasks/analytics_worker.py +27 -14
app/main.py CHANGED
@@ -14,7 +14,7 @@ import pathlib
14
  import json
15
 
16
  # ─── Third-Party ──────────────────────────────────────────────────────────────
17
- # from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
18
  from fastapi.middleware.cors import CORSMiddleware
19
  from fastapi.responses import JSONResponse
20
  from contextlib import asynccontextmanager
@@ -280,12 +280,14 @@ async def continuous_kpi_refresh():
280
  """
281
  Auto-refresh KPIs every 5 minutes for active organizations.
282
  """
 
 
283
  while True:
284
  try:
285
  logger.debug("πŸ”„ KPI scheduler tick...")
286
 
287
  active_keys = event_hub.keys("entity:*")
288
- for key in active_keys:
289
  key_parts = safe_redis_decode(key).split(":")
290
  if len(key_parts) >= 3:
291
  org_id, source_id = key_parts[1], key_parts[2]
@@ -295,17 +297,19 @@ async def continuous_kpi_refresh():
295
  if event_hub.exists(cache_key):
296
  continue
297
 
298
- # Trigger computation (non-blocking)
 
 
 
 
299
  logger.info(f"⏰ Auto-triggering KPIs for {org_id}/{source_id}")
300
- # NOTE: Ensure trigger_kpi_computation is imported/defined
301
  await trigger_kpi_computation(org_id, source_id)
302
-
 
303
  except Exception as e:
304
  logger.error(f"❌ Scheduler error: {e}")
305
 
306
- await asyncio.sleep(300) # 5 minutes
307
-
308
-
309
  @app.get("/debug/stream-content")
310
  def debug_stream(
311
  org_id: str = Query(...),
 
14
  import json
15
 
16
  # ─── Third-Party ──────────────────────────────────────────────────────────────
17
+ from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
18
  from fastapi.middleware.cors import CORSMiddleware
19
  from fastapi.responses import JSONResponse
20
  from contextlib import asynccontextmanager
 
280
  """
281
  Auto-refresh KPIs every 5 minutes for active organizations.
282
  """
283
+ await asyncio.sleep(10) # Let app startup complete
284
+
285
  while True:
286
  try:
287
  logger.debug("πŸ”„ KPI scheduler tick...")
288
 
289
  active_keys = event_hub.keys("entity:*")
290
+ for key in active_keys[:10]: # Max 10 per batch
291
  key_parts = safe_redis_decode(key).split(":")
292
  if len(key_parts) >= 3:
293
  org_id, source_id = key_parts[1], key_parts[2]
 
297
  if event_hub.exists(cache_key):
298
  continue
299
 
300
+ # Skip if worker already running
301
+ if event_hub.exists(f"worker:lock:{org_id}:{source_id}"):
302
+ continue
303
+
304
+ # Trigger computation
305
  logger.info(f"⏰ Auto-triggering KPIs for {org_id}/{source_id}")
 
306
  await trigger_kpi_computation(org_id, source_id)
307
+ await asyncio.sleep(1) # 1s gap between triggers
308
+
309
  except Exception as e:
310
  logger.error(f"❌ Scheduler error: {e}")
311
 
312
+ await asyncio.sleep(300) # ⭐ CRITICAL: Sleep 5 minutes between cycles
 
 
313
  @app.get("/debug/stream-content")
314
  def debug_stream(
315
  org_id: str = Query(...),
app/mapper.py CHANGED
@@ -1,5 +1,5 @@
1
  import os
2
- # import json
3
  import pandas as pd
4
  import numpy as np
5
  from datetime import datetime, timedelta
@@ -7,8 +7,8 @@ from concurrent.futures import ThreadPoolExecutor
7
  import time
8
 
9
  from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table
10
- # from app.hybrid_entity_detector import hybrid_detect_entity_type
11
- # from app.core.event_hub import event_hub
12
 
13
  # ---------------------- Canonical Schema ---------------------- #
14
  CANONICAL = {
 
1
  import os
2
+ import json
3
  import pandas as pd
4
  import numpy as np
5
  from datetime import datetime, timedelta
 
7
  import time
8
 
9
  from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table
10
+ from app.hybrid_entity_detector import hybrid_detect_entity_type
11
+ from app.core.event_hub import event_hub
12
 
13
  # ---------------------- Canonical Schema ---------------------- #
14
  CANONICAL = {
app/tasks/analytics_worker.py CHANGED
@@ -547,27 +547,40 @@ class WorkerManager:
547
  self._shutdown = False
548
 
549
  async def start_listener(self):
550
- """🎧 Listen to Redis pubsub for triggers"""
 
 
 
551
  pubsub = event_hub.redis.pubsub()
552
  pubsub.psubscribe("analytics_trigger:*")
553
-
554
- logger.info("🎧 Worker Manager: Einstein+Elon mode ENGAGED")
555
-
 
 
 
556
  try:
557
- while not self._shutdown:
558
- # Wait for message with timeout (prevents blocking forever)
559
- message = pubsub.get_message(timeout=5.0)
560
-
561
- if message and message["type"] == "pmessage":
562
  await self._handle_trigger(message)
563
-
564
- # Cleanup completed tasks
565
- self._cleanup_completed_workers()
566
-
 
 
567
  except asyncio.CancelledError:
568
- logger.info("[MANAGER] πŸ›‘ Listener cancelled")
 
 
 
 
 
 
569
  finally:
570
  pubsub.close()
 
571
 
572
  async def _handle_trigger(self, message: Dict[str, Any]):
573
  """Process a single trigger message"""
 
547
  self._shutdown = False
548
 
549
  async def start_listener(self):
550
+ """
551
+ 🎧 TRUE Redis pubsub listener - push-based, efficient, resilient
552
+ Uses async listen() instead of polling get_message()
553
+ """
554
  pubsub = event_hub.redis.pubsub()
555
  pubsub.psubscribe("analytics_trigger:*")
556
+
557
+ logger.info("🎧 Worker Manager: Einstein+Elon mode ENGAGED (true pubsub)")
558
+
559
+ # Track last cleanup time to avoid running it every iteration
560
+ last_cleanup = time.time()
561
+
562
  try:
563
+ # Use async listen() for push-based notifications (Zero Redis ops when idle)
564
+ async for message in pubsub.listen():
565
+ if message["type"] == "pmessage":
 
 
566
  await self._handle_trigger(message)
567
+
568
+ # Cleanup every 10 seconds (not every message)
569
+ if time.time() - last_cleanup > 10:
570
+ self._cleanup_completed_workers()
571
+ last_cleanup = time.time()
572
+
573
  except asyncio.CancelledError:
574
+ logger.info("[MANAGER] πŸ›‘ Listener cancelled by shutdown")
575
+ except Exception as e:
576
+ logger.error(f"[MANAGER] ❌ Listener crashed: {e}", exc_info=True)
577
+ # Auto-restart on failure (prevents silent death)
578
+ logger.info("[MANAGER] πŸ”„ Attempting to restart listener in 5s...")
579
+ await asyncio.sleep(5)
580
+ asyncio.create_task(self.start_listener(), name="worker-manager-restart")
581
  finally:
582
  pubsub.close()
583
+ logger.info("[MANAGER] πŸ”Œ Pubsub connection closed")
584
 
585
  async def _handle_trigger(self, message: Dict[str, Any]):
586
  """Process a single trigger message"""