Peter Mutwiri commited on
Commit
0bd628a
Β·
1 Parent(s): 8a78694

igress pipeline completion

Browse files
app/deps.py CHANGED
@@ -1,13 +1,16 @@
1
  # ── Standard Library ──────────────────────────────────────────────────────────
2
  import os
3
- from typing import Optional
4
  import pathlib
5
  import logging
 
6
 
7
  # ── Third-Party ────────────────────────────────────────────────────────────────
8
  import duckdb
9
- from fastapi import HTTPException, Header
10
  from upstash_redis import Redis
 
 
11
 
12
  # ── Configuration Paths ────────────────────────────────────────────────────────
13
  # Use YOUR existing pattern from app/db.py (multi-tenant)
@@ -131,6 +134,95 @@ def get_redis():
131
 
132
  return _redis_client
133
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  # ── API Security Dependency ────────────────────────────────────────────────────
135
  def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")):
136
  """
@@ -151,6 +243,69 @@ def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")):
151
 
152
  return x_api_key
153
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  # ── Health Check Utilities ─────────────────────────────────────────────────────
155
  def check_all_services():
156
  """
 
1
  # ── Standard Library ──────────────────────────────────────────────────────────
2
  import os
3
+ from typing import Optional, TYPE_CHECKING
4
  import pathlib
5
  import logging
6
+ import time
7
 
8
  # ── Third-Party ────────────────────────────────────────────────────────────────
9
  import duckdb
10
+ from fastapi import HTTPException, Header,Request, Depends
11
  from upstash_redis import Redis
12
+ from collections import defaultdict
13
+
14
 
15
  # ── Configuration Paths ────────────────────────────────────────────────────────
16
  # Use YOUR existing pattern from app/db.py (multi-tenant)
 
134
 
135
  return _redis_client
136
 
137
+
138
+
139
+ if TYPE_CHECKING:
140
+ from upstash_qstash import Client
141
+
142
+ def get_qstash_client() -> "Client":
143
+ """
144
+ Initialize and return singleton QStash client for Hugging Face Spaces.
145
+
146
+ Required HF Secrets:
147
+ - QSTASH_TOKEN: Your QStash API token
148
+
149
+ Optional HF Secrets:
150
+ - QSTASH_URL: Custom QStash URL (defaults to official Upstash endpoint)
151
+
152
+ Returns:
153
+ Configured QStash Client instance
154
+
155
+ Raises:
156
+ RuntimeError: If QSTASH_TOKEN is missing or client initialization fails
157
+ """
158
+ # Singleton pattern: store instance as function attribute
159
+ if not hasattr(get_qstash_client, "_client"):
160
+ token = os.getenv("QSTASH_TOKEN")
161
+ if not token:
162
+ raise RuntimeError(
163
+ "❌ QSTASH_TOKEN not found. Please add it to HF Space Secrets."
164
+ )
165
+
166
+ # Dynamic import to avoid requiring package at module load time
167
+ try:
168
+ from upstash_qstash import Client
169
+ except ImportError:
170
+ raise RuntimeError(
171
+ "❌ upstash_qstash not installed. "
172
+ "Add to requirements.txt: upstash-qstash"
173
+ )
174
+
175
+ # Optional: Use custom URL if provided
176
+ qstash_url = os.getenv("QSTASH_URL")
177
+
178
+ try:
179
+ if qstash_url:
180
+ get_qstash_client._client = Client(token=token, url=qstash_url)
181
+ print(f"βœ… QStash client initialized with custom URL: {qstash_url}")
182
+ else:
183
+ get_qstash_client._client = Client(token=token)
184
+ print("βœ… QStash client initialized")
185
+ except Exception as e:
186
+ raise RuntimeError(f"❌ QStash client initialization failed: {e}")
187
+
188
+ return get_qstash_client._client
189
+
190
+
191
+ def get_qstash_verifier():
192
+ """
193
+ Initialize QStash webhook verifier for receiving callbacks.
194
+ Used in /api/v1/analytics/callback endpoint to verify requests.
195
+
196
+ Required HF Secrets:
197
+ - QSTASH_CURRENT_SIGNING_KEY
198
+ - QSTASH_NEXT_SIGNING_KEY
199
+
200
+ Returns:
201
+ QStash Receiver/Verifier instance
202
+ """
203
+ if not hasattr(get_qstash_verifier, "_verifier"):
204
+ current_key = os.getenv("QSTASH_CURRENT_SIGNING_KEY")
205
+ next_key = os.getenv("QSTASH_NEXT_SIGNING_KEY")
206
+
207
+ if not current_key or not next_key:
208
+ raise RuntimeError(
209
+ "❌ QStash signing keys not configured. "
210
+ "Add QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY to HF secrets."
211
+ )
212
+
213
+ try:
214
+ from upstash_qstash import Receiver
215
+
216
+ get_qstash_verifier._verifier = Receiver({
217
+ "current_signing_key": current_key,
218
+ "next_signing_key": next_key
219
+ })
220
+ print("βœ… QStash verifier initialized")
221
+ except Exception as e:
222
+ raise RuntimeError(f"❌ QStash verifier initialization failed: {e}")
223
+
224
+ return get_qstash_verifier._verifier
225
+
226
  # ── API Security Dependency ────────────────────────────────────────────────────
227
  def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")):
228
  """
 
243
 
244
  return x_api_key
245
 
246
+ # ── New User Auth Dependency ──────────────────────────────────────────────────
247
+
248
+
249
+ def get_current_user(request: Request, api_key: str = Depends(verify_api_key)):
250
+ """
251
+ Extracts org_id from query parameters (since auth happens on Vercel).
252
+ Use this in analytics endpoints that need org context.
253
+
254
+ Stack Auth on Vercel already validated the user,
255
+ so we trust the orgId passed in the query.
256
+ """
257
+ org_id = request.query_params.get("org_id") or request.query_params.get("orgId")
258
+
259
+ if not org_id:
260
+ raise HTTPException(
261
+ status_code=401,
262
+ detail="❌ org_id missing from query parameters. Vercel stack auth missing?"
263
+ )
264
+
265
+ # Validate org_id format (simple security check)
266
+ if not org_id.startswith("org_") and not org_id.startswith("user_"):
267
+ raise HTTPException(
268
+ status_code=400,
269
+ detail=f"❌ Invalid org_id format: {org_id}"
270
+ )
271
+
272
+ return {
273
+ "org_id": org_id,
274
+ "api_key": api_key,
275
+ "authenticated_at": datetime.utcnow().isoformat(),
276
+ "source": "vercel_stack_auth"
277
+ }
278
+
279
+ # ── Rate Limiting (Optional but Recommended) ──────────────────────────────────
280
+
281
+ # In-memory rate limiter (per org)
282
+ _rate_limits = defaultdict(lambda: {"count": 0, "reset_at": 0})
283
+
284
+ def rate_limit_org(max_requests: int = 100, window_seconds: int = 60):
285
+ """
286
+ Rate limiter per organization.
287
+ Prevents one org from DOSing the analytics engine.
288
+ """
289
+ def dependency(org_id: str = Depends(lambda r: get_current_user(r)["org_id"])):
290
+ now = time.time()
291
+ limit_data = _rate_limits[org_id]
292
+
293
+ # Reset window
294
+ if now > limit_data["reset_at"]:
295
+ limit_data["count"] = 0
296
+ limit_data["reset_at"] = now + window_seconds
297
+
298
+ # Check limit
299
+ if limit_data["count"] >= max_requests:
300
+ raise HTTPException(
301
+ status_code=429,
302
+ detail=f"⏸️ Rate limit exceeded for {org_id}: {max_requests} req/min"
303
+ )
304
+
305
+ limit_data["count"] += 1
306
+ return org_id
307
+
308
+ return dependency
309
  # ── Health Check Utilities ─────────────────────────────────────────────────────
310
  def check_all_services():
311
  """
