shaliz-kong commited on
Commit
5407eb5
·
1 Parent(s): c80e93b

added dataframe load time so that reddis dont time out

Browse files
Files changed (1) hide show
  1. app/tasks/analytics_worker.py +50 -31
app/tasks/analytics_worker.py CHANGED
@@ -205,9 +205,12 @@ class AnalyticsWorker:
205
  return await asyncio.to_thread(self._sync_load_dataframe)
206
 
207
  def _sync_load_dataframe(self) -> pd.DataFrame:
208
- """Wait for table + data with exponential backoff"""
 
 
 
209
  conn = None
210
- MAX_WAIT = 30
211
  INITIAL_RETRY = 0.5
212
 
213
  try:
@@ -218,49 +221,65 @@ class AnalyticsWorker:
218
 
219
  self._entity_type = entity_info["entity_type"]
220
  table_name = f"main.{self._entity_type}_canonical"
221
- cutoff = datetime.now() - timedelta(hours=self.hours_window)
222
-
223
  conn = get_conn(self.org_id)
224
  retry_delay = INITIAL_RETRY
225
  start = time.time()
226
 
227
  while (time.time() - start) < MAX_WAIT:
228
  try:
229
- count = conn.execute(
230
- f"SELECT COUNT(*) FROM {table_name} WHERE timestamp >= ?",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
231
  [cutoff]
232
- ).fetchone()[0]
 
 
 
 
233
 
234
- if count > 0:
235
- logger.info(f"[LOAD] Table ready: {count} rows (waited {(time.time() - start):.1f}s)")
236
- break
 
 
237
 
238
- logger.debug(f"[LOAD] Table empty, retrying in {retry_delay}s...")
 
 
 
 
 
239
 
240
  except Exception as e:
241
- if "does not exist" in str(e).lower():
242
- logger.debug(f"[LOAD] Table doesn't exist yet, retrying...")
243
- else:
244
- logger.warning(f"[LOAD] Error: {e}")
245
-
246
- time.sleep(retry_delay)
247
- retry_delay = min(retry_delay * 1.5, 5.0)
248
-
249
- else:
250
- logger.error(f"[LOAD] Timeout after {MAX_WAIT}s")
251
- return pd.DataFrame()
252
-
253
- # Load the data
254
- df = conn.execute(
255
- f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC",
256
- [cutoff]
257
- ).df()
258
 
259
- logger.info(f"[LOAD] Success: {len(df)} rows × {len(df.columns)} cols")
260
- return df
261
 
262
  except Exception as e:
263
- logger.error(f"[LOAD] Fatal: {e}", exc_info=True)
264
  return pd.DataFrame()
265
  finally:
266
  if conn:
 
205
  return await asyncio.to_thread(self._sync_load_dataframe)
206
 
207
  def _sync_load_dataframe(self) -> pd.DataFrame:
208
+ """
209
+ Load data with resilience: check table, check data, apply filters
210
+ Returns data even if time window is empty
211
+ """
212
  conn = None
213
+ MAX_WAIT = 10 # Reduced from 30s for faster feedback
214
  INITIAL_RETRY = 0.5
215
 
216
  try:
 
221
 
222
  self._entity_type = entity_info["entity_type"]
223
  table_name = f"main.{self._entity_type}_canonical"
 
 
224
  conn = get_conn(self.org_id)
225
  retry_delay = INITIAL_RETRY
226
  start = time.time()
227
 
228
  while (time.time() - start) < MAX_WAIT:
229
  try:
230
+ # Check 1: Does table exist?
231
+ table_exists = conn.execute(
232
+ "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
233
+ [table_name.split('.')[1]]
234
+ ).fetchone()[0] > 0
235
+
236
+ if not table_exists:
237
+ logger.debug(f"[LOAD] Table {table_name} doesn't exist yet")
238
+ time.sleep(retry_delay)
239
+ continue
240
+
241
+ # Check 2: Does ANY data exist? (no timestamp filter)
242
+ count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
243
+
244
+ if count == 0:
245
+ logger.debug(f"[LOAD] Table exists but is empty, waiting for data...")
246
+ time.sleep(retry_delay)
247
+ continue
248
+
249
+ # Check 3: Try timestamp filter
250
+ cutoff = datetime.now() - timedelta(hours=self.hours_window)
251
+ df = conn.execute(
252
+ f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 10000",
253
  [cutoff]
254
+ ).df()
255
+
256
+ if not df.empty:
257
+ logger.info(f"[LOAD] Success: {len(df)} rows × {len(df.columns)} cols (time-filtered)")
258
+ return df
259
 
260
+ # If timestamp filter is empty, return recent data anyway
261
+ logger.warning(f"[LOAD] No data in {self.hours_window}h window, returning recent rows")
262
+ df = conn.execute(
263
+ f"SELECT * FROM {table_name} ORDER BY timestamp DESC LIMIT 1000"
264
+ ).df()
265
 
266
+ if not df.empty:
267
+ logger.info(f"[LOAD] Success: {len(df)} rows × {len(df.columns)} cols (recent data)")
268
+ return df
269
+
270
+ time.sleep(retry_delay)
271
+ retry_delay = min(retry_delay * 1.5, 5.0)
272
 
273
  except Exception as e:
274
+ logger.warning(f"[LOAD] Error (retry in {retry_delay}s): {e}")
275
+ time.sleep(retry_delay)
276
+ retry_delay = min(retry_delay * 1.5, 5.0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
 
278
+ logger.error(f"[LOAD] Timeout after {MAX_WAIT}s - table exists but no data available")
279
+ return pd.DataFrame()
280
 
281
  except Exception as e:
282
+ logger.error(f"[LOAD] Fatal error: {e}", exc_info=True)
283
  return pd.DataFrame()
284
  finally:
285
  if conn: