Peter Mutwiri commited on
Commit
6b15a3d
Β·
1 Parent(s): 7b246f4

corrected datasource and canonify

Browse files
Files changed (2) hide show
  1. app/mapper.py +13 -9
  2. app/routers/datasources.py +30 -16
app/mapper.py CHANGED
@@ -490,14 +490,18 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
490
 
491
  # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
492
  if not df.empty:
493
- redis.publish(
494
- f"analytics_trigger:{org_id}:{source_id}",
495
- json.dumps({
496
- "type": "kpi_compute",
497
- "entity_type": entity_type,
498
- "industry": industry
499
- })
500
- )
501
- print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
 
 
 
 
502
 
503
  return df, industry, industry_confidence
 
490
 
491
  # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
492
  if not df.empty:
493
+ # At the end of the canonify pipeline
494
+ try:
495
+ redis.publish(
496
+ f"analytics_trigger:{org_id}:{source_id}",
497
+ json.dumps({
498
+ "type": "kpi_compute",
499
+ "entity_type": entity_type,
500
+ "industry": industry
501
+ })
502
+ )
503
+ print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
504
+ except Exception as e:
505
+ print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
506
 
507
  return df, industry, industry_confidence
app/routers/datasources.py CHANGED
@@ -3,7 +3,7 @@ from typing import Dict, Any, List, Union
3
  from fastapi.responses import JSONResponse
4
  from pydantic import BaseModel
5
  from typing import List, Any, Dict, Union
6
- from app.deps import verify_api_key,get_current_user
7
  from app.db import get_conn, ensure_raw_table, bootstrap
8
  from app.mapper import canonify_df
9
  from app.routers.socket import sio
@@ -80,22 +80,36 @@ async def create_source_json(
80
  preview_df[col] = preview_df[col].astype(str)
81
 
82
  preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
83
- if is_qstash_available():
84
- try:
85
- client = get_qstash_client()
86
- result = client.message.publish(
87
- url=f"{settings.APP_URL}/api/v1/analytics/callback",
88
- body={"org_id": org_id, "source_id": source_id}
89
- )
90
- logger.info(f"πŸ“€ QStash queued: {result.message_id}")
91
- except Exception as e:
92
- logger.warning(f"⚠️ QStash failed, using Redis: {e}")
93
- # Fallback to Redis publish
94
- redis.publish(...)
95
- else:
96
- # βœ… Fallback when QStash not configured
97
- redis.publish(...)
 
 
 
 
 
 
 
 
 
 
98
  logger.info("πŸ“‘ Redis fallback for analytics trigger")
 
 
 
 
99
  # 4. πŸ“‘ Broadcast to connected dashboards
100
  await sio.emit(
101
  "datasource:new-rows",
 
3
  from fastapi.responses import JSONResponse
4
  from pydantic import BaseModel
5
  from typing import List, Any, Dict, Union
6
+ from app.deps import verify_api_key,get_current_user,APP_URL
7
  from app.db import get_conn, ensure_raw_table, bootstrap
8
  from app.mapper import canonify_df
9
  from app.routers.socket import sio
 
80
  preview_df[col] = preview_df[col].astype(str)
81
 
82
  preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
83
+
84
+ try:
85
+ # Use APP_URL from deps (no need for settings)
86
+ trigger_data = {
87
+ "type": "kpi_compute",
88
+ "org_id": org_id,
89
+ "source_id": source_id
90
+ }
91
+
92
+ # Try QStash if available
93
+ if is_qstash_available():
94
+ try:
95
+ from app.qstash_client import get_qstash_client
96
+ client = get_qstash_client()
97
+ result = client.message.publish(
98
+ url=f"{APP_URL}/api/v1/analytics/callback",
99
+ body=trigger_data
100
+ )
101
+ logger.info(f"πŸ“€ QStash queued: {result.message_id}")
102
+ except Exception as e:
103
+ logger.warning(f"⚠️ QStash failed, using Redis: {e}")
104
+ redis.publish(f"analytics_trigger:{org_id}:{source_id}", json.dumps(trigger_data))
105
+ else:
106
+ # βœ… Fallback: Direct Redis publish
107
+ redis.publish(f"analytics_trigger:{org_id}:{source_id}", json.dumps(trigger_data))
108
  logger.info("πŸ“‘ Redis fallback for analytics trigger")
109
+
110
+ except Exception as e:
111
+ logger.warning(f"⚠️ Analytics trigger failed (non-critical): {e}")
112
+ # βœ… DON'T raise - ingestion should succeed even if analytics fails
113
  # 4. πŸ“‘ Broadcast to connected dashboards
114
  await sio.emit(
115
  "datasource:new-rows",