app/engine/kpi_calculators/base.py ADDED
@@ -0,0 +1,132 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/engine/kpi_calculators/base.py
2
+ from abc import ABC, abstractmethod
3
+ import pandas as pd
4
+ import numpy as np
5
+ from typing import Dict, Any, List, Optional, Set
6
+ from datetime import datetime, timedelta
7
+ import json
8
+ import hashlib
9
+
10
+ class BaseKPICalculator(ABC):
11
+ """
12
+ Abstract base for all industry-specific KPI calculators.
13
+ Guarantees consistent output format and error handling.
14
+ """
15
+
16
+ REQUIRED_COLUMNS: Set[str] = {"timestamp"}
17
+ OPTIONAL_COLUMNS: Set[str] = set()
18
+
19
+ def __init__(self, org_id: str, df: pd.DataFrame, source_id: str):
20
+ self.org_id = org_id
21
+ self.source_id = source_id
22
+ self.computed_at = datetime.utcnow()
23
+
24
+ # Validate schema
25
+ missing = self.REQUIRED_COLUMNS - set(df.columns)
26
+ if missing:
27
+ raise ValueError(f"Missing required columns: {missing}")
28
+
29
+ # Clean and store
30
+ self.df = self._clean_dataframe(df.copy())
31
+ self.cache_key = f"kpi_cache:{org_id}:{source_id}"
32
+
33
+ def _clean_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
34
+ """Universal data cleaning - bulletproof"""
35
+ # Replace infinities and NaNs with None (DuckDB-friendly)
36
+ df = df.replace([np.inf, -np.inf, np.nan], None)
37
+
38
+ # Ensure timestamp is datetime
39
+ if 'timestamp' in df.columns:
40
+ df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
41
+
42
+ # Standardize column names (lowercase, no spaces)
43
+ df.columns = [str(col).lower().strip().replace(' ', '_') for col in df.columns]
44
+
45
+ return df
46
+
47
+ @abstractmethod
48
+ def compute_all(self) -> Dict[str, Any]:
49
+ """
50
+ Return standardized KPI payload:
51
+ {
52
+ "realtime": {...},
53
+ "financial": {...},
54
+ "inventory": {...},
55
+ "customer": {...},
56
+ "predictive": {...},
57
+ "charts": {...}
58
+ }
59
+ """
60
+ pass
61
+
62
+ def _calculate_growth(self, current: Optional[float], previous: Optional[float]) -> float:
63
+ """Safe growth calculation - handles None and zero gracefully"""
64
+ if current is None or previous is None or previous == 0:
65
+ return 0.0
66
+ return ((current - previous) / previous) * 100
67
+
68
+ def _get_cached_value(self, metric_key: str) -> Optional[float]:
69
+ """Retrieve previous value for trend analysis"""
70
+ from app.redis_client import redis
71
+ try:
72
+ cached = redis.get(f"kpi_history:{self.org_id}:{self.source_id}:{metric_key}")
73
+ return float(cached) if cached else None
74
+ except Exception:
75
+ return None
76
+
77
+ def _cache_current_value(self, metric_key: str, value: float):
78
+ """Cache current value for next comparison"""
79
+ from app.redis_client import redis
80
+ try:
81
+ redis.setex(
82
+ f"kpi_history:{self.org_id}:{self.source_id}:{metric_key}",
83
+ 86400, # 24 hours
84
+ str(value)
85
+ )
86
+ except Exception:
87
+ pass
88
+
89
+ def _detect_data_quality_issues(self) -> List[str]:
90
+ """Audit data before KPI computation"""
91
+ issues = []
92
+
93
+ if self.df.empty:
94
+ issues.append("No data in window")
95
+ return issues
96
+
97
+ # Check for stale data
98
+ if 'timestamp' in self.df.columns:
99
+ latest = self.df['timestamp'].max()
100
+ if latest and (datetime.now() - latest).total_seconds() > 3600:
101
+ issues.append(f"Stale data: last record {latest}")
102
+
103
+ # Check for missing critical fields
104
+ critical_fields = ['total', 'items']
105
+ for field in critical_fields:
106
+ if field in self.df.columns and self.df[field].isna().all():
107
+ issues.append(f"All values missing for {field}")
108
+
109
+ # Check for outliers (99.9th percentile)
110
+ if 'total' in self.df.columns:
111
+ outliers = self.df[self.df['total'] > self.df['total'].quantile(0.999)]
112
+ if len(outliers) > 0:
113
+ issues.append(f"{len(outliers)} outlier transactions detected")
114
+
115
+ return issues
116
+
117
+ # Factory pattern for industry selection
118
+ def get_kpi_calculator(industry: str, org_id: str, df: pd.DataFrame, source_id: str) -> BaseKPICalculator:
119
+ """Factory to get the right calculator"""
120
+ from app.engine.kpi_calculators.supermarket import SupermarketKPICalculator
121
+ from app.engine.kpi_calculators.pharmaceutical import PharmaceuticalKPICalculator
122
+ from app.engine.kpi_calculators.manufacturing import ManufacturingKPICalculator
123
+
124
+ calculators = {
125
+ "supermarket": SupermarketKPICalculator,
126
+ "pharmaceutical": PharmaceuticalKPICalculator,
127
+ "manufacturing": ManufacturingKPICalculator,
128
+ "default": SupermarketKPICalculator # Fallback
129
+ }
130
+
131
+ calculator_class = calculators.get(industry.lower(), calculators["default"])
132
+ return calculator_class(org_id, df, source_id)
app/engine/kpi_calculators/supermarket.py ADDED
@@ -0,0 +1,388 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/engine/kpi_calculators/supermarket.py
2
+ import pandas as pd
3
+ import numpy as np
4
+ from datetime import datetime, timedelta
5
+ from typing import Dict, Any, List, Optional
6
+ from app.engine.kpi_calculators.base import BaseKPICalculator
7
+
8
+ class SupermarketKPICalculator(BaseKPICalculator):
9
+ """Complete KPI engine for supermarkets and retail"""
10
+
11
+ OPTIONAL_COLUMNS = {
12
+ "workstationid", "operatorid", "items", "total", "qty", "category",
13
+ "artnum", "expiry_date", "cost", "customer_id", "promo_flag",
14
+ "trantime", "breaktime", "enddatetime"
15
+ }
16
+
17
+ def compute_all(self) -> Dict[str, Any]:
18
+ """Compute all supermarket KPIs with graceful degradation"""
19
+
20
+ # Check data quality first
21
+ quality_issues = self._detect_data_quality_issues()
22
+ if quality_issues:
23
+ print(f"[kpi] ⚠️ Data quality issues: {quality_issues}")
24
+
25
+ metrics = {
26
+ "realtime": self._compute_realtime_metrics(),
27
+ "financial": self._compute_financial_metrics(),
28
+ "inventory": self._compute_inventory_health(),
29
+ "customer": self._compute_customer_behavior(),
30
+ "predictive": self._compute_predictive_alerts(),
31
+ "charts": self._compute_chart_data(),
32
+ "metadata": {
33
+ "computed_at": self.computed_at.isoformat(),
34
+ "rows_analyzed": len(self.df),
35
+ "data_quality_issues": quality_issues,
36
+ "industry": "supermarket"
37
+ }
38
+ }
39
+
40
+ # Cache values for next run
41
+ self._cache_current_value("hourly_sales", metrics["realtime"]["hourly_sales"])
42
+ self._cache_current_value("daily_sales", metrics["financial"]["daily_sales"])
43
+
44
+ return metrics
45
+
46
+ def _compute_realtime_metrics(self) -> Dict[str, Any]:
47
+ """What's happening in the last hour"""
48
+ now = datetime.now()
49
+ one_hour_ago = now - timedelta(hours=1)
50
+
51
+ # Filter last hour safely
52
+ if 'timestamp' in self.df.columns:
53
+ last_hour = self.df[self.df['timestamp'] > one_hour_ago]
54
+ else:
55
+ last_hour = self.df
56
+
57
+ # Safe calculations with fallbacks
58
+ hourly_sales = float(last_hour['total'].sum()) if 'total' in last_hour.columns else 0.0
59
+
60
+ active_checkouts = 0
61
+ if 'workstationid' in last_hour.columns:
62
+ active_checkouts = int(len(last_hour['workstationid'].dropna().unique()))
63
+
64
+ items_per_minute = 0
65
+ if not last_hour.empty:
66
+ items_per_minute = int(len(last_hour) / 60)
67
+
68
+ # Transaction time (if available)
69
+ avg_transaction_time = 120.0 # Default 2 minutes
70
+ if 'trantime' in last_hour.columns and not last_hour['trantime'].isna().all():
71
+ try:
72
+ avg_transaction_time = float(last_hour.groupby('tranid')['trantime'].sum().mean())
73
+ except:
74
+ pass
75
+
76
+ # Queue length estimate
77
+ queue_length = 0
78
+ if 'workstationid' in last_hour.columns and not last_hour.empty:
79
+ try:
80
+ queue_length = int(last_hour.groupby('workstationid').size().mean())
81
+ except:
82
+ pass
83
+
84
+ # Growth calculation
85
+ prev_hourly = self._get_cached_value("hourly_sales")
86
+ growth = self._calculate_growth(hourly_sales, prev_hourly)
87
+
88
+ return {
89
+ "hourly_sales": hourly_sales,
90
+ "active_checkouts": active_checkouts,
91
+ "items_per_minute": items_per_minute,
92
+ "avg_transaction_time": avg_transaction_time,
93
+ "queue_length_estimate": queue_length,
94
+ "growth_vs_last_hour": growth
95
+ }
96
+
97
+ def _compute_financial_metrics(self) -> Dict[str, Any]:
98
+ """Money metrics with industry benchmarks"""
99
+
100
+ # Daily sales
101
+ daily_sales = float(self.df['total'].sum()) if 'total' in self.df.columns else 0.0
102
+
103
+ # Refunds/Voids
104
+ refund_rate = 0.0
105
+ if 'items' in self.df.columns and 'total' in self.df.columns:
106
+ refunds = self.df[
107
+ self.df['items'].astype(str).str.contains('refund|void|return', case=False, na=False)
108
+ ]['total'].abs().sum()
109
+ daily_sales_clean = self.df[
110
+ ~self.df['items'].astype(str).str.contains('refund|void|return', case=False, na=False)
111
+ ]['total'].sum()
112
+
113
+ if daily_sales_clean > 0:
114
+ refund_rate = float(refunds / daily_sales_clean * 100)
115
+
116
+ # Average basket
117
+ avg_basket = 0.0
118
+ avg_items = 0.0
119
+ if 'total' in self.df.columns and 'tranid' in self.df.columns:
120
+ try:
121
+ basket_values = self.df.groupby('tranid')['total'].sum()
122
+ avg_basket = float(basket_values.mean())
123
+ avg_items = float(self.df.groupby('tranid')['items'].count().mean()) if 'items' in self.df.columns else 0.0
124
+ except:
125
+ pass
126
+
127
+ # Gross margin (if cost available)
128
+ gross_margin = 28.5 # Industry average fallback
129
+ if 'cost' in self.df.columns and 'total' in self.df.columns:
130
+ total_sales = self.df['total'].sum()
131
+ total_cost = self.df['cost'].sum()
132
+ if total_sales > 0:
133
+ gross_margin = float((total_sales - total_cost) / total_sales * 100)
134
+
135
+ # Labor efficiency
136
+ labor_efficiency = 0.0
137
+ if 'operatorid' in self.df.columns and 'total' in self.df.columns:
138
+ unique_ops = self.df['operatorid'].nunique()
139
+ if unique_ops > 0:
140
+ labor_efficiency = float(daily_sales / unique_ops / 100)
141
+
142
+ return {
143
+ "daily_sales": daily_sales,
144
+ "gross_margin_pct": gross_margin,
145
+ "refund_rate": refund_rate,
146
+ "avg_basket_value": avg_basket,
147
+ "avg_items_per_basket": avg_items,
148
+ "labor_efficiency": labor_efficiency,
149
+ "sales_per_sqft": float(daily_sales / 5000) # Assume 5k sqft
150
+ }
151
+
152
+ def _compute_inventory_health(self) -> Dict[str, Any]:
153
+ """Stock intelligence with predictive alerts"""
154
+
155
+ expiring_value = 0.0
156
+ stockout_risk = 0
157
+ wastage_rate = 0.0
158
+ alerts = []
159
+
160
+ # Expiry analysis
161
+ if 'expiry_date' in self.df.columns:
162
+ try:
163
+ expiring_soon = self.df[
164
+ pd.to_datetime(self.df['expiry_date'], errors='coerce') <
165
+ datetime.now() + timedelta(days=7)
166
+ ]
167
+ expiring_value = float(expiring_soon['total'].sum()) if 'total' in expiring_soon.columns else 0.0
168
+
169
+ if expiring_value > 5000:
170
+ alerts.append(f"⚠️ KES {expiring_value:,.0f} expiring <7 days")
171
+ except:
172
+ pass
173
+
174
+ # Stock velocity (simple approach)
175
+ if 'artnum' in self.df.columns and 'qty' in self.df.columns:
176
+ try:
177
+ # Group by product and calculate velocity
178
+ product_stats = self.df.groupby('artnum').agg({
179
+ 'qty': 'sum',
180
+ 'total': 'sum'
181
+ }).fillna(0)
182
+
183
+ # Assume current stock = last qty value
184
+ current_stock = self.df.groupby('artnum')['qty'].last().fillna(0)
185
+
186
+ # Simple velocity (units per day)
187
+ daily_velocity = product_stats['qty'] / max(1, len(self.df.groupby(self.df['timestamp'].dt.date))))
188
+ days_left = (current_stock / daily_velocity).fillna(999)
189
+
190
+ stockout_risk = int((days_left < 2).sum())
191
+
192
+ if stockout_risk > 0:
193
+ alerts.append(f"🚨 {stockout_risk} SKUs at stockout risk")
194
+ except:
195
+ pass
196
+
197
+ # Wastage rate
198
+ if len(self.df) > 0:
199
+ try:
200
+ wastage_rate = float(len(expiring_soon) / len(self.df) * 100) if 'expiring_soon' in locals() else 0.0
201
+ except:
202
+ pass
203
+
204
+ return {
205
+ "expiring_value": expiring_value,
206
+ "out_of_stock_skus": stockout_risk,
207
+ "wastage_rate": wastage_rate,
208
+ "stock_turnover": float(365 / 30), # Simplified
209
+ "carrying_cost": float(self.df['total'].sum() * 0.02) if 'total' in self.df.columns else 0.0,
210
+ "alerts": alerts
211
+ }
212
+
213
+ def _compute_customer_behavior(self) -> Dict[str, Any]:
214
+ """Shopper insights with safe fallbacks"""
215
+
216
+ unique_customers = 0
217
+ repeat_rate = 0.0
218
+ peak_hour = 0
219
+ weekend_lift = 0.0
220
+
221
+ # Unique customers
222
+ if 'customer_id' in self.df.columns:
223
+ unique_customers = int(self.df['customer_id'].nunique())
224
+ elif 'operatorid' in self.df.columns:
225
+ unique_customers = int(self.df['operatorid'].nunique())
226
+
227
+ # Repeat rate (if customer_id available)
228
+ if 'customer_id' in self.df.columns and 'tranid' in self.df.columns:
229
+ try:
230
+ repeat_rate = float(
231
+ self.df.groupby('customer_id')['tranid'].nunique().gt(1).mean() * 100
232
+ )
233
+ except:
234
+ pass
235
+
236
+ # Peak hour
237
+ if 'timestamp' in self.df.columns:
238
+ try:
239
+ hourly = self.df.groupby(self.df['timestamp'].dt.hour)['total'].sum()
240
+ peak_hour = int(hourly.idxmax()) if not hourly.empty else 0
241
+ except:
242
+ pass
243
+
244
+ # Weekend lift
245
+ if 'timestamp' in self.df.columns:
246
+ try:
247
+ self.df['is_weekend'] = self.df['timestamp'].dt.weekday >= 5
248
+ if self.df['is_weekend'].any():
249
+ weekend_sales = self.df[self.df['is_weekend']]['total'].sum()
250
+ weekday_sales = self.df[~self.df['is_weekend']]['total'].sum()
251
+ if weekday_sales > 0:
252
+ weekend_lift = float(weekend_sales / weekday_sales * 100 - 100)
253
+ except:
254
+ pass
255
+
256
+ return {
257
+ "unique_customers": unique_customers,
258
+ "repeat_rate": repeat_rate,
259
+ "peak_hour": peak_hour,
260
+ "weekend_lift_pct": weekend_lift,
261
+ "new_customers": int(unique_customers * 0.15), # Assumption
262
+ "customer_acquisition_cost": 50.0, # Placeholder
263
+ "customer_lifetime_value": 2500.0 # Placeholder
264
+ }
265
+
266
+ def _compute_predictive_alerts(self) -> Dict[str, Any]:
267
+ """AI-powered alerts without ML (rule-based intelligence)"""
268
+
269
+ alerts = []
270
+
271
+ # Unusual pattern detection
272
+ if 'timestamp' in self.df.columns and 'total' in self.df.columns:
273
+ try:
274
+ hourly_sales = self.df.groupby(self.df['timestamp'].dt.hour)['total'].sum()
275
+ if hourly_sales.std() > hourly_sales.mean() * 0.3:
276
+ alerts.append({
277
+ "severity": "warning",
278
+ "title": "πŸ“Š Unusual Hourly Pattern",
279
+ "description": "Sales variance exceeds 30%. Check for system errors.",
280
+ "action": "investigate"
281
+ })
282
+ except:
283
+ pass
284
+
285
+ # Staffing opportunity
286
+ if 'operatorid' in self.df.columns and 'total' in self.df.columns:
287
+ try:
288
+ operator_efficiency = self.df.groupby('operatorid')['total'].sum()
289
+ low_performers = operator_efficiency[operator_efficiency < operator_efficiency.quantile(0.1)]
290
+
291
+ if len(low_performers) > 0:
292
+ alerts.append({
293
+ "severity": "info",
294
+ "title": "πŸ‘₯ Training Opportunity",
295
+ "description": f"{len(low_performers)} operators below 10th percentile",
296
+ "action": "schedule_training"
297
+ })
298
+ except:
299
+ pass
300
+
301
+ # Promo opportunity for slow movers
302
+ if 'artnum' in self.df.columns and 'qty' in self.df.columns:
303
+ try:
304
+ slow_movers = self.df.groupby('artnum')['qty'].sum().nsmallest(5).index.tolist()
305
+ if slow_movers:
306
+ alerts.append({
307
+ "severity": "insight",
308
+ "title": "πŸ’‘ Promo Opportunity",
309
+ "description": f"{len(slow_movers)} SKUs need velocity boost",
310
+ "action": "create_promo"
311
+ })
312
+ except:
313
+ pass
314
+
315
+ return {"alerts": alerts}
316
+
317
+ def _compute_chart_data(self) -> Dict[str, Any]:
318
+ """Frontend-ready chart data"""
319
+
320
+ hourly_sales = []
321
+ top_categories = []
322
+ customer_segments = []
323
+
324
+ # Hourly sales trend
325
+ if 'timestamp' in self.df.columns and 'total' in self.df.columns:
326
+ try:
327
+ hourly = self.df.groupby(self.df['timestamp'].dt.hour)['total'].sum()
328
+ hourly_sales = [{"label": f"{h:02d}:00", "value": float(v)}
329
+ for h, v in hourly.reindex(range(24), fill_value=0).items()]
330
+ except:
331
+ hourly_sales = []
332
+
333
+ # Top categories (if available)
334
+ if 'category' in self.df.columns and 'total' in self.df.columns:
335
+ try:
336
+ category_sales = self.df.groupby('category')['total'].sum().nlargest(5)
337
+ top_categories = [{"label": k, "value": float(v)}
338
+ for k, v in category_sales.items()]
339
+ except:
340
+ pass
341
+
342
+ # Customer segments (simplified RFM)
343
+ if 'customer_id' in self.df.columns and 'total' in self.df.columns:
344
+ try:
345
+ recency = (datetime.now() - self.df.groupby('customer_id')['timestamp'].max()).dt.days
346
+ frequency = self.df.groupby('customer_id')['tranid'].nunique()
347
+ monetary = self.df.groupby('customer_id')['total'].sum()
348
+
349
+ # Quintile-based segmentation
350
+ def segment_score(series):
351
+ return pd.qcut(series, 5, labels=[1,2,3,4,5], duplicates='drop')
352
+
353
+ r_score = segment_score(recency)
354
+ f_score = segment_score(frequency)
355
+ m_score = segment_score(monetary)
356
+
357
+ # Simple segments
358
+ segments = {
359
+ "VIP": int(((r_score <= 3) & (f_score >= 4) & (m_score >= 4)).sum()),
360
+ "Regular": int(((r_score <= 3) & (f_score >= 2) & (m_score >= 2)).sum()),
361
+ "At-Risk": int((r_score > 3).sum())
362
+ }
363
+
364
+ customer_segments = [{"label": k, "value": v} for k, v in segments.items()]
365
+ except:
366
+ customer_segments = [{"label": "All", "value": len(self.df)}]
367
+
368
+ return {
369
+ "hourly_sales": hourly_sales,
370
+ "top_categories": top_categories,
371
+ "customer_segments": customer_segments,
372
+ "sales_trend_7d": self._generate_trend_data(7)
373
+ }
374
+
375
+ def _generate_trend_data(self, days: int) -> List[Dict]:
376
+ """Generate realistic trend data - replace with Prophet ML"""
377
+ if 'total' not in self.df.columns:
378
+ return []
379
+
380
+ base = self.df['total'].sum() / max(1, len(self.df.groupby(self.df['timestamp'].dt.date))) if 'timestamp' in self.df.columns else 1
381
+
382
+ return [
383
+ {
384
+ "label": (datetime.now() - timedelta(days=i)).strftime('%a'),
385
+ "value": float(base * (1 + np.random.normal(0, 0.1)))
386
+ }
387
+ for i in range(days, 0, -1)
388
+ ]
app/main.py CHANGED
@@ -25,9 +25,9 @@ from contextlib import asynccontextmanager
25
 
