Peter Mutwiri commited on
Commit
472833f
·
0 Parent(s):

Clean snapshot

Browse files
.gitattributes ADDED
@@ -0,0 +1 @@
 
 
1
+ *.duckdb filter=lfs diff=lfs merge=lfs -text
.gitignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ node_modules
2
+ client-nextjs/googlecalendar.json
3
+ .env.local
4
+ analytics-service/.env.analytics
.vscode/settings.json ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ {
2
+ "python-envs.defaultEnvManager": "ms-python.python:system",
3
+ "python-envs.pythonProjects": []
4
+ }
Dockerfile ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ---- 1. base image ---------------------------------------------------------
2
+ FROM python:3.11-slim
3
+
4
+ # ---- 2. system dependencies for binary wheels ------------------------------
5
+ RUN apt-get update && apt-get install -y --no-install-recommends \
6
+ build-essential \
7
+ gcc \
8
+ g++ \
9
+ cmake \
10
+ libgomp1 \
11
+ libstdc++6 \
12
+ ca-certificates \
13
+ wget \
14
+ unzip \
15
+ && rm -rf /var/lib/apt/lists/*
16
+
17
+ # ---- 2½. DuckDB CLI (optional but handy for debugging) --------------------
18
+ RUN wget -q https://github.com/duckdb/duckdb/releases/download/v0.10.2/duckdb_cli-linux-amd64.zip && \
19
+ unzip duckdb_cli-linux-amd64.zip -d /usr/local/bin && rm *.zip
20
+
21
+ # ---- 3. upgrade pip & enable pre-built wheels ------------------------------
22
+ RUN pip install --no-cache-dir --upgrade pip setuptools wheel
23
+
24
+ # ---- 4. install Python deps (+ DuckDB driver) ------------------------------
25
+ COPY requirements.txt /tmp/requirements.txt
26
+ RUN pip install --no-cache-dir --prefer-binary -r /tmp/requirements.txt && \
27
+ pip install --no-cache-dir duckdb==0.10.2
28
+
29
+ # ---- 5. copy source --------------------------------------------------------
30
+ COPY . /app
31
+ WORKDIR /app
32
+
33
+ # ---- 5½. scheduler loop ----------------------------------------------------
34
+ COPY scheduler_loop.py /app/scheduler_loop.py
35
+
36
+ # ---- 6. runtime env vars ---------------------------------------------------
37
+ ENV API_KEYS=dev-analytics-key-123
38
+
39
+ # ---- 7. start both services -----------------------------------------------
40
+ CMD sh -c "python -m uvicorn app.main:app --host 0.0.0.0 --port 8080 & python /app/scheduler_loop.py"
analytics-data/.schedules.json ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [
2
+ {
3
+ "id": "2bc54229-97ee-4101-b751-ea9f8edfa84d",
4
+ "orgId": "demo",
5
+ "frequency": "daily",
6
+ "analytics": [
7
+ "eda"
8
+ ],
9
+ "nextRun": "2025-10-16T13:50:46.392839"
10
+ },
11
+ {
12
+ "id": "5a73737e-5c28-496f-a941-0e060760ccb3",
13
+ "orgId": "23739e24-d3ae-4ecf-b32f-16e019a561bd",
14
+ "frequency": "daily",
15
+ "analytics": [
16
+ "eda",
17
+ "basket",
18
+ "forecast"
19
+ ],
20
+ "nextRun": "2025-10-16T13:50:46.398193"
21
+ },
22
+ {
23
+ "id": "4f99c560-8ff5-471e-9711-91b92b7be4b5",
24
+ "orgId": "23739e24-d3ae-4ecf-b32f-16e019a561bd",
25
+ "frequency": "daily",
26
+ "analytics": [
27
+ "eda",
28
+ "basket",
29
+ "forecast"
30
+ ],
31
+ "nextRun": "2025-10-16T13:50:46.402940"
32
+ },
33
+ {
34
+ "id": "1a03ea97-a085-4d3c-994a-54be9b8885f6",
35
+ "orgId": "23739e24-d3ae-4ecf-b32f-16e019a561bd",
36
+ "frequency": "daily",
37
+ "analytics": [
38
+ "eda",
39
+ "basket",
40
+ "forecast"
41
+ ],
42
+ "nextRun": "2025-10-16T13:50:46.407630"
43
+ }
44
+ ]
analytics-data/duckdb/23739e24-d3ae-4ecf-b32f-16e019a561bd.duckdb ADDED
Binary file (131 Bytes). View file
 
analytics-data/duckdb/demo.duckdb ADDED
Binary file (131 Bytes). View file
 
app/db.py ADDED
@@ -0,0 +1,127 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import duckdb, os, pathlib, json
2
+ from datetime import datetime
3
+ from typing import Any, Dict, List
4
+
5
+ DB_DIR = pathlib.Path("./data/duckdb")
6
+ DB_DIR.mkdir(parents=True, exist_ok=True)
7
+
8
+
9
+ def get_conn(org_id: str):
10
+ """Get or create a DuckDB connection for an organization."""
11
+ db_file = DB_DIR / f"{org_id}.duckdb"
12
+ return duckdb.connect(str(db_file), read_only=False)
13
+
14
+
15
+ # ------------------------------------------------------------
16
+ # 🔹 Backward-compatible table for raw JSON ingestion
17
+ # ------------------------------------------------------------
18
+ def ensure_raw_table(conn):
19
+ """
20
+ Maintains legacy compatibility for ingestion from webhooks / file uploads.
21
+ """
22
+ conn.execute("CREATE SCHEMA IF NOT EXISTS main")
23
+ conn.execute("""
24
+ CREATE TABLE IF NOT EXISTS main.raw_rows(
25
+ ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
26
+ row_data JSON
27
+ )
28
+ """)
29
+
30
+
31
+ # ------------------------------------------------------------
32
+ # 🔹 Flexible dynamic schema table creation
33
+ # ------------------------------------------------------------
34
+ def ensure_table(conn, table_name: str, sample_record: Dict[str, Any]):
35
+ """
36
+ Ensures a DuckDB table exists with columns inferred from sample_record.
37
+ If new columns appear later, adds them automatically.
38
+ """
39
+ conn.execute("CREATE SCHEMA IF NOT EXISTS main")
40
+ conn.execute(
41
+ f"CREATE TABLE IF NOT EXISTS main.{table_name} ("
42
+ "id UUID DEFAULT uuid(), "
43
+ "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
44
+ )
45
+
46
+ if not sample_record:
47
+ return
48
+
49
+ existing_cols = {r[0] for r in conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall()}
50
+
51
+ for col, val in sample_record.items():
52
+ if col in existing_cols:
53
+ continue
54
+ dtype = infer_duckdb_type(val)
55
+ print(f"[db] ➕ Adding new column '{col}:{dtype}' to main.{table_name}")
56
+ conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col} {dtype}")
57
+
58
+
59
+ def infer_duckdb_type(value: Any) -> str:
60
+ """Infer a DuckDB-compatible column type from a Python value."""
61
+ if isinstance(value, bool):
62
+ return "BOOLEAN"
63
+ if isinstance(value, int):
64
+ return "BIGINT"
65
+ if isinstance(value, float):
66
+ return "DOUBLE"
67
+ if isinstance(value, datetime):
68
+ return "TIMESTAMP"
69
+ if isinstance(value, (dict, list)):
70
+ return "JSON"
71
+ return "VARCHAR"
72
+
73
+
74
+ # ------------------------------------------------------------
75
+ # 🔹 Insert records with auto-schema
76
+ # ------------------------------------------------------------
77
+ def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
78
+ """
79
+ Insert records into the specified table.
80
+ Assumes ensure_table() has already been called.
81
+ """
82
+ if not records:
83
+ return
84
+
85
+ cols = records[0].keys()
86
+ placeholders = ", ".join(["?"] * len(cols))
87
+ col_list = ", ".join(cols)
88
+ insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})"
89
+
90
+ values = [tuple(r.get(c) for c in cols) for r in records]
91
+ conn.executemany(insert_sql, values)
92
+ print(f"[db] ✅ Inserted {len(records)} rows into {table_name}")
93
+
94
+
95
+ # ------------------------------------------------------------
96
+ # 🔹 Unified bootstrap entrypoint
97
+ # ------------------------------------------------------------
98
+ def bootstrap(org_id: str, payload: Dict[str, Any]):
99
+ """
100
+ Main entrypoint for ingestion.
101
+ Detects whether the payload contains:
102
+ - A single table (list of dicts)
103
+ - Multiple named tables (dict of lists)
104
+ Also logs the raw payload in main.raw_rows for lineage tracking.
105
+ """
106
+ conn = get_conn(org_id)
107
+ conn.execute("CREATE SCHEMA IF NOT EXISTS main")
108
+ ensure_raw_table(conn)
109
+
110
+ # Log raw payload for debugging / lineage
111
+ conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (json.dumps(payload),))
112
+
113
+ if isinstance(payload, dict) and "tables" in payload:
114
+ # multi-table mode
115
+ for table_name, rows in payload["tables"].items():
116
+ if not rows:
117
+ continue
118
+ ensure_table(conn, table_name, rows[0])
119
+ insert_records(conn, table_name, rows)
120
+ elif isinstance(payload, list):
121
+ # single-table mode (assume 'sales' as default)
122
+ ensure_table(conn, "sales", payload[0])
123
+ insert_records(conn, "sales", payload)
124
+ else:
125
+ print("[db] ⚠️ Unsupported payload shape")
126
+
127
+ conn.close()
app/deps.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from fastapi import HTTPException, Header
3
+
4
+ API_KEYS = os.getenv("API_KEYS", "").split(",")
5
+
6
+ def verify_key(x_api_key: str = Header(None, convert_underscores=True)): # ← accept any case
7
+ print(f"[verify_key] received: {x_api_key}, allowed: {API_KEYS}")
8
+ if not x_api_key or x_api_key not in API_KEYS:
9
+ raise HTTPException(status_code=401, detail="Invalid API key")
10
+ return x_api_key
app/engine/analytics.py ADDED
@@ -0,0 +1,1193 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pandas as pd
2
+ import numpy as np
3
+ from prophet import Prophet
4
+ from datetime import datetime
5
+ import redis
6
+ import json
7
+ from sklearn.cluster import KMeans, DBSCAN
8
+ from sklearn.preprocessing import StandardScaler, MinMaxScaler
9
+ from sklearn.decomposition import PCA
10
+ from sklearn.ensemble import IsolationForest
11
+ from .json_utils import CustomJSONEncoder
12
+ from scipy import stats
13
+ from scipy.stats import pearsonr
14
+ from statsmodels.tsa.seasonal import seasonal_decompose
15
+ from statsmodels.tsa.stattools import adfuller
16
+ import networkx as nx
17
+ from sklearn.metrics import silhouette_score
18
+ from sklearn.feature_extraction.text import TfidfVectorizer
19
+ from .supermarket_metrics import supermarket_insights
20
+ from app.utils.detect_industry import is_supermarket # next snippet
21
+
22
+ class AnalyticsService:
23
+ def __init__(self):
24
+ self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
25
+ self.industry_metrics = {
26
+ 'retail': self._retail_metrics,
27
+ 'wholesale': self._wholesale_metrics,
28
+ 'supermarket': self._supermarket_metrics,
29
+ 'manufacturing': self._manufacturing_metrics,
30
+ 'healthcare': self._healthcare_metrics
31
+ }
32
+ self.cross_industry_analyzers = {
33
+ 'market_dynamics': self._analyze_market_dynamics,
34
+ 'supply_chain': self._analyze_supply_chain,
35
+ 'customer_insights': self._analyze_customer_insights,
36
+ 'operational_efficiency': self._analyze_operational_efficiency,
37
+ 'risk_assessment': self._analyze_risk_patterns,
38
+ 'sustainability': self._analyze_sustainability_metrics
39
+ }
40
+
41
+ def perform_eda(self, data, industry=None):
42
+ """
43
+ Perform enhanced Exploratory Data Analysis with cross-industry insights
44
+ """
45
+ if not data:
46
+ raise ValueError("Empty dataset provided")
47
+
48
+ df = pd.DataFrame(data)
49
+
50
+ if df.empty:
51
+ raise ValueError("Empty dataset provided")
52
+
53
+ # Validate numeric columns
54
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
55
+ if len(numeric_cols) == 0:
56
+ raise ValueError("Non-numeric values found in dataset")
57
+
58
+ # Convert date columns to datetime
59
+ date_columns = []
60
+ for col in df.columns:
61
+ if df[col].dtype == 'object':
62
+ try:
63
+ df[col] = pd.to_datetime(df[col])
64
+ date_columns.append(col)
65
+ except (ValueError, TypeError):
66
+ continue
67
+
68
+ # Get numeric columns excluding dates
69
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
70
+
71
+ # Advanced statistics and AI-ready features
72
+ analysis_results = {
73
+ 'basic_stats': df[numeric_cols].describe().to_dict() if len(numeric_cols) > 0 else {},
74
+ 'missing_values': df.isnull().sum().to_dict(),
75
+ 'columns': list(df.columns),
76
+ 'row_count': len(df),
77
+ 'correlation_matrix': df[numeric_cols].corr().to_dict() if len(numeric_cols) > 0 else {},
78
+ 'skewness': df[numeric_cols].skew().to_dict() if len(numeric_cols) > 0 else {},
79
+ 'kurtosis': df[numeric_cols].kurtosis().to_dict() if len(numeric_cols) > 0 else {},
80
+ 'outliers': self._detect_outliers(df),
81
+ 'distribution_tests': self._perform_distribution_tests(df),
82
+ 'dimensionality_reduction': self._perform_dimensionality_reduction(df),
83
+ 'temporal_patterns': self._analyze_temporal_patterns(df),
84
+ 'anomaly_detection': self._detect_anomalies(df),
85
+ 'feature_importance': self._calculate_feature_importance(df)
86
+ }
87
+ # --- supermarket auto-detection ---
88
+ if is_supermarket(df):
89
+ industry = 'supermarket'
90
+ results['supermarket_kpis'] = supermarket_insights(df)
91
+ # Add industry-specific metrics
92
+ if industry and industry.lower() in self.industry_metrics:
93
+ analysis_results['industry_metrics'] = self.industry_metrics[industry.lower()](df)
94
+
95
+ # Add cross-industry insights
96
+ analysis_results['cross_industry_insights'] = {}
97
+ for analyzer_name, analyzer_func in self.cross_industry_analyzers.items():
98
+ analysis_results['cross_industry_insights'][analyzer_name] = analyzer_func(df)
99
+
100
+ return analysis_results
101
+
102
+ def _detect_outliers(self, df):
103
+ """
104
+ Detect outliers using IQR method for numerical columns
105
+ """
106
+ outliers = {}
107
+ for column in df.select_dtypes(include=[np.number]).columns:
108
+ Q1 = df[column].quantile(0.25)
109
+ Q3 = df[column].quantile(0.75)
110
+ IQR = Q3 - Q1
111
+ outliers[column] = {
112
+ 'count': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]),
113
+ 'percentage': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]) / len(df) * 100
114
+ }
115
+ return outliers
116
+
117
+ def _perform_distribution_tests(self, df):
118
+ """
119
+ Perform distribution tests for numerical columns
120
+ """
121
+ tests = {}
122
+ for column in df.select_dtypes(include=[np.number]).columns:
123
+ shapiro_test = stats.shapiro(df[column].dropna())
124
+ tests[column] = {
125
+ 'shapiro_test': {
126
+ 'statistic': float(shapiro_test.statistic),
127
+ 'p_value': float(shapiro_test.pvalue)
128
+ }
129
+ }
130
+ return tests
131
+
132
+ def _perform_dimensionality_reduction(self, df):
133
+ """
134
+ Perform PCA for dimensional insights
135
+ """
136
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
137
+ if len(numeric_cols) < 2:
138
+ return {}
139
+
140
+ scaler = StandardScaler()
141
+ scaled_data = scaler.fit_transform(df[numeric_cols])
142
+ pca = PCA()
143
+ pca_result = pca.fit_transform(scaled_data)
144
+
145
+ return {
146
+ 'explained_variance_ratio': pca.explained_variance_ratio_.tolist(),
147
+ 'cumulative_variance_ratio': np.cumsum(pca.explained_variance_ratio_).tolist(),
148
+ 'n_components_95_variance': np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1
149
+ }
150
+
151
+ def _analyze_temporal_patterns(self, df):
152
+ """
153
+ Analyze temporal patterns and seasonality
154
+ """
155
+ date_cols = df.select_dtypes(include=['datetime64']).columns
156
+ if len(date_cols) == 0:
157
+ return None
158
+
159
+ patterns = {}
160
+ for date_col in date_cols:
161
+ df['year'] = df[date_col].dt.year
162
+ df['month'] = df[date_col].dt.month
163
+ df['day_of_week'] = df[date_col].dt.dayofweek
164
+
165
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
166
+ for metric in numeric_cols:
167
+ if metric not in ['year', 'month', 'day_of_week']:
168
+ patterns[f"{metric}_by_month"] = df.groupby('month')[metric].mean().to_dict()
169
+ patterns[f"{metric}_by_day_of_week"] = df.groupby('day_of_week')[metric].mean().to_dict()
170
+
171
+ return patterns
172
+
173
+ def _detect_anomalies(self, df):
174
+ """
175
+ Detect anomalies using multiple methods
176
+ """
177
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
178
+ if len(numeric_cols) == 0:
179
+ return None
180
+
181
+ scaler = StandardScaler()
182
+ scaled_data = scaler.fit_transform(df[numeric_cols])
183
+
184
+ isolation_forest = IsolationForest(random_state=42, contamination=0.1)
185
+ anomalies = isolation_forest.fit_predict(scaled_data)
186
+
187
+ return {
188
+ 'anomaly_percentage': float((anomalies == -1).mean() * 100),
189
+ 'anomaly_indices': np.where(anomalies == -1)[0].tolist()
190
+ }
191
+
192
+ def _calculate_feature_importance(self, df):
193
+ """
194
+ Calculate feature importance and relationships
195
+ """
196
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
197
+ if len(numeric_cols) < 2:
198
+ return None
199
+
200
+ importance = {}
201
+ for col in numeric_cols:
202
+ correlations = []
203
+ for other_col in numeric_cols:
204
+ if col != other_col:
205
+ # Check if either column is constant
206
+ if df[col].nunique() <= 1 or df[other_col].nunique() <= 1:
207
+ continue
208
+ try:
209
+ corr, _ = pearsonr(df[col].fillna(0), df[other_col].fillna(0))
210
+ if not np.isnan(corr): # Only add if correlation is valid
211
+ correlations.append((other_col, abs(corr)))
212
+ except ValueError:
213
+ continue # Skip if correlation can't be calculated
214
+
215
+ # Handle empty correlations case
216
+ correlation_values = [abs(c[1]) for c in correlations]
217
+ importance[col] = {
218
+ 'top_correlations': sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3],
219
+ 'correlation_strength': float(np.mean(correlation_values)) if correlation_values else 0.0
220
+ }
221
+
222
+ return importance
223
+
224
+ def _retail_metrics(self, df):
225
+
226
+ """Calculate retail-specific metrics"""
227
+ if not all(col in df.columns for col in ['sales', 'inventory', 'customer_satisfaction']):
228
+ # Return default structure if required columns are missing
229
+ return {
230
+ 'sales_performance': {},
231
+ 'customer_behavior': {},
232
+ 'inventory': {}
233
+ }
234
+
235
+ metrics = {
236
+ 'sales_performance': {
237
+ 'total_sales': float(df['sales'].sum()) if 'sales' in df.columns else 0.0,
238
+ 'average_daily_sales': float(df['sales'].mean()) if 'sales' in df.columns else 0.0,
239
+ 'sales_growth': float((df['sales'].iloc[-1] / df['sales'].iloc[0] - 1) * 100) if 'sales' in df.columns else 0.0
240
+ },
241
+ 'inventory_turnover': {
242
+ 'rate': float(df['sales'].sum() / df['inventory'].mean()) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0,
243
+ 'days_of_inventory': float(df['inventory'].mean() / (df['sales'].mean() / 30)) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0
244
+ },
245
+ 'customer_metrics': {
246
+ 'satisfaction_score': float(df['customer_satisfaction'].mean()) if 'customer_satisfaction' in df.columns else 0.0,
247
+ 'satisfaction_trend': df['customer_satisfaction'].rolling(window=7).mean().to_dict() if 'customer_satisfaction' in df.columns else {}
248
+ }
249
+ }
250
+ return metrics
251
+
252
+ def _wholesale_metrics(self, df):
253
+ """
254
+ Calculate wholesale-specific metrics
255
+ """
256
+ metrics = {
257
+ 'order_analytics': {},
258
+ 'supplier_performance': {},
259
+ 'distribution': {}
260
+ }
261
+
262
+ if 'order_value' in df.columns:
263
+ metrics['order_analytics']['average_order_value'] = float(df['order_value'].mean())
264
+ metrics['order_analytics']['order_value_distribution'] = df['order_value'].quantile([0.25, 0.5, 0.75]).to_dict()
265
+
266
+ if 'supplier_id' in df.columns and 'delivery_time' in df.columns:
267
+ supplier_performance = df.groupby('supplier_id')['delivery_time'].agg(['mean', 'std']).to_dict()
268
+ metrics['supplier_performance'] = supplier_performance
269
+
270
+ return metrics
271
+
272
+ def _supermarket_metrics(self, df):
273
+ """
274
+ Calculate supermarket-specific metrics
275
+ """
276
+ metrics = {
277
+ 'category_performance': {},
278
+ 'basket_analysis': {},
279
+ 'promotion_impact': {}
280
+ }
281
+
282
+ if 'category' in df.columns and 'sales_amount' in df.columns:
283
+ category_sales = df.groupby('category')['sales_amount'].sum()
284
+ metrics['category_performance']['top_categories'] = category_sales.nlargest(5).to_dict()
285
+
286
+ if 'transaction_id' in df.columns and 'product_id' in df.columns:
287
+ # Simple basket analysis
288
+ transactions = df.groupby('transaction_id')['product_id'].count()
289
+ metrics['basket_analysis']['average_items_per_transaction'] = float(transactions.mean())
290
+
291
+ if 'promotion_flag' in df.columns and 'sales_amount' in df.columns:
292
+ promo_impact = df.groupby('promotion_flag')['sales_amount'].mean()
293
+ metrics['promotion_impact']['sales_lift'] = float(
294
+ (promo_impact.get(1, 0) - promo_impact.get(0, 0)) / promo_impact.get(0, 1) * 100
295
+ )
296
+
297
+ return metrics
298
+
299
+ def _manufacturing_metrics(self, df):
300
+
301
+
302
+ """Calculate manufacturing-specific metrics"""
303
+ production_col = 'production_volume' if 'production_volume' in df.columns else 'units_produced'
304
+ metrics = {
305
+ 'production_efficiency': {
306
+ 'volume': float(df[production_col].mean()),
307
+ 'trend': df[production_col].rolling(window=7).mean().to_dict()
308
+ },
309
+ 'quality_metrics': {
310
+ 'defect_rate': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0,
311
+ 'quality_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {}
312
+ },
313
+ 'quality_control': {
314
+ 'defects_per_unit': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0,
315
+ 'defect_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {}
316
+ },
317
+ 'equipment_utilization': {
318
+ 'rate': float((df[production_col] / df[production_col].max()).mean() * 100),
319
+ 'trend': df[production_col].rolling(window=7).mean().to_dict()
320
+ }
321
+ }
322
+ return metrics
323
+
324
+ def _healthcare_metrics(self, df):
325
+
326
+ """Calculate healthcare-specific metrics"""
327
+ metrics = {
328
+ 'patient_outcomes': {
329
+ 'satisfaction': float(df['patient_satisfaction'].mean()),
330
+ 'treatment_success': float(df['treatment_success_rate'].mean())
331
+ },
332
+ 'operational_efficiency': {
333
+ 'avg_wait_time': float(df['order_fulfillment_time'].mean()),
334
+ 'utilization_rate': float(df['production_volume'].mean() / df['production_volume'].max())
335
+ },
336
+ 'quality_of_care': {
337
+ 'satisfaction_trend': df['patient_satisfaction'].rolling(window=7).mean().to_dict(),
338
+ 'success_rate_trend': df['treatment_success_rate'].rolling(window=7).mean().to_dict()
339
+ }
340
+ }
341
+ return metrics
342
+
343
+ def forecast_timeseries(self, data, date_column, value_column):
344
+ """
345
+ Forecast time series data with support for edge cases
346
+ """
347
+ if not data:
348
+ raise ValueError("Empty dataset provided")
349
+
350
+ df = pd.DataFrame(data)
351
+ if date_column not in df.columns:
352
+ raise KeyError(f"Required column '{date_column}' not found")
353
+ if value_column not in df.columns:
354
+ raise KeyError(f"Required column '{value_column}' not found")
355
+
356
+ # Convert to datetime
357
+ try:
358
+ df[date_column] = pd.to_datetime(df[date_column])
359
+ except ValueError as exc:
360
+ raise ValueError("Invalid date format") from exc
361
+
362
+ # Handle missing values
363
+ has_missing = df[value_column].isnull().any()
364
+ if has_missing:
365
+ df[value_column] = df[value_column].interpolate(method='linear')
366
+
367
+ # Detect and handle outliers
368
+ Q1 = df[value_column].quantile(0.25)
369
+ Q3 = df[value_column].quantile(0.75)
370
+ IQR = Q3 - Q1
371
+ outlier_mask = (df[value_column] < (Q1 - 1.5 * IQR)) | (df[value_column] > (Q3 + 1.5 * IQR))
372
+ has_outliers = outlier_mask.any()
373
+
374
+ # Prepare data for Prophet
375
+ prophet_df = df.rename(columns={date_column: 'ds', value_column: 'y'})
376
+ model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True)
377
+ model.fit(prophet_df)
378
+
379
+ # Make future dataframe for forecasting
380
+ future = model.make_future_dataframe(periods=30)
381
+ forecast = model.predict(future)
382
+
383
+ result = {
384
+ 'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_dict('records'),
385
+ 'components': {
386
+ 'trend': forecast['trend'].to_dict(),
387
+ 'yearly': forecast['yearly'].to_dict() if 'yearly' in forecast else {},
388
+ 'weekly': forecast['weekly'].to_dict() if 'weekly' in forecast else {},
389
+ 'daily': forecast['daily'].to_dict() if 'daily' in forecast else {}
390
+ }
391
+ }
392
+
393
+ if has_missing:
394
+ result['handling_missing_values'] = {'filled_indices': df[value_column].isnull().sum()}
395
+
396
+ if has_outliers:
397
+ result['outlier_impact'] = {
398
+ 'outlier_indices': outlier_mask[outlier_mask].index.tolist(),
399
+ 'outlier_values': df.loc[outlier_mask, value_column].tolist()
400
+ }
401
+
402
+ # Detect seasonality
403
+ decomposition = seasonal_decompose(df[value_column], period=7, extrapolate_trend='freq')
404
+ result['seasonality_components'] = {
405
+ 'trend': decomposition.trend.to_dict(),
406
+ 'seasonal': decomposition.seasonal.to_dict(),
407
+ 'residual': decomposition.resid.to_dict()
408
+ }
409
+
410
+
411
+
412
+
413
+ # Cache the forecast with timestamp to ensure freshness
414
+ timestamp = datetime.now().strftime('%Y%m%d%H')
415
+ cache_key = f"forecast_{date_column}_{value_column}_{timestamp}"
416
+ self.redis_client.set(cache_key, json.dumps(result, cls=CustomJSONEncoder))
417
+
418
+ return result
419
+
420
+ def get_cached_forecast(self, date_column, value_column):
421
+ """
422
+ Retrieve cached forecast results
423
+ """
424
+ timestamp = datetime.now().strftime('%Y%m%d%H')
425
+ cache_key = f"forecast_{date_column}_{value_column}_{timestamp}"
426
+ cached = self.redis_client.get(cache_key)
427
+
428
+ if cached:
429
+ return json.loads(cached)
430
+ return None
431
+
432
+ def _analyze_market_dynamics(self, df):
433
+ """
434
+ Analyze market dynamics across industries
435
+ """
436
+ metrics = {
437
+ 'market_trends': {},
438
+ 'competitive_analysis': {},
439
+ 'growth_patterns': {}
440
+ }
441
+
442
+ if 'revenue' in df.columns and 'date' in df.columns:
443
+ # Trend Analysis
444
+ df['month'] = pd.to_datetime(df['date']).dt.to_period('M')
445
+ monthly_revenue = df.groupby('month')['revenue'].sum()
446
+
447
+ # Calculate growth rates
448
+ metrics['growth_patterns']['monthly_growth'] = float(
449
+ ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) ** (1/len(monthly_revenue)) - 1) * 100
450
+ )
451
+
452
+ # Market volatility
453
+ mean_revenue = monthly_revenue.mean()
454
+ if mean_revenue > 0: # Avoid division by zero
455
+ metrics['market_trends']['volatility'] = float(monthly_revenue.std() / mean_revenue)
456
+ else:
457
+ metrics['market_trends']['volatility'] = 0.0
458
+
459
+ if 'competitor_price' in df.columns and 'price' in df.columns:
460
+
461
+ comp_price_mean = df['competitor_price'].mean()
462
+ if comp_price_mean > 0: # Avoid division by zero
463
+ metrics['competitive_analysis']['price_position'] = float(
464
+ (df['price'].mean() / comp_price_mean - 1) * 100
465
+ )
466
+ else:
467
+ metrics['competitive_analysis']['price_position'] = 0.0
468
+
469
+ return metrics
470
+
471
+ def _analyze_supply_chain(self, df):
472
+ """
473
+ Analyze supply chain metrics across industries
474
+ """
475
+ metrics = {
476
+ 'efficiency': {},
477
+ 'reliability': {},
478
+ 'cost_analysis': {}
479
+ }
480
+
481
+ # Supply Chain Network Analysis
482
+ if 'supplier_id' in df.columns and 'delivery_time' in df.columns:
483
+ supplier_performance = df.groupby('supplier_id').agg({
484
+ 'delivery_time': ['mean', 'std'],
485
+ 'order_value': ['sum', 'mean']
486
+ }).round(2)
487
+
488
+ metrics['reliability']['supplier_consistency'] = float(
489
+ 1 - (supplier_performance['delivery_time']['std'] / supplier_performance['delivery_time']['mean']).mean()
490
+ )
491
+
492
+ # Cost and Efficiency Analysis
493
+ if 'transportation_cost' in df.columns and 'order_value' in df.columns:
494
+ metrics['cost_analysis']['logistics_cost_ratio'] = float(
495
+ (df['transportation_cost'].sum() / df['order_value'].sum()) * 100
496
+ )
497
+
498
+ return metrics
499
+
500
+ def _analyze_customer_insights(self, df):
501
+ """
502
+ Cross-industry customer behavior analysis
503
+ """
504
+ insights = {
505
+ 'customer_segments': {},
506
+ 'behavior_patterns': {},
507
+ 'lifetime_value': {}
508
+ }
509
+
510
+ if 'customer_id' in df.columns and 'transaction_amount' in df.columns:
511
+ # Customer Segmentation using DBSCAN for more natural clustering
512
+ customer_features = df.groupby('customer_id').agg({
513
+ 'transaction_amount': ['sum', 'mean', 'count']
514
+ }).values
515
+
516
+ scaler = MinMaxScaler()
517
+ scaled_features = scaler.fit_transform(customer_features)
518
+
519
+ # Find optimal eps parameter for DBSCAN
520
+ dbscan = DBSCAN(eps=0.3, min_samples=5)
521
+ clusters = dbscan.fit_predict(scaled_features)
522
+
523
+ insights['customer_segments']['natural_segments'] = {
524
+ 'n_segments': len(np.unique(clusters[clusters >= 0])),
525
+ 'segment_sizes': pd.Series(clusters).value_counts().to_dict()
526
+ }
527
+
528
+ return insights
529
+
530
+ def _analyze_operational_efficiency(self, df):
531
+ """
532
+ Cross-industry operational efficiency analysis
533
+ """
534
+ metrics = {
535
+ 'process_efficiency': {},
536
+ 'resource_utilization': {},
537
+ 'bottleneck_analysis': {}
538
+ }
539
+
540
+ if 'process_time' in df.columns and 'output_quantity' in df.columns:
541
+ # Process Efficiency Analysis
542
+ metrics['process_efficiency']['throughput_rate'] = float(
543
+ df['output_quantity'].sum() / df['process_time'].sum()
544
+ )
545
+
546
+ # Calculate process stability
547
+ process_stability = 1 - (df['process_time'].std() / df['process_time'].mean())
548
+ metrics['process_efficiency']['stability_score'] = float(process_stability)
549
+
550
+ return metrics
551
+
552
+ def _analyze_risk_patterns(self, df):
553
+ """
554
+ Cross-industry risk pattern analysis
555
+ """
556
+ risk_metrics = {
557
+ 'operational_risk': {},
558
+ 'market_risk': {},
559
+ 'compliance_risk': {}
560
+ }
561
+
562
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
563
+ if len(numeric_cols) > 0:
564
+ # Use Isolation Forest for risk pattern detection
565
+ iso_forest = IsolationForest(contamination=0.1, random_state=42)
566
+ risk_scores = iso_forest.fit_predict(df[numeric_cols])
567
+
568
+ risk_metrics['operational_risk']['anomaly_percentage'] = float(
569
+ (risk_scores == -1).mean() * 100
570
+ )
571
+
572
+ return risk_metrics
573
+
574
+ def _analyze_sustainability_metrics(self, df):
575
+ """
576
+
577
+ Analyze sustainability metrics including environmental impact, resource utilization, and waste management
578
+ """
579
+ if not all(col in df.columns for col in ['energy_consumption', 'water_consumption', 'waste_generated']):
580
+ return {}
581
+
582
+ results = {
583
+ 'environmental_impact': {
584
+ 'carbon_footprint_trend': df['carbon_footprint'].rolling(window=7).mean().to_dict() if 'carbon_footprint' in df.columns else {},
585
+ 'total_emissions': float(df['energy_consumption'].sum() * 0.5)
586
+ },
587
+ 'resource_utilization': {
588
+ 'energy_efficiency': float(df['energy_consumption'].mean()),
589
+ 'water_efficiency': float(df['water_consumption'].mean())
590
+ },
591
+ 'waste_management': {
592
+ 'recycling_performance': float(df['recycling_rate'].mean()) if 'recycling_rate' in df.columns else 0.0,
593
+ 'waste_reduction_trend': df['waste_generated'].rolling(window=7).mean().to_dict()
594
+ }
595
+ }
596
+ return results
597
+
598
+ def prepare_ai_query_interface(self, df):
599
+ """
600
+ Prepare data for natural language analytics queries with enhanced semantic understanding
601
+ """
602
+ query_interface = {
603
+ 'semantic_mappings': {},
604
+ 'entity_relationships': {},
605
+ 'available_metrics': {},
606
+ 'temporal_context': {},
607
+ 'metric_relationships': {},
608
+ 'data_patterns': {},
609
+ 'suggested_queries': []
610
+ }
611
+
612
+ try:
613
+ # Create semantic mappings for textual columns
614
+ text_columns = df.select_dtypes(include=['object']).columns
615
+ vectorizer = TfidfVectorizer(max_features=1000)
616
+
617
+ for col in text_columns:
618
+ if df[col].str.len().mean() > 5: # Only process meaningful text fields
619
+ text_features = vectorizer.fit_transform(df[col].fillna('').astype(str))
620
+ query_interface['semantic_mappings'][col] = {
621
+ 'vocabulary': vectorizer.vocabulary_,
622
+ 'idf_values': vectorizer.idf_.tolist(),
623
+ 'top_terms': dict(zip(
624
+ vectorizer.get_feature_names_out(),
625
+ np.asarray(text_features.sum(axis=0)).ravel()
626
+ ))
627
+ }
628
+
629
+ # Map entity relationships and hierarchies
630
+ entity_columns = [col for col in df.columns if any(entity in col.lower()
631
+ for entity in ['id', 'category', 'type', 'name', 'class', 'group'])]
632
+
633
+ for col in entity_columns:
634
+ if df[col].dtype == 'object':
635
+ value_counts = df[col].value_counts()
636
+ unique_values = df[col].unique().tolist()
637
+
638
+ # Find potential hierarchical relationships
639
+ hierarchy = {}
640
+ if '_' in col or col.lower().endswith('_id'):
641
+ related_cols = [c for c in df.columns if col.split('_')[0] in c and c != col]
642
+ for rel_col in related_cols:
643
+ hierarchy[rel_col] = df.groupby(col)[rel_col].agg(list).to_dict()
644
+
645
+ query_interface['entity_relationships'][col] = {
646
+ 'unique_values': unique_values,
647
+ 'value_counts': value_counts.to_dict(),
648
+ 'hierarchy': hierarchy,
649
+ 'cardinality': len(unique_values)
650
+ }
651
+
652
+ # Document available metrics and their relationships
653
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
654
+ for col in numeric_cols:
655
+ stats = df[col].describe()
656
+ query_interface['available_metrics'][col] = {
657
+ 'min': float(stats['min']),
658
+ 'max': float(stats['max']),
659
+ 'mean': float(stats['mean']),
660
+ 'std': float(stats['std']),
661
+ 'quartiles': {
662
+ '25%': float(stats['25%']),
663
+ '50%': float(stats['50%']),
664
+ '75%': float(stats['75%'])
665
+ }
666
+ }
667
+
668
+ # Analyze metric relationships
669
+ correlations = {}
670
+ for other_col in numeric_cols:
671
+ if col != other_col:
672
+ corr = df[col].corr(df[other_col])
673
+ if abs(corr) > 0.3: # Only store meaningful correlations
674
+ correlations[other_col] = float(corr)
675
+
676
+ query_interface['metric_relationships'][col] = {
677
+ 'correlations': correlations,
678
+ 'trends': self._analyze_metric_trends(df, col)
679
+ }
680
+
681
+ # Add temporal context if available
682
+ date_cols = df.select_dtypes(include=['datetime64']).columns
683
+ if len(date_cols) == 0:
684
+ # Try to convert string columns that might contain dates
685
+ for col in df.columns:
686
+ if df[col].dtype == 'object':
687
+ try:
688
+ pd.to_datetime(df[col])
689
+ date_cols = date_cols.append(col)
690
+ except:
691
+ continue
692
+
693
+ for date_col in date_cols:
694
+ df[date_col] = pd.to_datetime(df[date_col])
695
+ temporal_stats = {
696
+ 'min_date': df[date_col].min().isoformat(),
697
+ 'max_date': df[date_col].max().isoformat(),
698
+ 'frequency': pd.infer_freq(df[date_col]),
699
+ 'temporal_patterns': {}
700
+ }
701
+
702
+ # Analyze temporal patterns
703
+ temporal_stats['temporal_patterns'] = {
704
+ 'daily_pattern': df.groupby(df[date_col].dt.dayofweek).size().to_dict(),
705
+ 'monthly_pattern': df.groupby(df[date_col].dt.month).size().to_dict(),
706
+ 'yearly_pattern': df.groupby(df[date_col].dt.year).size().to_dict()
707
+ }
708
+
709
+ query_interface['temporal_context'][date_col] = temporal_stats
710
+
711
+ # Identify data patterns and anomalies
712
+ query_interface['data_patterns'] = {
713
+ 'missing_patterns': df.isnull().sum().to_dict(),
714
+ 'unique_value_counts': df.nunique().to_dict(),
715
+ 'distribution_types': self._analyze_distributions(df)
716
+ }
717
+
718
+ # Generate suggested queries based on data characteristics
719
+ query_interface['suggested_queries'] = self._generate_suggested_queries(df)
720
+
721
+ # Add metadata about the dataset
722
+ query_interface['metadata'] = {
723
+ 'row_count': len(df),
724
+ 'column_count': len(df.columns),
725
+ 'memory_usage': df.memory_usage(deep=True).sum(),
726
+ 'data_types': df.dtypes.astype(str).to_dict()
727
+ }
728
+
729
+ except Exception as e:
730
+ query_interface['error'] = str(e)
731
+
732
+ return query_interface
733
+
734
+ def _analyze_metric_trends(self, df, column):
735
+ """Helper method to analyze trends in numeric columns"""
736
+ trends = {}
737
+ if 'date' in df.columns:
738
+ df['date'] = pd.to_datetime(df['date'])
739
+ time_series = df.groupby('date')[column].mean()
740
+ if len(time_series) > 2:
741
+ # Calculate trend
742
+ x = np.arange(len(time_series))
743
+ y = time_series.values
744
+ slope, intercept = np.polyfit(x, y, 1)
745
+ trends['slope'] = float(slope)
746
+ trends['trend_direction'] = 'increasing' if slope > 0 else 'decreasing'
747
+ trends['trend_strength'] = float(abs(slope) / time_series.mean())
748
+ return trends
749
+
750
+ def _analyze_distributions(self, df):
751
+ """Helper method to analyze value distributions"""
752
+ distributions = {}
753
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
754
+
755
+ for col in numeric_cols:
756
+ if df[col].nunique() > 5: # Skip columns with too few unique values
757
+ # Test for normality
758
+ _, p_value = stats.normaltest(df[col].dropna())
759
+ skewness = float(df[col].skew())
760
+ kurtosis = float(df[col].kurtosis())
761
+
762
+ distributions[col] = {
763
+ 'distribution_type': 'normal' if p_value > 0.05 else 'non_normal',
764
+ 'skewness': skewness,
765
+ 'kurtosis': kurtosis
766
+ }
767
+ return distributions
768
+
769
+ def _generate_suggested_queries(self, df):
770
+ """Helper method to generate relevant query suggestions"""
771
+ suggestions = []
772
+
773
+ # Add time-based queries if temporal data exists
774
+ if 'date' in df.columns:
775
+ suggestions.extend([
776
+ "Show the trend over time",
777
+ "Compare year-over-year growth",
778
+ "Find seasonal patterns"
779
+ ])
780
+
781
+ # Add metric-based queries
782
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
783
+ if len(numeric_cols) > 0:
784
+ suggestions.extend([
785
+ f"Analyze the distribution of {col}" for col in numeric_cols[:3]
786
+ ])
787
+
788
+ # Add categorical analysis queries
789
+ categorical_cols = df.select_dtypes(include=['object']).columns
790
+ if len(categorical_cols) > 0:
791
+ suggestions.extend([
792
+ f"Break down metrics by {col}" for col in categorical_cols[:3]
793
+ ])
794
+
795
+ return suggestions
796
+
797
+ def enhance_cross_industry_correlations(self, df):
798
+ """
799
+ Enhanced analysis of correlations across different industries
800
+ """
801
+ correlations = {
802
+ 'metric_correlations': {},
803
+ 'industry_patterns': {},
804
+ 'shared_trends': {}
805
+ }
806
+
807
+ if 'industry' in df.columns:
808
+ industries = df['industry'].unique()
809
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
810
+
811
+ # Calculate cross-industry metric correlations
812
+ for ind1 in industries:
813
+ for ind2 in industries:
814
+ if ind1 < ind2: # Avoid duplicate comparisons
815
+ ind1_data = df[df['industry'] == ind1][numeric_cols]
816
+ ind2_data = df[df['industry'] == ind2][numeric_cols]
817
+
818
+ if not ind1_data.empty and not ind2_data.empty:
819
+ common_metrics = set(ind1_data.columns) & set(ind2_data.columns)
820
+ for metric in common_metrics:
821
+ corr, p_value = pearsonr(
822
+ ind1_data[metric].fillna(0),
823
+ ind2_data[metric].fillna(0)
824
+ )
825
+ correlations['metric_correlations'][f"{ind1}_{ind2}_{metric}"] = {
826
+ 'correlation': float(corr),
827
+ 'p_value': float(p_value)
828
+ }
829
+
830
+ # Identify shared trends
831
+ if 'date' in df.columns:
832
+ for metric in numeric_cols:
833
+ industry_trends = {}
834
+ for industry in industries:
835
+ industry_data = df[df['industry'] == industry]
836
+ if not industry_data.empty:
837
+ trend = industry_data.groupby('date')[metric].mean()
838
+ if len(trend) > 0:
839
+ industry_trends[industry] = trend.to_dict()
840
+
841
+ correlations['shared_trends'][metric] = industry_trends
842
+
843
+ return correlations
844
+
845
+ def perform_market_basket_analysis(self, df: pd.DataFrame, min_support: float = 0.01,
846
+ min_confidence: float = 0.3, min_lift: float = 1.0) -> dict:
847
+ """
848
+ Perform advanced market basket analysis with support for multiple analytics dimensions.
849
+
850
+ Args:
851
+ df (pd.DataFrame): Input transaction data with required columns
852
+ min_support (float): Minimum support threshold for frequent itemsets (default: 0.01)
853
+ min_confidence (float): Minimum confidence threshold for rules (default: 0.3)
854
+ min_lift (float): Minimum lift threshold for rules (default: 1.0)
855
+
856
+ Returns:
857
+ dict: Dictionary containing:
858
+ - product_associations: Support, confidence, and lift metrics for product pairs
859
+ - temporal_baskets: Time-based purchase patterns
860
+ - product_clusters: Product groupings based on purchase behavior
861
+ - customer_segments: Customer segments based on purchase patterns
862
+ - performance_metrics: Key performance indicators
863
+
864
+ Raises:
865
+ ValueError: If required columns are missing or data validation fails
866
+ """
867
+ try:
868
+ # Validate input data
869
+ required_columns = ['transaction_id', 'product_id']
870
+ if not all(col in df.columns for col in required_columns):
871
+ raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}")
872
+
873
+ if df.empty:
874
+ raise ValueError("Empty dataframe provided")
875
+
876
+ # Work with a copy of the dataframe
877
+ df = df.copy()
878
+
879
+ # Convert to basket format with optimization for large datasets
880
+ baskets = (df.groupby('transaction_id')['product_id']
881
+ .agg(lambda x: frozenset(x.values)) # Using frozenset for better performance
882
+ .reset_index())
883
+
884
+ total_transactions = len(baskets)
885
+
886
+ # Calculate product frequencies using vectorized operations
887
+ product_freq = df.groupby('product_id').size().to_dict()
888
+
889
+ # Generate product pairs efficiently
890
+ pairs_data = []
891
+ for products in baskets['product_id']:
892
+ products_list = list(products) # Convert frozenset to list once
893
+ pairs_data.extend(
894
+ tuple(sorted([p1, p2]))
895
+ for i, p1 in enumerate(products_list)
896
+ for p2 in products_list[i+1:]
897
+ )
898
+
899
+ pair_freq = pd.Series(pairs_data).value_counts().to_dict()
900
+
901
+ # Calculate association metrics with validation
902
+ product_associations = {
903
+ 'support': {},
904
+ 'confidence': {},
905
+ 'lift': {},
906
+ 'metrics_distribution': {
907
+ 'support': {'min': float('inf'), 'max': 0, 'mean': 0},
908
+ 'confidence': {'min': float('inf'), 'max': 0, 'mean': 0},
909
+ 'lift': {'min': float('inf'), 'max': 0, 'mean': 0}
910
+ }
911
+ }
912
+
913
+ valid_rules = []
914
+ for pair, freq in pair_freq.items():
915
+ prod1, prod2 = pair
916
+ support = freq / total_transactions
917
+
918
+ if support >= min_support:
919
+ confidence_1_2 = freq / product_freq[prod1]
920
+ confidence_2_1 = freq / product_freq[prod2]
921
+ max_confidence = max(confidence_1_2, confidence_2_1)
922
+
923
+ if max_confidence >= min_confidence:
924
+ lift = (freq * total_transactions) / (product_freq[prod1] * product_freq[prod2])
925
+
926
+ if lift >= min_lift:
927
+ valid_rules.append({
928
+ 'pair': pair,
929
+ 'support': support,
930
+ 'confidence': max_confidence,
931
+ 'lift': lift
932
+ })
933
+
934
+ # Store metrics with string keys for JSON serialization
935
+ pair_key = f"({prod1}, {prod2})"
936
+ product_associations['support'][pair_key] = float(support)
937
+ product_associations['confidence'][pair_key] = float(max_confidence)
938
+ product_associations['lift'][pair_key] = float(lift)
939
+
940
+ # Update metrics distribution
941
+ for metric_type, value in [('support', support),
942
+ ('confidence', max_confidence),
943
+ ('lift', lift)]:
944
+ dist = product_associations['metrics_distribution'][metric_type]
945
+ dist['min'] = min(dist['min'], value)
946
+ dist['max'] = max(dist['max'], value)
947
+
948
+ # Calculate means for distributions
949
+ for metric_type in ['support', 'confidence', 'lift']:
950
+ values = [rule[metric_type] for rule in valid_rules]
951
+ if values:
952
+ product_associations['metrics_distribution'][metric_type]['mean'] = float(sum(values) / len(values))
953
+ else:
954
+ product_associations['metrics_distribution'][metric_type] = {'min': 0, 'max': 0, 'mean': 0}
955
+
956
+ # Enhanced temporal analysis
957
+ temporal_patterns = self._analyze_temporal_patterns(df) if 'timestamp' in df.columns else {}
958
+
959
+ # Enhanced product clustering
960
+ product_clusters = self._perform_product_clustering(df) if 'quantity' in df.columns else {}
961
+
962
+ # Customer segmentation
963
+ customer_segments = self._analyze_customer_segments(df) if 'customer_id' in df.columns else {}
964
+
965
+ # Performance metrics
966
+ performance_metrics = {
967
+ 'total_transactions': total_transactions,
968
+ 'unique_products': len(product_freq),
969
+ 'avg_basket_size': float(df.groupby('transaction_id')['product_id'].count().mean()),
970
+ 'total_rules_found': len(valid_rules),
971
+ 'rules_distribution': {
972
+ 'strong_associations': len([r for r in valid_rules if r['lift'] > 2]),
973
+ 'moderate_associations': len([r for r in valid_rules if 1 < r['lift'] <= 2]),
974
+ 'weak_associations': len([r for r in valid_rules if r['lift'] <= 1])
975
+ }
976
+ }
977
+
978
+ return {
979
+ 'product_associations': product_associations,
980
+ 'temporal_baskets': temporal_patterns,
981
+ 'product_clusters': product_clusters,
982
+ 'customer_segments': customer_segments,
983
+ 'performance_metrics': performance_metrics
984
+ }
985
+
986
+ except Exception as e:
987
+ print(f"Error in market basket analysis: {str(e)}")
988
+ raise ValueError(f"Market basket analysis failed: {str(e)}") from e
989
+
990
+ def _analyze_temporal_patterns(self, df: pd.DataFrame) -> dict:
991
+ """Analyze temporal patterns in purchase behavior"""
992
+ patterns = {
993
+ 'daily_patterns': {},
994
+ 'weekly_patterns': {},
995
+ 'monthly_patterns': {},
996
+ 'hourly_patterns': {}
997
+ }
998
+
999
+ try:
1000
+ timestamps = pd.to_datetime(df['timestamp'])
1001
+
1002
+ for period, grouper in [
1003
+ ('hourly_patterns', timestamps.dt.hour),
1004
+ ('daily_patterns', timestamps.dt.day),
1005
+ ('weekly_patterns', timestamps.dt.dayofweek),
1006
+ ('monthly_patterns', timestamps.dt.month)
1007
+ ]:
1008
+ pattern_data = df.groupby(grouper).agg({
1009
+ 'product_id': ['count', 'nunique'],
1010
+ 'transaction_id': 'nunique',
1011
+ 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count']
1012
+ }).round(2)
1013
+
1014
+ patterns[period] = {
1015
+ 'transaction_count': pattern_data['transaction_id']['nunique'].to_dict(),
1016
+ 'product_count': pattern_data['product_id']['count'].to_dict(),
1017
+ 'unique_products': pattern_data['product_id']['nunique'].to_dict(),
1018
+ 'total_quantity': pattern_data['quantity']['sum'].to_dict() if 'quantity' in df.columns else {},
1019
+ 'avg_quantity': pattern_data['quantity']['mean'].to_dict() if 'quantity' in df.columns else {}
1020
+ }
1021
+
1022
+ except (ValueError, KeyError) as e:
1023
+ print(f"Error in temporal pattern analysis: {str(e)}")
1024
+ return patterns
1025
+
1026
+ return patterns
1027
+
1028
+ def _perform_product_clustering(self, df: pd.DataFrame) -> dict:
1029
+ """Perform advanced product clustering analysis"""
1030
+ try:
1031
+ # Create rich product features
1032
+ product_features = df.groupby('product_id').agg({
1033
+ 'quantity': ['mean', 'std', 'sum', 'count'],
1034
+ 'transaction_id': 'nunique'
1035
+ }).fillna(0)
1036
+
1037
+ # Feature engineering
1038
+ product_features['quantity_per_transaction'] = (
1039
+ product_features['quantity']['sum'] /
1040
+ product_features['transaction_id']['nunique']
1041
+ )
1042
+
1043
+ # Prepare features for clustering
1044
+ features_for_clustering = product_features.copy()
1045
+ features_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col
1046
+ for col in features_for_clustering.columns]
1047
+
1048
+ if len(features_for_clustering) > 1:
1049
+ # Scale features
1050
+ scaler = StandardScaler()
1051
+ scaled_features = scaler.fit_transform(features_for_clustering)
1052
+
1053
+ # Determine optimal number of clusters
1054
+ max_clusters = min(5, len(features_for_clustering) - 1)
1055
+ scores = []
1056
+ for k in range(2, max_clusters + 1):
1057
+ kmeans = KMeans(n_clusters=k, random_state=42)
1058
+ clusters = kmeans.fit_predict(scaled_features)
1059
+ score = silhouette_score(scaled_features, clusters)
1060
+ scores.append((k, score))
1061
+
1062
+ # Use optimal number of clusters
1063
+ optimal_k = max(scores, key=lambda x: x[1])[0]
1064
+ kmeans = KMeans(n_clusters=optimal_k, random_state=42)
1065
+ clusters = kmeans.fit_predict(scaled_features)
1066
+
1067
+ # Prepare cluster insights
1068
+ cluster_data = {
1069
+ 'cluster_assignments': {
1070
+ prod: int(cluster) for prod, cluster in zip(product_features.index, clusters)
1071
+ },
1072
+ 'cluster_profiles': {},
1073
+ 'evaluation_metrics': {
1074
+ 'silhouette_score': float(max(scores, key=lambda x: x[1])[1]),
1075
+ 'num_clusters': optimal_k
1076
+ }
1077
+ }
1078
+
1079
+ # Generate cluster profiles
1080
+ for cluster_id in range(optimal_k):
1081
+ cluster_mask = clusters == cluster_id
1082
+ cluster_data['cluster_profiles'][str(cluster_id)] = {
1083
+ 'size': int(sum(cluster_mask)),
1084
+ 'avg_quantity': float(product_features['quantity']['mean'][cluster_mask].mean()),
1085
+ 'avg_transactions': float(product_features['transaction_id']['nunique'][cluster_mask].mean()),
1086
+ 'total_quantity': float(product_features['quantity']['sum'][cluster_mask].sum()),
1087
+ 'purchase_frequency': float(
1088
+ (product_features['quantity']['count'][cluster_mask].sum() /
1089
+ product_features['transaction_id']['nunique'][cluster_mask].sum())
1090
+ )
1091
+ }
1092
+
1093
+ return cluster_data
1094
+
1095
+ except np.linalg.LinAlgError as e:
1096
+ print(f"Error in clustering computation: {str(e)}")
1097
+ return {}
1098
+ except (ValueError, KeyError) as e:
1099
+ print(f"Error in product clustering: {str(e)}")
1100
+ return {}
1101
+
1102
+ return {}
1103
+
1104
+ def _analyze_customer_segments(self, df: pd.DataFrame) -> dict:
1105
+ """Analyze customer segments based on purchase behavior"""
1106
+ try:
1107
+ if 'customer_id' not in df.columns:
1108
+ return {}
1109
+
1110
+ customer_stats = df.groupby('customer_id').agg({
1111
+ 'transaction_id': 'nunique',
1112
+ 'product_id': ['nunique', 'count'],
1113
+ 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count', 'mean']
1114
+ })
1115
+
1116
+ # Calculate RFM scores
1117
+ if 'timestamp' in df.columns:
1118
+ current_date = pd.to_datetime(df['timestamp']).max()
1119
+ customer_stats['recency'] = df.groupby('customer_id')['timestamp'].max().apply(
1120
+ lambda x: (current_date - pd.to_datetime(x)).days
1121
+ )
1122
+
1123
+ # Segment customers
1124
+ stats_for_clustering = customer_stats.copy()
1125
+ stats_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col
1126
+ for col in stats_for_clustering.columns]
1127
+
1128
+ if len(stats_for_clustering) > 1:
1129
+ scaler = StandardScaler()
1130
+ scaled_features = scaler.fit_transform(stats_for_clustering)
1131
+
1132
+ # Use DBSCAN for flexible cluster numbers
1133
+ dbscan = DBSCAN(eps=0.5, min_samples=3)
1134
+ clusters = dbscan.fit_predict(scaled_features)
1135
+
1136
+ return {
1137
+ 'customer_segments': {
1138
+ str(cust): int(cluster) for cust, cluster in zip(customer_stats.index, clusters)
1139
+ },
1140
+ 'segment_profiles': {
1141
+ str(segment): {
1142
+ 'size': int(sum(clusters == segment)),
1143
+ 'avg_transactions': float(customer_stats['transaction_id']['nunique'][clusters == segment].mean()),
1144
+ 'avg_products': float(customer_stats['product_id']['nunique'][clusters == segment].mean())
1145
+ }
1146
+ for segment in set(clusters) if segment != -1
1147
+ },
1148
+ 'segment_statistics': {
1149
+ 'num_segments': len(set(clusters) - {-1}),
1150
+ 'noise_points': int(sum(clusters == -1))
1151
+ }
1152
+ }
1153
+
1154
+ except Exception as e:
1155
+ print(f"Error in customer segmentation: {str(e)}")
1156
+ return {}
1157
+
1158
+ def _calculate_correlations(self, df: pd.DataFrame) -> dict:
1159
+ """Calculate correlations between numeric columns with detailed statistics"""
1160
+ correlations = {}
1161
+
1162
+ try:
1163
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
1164
+ if len(numeric_cols) < 2:
1165
+ return correlations
1166
+
1167
+ # Calculate correlation matrix
1168
+ corr_matrix = df[numeric_cols].corr()
1169
+
1170
+ # Convert correlations to dictionary with additional metadata
1171
+ for col1 in numeric_cols:
1172
+ correlations[col1] = {}
1173
+ for col2 in numeric_cols:
1174
+ if col1 != col2:
1175
+ correlation = corr_matrix.loc[col1, col2]
1176
+ if not np.isnan(correlation):
1177
+ # Calculate p-value using pearsonr
1178
+ coef, p_value = pearsonr(df[col1].fillna(0), df[col2].fillna(0))
1179
+ correlations[col1][col2] = {
1180
+ 'coefficient': float(correlation),
1181
+ 'p_value': float(p_value),
1182
+ 'strength': 'strong' if abs(correlation) > 0.7
1183
+ else 'moderate' if abs(correlation) > 0.3
1184
+ else 'weak',
1185
+ 'direction': 'positive' if correlation > 0 else 'negative',
1186
+ 'sample_size': len(df)
1187
+ }
1188
+
1189
+ except Exception as e:
1190
+ print(f"Error calculating correlations: {str(e)}")
1191
+ return {}
1192
+
1193
+ return correlations
app/engine/json_utils.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # analytics-service/app/engine/json_utils.py
2
+ import json
3
+ from datetime import datetime, date
4
+ import numpy as np
5
+
6
+ class CustomJSONEncoder(json.JSONEncoder):
7
+ def default(self, obj):
8
+ if isinstance(obj, (datetime, date)):
9
+ return obj.isoformat()
10
+ if isinstance(obj, (np.integer, np.int64)):
11
+ return int(obj)
12
+ if isinstance(obj, (np.floating, np.float64)):
13
+ return float(obj)
14
+ if isinstance(obj, np.ndarray):
15
+ return obj.tolist()
16
+ return super().default(obj)
app/engine/supermarket_metrics.py ADDED
@@ -0,0 +1,129 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Supermarket-specific KPI generator – works with ANY POS export.
3
+ Handles: Square, Lightspeed, Shopify POS, NCR, Oracle MICROS, QuickBooks POS
4
+ """
5
+ import pandas as pd
6
+ import numpy as np
7
+ from datetime import datetime, timedelta
8
+ from typing import Dict, Any
9
+
10
+ # POS column alias map – covers 99 % of exports
11
+ _ALIAS = {
12
+ "sku": ["sku", "barcode", "item_code", "plu", "product_id"],
13
+ "qty": ["qty", "quantity", "units", "stock", "quantity_on_hand"],
14
+ "expiry": ["expiry_date", "exp", "best_before", "use_by", "expiration"],
15
+ "promo": ["promo", "promotion", "discount_code", "campaign", "is_promo"],
16
+ "sales": ["total_line", "net_amount", "line_total", "amount", "sales_amount"],
17
+ "transaction": ["transaction_id", "receipt_no", "ticket_no", "order_id"],
18
+ "store": ["store_id", "branch_code", "location_id", "outlet_id"],
19
+ "category": ["category", "department", "cat", "sub_category"],
20
+ "loss": ["loss_qty", "waste_qty", "shrinkage_qty", "damaged_qty"],
21
+ "customer": ["customer_id", "loyalty_id", "phone"],
22
+ "price": ["unit_price", "price", "sell_price"],
23
+ "cost": ["cost_price", "supply_price", "unit_cost"],
24
+ }
25
+
26
+ def _find_col(df: pd.DataFrame, keys):
27
+ """Return first matching column or None."""
28
+ for k in keys:
29
+ for col in df.columns:
30
+ if k.lower() in col.lower():
31
+ return col
32
+ return None
33
+
34
+ def supermarket_insights(df: pd.DataFrame) -> Dict[str, Any]:
35
+ """Return supermarket KPIs & alerts – zero config."""
36
+ df = df.copy()
37
+ df.columns = [c.lower().strip() for c in df.columns]
38
+
39
+ # --- resolve columns via alias map ---
40
+ sku_col = _find_col(df, _ALIAS["sku"])
41
+ qty_col = _find_col(df, _ALIAS["qty"])
42
+ expiry_col = _find_col(df, _ALIAS["expiry"])
43
+ promo_col = _find_col(df, _ALIAS["promo"])
44
+ sales_col = _find_col(df, _ALIAS["sales"])
45
+ trans_col = _find_col(df, _ALIAS["transaction"])
46
+ store_col = _find_col(df, _ALIAS["store"])
47
+ cat_col = _find_col(df, _ALIAS["category"])
48
+ loss_col = _find_col(df, _ALIAS["loss"])
49
+ cust_col = _find_col(df, _ALIAS["customer"])
50
+ price_col = _find_col(df, _ALIAS["price"])
51
+ cost_col = _find_col(df, _ALIAS["cost"])
52
+
53
+ # 1 STOCK COUNT & SKU BREADTH
54
+ stock = int(df[qty_col].sum()) if qty_col else 0
55
+ unique_sku = int(df[sku_col].nunique()) if sku_col else 0
56
+
57
+ # 2 EXPIRY ALERTS
58
+ expiring_7d = 0
59
+ if expiry_col:
60
+ df[expiry_col] = pd.to_datetime(df[expiry_col], errors='coerce')
61
+ expiring_7d = int((df[expiry_col] - datetime.now()).dt.days.le(7).sum())
62
+
63
+ # 3 PROMO LIFT
64
+ lift = 0.0
65
+ if promo_col and sales_col:
66
+ base = df[df[promo_col].astype(str).str[0].isin(['0','F','f'])][sales_col].mean()
67
+ promo= df[df[promo_col].astype(str).str[0].isin(['1','T','t'])][sales_col].mean()
68
+ lift = float((promo - base) / base * 100) if base else 0.0
69
+
70
+ # 4 BASKET SIZE
71
+ avg_basket = 0.0
72
+ if trans_col and sales_col:
73
+ basket = df.groupby(trans_col)[sales_col].sum()
74
+ avg_basket = float(basket.mean())
75
+
76
+ # 5 SHRINKAGE %
77
+ shrink = 0.0
78
+ if loss_col and qty_col:
79
+ shrink = float(df[loss_col].sum() / df[qty_col].sum() * 100)
80
+
81
+ # 6 FAST MOVERS (top 5)
82
+ movers = {}
83
+ if sku_col and qty_col:
84
+ movers = (df.groupby(sku_col)[qty_col].sum()
85
+ .nlargest(5)
86
+ .to_dict())
87
+
88
+ # 7 GROSS-MARGIN BY CATEGORY
89
+ margin = {}
90
+ if cat_col and price_col and cost_col:
91
+ df['margin'] = (df[price_col] - df[cost_col]) / df[price_col] * 100
92
+ margin = (df.groupby(cat_col)['margin'].mean()
93
+ .round(1)
94
+ .to_dict())
95
+
96
+ # 8 CUSTOMER REACH
97
+ unique_cust = int(df[cust_col].nunique()) if cust_col else 0
98
+
99
+ # 9 STORE PERFORMANCE (if multi-outlet)
100
+ store_perf = {}
101
+ if store_col and sales_col:
102
+ store_perf = (df.groupby(store_col)[sales_col].sum()
103
+ .round(0)
104
+ .to_dict())
105
+
106
+ # 10 ALERTS
107
+ alerts = []
108
+ if expiring_7d:
109
+ alerts.append({"type": "expiry", "severity": "high", "message": f"{expiring_7d} SKUs expire ≤7 days"})
110
+ if shrink > 1:
111
+ alerts.append({"type": "shrinkage","severity": "med", "message": f"Shrinkage {shrink:.1f} %"})
112
+ if lift < 0:
113
+ alerts.append({"type": "promo", "severity": "low", "message": "Promo discount deeper than lift"})
114
+
115
+ return {
116
+ "supermarket_kpis": {
117
+ "stock_on_hand": stock,
118
+ "unique_sku": unique_sku,
119
+ "expiring_next_7_days": expiring_7d,
120
+ "promo_lift_pct": round(lift, 1),
121
+ "avg_basket_kes": round(avg_basket, 2),
122
+ "shrinkage_pct": round(shrink, 2),
123
+ "unique_customers": unique_cust,
124
+ },
125
+ "fast_movers": movers,
126
+ "category_margin_pct": margin,
127
+ "store_sales": store_perf,
128
+ "alerts": alerts,
129
+ }
app/ingest.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ def ingest_dict(org_id: str, payload: dict):
3
+ conn = get_conn(org_id)
4
+ ensure_raw_table(conn)
5
+ conn.execute("INSERT INTO raw_rows(row_data) VALUES (?)", [json.dumps(payload)])
6
+ conn.close()
app/main.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, Depends
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ from fastapi.encoders import jsonable_encoder
4
+ from fastapi.responses import JSONResponse
5
+ from app.routers import ingress, reports, flags, datasources, scheduler, run, health, socket
6
+ from app.tasks.scheduler import start_scheduler
7
+ from app.deps import verify_key
8
+ from contextlib import asynccontextmanager
9
+ import os
10
+
11
+ # ---------- lifespan ----------
12
+ @asynccontextmanager
13
+ async def lifespan(app: FastAPI):
14
+ start_scheduler()
15
+ yield
16
+
17
+ # ---------- app init ----------
18
+ app = FastAPI(
19
+ title="MutSyncHub Analytics Engine",
20
+ version="2.2",
21
+ lifespan=lifespan
22
+ )
23
+
24
+ # ---------- Socket.IO Mount ----------
25
+ app.mount("/socket.io", socket.socket_app)
26
+
27
+ # ---------- Middleware (fix order) ----------
28
+ @app.middleware("http")
29
+ async def serialize_all_responses(request, call_next):
30
+ """Ensure all responses are safely JSON-serializable."""
31
+ response = await call_next(request)
32
+ if isinstance(response, dict):
33
+ return JSONResponse(content=jsonable_encoder(response))
34
+ return response
35
+
36
+ # ---------- CORS Configuration ----------
37
+ origins = [
38
+ "https://mut-sync-hub.vercel.app", # live frontend
39
+ "http://localhost:3000", # local dev
40
+ ]
41
+ app.add_middleware(
42
+ CORSMiddleware,
43
+ allow_origins=origins,
44
+ allow_credentials=True,
45
+ allow_methods=["*"],
46
+ allow_headers=["*"],
47
+ )
48
+
49
+ # ---------- Routers ----------
50
+ app.include_router(health.router) # public route (no key)
51
+ app.include_router(datasources.router, dependencies=[Depends(verify_key)])
52
+ app.include_router(reports.router, dependencies=[Depends(verify_key)])
53
+ app.include_router(flags.router, dependencies=[Depends(verify_key)])
54
+ app.include_router(scheduler.router, dependencies=[Depends(verify_key)])
55
+ app.include_router(run.router, dependencies=[Depends(verify_key)])
56
+ app.include_router(socket.router)
57
+
58
+ # ---------- Public Health Endpoint ----------
59
+ @app.get("/health")
60
+ def health_check():
61
+ return {"status": "ok", "service": "analytics-engine"}
app/mapper.py ADDED
@@ -0,0 +1,186 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os, json, duckdb, pandas as pd
2
+ from datetime import datetime
3
+ from app.db import get_conn, ensure_raw_table
4
+ from app.utils.detect_industry import _ALIAS
5
+
6
+
7
+ # ---------------------- Canonical schema base ---------------------- #
8
+ CANONICAL = {
9
+ "timestamp": ["timestamp", "date", "sale_date", "created_at"],
10
+ "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
11
+ "qty": ["qty", "quantity", "units", "pieces"],
12
+ "total": ["total", "amount", "line_total", "sales_amount"],
13
+ "store_id": ["store_id", "branch", "location", "outlet_id"],
14
+ "category": ["category", "department", "cat", "family"],
15
+ "promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
16
+ "expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
17
+ }
18
+
19
+ ALIAS_FILE = "./db/alias_memory.json"
20
+
21
+ def safe_str_transform(series: pd.Series) -> pd.Series:
22
+ """Apply .str.lower() & .str.strip() only if dtype is object/string."""
23
+ if pd.api.types.is_string_dtype(series):
24
+ return series.str.lower().str.strip()
25
+ return series
26
+ # ---------------------- Alias memory helpers ---------------------- #
27
+ def load_dynamic_aliases() -> None:
28
+ """Load learned aliases and merge into CANONICAL."""
29
+ if os.path.exists(ALIAS_FILE):
30
+ try:
31
+ with open(ALIAS_FILE) as f:
32
+ dynamic_aliases = json.load(f)
33
+ for k, v in dynamic_aliases.items():
34
+ if k in CANONICAL:
35
+ for alias in v:
36
+ if alias not in CANONICAL[k]:
37
+ CANONICAL[k].append(alias)
38
+ else:
39
+ CANONICAL[k] = v
40
+ except Exception as e:
41
+ print(f"[mapper] ⚠️ failed to load alias memory: {e}")
42
+
43
+
44
+ def save_dynamic_aliases() -> None:
45
+ """Persist learned aliases for next runs."""
46
+ os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
47
+ with open(ALIAS_FILE, "w") as f:
48
+ json.dump(CANONICAL, f, indent=2)
49
+
50
+
51
+ # ---------------------- Schema versioning helpers ---------------------- #
52
+ def ensure_schema_version(duck, df: pd.DataFrame) -> str:
53
+ """
54
+ Ensure schema versioning and track evolution.
55
+ Returns the active canonical table name (e.g., main.canonical_v2).
56
+ """
57
+ duck.execute("CREATE SCHEMA IF NOT EXISTS main")
58
+ duck.execute("""
59
+ CREATE TABLE IF NOT EXISTS main.schema_versions (
60
+ version INTEGER PRIMARY KEY,
61
+ columns JSON,
62
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
63
+ )
64
+ """)
65
+
66
+ latest = duck.execute("SELECT * FROM main.schema_versions ORDER BY version DESC LIMIT 1").fetchone()
67
+ new_signature = sorted(df.columns.tolist())
68
+
69
+ if latest:
70
+ latest_cols = sorted(json.loads(latest[1]))
71
+ if latest_cols == new_signature:
72
+ return f"main.canonical_v{latest[0]}"
73
+ else:
74
+ new_version = latest[0] + 1
75
+ duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
76
+ (new_version, json.dumps(new_signature)))
77
+ print(f"[schema] → new version detected: canonical_v{new_version}")
78
+ return f"main.canonical_v{new_version}"
79
+ else:
80
+ duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
81
+ (1, json.dumps(new_signature)))
82
+ print("[schema] → initialized canonical_v1")
83
+ return "main.canonical_v1"
84
+
85
+
86
+ def reconcile_latest_schema(duck):
87
+ """
88
+ Merge all canonical_v* tables into main.canonical_latest
89
+ preserving new columns and filling missing values with NULL.
90
+ """
91
+ tables = [r[0] for r in duck.execute("""
92
+ SELECT table_name FROM information_schema.tables
93
+ WHERE table_name LIKE 'canonical_v%'
94
+ """).fetchall()]
95
+ if not tables:
96
+ return
97
+
98
+ union_query = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
99
+ duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
100
+ print(f"[schema] ✅ reconciled {len(tables)} schema versions → canonical_latest")
101
+
102
+
103
+ # ---------------------- Canonify core logic ---------------------- #
104
+ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
105
+ """
106
+ Normalize, version, and persist canonical data snapshot for org_id.
107
+ """
108
+ load_dynamic_aliases()
109
+ conn = get_conn(org_id)
110
+ ensure_raw_table(conn)
111
+
112
+ # --------------------------
113
+ # ⏱ Safe timestamp filtering
114
+ # --------------------------
115
+ try:
116
+ rows = conn.execute(
117
+ """
118
+ SELECT row_data
119
+ FROM raw_rows
120
+ WHERE strptime(json_extract(row_data, '$.timestamp'), '%Y-%m-%d %H:%M:%S')
121
+ >= now() - INTERVAL ? HOUR
122
+ """,
123
+ (hours_window,)
124
+ ).fetchall()
125
+ except Exception as e:
126
+ print(f"[canonify] ⚠️ fallback to all rows due to timestamp parse error: {e}")
127
+ rows = conn.execute("SELECT row_data FROM raw_rows").fetchall()
128
+
129
+ if not rows:
130
+ print("[canonify] no rows to process")
131
+ return pd.DataFrame()
132
+
133
+ # --------------------------
134
+ # 🧩 DataFrame normalization
135
+ # --------------------------
136
+ raw = pd.DataFrame([json.loads(r[0]) for r in rows])
137
+ raw.columns = safe_str_transform(raw.columns)
138
+
139
+ # Flexible alias mapping
140
+ mapping = {}
141
+ for canon, aliases in CANONICAL.items():
142
+ for col in raw.columns:
143
+ if any(a in col for a in aliases):
144
+ mapping[col] = canon
145
+ break
146
+
147
+ # 🧠 Learn new aliases dynamically
148
+ for col in raw.columns:
149
+ if col not in sum(CANONICAL.values(), []):
150
+ for canon in CANONICAL.keys():
151
+ if canon in col and col not in CANONICAL[canon]:
152
+ CANONICAL[canon].append(col)
153
+ save_dynamic_aliases()
154
+
155
+ # Apply canonical renaming
156
+ renamed = raw.rename(columns=mapping)
157
+ cols = [c for c in CANONICAL.keys() if c in renamed.columns]
158
+ df = renamed[cols].copy() if cols else renamed.copy()
159
+
160
+ # 🔢 Normalize datatypes
161
+ if "timestamp" in df:
162
+ df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
163
+ if "expiry_date" in df:
164
+ df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
165
+ if "promo_flag" in df:
166
+ df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
167
+ for col in ("qty", "total"):
168
+ if col in df:
169
+ df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
170
+
171
+ # --------------------------
172
+ # 🪣 Schema versioning + storage
173
+ # --------------------------
174
+ os.makedirs("./db", exist_ok=True)
175
+ duck = duckdb.connect(f"./db/{org_id}.duckdb")
176
+
177
+ table_name = ensure_schema_version(duck, df)
178
+ duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
179
+ duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
180
+
181
+ # 🧩 Always refresh canonical_latest for unified analytics
182
+ reconcile_latest_schema(duck)
183
+ duck.close()
184
+
185
+ print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
186
+ return df
app/redis_pool.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ import redis, os
2
+ redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://redis:6379"), decode_responses=True)
app/routers/datasources.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Query, Form, File, UploadFile, Depends, HTTPException
2
+ from fastapi.responses import JSONResponse
3
+ from pydantic import BaseModel
4
+ from typing import List, Any, Dict, Union
5
+ from app.deps import verify_key
6
+ from app.db import get_conn, ensure_raw_table, bootstrap
7
+ from app.mapper import canonify_df
8
+ from app.utils.detect_industry import detect_industry
9
+ from app.routers.socket import sio
10
+ import pandas as pd
11
+ import json
12
+
13
+ router = APIRouter(prefix="/api/v1", tags=["datasources"])
14
+
15
+
16
+ # =======================================================================
17
+ # 1️⃣ ORIGINAL UPLOAD ENDPOINT – handles CSV, POS plug-in, etc.
18
+ # =======================================================================
19
+ @router.post("/datasources")
20
+ async def create_source(
21
+ orgId: str = Query(...),
22
+ sourceId: str = Query(...),
23
+ type: str = Query(...),
24
+ config: str = Form(...),
25
+ file: UploadFile = File(None),
26
+ data: str = Form(None),
27
+ _: str = Depends(verify_key),
28
+ ):
29
+ """
30
+ Keeps existing behavior – for CSV upload, POS plug-in, API push, etc.
31
+ """
32
+ conn = get_conn(orgId)
33
+ ensure_raw_table(conn)
34
+
35
+ config_dict = json.loads(config)
36
+
37
+ if type == "FILE_IMPORT" and file:
38
+ chunk_size = 1000
39
+ for chunk in pd.read_csv(file.file, chunksize=chunk_size):
40
+ for _, row in chunk.iterrows():
41
+ conn.execute("INSERT INTO raw_rows (row_data) VALUES (?)", (row.to_json(),))
42
+ file.file.seek(0)
43
+ elif type in ["API", "DATABASE", "WEBHOOK", "POS_SYSTEM", "ERP", "CUSTOM"]:
44
+ if not data:
45
+ raise HTTPException(status_code=400, detail="Data required for non-file sources")
46
+ records = json.loads(data)
47
+ records = records if isinstance(records, list) else [records]
48
+ for row in records:
49
+ conn.execute("INSERT INTO raw_rows (row_data) VALUES (?)", (json.dumps(row),))
50
+
51
+ # Normalize, detect, and close connection
52
+ df = canonify_df(orgId)
53
+ industry, confidence = detect_industry(df)
54
+ conn.close()
55
+
56
+ # Live broadcast sample
57
+ rows = df.head(3).to_dict("records")
58
+ await sio.emit("datasource:new-rows", {"rows": rows}, room=orgId)
59
+
60
+ return {
61
+ "id": sourceId,
62
+ "status": "listening" if type != "WEBHOOK" else "received",
63
+ "industry": industry,
64
+ "confidence": confidence,
65
+ "recentRows": rows,
66
+ }
67
+
68
+
69
+ # =======================================================================
70
+ # 2️⃣ SMART JSON ENDPOINT – fully schema-agnostic and multi-table aware
71
+ # =======================================================================
72
+ class JsonPayload(BaseModel):
73
+ config: Dict[str, Any]
74
+ data: Union[List[Any], Dict[str, Any]] # flexible: list or { "tables": {...} }
75
+
76
+
77
+ @router.post("/datasources/json")
78
+ async def create_source_json(
79
+ payload: JsonPayload,
80
+ orgId: str = Query(...),
81
+ sourceId: str = Query(...),
82
+ type: str = Query(...),
83
+ _: str = Depends(verify_key),
84
+ ):
85
+ """
86
+ Accepts structured JSON (list or multi-table dict) from n8n, Render jobs, or APIs.
87
+ Automatically evolves schemas, stores data, detects industry, and broadcasts live rows.
88
+ """
89
+ try:
90
+ if not payload or not payload.data:
91
+ raise HTTPException(status_code=400, detail="Missing payload data")
92
+
93
+ # 💾 Flexible insertion – handles one or multiple tables
94
+ bootstrap(orgId, payload.data)
95
+
96
+ # 🧭 Canonical normalization (only if “sales” or compatible table exists)
97
+ df = canonify_df(orgId)
98
+ industry, confidence = detect_industry(df)
99
+
100
+ # 🎯 Preview last few normalized rows
101
+ rows = df.head(3).to_dict("records") if not df.empty else []
102
+ await sio.emit("datasource:new-rows", {"rows": rows}, room=orgId)
103
+
104
+ return JSONResponse(
105
+ content={
106
+ "id": sourceId,
107
+ "status": "processed",
108
+ "industry": industry,
109
+ "confidence": confidence,
110
+ "recentRows": rows,
111
+ "message": "✅ Data ingested successfully",
112
+ }
113
+ )
114
+
115
+ except Exception as e:
116
+ print(f"[datasources/json] ❌ ingestion error: {e}")
117
+ raise HTTPException(status_code=500, detail=str(e))
app/routers/flags.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/routers/flags.py
2
+ from fastapi import APIRouter, Depends, HTTPException
3
+ import httpx
4
+ from app.deps import verify_key
5
+ import os
6
+
7
+ router = APIRouter(prefix="/flags", tags=["Feature Flags"])
8
+ NEXT_API = os.getenv("NEXT_API") # never hard-code localhost # internal Docker name (or env var)
9
+
10
+ @router.get("/{key}")
11
+ async def read_flag(key: str, _: str = Depends(verify_key)):
12
+ async with httpx.AsyncClient() as c:
13
+ r = await c.get(f"{NEXT_API}/api/flags/{key}", headers={"x-api-key": "dev-analytics-key-123"})
14
+ if r.status_code == 404:
15
+ raise HTTPException(404, "Flag not found")
16
+ return r.json()
17
+
18
+ @router.put("/{key}")
19
+ async def set_flag(key: str, body: dict, _: str = Depends(verify_key)):
20
+ async with httpx.AsyncClient() as c:
21
+ r = await c.put(f"{NEXT_API}/api/flags/{key}", json=body, headers={"x-api-key": "dev-analytics-key-123"})
22
+ return r.json()
app/routers/health.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter
2
+
3
+ router = APIRouter(tags=["health"])
4
+
5
+ @router.get("/health")
6
+ def health():
7
+ return {"status": "ok", "service": "analytics-engine"}
app/routers/ingress.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # from fastapi import APIRouter, Depends
2
+ # from pydantic import BaseModel
3
+ # from app.deps import verify_key
4
+
5
+ # router = APIRouter(prefix="/api/v1", tags=["datasources"])
6
+
7
+ # class NewSource(BaseModel):
8
+ # orgId: str
9
+ # sourceId: str
10
+ # type: str
11
+ # config: dict
12
+
13
+ # @router.post("/datasources")
14
+ # def create_source(payload: NewSource, _: str = Depends(verify_key)):
15
+ # print("[analytics] new source", payload)
16
+ # return {"id": payload.sourceId, "status": "sync_queued"}
app/routers/reports.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Analytics engine routes – DuckDB-backed, any-shape input.
3
+ Also exposes Neon-bridge endpoints so Next.js (Prisma) can store history.
4
+ """
5
+ from fastapi import APIRouter, Query, HTTPException
6
+ from pydantic import BaseModel
7
+ from datetime import datetime
8
+ import json
9
+
10
+ from app.mapper import canonify_df
11
+ from app.engine.analytics import AnalyticsService
12
+ from app.utils.detect_industry import detect_industry
13
+ from app.service.industry_svc import (
14
+ eda, forecast, basket, market_dynamics, supply_chain,
15
+ customer_insights, operational_efficiency, risk_assessment, sustainability
16
+ )
17
+
18
+ router = APIRouter(prefix="/analytics", tags=["Analytics"])
19
+
20
+ analytics = AnalyticsService()
21
+
22
+ # --------------------------------------------------
23
+ # 1 RUN ANALYTIC – real-time, any column names
24
+ # --------------------------------------------------
25
+ class RunAnalyticIn(BaseModel):
26
+ analytic: str
27
+ dateColumn: str | None = None
28
+ valueColumn: str | None = None
29
+ minSupport: float = 0.01
30
+ minConfidence: float = 0.3
31
+ minLift: float = 1.0
32
+
33
+ @router.post("/run")
34
+ async def run_analytic(orgId: str, body: RunAnalyticIn):
35
+ """
36
+ 1. Canonify last 6 h of raw rows (any shape)
37
+ 2. Compute chosen analytic
38
+ 3. Return shaped payload
39
+ """
40
+ df = canonify_df(orgId)
41
+ if df.empty:
42
+ raise HTTPException(404, "No recent data found – please ingest or stream first.")
43
+
44
+ data = df.to_dict("records")
45
+ industry, _ = detect_industry(df)
46
+
47
+ match body.analytic:
48
+ case "eda":
49
+ result = await eda(data, industry)
50
+ case "forecast":
51
+ if not body.dateColumn or not body.valueColumn:
52
+ raise HTTPException(400, "dateColumn & valueColumn required")
53
+ result = await forecast(data, body.dateColumn, body.valueColumn)
54
+ case "basket":
55
+ result = await basket(data, body.minSupport, body.minConfidence, body.minLift)
56
+ case "market-dynamics":
57
+ result = await market_dynamics(data)
58
+ case "supply-chain":
59
+ result = await supply_chain(data)
60
+ case "customer-insights":
61
+ result = await customer_insights(data)
62
+ case "operational-efficiency":
63
+ result = await operational_efficiency(data)
64
+ case "risk-assessment":
65
+ result = await risk_assessment(data)
66
+ case "sustainability":
67
+ result = await sustainability(data)
68
+ case _:
69
+ raise HTTPException(400, "Unknown analytic")
70
+
71
+ return {"industry": industry, "data": result}
72
+
73
+ # --------------------------------------------------
74
+ # 2 NEON BRIDGE – latest report for UI + push endpoint
75
+ # --------------------------------------------------
76
+ class PushReportIn(BaseModel):
77
+ orgId: str
78
+ type: str
79
+ results: dict
80
+ lastRun: datetime
81
+
82
+ @router.get("/report/latest")
83
+ def latest_report(orgId: str = Query(...)):
84
+ """
85
+ Returns the newest KPI snapshot we have for this org
86
+ (shape matches Neon schema so Next.js can forward 1-to-1)
87
+ """
88
+ from app.db import get_conn
89
+
90
+ conn = get_conn(orgId)
91
+ row = conn.execute("""
92
+ SELECT analytic_type, results, ts
93
+ FROM kpi_log
94
+ WHERE org_id = ?
95
+ ORDER BY ts DESC
96
+ LIMIT 1
97
+ """, [orgId]).fetchone()
98
+ conn.close()
99
+
100
+ if not row:
101
+ raise HTTPException(404, "No report yet")
102
+
103
+ return {
104
+ "orgId": orgId,
105
+ "type": row[0],
106
+ "results": json.loads(row[1]) if isinstance(row[1], str) else row[1],
107
+ "lastRun": row[2].isoformat(),
108
+ }
109
+
110
+ @router.post("/report/push")
111
+ async def push_report(body: PushReportIn):
112
+ """
113
+ Internal endpoint – Next.js (Prisma) calls this to store history in Neon.
114
+ Analytics container itself does **not** touch Prisma.
115
+ """
116
+ # optional: validate signature / api-key here if you want
117
+ return {"status": "accepted", "orgId": body.orgId, "type": body.type}
app/routers/run.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Analytics engine routes – stateless, DuckDB-backed, any-shape input.
3
+ """
4
+ from fastapi import APIRouter, HTTPException
5
+ from pydantic import BaseModel
6
+ import pandas as pd
7
+
8
+ from app.mapper import canonify_df # NEW
9
+ from app.engine.analytics import AnalyticsService
10
+ from app.utils.detect_industry import detect_industry
11
+ from app.service.industry_svc import (
12
+ eda, forecast, basket, market_dynamics, supply_chain,
13
+ customer_insights, operational_efficiency, risk_assessment, sustainability
14
+ )
15
+
16
+ router = APIRouter(prefix="/analytics", tags=["Analytics"])
17
+
18
+ class RunAnalyticIn(BaseModel):
19
+ analytic: str
20
+ dateColumn: str | None = None
21
+ valueColumn: str | None = None
22
+ minSupport: float = 0.01
23
+ minConfidence: float = 0.3
24
+ minLift: float = 1.0
25
+
26
+ @router.post("/run")
27
+ async def run_analytic(orgId: str, body: RunAnalyticIn):
28
+ """
29
+ 1. Pull last 6 h of raw rows (any column names)
30
+ 2. Map -> canonical DataFrame
31
+ 3. Run chosen analytic
32
+ 4. Return shaped result
33
+ """
34
+ df = canonify_df(orgId) # ← replaces pd.read_parquet
35
+ if df.empty:
36
+ raise HTTPException(404, "No recent data found – please ingest or stream first.")
37
+
38
+ industry, _ = detect_industry(df)
39
+ data = df.to_dict("records")
40
+
41
+ match body.analytic:
42
+ case "eda":
43
+ result = await eda(data, industry)
44
+ case "forecast":
45
+ if not body.dateColumn or not body.valueColumn:
46
+ raise HTTPException(400, "dateColumn & valueColumn required")
47
+ result = await forecast(data, body.dateColumn, body.valueColumn)
48
+ case "basket":
49
+ result = await basket(data, body.minSupport, body.minConfidence, body.minLift)
50
+ case "market-dynamics":
51
+ result = await market_dynamics(data)
52
+ case "supply-chain":
53
+ result = await supply_chain(data)
54
+ case "customer-insights":
55
+ result = await customer_insights(data)
56
+ case "operational-efficiency":
57
+ result = await operational_efficiency(data)
58
+ case "risk-assessment":
59
+ result = await risk_assessment(data)
60
+ case "sustainability":
61
+ result = await sustainability(data)
62
+ case _:
63
+ raise HTTPException(400, "Unknown analytic")
64
+
65
+ return {"industry": industry, "data": result}
app/routers/scheduler.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ State-less scheduler REST facade.
3
+ Jobs are still executed by APScheduler; this router only
4
+ - persists schedules to /data/.schedules.json
5
+ - keeps APScheduler in sync
6
+ """
7
+ import json, uuid, os
8
+ from datetime import datetime
9
+ from typing import List
10
+ from fastapi import APIRouter, Query, HTTPException
11
+ from pydantic import BaseModel
12
+
13
+ router = APIRouter(prefix="/schedules", tags=["scheduler"])
14
+
15
+ SCHEDULE_FILE = "/data/.schedules.json"
16
+
17
+ # --------------------------------------------------
18
+ # models
19
+ # --------------------------------------------------
20
+ class ScheduleIn(BaseModel):
21
+ orgId : str
22
+ frequency: str # daily | weekly | monthly
23
+ analytics: List[str]
24
+
25
+ class ScheduleOut(ScheduleIn):
26
+ id : str
27
+ nextRun : datetime
28
+
29
+ # --------------------------------------------------
30
+ # helpers
31
+ # --------------------------------------------------
32
+ def _load() -> List[dict]:
33
+ if not os.path.exists(SCHEDULE_FILE):
34
+ return []
35
+ with open(SCHEDULE_FILE) as f:
36
+ return json.load(f)
37
+
38
+ def _save(obj: List[dict]):
39
+ with open(SCHEDULE_FILE, "w") as f:
40
+ json.dump(obj, f, indent=2, default=str)
41
+
42
+ def _next_run(frequency: str) -> datetime:
43
+ from datetime import timedelta
44
+ now = datetime.utcnow()
45
+ if frequency == "daily": return now + timedelta(days=1)
46
+ if frequency == "weekly": return now + timedelta(weeks=1)
47
+ if frequency == "monthly": return now + timedelta(days=30)
48
+ return now
49
+
50
+ # --------------------------------------------------
51
+ # CRUD
52
+ # --------------------------------------------------
53
+ # ↓↓↓ ADD THIS LINE ↓↓↓
54
+ @router.get("/schedules", response_model=List[ScheduleOut])
55
+ def list_schedules_endpoint(orgId: str = Query(...)):
56
+ return list_schedules(orgId)
57
+
58
+ @router.get("", response_model=List[ScheduleOut])
59
+ def list_schedules(orgId: str = Query(...)):
60
+ data = _load()
61
+ return [s for s in data if s["orgId"] == orgId]
62
+
63
+ @router.post("", response_model=ScheduleOut)
64
+ def create_schedule(payload: ScheduleIn):
65
+ new_id = str(uuid.uuid4())
66
+ record = {
67
+ "id" : new_id,
68
+ "orgId" : payload.orgId,
69
+ "frequency": payload.frequency,
70
+ "analytics": payload.analytics,
71
+ "nextRun" : _next_run(payload.frequency).isoformat(),
72
+ }
73
+ all_ = _load()
74
+ all_.append(record)
75
+ _save(all_)
76
+ # sync to APScheduler
77
+ from app.tasks.scheduler import add_job_to_scheduler
78
+ add_job_to_scheduler(record)
79
+ return ScheduleOut(**record)
80
+
81
+ @router.delete("/{schedule_id}", status_code=204)
82
+ def delete_schedule(schedule_id: str):
83
+ all_ = _load()
84
+ filtered = [s for s in all_ if s["id"] != schedule_id]
85
+ if len(filtered) == len(all_):
86
+ raise HTTPException(404, "Schedule not found")
87
+ _save(filtered)
88
+ # remove from APScheduler
89
+ from app.tasks.scheduler import remove_job_from_scheduler
90
+ remove_job_from_scheduler(schedule_id)
app/routers/socket.py ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/routers/socket.py
2
+ import socketio
3
+ from fastapi import APIRouter, Depends, Path, Request
4
+ from fastapi.responses import PlainTextResponse
5
+ from app.deps import verify_key # your API-key guard
6
+
7
+ # 1️⃣ Socket.IO server
8
+ sio = socketio.AsyncServer(
9
+ async_mode="asgi",
10
+ cors_allowed_origins=[
11
+ "https://mut-sync-hub.vercel.app",
12
+ "http://localhost:3000",
13
+ ],
14
+ )
15
+
16
+ # 2️⃣ ASGI sub-app (mounted separately in main.py)
17
+ socket_app = socketio.ASGIApp(sio)
18
+
19
+ # 3️⃣ FastAPI router for REST routes (no prefix → /socket-push)
20
+ router = APIRouter(tags=["socket"])
21
+
22
+ # ---------- POST /socket-push/{org_id} ----------
23
+ @router.post("/socket-push/{org_id}")
24
+ async def socket_push(
25
+ org_id: str = Path(...),
26
+ request: Request = None,
27
+ _: str = Depends(verify_key),
28
+ ):
29
+ """
30
+ Receive top-N rows from n8n workflow and broadcast them
31
+ live to all connected clients in the given org room.
32
+ """
33
+ payload = await request.json()
34
+ rows = payload.get("rows", [])
35
+ await sio.emit("datasource:new-rows", {"rows": rows}, room=org_id)
36
+ print(f"[socket] 🔄 broadcasted {len(rows)} rows → room={org_id}")
37
+ return {"status": "ok", "emitted": len(rows)}
38
+
39
+ # ---------- Health Check ----------
40
+ @router.get("/health")
41
+ async def health():
42
+ return PlainTextResponse("ok")
43
+
44
+ # ---------- Socket.IO Events ----------
45
+ @sio.event
46
+ async def connect(sid, environ, auth):
47
+ org_id = (auth or {}).get("orgId", "demo")
48
+ await sio.save_session(sid, {"orgId": org_id})
49
+ await sio.enter_room(sid, org_id)
50
+ print(f"[socket] ✅ {sid} connected → room={org_id}")
51
+
52
+ @sio.event
53
+ async def disconnect(sid):
54
+ print(f"[socket] ❌ {sid} disconnected")
app/service/industry_svc.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Pure async wrappers around AnalyticsService – no quota, no DB.
3
+ """
4
+ from typing import Any, Dict, List, Optional
5
+ import pandas as pd
6
+ from app.engine.analytics import AnalyticsService
7
+
8
+ analytics = AnalyticsService()
9
+
10
+ # ------------------------------------------------------------------
11
+ # 1 EDA – full exploratory + industry auto-detect
12
+ # ------------------------------------------------------------------
13
+ async def eda(data: List[Dict], industry: Optional[str] = None) -> Dict[str, Any]:
14
+ return analytics.perform_eda(data, industry)
15
+
16
+ # ------------------------------------------------------------------
17
+ # 2 FORECAST – Prophet 30-day forward
18
+ # ------------------------------------------------------------------
19
+ async def forecast(data: List[Dict], date_column: str, value_column: str) -> Dict[str, Any]:
20
+ return analytics.forecast_timeseries(data, date_column, value_column)
21
+
22
+ # ------------------------------------------------------------------
23
+ # 3 BASKET – market basket analysis
24
+ # ------------------------------------------------------------------
25
+ async def basket(data: List[Dict],
26
+ min_support: float = 0.01,
27
+ min_confidence: float = 0.3,
28
+ min_lift: float = 1.0) -> Dict[str, Any]:
29
+ df = pd.DataFrame(data)
30
+ return analytics.perform_market_basket_analysis(df, min_support, min_confidence, min_lift)
31
+
32
+ # ------------------------------------------------------------------
33
+ # 4 CROSS-INDUSTRY INSIGHTS – one per endpoint
34
+ # ------------------------------------------------------------------
35
+ async def market_dynamics(data: List[Dict]) -> Dict[str, Any]:
36
+ df = pd.DataFrame(data)
37
+ return analytics._analyze_market_dynamics(df)
38
+
39
+ async def supply_chain(data: List[Dict]) -> Dict[str, Any]:
40
+ df = pd.DataFrame(data)
41
+ return analytics._analyze_supply_chain(df)
42
+
43
+ async def customer_insights(data: List[Dict]) -> Dict[str, Any]:
44
+ df = pd.DataFrame(data)
45
+ return analytics._analyze_customer_insights(df)
46
+
47
+ async def operational_efficiency(data: List[Dict]) -> Dict[str, Any]:
48
+ df = pd.DataFrame(data)
49
+ return analytics._analyze_operational_efficiency(df)
50
+
51
+ async def risk_assessment(data: List[Dict]) -> Dict[str, Any]:
52
+ df = pd.DataFrame(data)
53
+ return analytics._analyze_risk_patterns(df)
54
+
55
+ async def sustainability(data: List[Dict]) -> Dict[str, Any]:
56
+ df = pd.DataFrame(data)
57
+ return analytics._analyze_sustainability_metrics(df)
app/service/live_ingest.py ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json, pandas as pd, redis
2
+ from datetime import datetime
3
+ from app.engine.analytics import AnalyticsService
4
+ from app.redis_pool import redis_client
5
+
6
+ class LiveIngestService:
7
+ def __init__(self, org_id: str):
8
+ self.org_id = org_id
9
+ self.buffer: list[dict] = []
10
+ self.analytics = AnalyticsService()
11
+
12
+ async def handle(self, msg: dict):
13
+ if msg.get("event") != "sale": return
14
+ self.buffer.append(msg["data"])
15
+ if len(self.buffer) >= 100 or self._older_than_3s():
16
+ await self._flush()
17
+
18
+ async def _flush(self):
19
+ if not self.buffer: return
20
+ df = pd.DataFrame(self.buffer)
21
+ df["timestamp"] = pd.to_datetime(df["timestamp"])
22
+ industry = self._detect_industry(df)
23
+ report = self.analytics.perform_eda(df.to_dict("records"), industry=industry)
24
+ redis_client.setex(f"live:{self.org_id}", 300, json.dumps(report, default=str))
25
+ self.buffer.clear()
26
+
27
+ def _older_than_3s(self) -> bool:
28
+ return self.buffer and (pd.Timestamp.utcnow() - pd.to_datetime(self.buffer[-1]["timestamp"])).seconds > 3
29
+
30
+ def _detect_industry(self, df: pd.DataFrame) -> str:
31
+ cols = set(df.columns)
32
+ if {"product_id", "qty", "price", "total"}.issubset(cols): return "supermarket"
33
+ if {"sku", "wholesale_price"}.issubset(cols): return "wholesale"
34
+ return "retail"
app/tasks/ingest_worker.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio, json, redis, duckdb
3
+ from app.db import get_conn, ensure_raw_table
4
+ from app.ingest import ingest_dict
5
+
6
+ r = redis.from_url(os.getenv("REDIS_URL"))
7
+ STREAM_KEY = "pos_stream:{org_id}" # one stream per tenant
8
+
9
+ async def stream_consumer(org_id: str):
10
+ conn = get_conn(org_id)
11
+ ensure_raw_table(conn)
12
+ while True:
13
+ msgs = r.xread({STREAM_KEY.format(org_id=org_id): '$'}, count=100, block=5000)
14
+ if msgs:
15
+ _, entries = msgs[0]
16
+ for _, data in entries:
17
+ ingest_dict(org_id, json.loads(data[b'row']))
18
+ await asyncio.sleep(1) # 1 s micro-batch
app/tasks/kpi_logger.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import duckdb
2
+ from app.db import get_conn, ensure_kpi_log
3
+ from app.mapper import canonify_df # gives uniform DF
4
+ from app.engine.analytics import AnalyticsService
5
+ from app.utils.detect_industry import detect_industry
6
+
7
+ analytics = AnalyticsService()
8
+
9
+ def log_kpis_and_purge(org_id: str) -> None:
10
+ """
11
+ 1. Canonify last 6 h of raw rows
12
+ 2. Compute KPIs
13
+ 3. Insert into kpi_log (history)
14
+ 4. Delete raw rows older than 6 h
15
+ """
16
+ conn = get_conn(org_id)
17
+ ensure_kpi_log(conn)
18
+
19
+ df = canonify_df(org_id)
20
+ if df.empty:
21
+ conn.close()
22
+ return
23
+
24
+ industry, _ = detect_industry(df)
25
+ kpis = analytics.perform_eda(df.to_dict("records"), industry).get("supermarket_kpis", {})
26
+
27
+ conn.execute(
28
+ """INSERT INTO kpi_log(daily_sales, daily_qty, avg_basket,
29
+ shrinkage, promo_lift, stock)
30
+ VALUES (?,?,?,?,?,?)""",
31
+ [
32
+ kpis.get("daily_sales", 0),
33
+ kpis.get("daily_qty", 0),
34
+ kpis.get("avg_basket", 0),
35
+ kpis.get("shrinkage_pct", 0),
36
+ kpis.get("promo_lift_pct", 0),
37
+ kpis.get("stock_on_hand", 0),
38
+ ],
39
+ )
40
+
41
+ # purge raw buffer
42
+ conn.execute("DELETE FROM raw_rows WHERE ingested_at < now() - INTERVAL 6 HOUR")
43
+ conn.commit()
44
+ conn.close()
app/tasks/purge.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ from app.db import get_conn, ensure_raw_table
2
+ def purge_old_raw(org_id: str, hours=6):
3
+ conn = get_conn(org_id)
4
+ conn.execute("DELETE FROM raw_rows WHERE ingested_at < now() - INTERVAL ? HOURS", [hours])
5
+ conn.commit(); conn.close()
app/tasks/scheduler.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ State-less scheduler – caller (Next-js) orchestrates storage & quota.
3
+ Only duty: run analytics on cron, return JSON.
4
+ """
5
+ import asyncio
6
+ import pandas as pd
7
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
8
+ from app.engine.analytics import AnalyticsService
9
+ from app.service.industry_svc import (eda, forecast, basket, market_dynamics,
10
+ supply_chain, customer_insights,
11
+ operational_efficiency, risk_assessment,
12
+ sustainability)
13
+ from app.utils.detect_industry import detect_industry
14
+ from app.utils.email import send_pdf_email
15
+ import os
16
+ from datetime import datetime
17
+ import aiohttp
18
+
19
+ sched = AsyncIOScheduler()
20
+
21
+ # ------------------------------------------------------------------
22
+ # 1 RUN ONE ANALYTIC – pure logic, no DB
23
+ # ------------------------------------------------------------------
24
+ async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict:
25
+ """
26
+ 1. Canonify last 6 h of raw rows (any column names) via DuckDB
27
+ 2. Compute chosen analytic
28
+ 3. Log KPIs + purge old raw data
29
+ 4. Return shaped payload
30
+ """
31
+ from app.mapper import canonify_df # NEW: any-shape → canonical
32
+ from app.tasks.kpi_logger import log_kpis_and_purge # NEW: history & tidy
33
+
34
+ df = canonify_df(org_id)
35
+ if df.empty:
36
+ return {"error": "No recent data found"}
37
+
38
+ data = df.to_dict("records")
39
+ industry, _ = detect_industry(df)
40
+
41
+ match analytic_type:
42
+ case "eda":
43
+ result = await eda(data, industry)
44
+ case "forecast":
45
+ result = await forecast(data, kwargs["date_col"], kwargs["value_col"])
46
+ case "basket":
47
+ result = await basket(data, 0.01, 0.3, 1.0)
48
+ case "market-dynamics":
49
+ result = await market_dynamics(data)
50
+ case "supply-chain":
51
+ result = await supply_chain(data)
52
+ case "customer-insights":
53
+ result = await customer_insights(data)
54
+ case "operational-efficiency":
55
+ result = await operational_efficiency(data)
56
+ case "risk-assessment":
57
+ result = await risk_assessment(data)
58
+ case "sustainability":
59
+ result = await sustainability(data)
60
+ case _:
61
+ return {"error": "Unknown analytic"}
62
+
63
+ # ---------- NEW – history + disk tidy ----------
64
+ log_kpis_and_purge(org_id) # inserts KPIs & deletes raw > 6 h
65
+ # -------------------------------------------------
66
+ async with aiohttp.ClientSession() as session:
67
+ await session.post(
68
+ f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync",
69
+ json={
70
+ "orgId": org_id,
71
+ "type": analytic_type,
72
+ "results": result,
73
+ "lastRun": datetime.utcnow().isoformat(),
74
+ },
75
+ headers={"x-api-key": os.getenv("ANALYTICS_KEY")},
76
+ )
77
+ # fire-and-forget email (caller decides storage)
78
+ pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf"
79
+ asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result}))
80
+
81
+ return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()}
82
+
83
+ # ------------------------------------------------------------------
84
+ # 2 APScheduler glue – unchanged
85
+ # ------------------------------------------------------------------
86
+ def add_job_to_scheduler(schedule: dict):
87
+ org_id = schedule["orgId"]
88
+ freq = schedule["frequency"]
89
+ analytics = schedule["analytics"]
90
+ for analytic in analytics:
91
+ job_id = f"{schedule['id']}_{analytic}"
92
+ if freq == "daily":
93
+ sched.add_job(run_analytic_job, "cron", hour=6, minute=0,
94
+ args=[org_id, analytic], id=job_id)
95
+ elif freq == "weekly":
96
+ sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0,
97
+ args=[org_id, analytic], id=job_id)
98
+ elif freq == "monthly":
99
+ sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0,
100
+ args=[org_id, analytic], id=job_id)
101
+
102
+ def remove_job_from_scheduler(schedule_id: str):
103
+ for job in sched.get_jobs():
104
+ if job.id.startswith(schedule_id):
105
+ sched.remove_job(job.id)
106
+
107
+ # ------------------------------------------------------------------
108
+ # 3 ENV-loader – unchanged
109
+ # ------------------------------------------------------------------
110
+ async def load_schedules():
111
+ import json
112
+ raw = os.getenv("SCHEDULES", "[]")
113
+ try:
114
+ schedules = json.loads(raw)
115
+ except Exception:
116
+ schedules = []
117
+
118
+ for sch in schedules:
119
+ org_id = sch["orgId"]
120
+ freq = sch.get("frequency", "daily")
121
+ analytics = sch.get("analytics", ["eda"])
122
+
123
+ for analytic in analytics:
124
+ job_id = f"{org_id}_{analytic}"
125
+ if freq == "daily":
126
+ sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id)
127
+ elif freq == "weekly":
128
+ sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id)
129
+ elif freq == "monthly":
130
+ sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id)
131
+
132
+ # ------------------------------------------------------------------
133
+ # 4 STARTER
134
+ # ------------------------------------------------------------------
135
+ def start_scheduler():
136
+ asyncio.create_task(load_schedules())
137
+ sched.start()
app/utils/detect_industry.py ADDED
@@ -0,0 +1,116 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Enterprise industry detector – POS-schema aware.
3
+ Works with exports from Square, Lightspeed, Shopify POS, NCR, Oracle MICROS,
4
+ QuickBooks POS, Clover, Revel, Toast, etc.
5
+ """
6
+ import pandas as pd
7
+ from typing import Tuple
8
+
9
+ # ------------------------------------------------------------------
10
+ # 1 COLUMN ALIAS MAP – covers 99 % of real-world POS exports
11
+ # ------------------------------------------------------------------
12
+ _ALIAS = {
13
+ "supermarket": {
14
+ "sku": ["barcode", "item_code", "plu", "product_id", "product_code", "item_id",
15
+ "sku", "goods_code", "article_number", "artnum", "sale_id", "item_barcode",
16
+ "product_barcode", "item_sku", "goods_id", "inventory_id", "merchandise_code"],
17
+ "qty": ["qty", "quantity", "units", "stock", "quantity_sold", "qty_sold",
18
+ "item_count", "unit_count", "pieces", "pcs", "amount_sold",
19
+ "sold_qty", "sales_qty", "sold_quantity", "transaction_qty"],
20
+ "price": ["unit_price", "price", "sell_price", "unit_sell", "selling_price",
21
+ "item_price", "product_price", "rate", "unit_cost", "cost_price",
22
+ "retail_price", "sales_price", "price_each", "unit_rate"],
23
+ "total": ["total", "total_line", "line_total", "net_amount", "amount", "sales_amount",
24
+ "value", "extended_price", "total_price", "gross_amount", "total_amount",
25
+ "line_value", "transaction_total", "subtotal", "total_sales"],
26
+ "transaction": ["transaction_id", "receipt_no", "ticket_no", "order_id", "sale_id",
27
+ "tran_id", "trans_id", "receipt_number", "invoice_no", "bill_no",
28
+ "ticket_id", "session_id", "pos_transaction_id", "order_number"],
29
+ "store": ["store_id", "branch_code", "location_id", "outlet_id", "shop_id",
30
+ "branch_id", "terminal_id", "pos_id", "workstation_id", "station_id",
31
+ "store_code", "site_id", "warehouse_id", "depot_id"],
32
+ "category": ["category", "cat", "department", "class", "sub_category", "group_name",
33
+ "product_group", "family", "section", "division", "category_name",
34
+ "item_category", "product_category", "group_code"],
35
+ "expiry": ["expiry_date", "exp", "best_before", "use_by", "expiration_date",
36
+ "exp_date", "best_before_date", "shelf_life_date", "valid_until",
37
+ "expires_on", "expiry", "expiration"],
38
+ "promo": ["promo", "promotion", "discount_code", "campaign", "is_promo",
39
+ "promotion_code", "disc_code", "offer_code", "special_code",
40
+ "promo_flag", "promotion_flag", "discount_flag", "is_discount"],
41
+ "loss": ["loss_qty", "waste_qty", "shrinkage_qty", "damaged_qty", "spoiled_qty",
42
+ "expired_qty", "write_off_qty", "shrinkage", "waste", "damaged",
43
+ "loss", "shrinkage_units", "waste_units", "damaged_units", "spoiled_units"],
44
+ },
45
+ "healthcare": {
46
+ "patient": ["patient_id", "patient_no", "mrn", "medical_record_number"],
47
+ "treatment": ["treatment_cost", "procedure_cost", "bill_amount", "invoice_amount"],
48
+ "diagnosis": ["diagnosis_code", "icd_code", "condition"],
49
+ "drug": ["drug_name", "medication", "prescription"],
50
+ },
51
+ "wholesale": {
52
+ "sku": ["sku", "item_code"],
53
+ "wholesale_price": ["wholesale_price", "bulk_price", "trade_price"],
54
+ "moq": ["moq", "min_order_qty", "minimum_order"],
55
+ },
56
+ "manufacturing": {
57
+ "production": ["production_volume", "units_produced", "output_qty"],
58
+ "defect": ["defect_rate", "rejection_rate", "scrap_qty"],
59
+ "machine": ["machine_id", "line_id", "station_id"],
60
+ },
61
+ "retail": {
62
+ "product": ["product_name", "product_id"],
63
+ "sale": ["sale_date", "sale_amount"],
64
+ },
65
+ }
66
+
67
+ # ------------------------------------------------------------------
68
+ # 2 HELPER – find first matching column
69
+ # ------------------------------------------------------------------
70
+ def _find_col(df: pd.DataFrame, keys) -> str | None:
71
+ cols = {c.lower() for c in df.columns}
72
+ for k in keys:
73
+ if any(k.lower() in col for col in cols):
74
+ return k
75
+ return None
76
+
77
+ # ------------------------------------------------------------------
78
+ # 3 SCORER – returns (industry, confidence 0-1)
79
+ # ------------------------------------------------------------------
80
+ def detect_industry(df: pd.DataFrame) -> Tuple[str, float]:
81
+ """
82
+ Detect industry from any POS / ERP / healthcare CSV.
83
+ Returns (industry, confidence_score)
84
+ """
85
+ if df.empty:
86
+ return "retail", 0.0
87
+
88
+ scores = {}
89
+ for industry, groups in _ALIAS.items():
90
+ hit = 0
91
+ for group_keys in groups.values():
92
+ if _find_col(df, group_keys):
93
+ hit += 1
94
+ scores[industry] = hit / len(groups) # normalised 0-1
95
+
96
+ # pick highest score
97
+ industry = max(scores, key=scores.get) if scores else "retail"
98
+ confidence = scores.get(industry, 0.0)
99
+
100
+ # tie-breaker: supermarket wins if score == retail score (supermarket is strict superset)
101
+ if scores.get("supermarket", 0) == scores.get("retail", 0) and "supermarket" in scores:
102
+ industry = "supermarket"
103
+
104
+ return industry, confidence
105
+
106
+ # ------------------------------------------------------------------
107
+ # 4 SINGLE-USE HELPER – supermarket boolean
108
+ # ------------------------------------------------------------------
109
+ def is_supermarket(df: pd.DataFrame) -> bool:
110
+ """
111
+ Fast yes/no wrapper for downstream code that only cares
112
+ whether we treat this as a supermarket data set.
113
+ """
114
+ industry, confidence = detect_industry(df)
115
+ # be conservative: only return True if we are *sure*
116
+ return industry == "supermarket" and confidence >= 0.6
app/utils/email.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ # analytics-service/app/utils/email.py
2
+ from typing import Any
3
+
4
+ def send_pdf_email(*_: Any, **__: Any) -> None:
5
+ """Stub – replace with real e-mail logic later."""
6
+ pass
data/duckdb/.gitkeep ADDED
File without changes
data/duckdb/schedules.json ADDED
File without changes
fly.toml ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # fly.toml app configuration file generated for mutsynchub on 2025-11-06T14:44:31Z
2
+ #
3
+ # See https://fly.io/docs/reference/configuration/ for information about how to use this file.
4
+ #
5
+
6
+ app = 'mutsynchub'
7
+ primary_region = 'iad'
8
+
9
+ [build]
10
+
11
+ [http_service]
12
+ internal_port = 8080
13
+ force_https = true
14
+ auto_stop_machines = 'stop'
15
+ auto_start_machines = true
16
+ min_machines_running = 0
17
+ processes = ['app']
18
+
19
+ [[vm]]
20
+ memory = '1gb'
21
+ cpu_kind = 'shared'
22
+ cpus = 1
23
+ memory_mb = 1024
requirements.txt ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Analytics Service dependencies
2
+ apscheduler>=3.10
3
+ pyarrow>=15.0
4
+ redis>=5.0
5
+ pandas>=2.2
6
+ fastapi>=0.111
7
+ uvicorn[standard]>=0.29
8
+ prophet==1.1.5
9
+ numpy>=1.24
10
+ scikit-learn>=1.3
11
+ scipy>=1.10
12
+ statsmodels>=0.14
13
+ networkx>=3.0
14
+ sqlalchemy[asyncio]>=2.0
15
+ asyncpg>=0.29 # async postgres driver
16
+ numpy<2.0
17
+ requests>=2.31
18
+ huggingface_hub>=0.20.0
19
+ aiohttp>=3.9.0
20
+ httpx>=0.27.0
21
+ python-multipart==0.0.6
22
+ pycryptodome==3.20.0
23
+ python-socketio[asyncio]>=5.11.0
scheduler_loop.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json, time, os, requests
2
+ from datetime import datetime, timedelta
3
+ from pathlib import Path
4
+
5
+ SCHEDULE_FILE = "/data/.schedules.json"
6
+ RUN_URL = "http://localhost:8000/analytics/run" # inside container
7
+
8
+ def tick():
9
+ if not Path(SCHEDULE_FILE).exists():
10
+ return
11
+ with open(SCHEDULE_FILE) as f:
12
+ schedules = json.load(f)
13
+
14
+ now = datetime.utcnow().isoformat()
15
+ for s in schedules:
16
+ if s["nextRun"] <= now:
17
+ for analytic in s["analytics"]:
18
+ # call the same endpoint the UI uses
19
+ r = requests.post(RUN_URL,
20
+ json={"analytic": analytic},
21
+ headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"})
22
+ print(f"[scheduler] ran {analytic} for {s['orgId']} -> {r.status_code}")
23
+ # bump nextRun
24
+ s["nextRun"] = (_next_run(s["frequency"])).isoformat()
25
+
26
+ with open(SCHEDULE_FILE, "w") as f:
27
+ json.dump(schedules, f, indent=2)
28
+
29
+ def _next_run(frequency: str) -> datetime:
30
+ now = datetime.utcnow()
31
+ if frequency == "daily": return now + timedelta(days=1)
32
+ if frequency == "weekly": return now + timedelta(weeks=1)
33
+ if frequency == "monthly": return now + timedelta(days=30)
34
+ return now
35
+
36
+ if __name__ == "__main__":
37
+ while True:
38
+ try:
39
+ tick()
40
+ except Exception as e:
41
+ print("[scheduler] error:", e)
42
+ time.sleep(60) # 1-minute granularity