26
  # ─── Router Imports ───────────────────────────────────────────────────────────
27
  # Import ALL routers
28
- from app.routers import health, datasources, reports, flags, scheduler, run, socket
29
  # ─── Dependencies ─────────────────────────────────────────────────────────────
30
- from app.deps import verify_api_key, check_all_services
31
 
32
  # ─── Logger Configuration ───────────────────────────────────────────────────────
33
  logging.basicConfig(
@@ -74,6 +74,29 @@ async def lifespan(app: FastAPI):
74
  logger.info(f"βœ… Scheduler started (PID: {scheduler_process.pid})")
75
 
76
  logger.info("βœ… Startup sequence complete")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  yield
78
 
79
  # ─── Shutdown ──────────────────────────────────────────────────────────────
@@ -155,7 +178,86 @@ async def add_request_tracking(request: Request, call_next):
155
  )
156
 
157
  return response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
158
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  # ─── Root Endpoint ─────────────────────────────────────────────────────────────
160
  @app.get("/", tags=["root"])
161
  def read_root():
@@ -230,4 +332,5 @@ app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depen
230
  app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
231
  app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
232
  app.include_router(run.router, prefix="/api/v1/run", dependencies=[Depends(verify_api_key)])
233
- app.include_router(socket.router, prefix="/api/v1/socket", dependencies=[Depends(verify_api_key)])
 
 
25
 
26
  # ─── Router Imports ───────────────────────────────────────────────────────────
27
  # Import ALL routers
28
+ from app.routers import health, datasources, reports, flags, scheduler, run, socket,analytics_stream
29
  # ─── Dependencies ─────────────────────────────────────────────────────────────
30
+ from app.deps import get_current_user, rate_limit_org, verify_api_key, check_all_services
31
 
32
  # ─── Logger Configuration ───────────────────────────────────────────────────────
33
  logging.basicConfig(
 
74
  logger.info(f"βœ… Scheduler started (PID: {scheduler_process.pid})")
75
 
76
  logger.info("βœ… Startup sequence complete")
77
+
78
+ # Setup Redis streams
79
+ logger.info("πŸ”„ Setting up Redis streams...")
80
+ try:
81
+ active_orgs = redis.keys("entity:*")
82
+ for key in active_orgs:
83
+ key_parts = key.decode().split(":")
84
+ if len(key_parts) >= 3:
85
+ org_id, source_id = key_parts[1], key_parts[2]
86
+ stream_key = f"stream:analytics:{org_id}:{source_id}"
87
+ try:
88
+ redis.xgroup_create(stream_key, f"analytics_consumers_{org_id}", id="0", mkstream=True)
89
+ except Exception as e:
90
+ if "BUSYGROUP" not in str(e):
91
+ logger.warning(f"⚠️ Stream setup warning: {e}")
92
+
93
+ logger.info("βœ… Redis streams consumer groups ready")
94
+ except Exception as e:
95
+ logger.error(f"❌ Stream setup failed: {e}")
96
+
97
+ # Start background KPI scheduler
98
+ logger.info("⏰ Starting KPI refresh scheduler...")
99
+ asyncio.create_task(continuous_kpi_refresh(), name="kpi_scheduler")
100
  yield
101
 
102
  # ─── Shutdown ──────────────────────────────────────────────────────────────
 
178
  )
179
 
180
  return response
181
+ # ─── NEW: KPI COMPUTATION ENDPOINT (With Auth) ─────────────────────────────────
182
+ @app.post("/api/v1/kpi/compute")
183
+ async def compute_kpis(
184
+ source_id: str = Query(..., description="Data source ID"),
185
+ background_tasks: BackgroundTasks,
186
+ current_user: dict = Depends(get_current_user), # NEW: Auth from query params
187
+ limited_org: str = Depends(rate_limit_org(max_requests=50)) # NEW: Rate limit
188
+ ):
189
+ """
190
+ Trigger KPI computation.
191
+ Returns immediately; results published to Redis stream.
192
+
193
+ Auth: Uses org_id from query params (validated against Vercel stack auth)
194
+ Rate limit: 50 requests/min per org
195
+ """
196
+ try:
197
+ org_id = current_user["org_id"]
198
+
199
+ # Check cache first
200
+ cached = redis.get(f"kpi_cache:{org_id}:{source_id}")
201
+ if cached:
202
+ return {
203
+ "status": "cached",
204
+ "org_id": org_id,
205
+ "data": json.loads(cached),
206
+ "rate_limit": {
207
+ "remaining": 50 - _rate_limits[org_id]["count"],
208
+ "reset_in": max(0, _rate_limits[org_id]["reset_at"] - time.time())
209
+ }
210
+ }
211
+
212
+ # Trigger background computation via QStash
213
+ background_tasks.add_task(trigger_kpi_computation, org_id, source_id)
214
+
215
+ return {
216
+ "status": "processing",
217
+ "org_id": org_id,
218
+ "message": "KPI computation queued. Poll /analytics/stream/recent for results.",
219
+ "poll_url": f"/api/v1/analytics/stream/recent?org_id={org_id}&source_id={source_id}",
220
+ "rate_limit": {
221
+ "remaining": 50 - _rate_limits[org_id]["count"],
222
+ "reset_in": max(0, _rate_limits[org_id]["reset_at"] - time.time())
223
+ }
224
+ }
225
+ except Exception as e:
226
+ logger.error(f"❌ KPI compute error: {e}")
227
+ raise HTTPException(status_code=500, detail=str(e))
228
 
229
+ # ─── NEW: BACKGROUND KPI SCHEDULER ───────────────────────────────────────────
230
+ async def continuous_kpi_refresh():
231
+ """
232
+ Auto-refresh KPIs every 5 minutes for active organizations.
233
+ Runs as a background task started at app startup.
234
+ """
235
+ while True:
236
+ try:
237
+ logger.debug("πŸ”„ KPI scheduler tick...")
238
+
239
+ # Get all active entity keys from Redis
240
+ active_keys = redis.keys("entity:*")
241
+
242
+ for key in active_keys:
243
+ key_parts = key.decode().split(":")
244
+ if len(key_parts) >= 3:
245
+ org_id, source_id = key_parts[1], key_parts[2]
246
+
247
+ # Skip if recently computed (cache exists)
248
+ cache_key = f"kpi_cache:{org_id}:{source_id}"
249
+ if redis.exists(cache_key):
250
+ continue
251
+
252
+ # Trigger async computation (non-blocking)
253
+ logger.info(f"⏰ Auto-triggering KPIs for {org_id}/{source_id}")
254
+ await trigger_kpi_computation(org_id, source_id)
255
+
256
+ except Exception as e:
257
+ logger.error(f"❌ Scheduler error: {e}")
258
+
259
+ # Wait 5 minutes before next run
260
+ await asyncio.sleep(300)
261
  # ─── Root Endpoint ─────────────────────────────────────────────────────────────
262
  @app.get("/", tags=["root"])
263
  def read_root():
 
332
  app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
333
  app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
334
  app.include_router(run.router, prefix="/api/v1/run", dependencies=[Depends(verify_api_key)])
335
+ app.include_router(socket.router, prefix="/api/v1/socket", dependencies=[Depends(verify_api_key)])
336
+ app.include_router(analytics_stream.router, prefix="/api/v1/analytics", tags=["analytics"])
app/mapper.py CHANGED
@@ -487,5 +487,17 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
487
  df = df.replace([np.inf, -np.inf, np.nan], None) # Clean for JSON response
488
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
489
  print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms for {org_id}")
 
 
 
 
 
 
 
 
 
 
 
 
490
 
491
  return df, industry, industry_confidence
 
487
  df = df.replace([np.inf, -np.inf, np.nan], None) # Clean for JSON response
488
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
489
  print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms for {org_id}")
490
+
491
+ # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
492
+ if not df.empty:
493
+ redis.publish(
494
+ f"analytics_trigger:{org_id}:{source_id}",
495
+ json.dumps({
496
+ "type": "kpi_compute",
497
+ "entity_type": entity_type,
498
+ "industry": industry
499
+ })
500
+ )
501
+ print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
502
 
503
  return df, industry, industry_confidence
app/qstash_client.py ADDED
@@ -0,0 +1 @@
 
 
1
+ qstash_client.py
app/routers/analytics_stream.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/routers/analytics_stream.py
2
+ from fastapi import APIRouter, Depends, HTTPException, Query
3
+ from typing import List, Dict
4
+ import json
5
+ import asyncio
6
+ from datetime import datetime
7
+ from app.deps import get_current_user
8
+ from app.redis_client import redis
9
+ import uuid
10
+ from app.qstash_client import publish_message, is_qstash_available
11
+
12
+ router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])
13
+
14
+ class AnalyticsStreamManager:
15
+ """Manages Redis streams for real-time analytics without WebSockets"""
16
+
17
+ def __init__(self, org_id: str, source_id: str):
18
+ self.org_id = org_id
19
+ self.source_id = source_id
20
+ self.stream_key = f"stream:analytics:{org_id}:{source_id}"
21
+ self.consumer_group = f"analytics_consumers_{org_id}"
22
+
23
+ async def ensure_consumer_group(self):
24
+ """Create Redis consumer group if not exists"""
25
+ try:
26
+ redis.xgroup_create(
27
+ self.stream_key,
28
+ self.consumer_group,
29
+ id="0",
30
+ mkstream=True
31
+ )
32
+ except Exception as e:
33
+ if "BUSYGROUP" not in str(e):
34
+ print(f"[stream] ⚠️ Group creation warning: {e}")
35
+
36
+ async def publish_kpi_update(self, data: Dict):
37
+ """Publish KPI update to Redis stream"""
38
+ message = {
39
+ "type": "kpi_update",
40
+ "timestamp": datetime.utcnow().isoformat(),
41
+ "data": data
42
+ }
43
+ redis.xadd(self.stream_key, {"message": json.dumps(message)})
44
+
45
+ async def publish_insight(self, insight: Dict):
46
+ """Publish AI insight to stream"""
47
+ message = {
48
+ "type": "insight",
49
+ "timestamp": datetime.utcnow().isoformat(),
50
+ "data": insight
51
+ }
52
+ redis.xadd(self.stream_key, {"message": json.dumps(message)})
53
+
54
+ def read_recent(self, count: int = 10) -> List[Dict]:
55
+ """Read recent messages for polling"""
56
+ try:
57
+ messages = redis.xrevrange(self.stream_key, count=count)
58
+ return [
59
+ json.loads(msg[1][b"message"].decode())
60
+ for msg in messages
61
+ ]
62
+ except Exception as e:
63
+ print(f"[stream] ❌ Read error: {e}")
64
+ return []
65
+
66
+ @router.get("/recent")
67
+ async def get_recent_analytics(
68
+ org_id = current_user["org_id"]
69
+ source_id = current_user.get("source_id", "default")
70
+ count: int = Query(10, ge=1, le=100),
71
+ user = Depends(get_current_user)
72
+ ):
73
+ """Poll recent analytics (replaces Socket.io)"""
74
+ if user.org_id != org_id:
75
+ raise HTTPException(status_code=403, detail="Unauthorized")
76
+
77
+ manager = AnalyticsStreamManager(org_id, source_id)
78
+ messages = manager.read_recent(count)
79
+
80
+ return {
81
+ "status": "success",
82
+ "org_id": org_id, # Confirm which org
83
+ "messages": messages,
84
+ "timestamp": datetime.utcnow().isoformat()
85
+ }
86
+
87
+ @router.post("/trigger")
88
+ async def trigger_kpi_computation(
89
+ source_id: str = Query(...),
90
+ org_id: str = Query(...),
91
+ current_user: Dict = Depends(get_current_user),
92
+ ):
93
+ """Trigger KPI computation via QStash"""
94
+
95
+ if not is_qstash_available():
96
+ raise HTTPException(
97
+ status_code=503,
98
+ detail="QStash not configured. Check HF secrets."
99
+ )
100
+
101
+ # Check cache (your existing logic)
102
+ cached = redis.get(f"kpi_cache:{org_id}:{source_id}")
103
+ if cached:
104
+ return {"status": "cached", "data": json.loads(cached)}
105
+
106
+ # Publish to QStash
107
+ try:
108
+ result = publish_message(
109
+ url=f"{settings.APP_URL}/api/v1/analytics/callback",
110
+ body={
111
+ "org_id": org_id,
112
+ "source_id": source_id,
113
+ "user_id": current_user["user_id"]
114
+ },
115
+ callback=f"{settings.APP_URL}/api/v1/analytics/notify"
116
+ )
117
+
118
+ return {
119
+ "status": "processing",
120
+ "message_id": result["messageId"],
121
+ "poll_url": f"/api/v1/analytics/stream/recent?org_id={org_id}&source_id={source_id}"
122
+ }
123
+
124
+ except Exception as e:
125
+ raise HTTPException(status_code=500, detail=f"QStash error: {str(e)}")
126
+
127
+ @router.post("/callback")
128
+ async def qstash_kpi_callback(
129
+ payload: Dict = Body(...),
130
+ background_tasks: BackgroundTasks
131
+ ):
132
+ """QStash calls this to compute KPIs"""
133
+ org_id = payload["org_id"]
134
+ source_id = payload["source_id"]
135
+
136
+ # Trigger background computation
137
+ background_tasks.add_task(
138
+ run_analytics_worker, org_id, source_id
139
+ )
140
+
141
+ return {"status": "accepted"}
142
+
143
+ @router.post("/notify")
144
+ async def qstash_notification(payload: Dict = Body(...)):
145
+ """QStash calls this when job is done"""
146
+ # This is where you notify frontend
147
+ # Could ping a webhook or update a status key in Redis
148
+
149
+ return {"status": "ok"}
150
+
151
+ async def run_analytics_worker(org_id: str, source_id: str):
152
+ """Run the KPI worker and publish results"""
153
+ try:
154
+ from app.tasks.analytics_worker import AnalyticsWorker
155
+ worker = AnalyticsWorker(org_id, source_id)
156
+ results = await worker.run()
157
+
158
+ # Publish to Redis stream
159
+ manager = AnalyticsStreamManager(org_id, source_id)
160
+ await manager.publish_kpi_update(results)
161
+
162
+ except Exception as e:
163
+ print(f"[callback] ❌ Worker failed: {e}")
app/routers/datasources.py CHANGED
@@ -2,7 +2,7 @@ from fastapi import APIRouter, Query, Form, File, UploadFile, Depends, HTTPExcep
2
  from fastapi.responses import JSONResponse
3
  from pydantic import BaseModel
4
  from typing import List, Any, Dict, Union
5
- from app.deps import verify_api_key
6
  from app.db import get_conn, ensure_raw_table, bootstrap
7
  from app.mapper import canonify_df
8
  from app.routers.socket import sio
@@ -11,6 +11,8 @@ import json
11
  import time
12
  from datetime import datetime, timedelta
13
  from app.redis_client import redis
 
 
14
  router = APIRouter(tags=["datasources"]) # Remove
15
 
16
 
@@ -83,9 +85,10 @@ async def create_source_json(
83
  orgId: str = Query(...),
84
  sourceId: str = Query(...),
85
  type: str = Query(...),
 
86
  _: str = Depends(verify_api_key),
87
  ):
88
-
89
  org_id = orgId
90
  source_id = sourceId
91
  ds_type = type
@@ -118,8 +121,8 @@ async def create_source_json(
118
  # Entity will be auto-queued by process_detect_industry()
119
 
120
  df, industry, confidence = canonify_df(org_id, source_id)
121
- # 3. 🎯 Prepare preview for real-time broadcast
122
-
123
  # Convert DataFrame to JSON-safe format
124
  preview_df = df.head(3).copy()
125
  for col in preview_df.columns:
 
2
  from fastapi.responses import JSONResponse
3
  from pydantic import BaseModel
4
  from typing import List, Any, Dict, Union
5
+ from app.deps import verify_api_key,get_current_user
6
  from app.db import get_conn, ensure_raw_table, bootstrap
7
  from app.mapper import canonify_df
8
  from app.routers.socket import sio
 
11
  import time
12
  from datetime import datetime, timedelta
13
  from app.redis_client import redis
14
+ # Add this import
15
+ from app.tasks.analytics_worker import trigger_kpi_computation
16
  router = APIRouter(tags=["datasources"]) # Remove
17
 
18
 
 
85
  orgId: str = Query(...),
86
  sourceId: str = Query(...),
87
  type: str = Query(...),
88
+ current_user: dict = Depends(get_current_user),
89
  _: str = Depends(verify_api_key),
90
  ):
91
+
92
  org_id = orgId
93
  source_id = sourceId
94
  ds_type = type
 
121
  # Entity will be auto-queued by process_detect_industry()
122
 
123
  df, industry, confidence = canonify_df(org_id, source_id)
124
+ # run autokpi computation in background
125
+ background_tasks.add_task(trigger_kpi_computation, org_id, source_id)
126
  # Convert DataFrame to JSON-safe format
127
  preview_df = df.head(3).copy()
128
  for col in preview_df.columns:
app/tasks/analytics_worker.py ADDED
@@ -0,0 +1,208 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/tasks/analytics_worker.py
2
+ import asyncio
3
+ import json
4
+ import pandas as pd
5
+ from datetime import datetime
6
+ from typing import Dict, Any
7
+ from app.redis_client import redis
8
+ from app.db import get_conn
9
+ from app.engine.kpi_calculators.base import get_kpi_calculator
10
+
11
+ class AnalyticsWorker:
12
+ """Background worker for KPI computation and Redis pub/sub"""
13
+
14
+ def __init__(self, org_id: str, source_id: str, hours_window: int = 24):
15
+ self.org_id = org_id
16
+ self.source_id = source_id
17
+ self.hours_window = hours_window
18
+ self.computed_at = None
19
+
20
+ async def run(self) -> Dict[str, Any]:
21
+ """Async KPI computation with error handling"""
22
+ start_time = datetime.now()
23
+
24
+ try:
25
+ # 1. Load data
26
+ df = await self._load_dataframe()
27
+ if df.empty:
28
+ await self._publish_status("no_data")
29
+ return {"error": "No data in window"}
30
+
31
+ # 2. Get industry
32
+ industry = await self._get_industry()
33
+ if not industry or industry == "UNKNOWN":
34
+ await self._publish_status("unknown_industry")
35
+ return {"error": "Industry unknown"}
36
+
37
+ # 3. Compute KPIs
38
+ calculator = get_kpi_calculator(industry, self.org_id, df, self.source_id)
39
+ results = await asyncio.to_thread(calculator.compute_all)
40
+
41
+ self.computed_at = datetime.now()
42
+
43
+ # 4. Publish to Redis
44
+ await self._publish_results(results)
45
+
46
+ # 5. Cache for 5 minutes
47
+ cache_ttl = 300 # 5 min
48
+ redis.setex(
49
+ f"kpi_cache:{self.org_id}:{self.source_id}",
50
+ cache_ttl,
51
+ json.dumps(results)
52
+ )
53
+
54
+ duration = (self.computed_at - start_time).total_seconds()
55
+ print(f"[worker] βœ… {self.org_id}/{self.source_id} computed in {duration:.2f}s")
56
+
57
+ return results
58
+
59
+ except Exception as e:
60
+ error_msg = f"KPI computation failed: {str(e)}"
61
+ print(f"[worker] ❌ {self.org_id}/{self.source_id}: {error_msg}")
62
+ await self._publish_error(error_msg)
63
+ return {"error": error_msg}
64
+
65
+ async def _load_dataframe(self) -> pd.DataFrame:
66
+ """Load from DuckDB with async wrapper"""
67
+ loop = asyncio.get_event_loop()
68
+ return await loop.run_in_executor(None, self._sync_load_dataframe)
69
+
70
+ def _sync_load_dataframe(self) -> pd.DataFrame:
71
+ """
72
+ Synchronous DB loading with canonical table readiness check.
73
+ Waits up to 30 seconds for the table to exist and contain data.
74
+ """
75
+ conn = None
76
+ MAX_WAIT = 30 # seconds
77
+ RETRY_INTERVAL = 2 # seconds
78
+
79
+ try:
80
+ # Get entity type from Redis
81
+ entity_key = f"entity:{self.org_id}:{self.source_id}"
82
+ entity_info = redis.get(entity_key)
83
+
84
+ if not entity_info:
85
+ print(f"[worker] ⚠️ No entity info in Redis: {entity_key}")
86
+ return pd.DataFrame()
87
+
88
+ try:
89
+ entity_type = json.loads(entity_info)['entity_type']
90
+ if entity_type == "UNKNOWN":
91
+ print(f"[worker] ⚠️ Entity type is UNKNOWN, skipping")
92
+ return pd.DataFrame()
93
+ except (json.JSONDecodeError, KeyError) as e:
94
+ print(f"[worker] ❌ Invalid entity info: {e}")
95
+ return pd.DataFrame()
96
+
97
+ table_name = f"main.{entity_type}_canonical"
98
+ cutoff_time = datetime.now() - timedelta(hours=self.hours_window)
99
+
100
+ conn = get_conn(self.org_id)
101
+
102
+ # Wait for table readiness
103
+ start_time = time.time()
104
+ elapsed = 0
105
+
106
+ while elapsed < MAX_WAIT:
107
+ try:
108
+ # Try to query row count - this checks both existence and data
109
+ count_query = f"SELECT COUNT(*) FROM {table_name} WHERE timestamp >= ?"
110
+ row_count = conn.execute(count_query, [cutoff_time]).fetchone()[0]
111
+
112
+ if row_count > 0:
113
+ print(f"[worker] βœ… Table ready: {row_count} rows in {table_name} (waited {elapsed:.1f}s)")
114
+ break
115
+ else:
116
+ print(f"[worker] ⏳ Table exists but no data yet (waited {elapsed:.1f}s)")
117
+
118
+ except Exception as e:
119
+ error_msg = str(e).lower()
120
+ if "does not exist" in error_msg or "catalog error" in error_msg:
121
+ print(f"[worker] ⏳ Table doesn't exist yet (waited {elapsed:.1f}s)")
122
+ else:
123
+ print(f"[worker] ⚠️ Unexpected error: {e} (waited {elapsed:.1f}s)")
124
+
125
+ time.sleep(RETRY_INTERVAL)
126
+ elapsed = time.time() - start_time
127
+
128
+ else:
129
+ print(f"[worker] ❌ Timeout after {MAX_WAIT}s: {table_name}")
130
+ return pd.DataFrame()
131
+
132
+ # Load the data
133
+ query = f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC"
134
+ df = conn.execute(query, [cutoff_time]).df()
135
+
136
+ print(f"[worker] πŸ“Š Loaded {len(df)} rows Γ— {len(df.columns)} cols")
137
+ return df
138
+
139
+ except Exception as e:
140
+ print(f"[worker] ❌ Fatal error: {e}")
141
+ return pd.DataFrame()
142
+
143
+ finally:
144
+ if conn:
145
+ try:
146
+ conn.close()
147
+ print(f"[worker] πŸ”’ Connection closed for {self.org_id}")
148
+ except Exception as e:
149
+ print(f"[worker] ⚠️ Error closing connection: {e}")
150
+
151
+ async def _get_industry(self) -> str:
152
+ """Get industry from Redis cache"""
153
+ try:
154
+ industry_key = f"industry:{self.org_id}:{self.source_id}"
155
+ data = redis.get(industry_key)
156
+ if data:
157
+ return json.loads(data).get('industry', 'supermarket').lower()
158
+ return "supermarket"
159
+ except:
160
+ return "supermarket"
161
+
162
+ async def _publish_results(self, results: Dict[str, Any]):
163
+ """Publish KPIs and insights to Redis pub/sub"""
164
+
165
+ # Main KPI channel
166
+ kpi_channel = f"analytics:{self.org_id}:{self.source_id}:kpi"
167
+ kpi_message = {
168
+ "type": "kpi_update",
169
+ "timestamp": self.computed_at.isoformat(),
170
+ "data": results
171
+ }
172
+ redis.publish(kpi_channel, json.dumps(kpi_message))
173
+
174
+ # Separate insight channel
175
+ insight_channel = f"analytics:{self.org_id}:{self.source_id}:insights"
176
+ for alert in results.get('predictive', {}).get('alerts', []):
177
+ insight_message = {
178
+ "type": "insight",
179
+ "timestamp": self.computed_at.isoformat(),
180
+ "data": alert
181
+ }
182
+ redis.publish(insight_channel, json.dumps(insight_message))
183
+
184
+ print(f"[worker] πŸ“€ Published to {kpi_channel}")
185
+
186
+ async def _publish_status(self, status: str):
187
+ """Publish system status"""
188
+ channel = f"analytics:{self.org_id}:{self.source_id}:status"
189
+ redis.publish(channel, json.dumps({
190
+ "type": "status",
191
+ "status": status,
192
+ "timestamp": datetime.now().isoformat()
193
+ }))
194
+
195
+ async def _publish_error(self, message: str):
196
+ """Publish error to status channel"""
197
+ channel = f"analytics:{self.org_id}:{self.source_id}:status"
198
+ redis.publish(channel, json.dumps({
199
+ "type": "error",
200
+ "message": message,
201
+ "timestamp": datetime.now().isoformat()
202
+ }))
203
+
204
+ # Helper for triggering worker
205
+ async def trigger_kpi_computation(org_id: str, source_id: str):
206
+ """Non-blocking KPI trigger"""
207
+ worker = AnalyticsWorker(org_id, source_id)
208
+ asyncio.create_task(worker.run(), name=f"kpi-{org_id}-{source_id}")