shaliz-kong commited on
Commit
d3d9d83
Β·
1 Parent(s): fa2291e

added entityt type in kpi calculators

Browse files
app/engine/kpi_calculators/base.py CHANGED
@@ -25,28 +25,31 @@ class BaseKPICalculator(ABC):
25
  - Comprehensive error handling
26
  """
27
 
28
- def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None):
29
  """
30
  βœ… Universal constructor - all parameters optional except org_id and df
31
-
32
  Args:
33
  org_id: Organization ID (required)
34
  df: DataFrame to analyze (required)
35
  source_id: Optional source identifier for tracking
 
36
  """
37
  if not org_id or df.empty:
38
  raise ValueError("org_id and non-empty df required")
39
-
40
  self.org_id = org_id
41
  self.source_id = source_id
42
  self.df = df.copy() # Defensive copy to prevent mutation
43
- self.schema = OrgSchema(org_id)
 
 
 
44
  self.llm = get_llm_service()
45
  self.computed_at = datetime.utcnow()
46
  self._cache: Dict[str, Any] = {} # In-memory cache for this run
47
-
48
- logger.info(f"[KPI] πŸ“Š {self.__class__.__name__} initialized for {org_id} ({len(df)} rows)")
49
 
 
50
  @abstractmethod
51
  async def compute_all(self) -> Dict[str, Any]:
52
  """
 
25
  - Comprehensive error handling
26
  """
27
 
28
+ def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None, entity_type: str = "SALES"):
29
  """
30
  βœ… Universal constructor - all parameters optional except org_id and df
31
+
32
  Args:
33
  org_id: Organization ID (required)
34
  df: DataFrame to analyze (required)
35
  source_id: Optional source identifier for tracking
36
+ entity_type: Entity type from Redis (e.g., "SALES", "INVENTORY")
37
  """
38
  if not org_id or df.empty:
39
  raise ValueError("org_id and non-empty df required")
40
+
41
  self.org_id = org_id
42
  self.source_id = source_id
43
  self.df = df.copy() # Defensive copy to prevent mutation
44
+ self.entity_type = entity_type # βœ… Store entity_type
45
+
46
+ # βœ… FIXED: Pass entity_type to OrgSchema
47
+ self.schema = OrgSchema(org_id=org_id, entity_type=entity_type)
48
  self.llm = get_llm_service()
49
  self.computed_at = datetime.utcnow()
50
  self._cache: Dict[str, Any] = {} # In-memory cache for this run
 
 
51
 
52
+ logger.info(f"[KPI] πŸ“Š {self.__class__.__name__} initialized for {org_id}/{entity_type} ({len(df)} rows)")
53
  @abstractmethod
54
  async def compute_all(self) -> Dict[str, Any]:
55
  """
app/engine/kpi_calculators/hospitality.py CHANGED
@@ -9,11 +9,12 @@ from app.schemas.org_schema import OrgSchema
9
  class HospitalityKPICalculator(BaseKPICalculator):
10
  """Restaurant & Hospitality KPI engine"""
11
 
12
- def __init__(self, org_id: str, df: pd.DataFrame, source_id: str):
13
- super().__init__(df)
14
  self.schema = OrgSchema(org_id)
15
  self.org_id = org_id
16
  self.source_id = source_id
 
17
  self._alias_columns()
18
 
19
  def _alias_columns(self):
 
9
  class HospitalityKPICalculator(BaseKPICalculator):
10
  """Restaurant & Hospitality KPI engine"""
11
 
12
+ def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None, entity_type: str = "SALES"):
13
+ super().__init__(org_id=org_id, df=df, source_id=source_id, entity_type=entity_type)
14
  self.schema = OrgSchema(org_id)
15
  self.org_id = org_id
16
  self.source_id = source_id
17
+ self.entity_type = entity_type
18
  self._alias_columns()
19
 
20
  def _alias_columns(self):
app/engine/kpi_calculators/registry.py CHANGED
@@ -1,6 +1,9 @@
1
  """
2
  🏭 KPI Calculator Factory Registry
3
  Enterprise Pattern: Zero-bias, fault-tolerant, async-ready
 
 
 
4
  """
5
 
6
  import logging
@@ -27,7 +30,8 @@ def get_kpi_calculator(
27
  industry: str,
28
  org_id: str,
29
  df: pd.DataFrame,
30
- source_id: Optional[str] = None
 
31
  ) -> Any:
32
  """
33
  🎯 Factory - gets calculator for any industry with fault tolerance
@@ -37,12 +41,14 @@ def get_kpi_calculator(
37
  org_id: Organization ID
38
  df: DataFrame to analyze
39
  source_id: Optional source identifier
 
40
 
41
  Returns:
42
  Instantiated calculator class
43
 
44
  Raises:
45
  ValueError: If df is empty or org_id missing
 
46
  """
47
  if not org_id or df.empty:
48
  raise ValueError("org_id and non-empty df required")
@@ -51,23 +57,57 @@ def get_kpi_calculator(
51
  industry_key = industry.lower().strip() if industry else "default"
52
  calculator_class = KPI_CALCULATORS.get(industry_key, KPI_CALCULATORS["default"])
53
 
54
- logger.info(f"[KPI] 🎯 Selected {calculator_class.__name__} for industry: '{industry_key}'")
55
 
56
- # βœ… **Universal constructor** - handles both signatures
57
  try:
58
- # Try with source_id (new pattern)
59
- return calculator_class(org_id=org_id, df=df, source_id=source_id)
60
- except TypeError:
61
- # Fallback to legacy signature
62
- logger.warning(f"[KPI] {calculator_class.__name__} doesn't accept source_id, using legacy signature")
63
- return calculator_class(org_id=org_id, df=df)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
  # Async version for non-blocking instantiation
66
  async def get_kpi_calculator_async(
67
  industry: str,
68
  org_id: str,
69
  df: pd.DataFrame,
70
- source_id: Optional[str] = None
 
71
  ) -> Any:
72
- """Non-blocking factory (for async contexts)"""
73
- return await asyncio.to_thread(get_kpi_calculator, industry, org_id, df, source_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
  🏭 KPI Calculator Factory Registry
3
  Enterprise Pattern: Zero-bias, fault-tolerant, async-ready
4
+ - Supports dynamic entity_type injection from Redis
5
+ - Backward compatible with legacy calculators
6
+ - Async interface for non-blocking instantiation
7
  """
8
 
9
  import logging
 
30
  industry: str,
31
  org_id: str,
32
  df: pd.DataFrame,
33
+ source_id: Optional[str] = None,
34
+ entity_type: str = "SALES" # βœ… NEW: Injected from Redis
35
  ) -> Any:
36
  """
37
  🎯 Factory - gets calculator for any industry with fault tolerance
 
41
  org_id: Organization ID
42
  df: DataFrame to analyze
43
  source_id: Optional source identifier
44
+ entity_type: Entity type from Redis (e.g., "SALES", "INVENTORY")
45
 
46
  Returns:
47
  Instantiated calculator class
48
 
49
  Raises:
50
  ValueError: If df is empty or org_id missing
51
+ TypeError: If calculator instantiation fails
52
  """
53
  if not org_id or df.empty:
54
  raise ValueError("org_id and non-empty df required")
 
57
  industry_key = industry.lower().strip() if industry else "default"
58
  calculator_class = KPI_CALCULATORS.get(industry_key, KPI_CALCULATORS["default"])
59
 
60
+ logger.info(f"[KPI] 🎯 {calculator_class.__name__} for {org_id}/{entity_type} ({industry_key})")
61
 
62
+ # βœ… **Universal constructor** - handles all signature variations
63
  try:
64
+ # Modern signature with entity_type
65
+ return calculator_class(
66
+ org_id=org_id,
67
+ df=df,
68
+ source_id=source_id,
69
+ entity_type=entity_type
70
+ )
71
+ except TypeError as e:
72
+ if "entity_type" in str(e):
73
+ # Legacy calculator without entity_type support
74
+ logger.warning(f"[KPI] {calculator_class.__name__} legacy signature: {e}")
75
+ try:
76
+ return calculator_class(org_id=org_id, df=df, source_id=source_id)
77
+ except TypeError:
78
+ # Ultra-legacy: only org_id and df
79
+ logger.warning(f"[KPI] {calculator_class.__name__} ultra-legacy signature")
80
+ return calculator_class(org_id=org_id, df=df)
81
+ else:
82
+ # Unexpected error
83
+ logger.error(f"[KPI] Unexpected instantiation error: {e}")
84
+ raise
85
 
86
  # Async version for non-blocking instantiation
87
  async def get_kpi_calculator_async(
88
  industry: str,
89
  org_id: str,
90
  df: pd.DataFrame,
91
+ source_id: Optional[str] = None,
92
+ entity_type: str = "SALES" # βœ… NEW: Async version also accepts entity_type
93
  ) -> Any:
94
+ """
95
+ 🎯 Async factory - non-blocking calculator instantiation
96
+
97
+ Args:
98
+ Same as get_kpi_calculator
99
+
100
+ Returns:
101
+ Instantiated calculator class
102
+
103
+ Usage:
104
+ calculator = await get_kpi_calculator_async(...)
105
+ """
106
+ return await asyncio.to_thread(
107
+ get_kpi_calculator,
108
+ industry,
109
+ org_id,
110
+ df,
111
+ source_id,
112
+ entity_type
113
+ )
app/engine/kpi_calculators/retail.py CHANGED
@@ -9,11 +9,12 @@ from app.schemas.org_schema import OrgSchema
9
  class RetailKPICalculator(BaseKPICalculator):
10
  """Retail KPI engine for general retail businesses"""
11
 
12
- def __init__(self, org_id: str, df: pd.DataFrame, source_id: str):
13
- super().__init__(df)
14
  self.schema = OrgSchema(org_id)
15
  self.org_id = org_id
16
  self.source_id = source_id
 
17
  self._alias_columns()
18
 
19
  def _alias_columns(self):
 
9
  class RetailKPICalculator(BaseKPICalculator):
10
  """Retail KPI engine for general retail businesses"""
11
 
12
+ def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None, entity_type: str = "SALES"):
13
+ super().__init__(org_id=org_id, df=df, source_id=source_id, entity_type=entity_type)
14
  self.schema = OrgSchema(org_id)
15
  self.org_id = org_id
16
  self.source_id = source_id
17
+ self.entity_type = entity_type
18
  self._alias_columns()
19
 
20
  def _alias_columns(self):
app/engine/kpi_calculators/supermarket.py CHANGED
@@ -26,21 +26,19 @@ class SupermarketKPICalculator(BaseKPICalculator):
26
  - Predictive alerts
27
  """
28
 
29
- def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None):
30
- """
31
- βœ… **Fixed constructor** - matches BaseKPICalculator signature
32
-
33
- Args:
34
- org_id: Organization ID
35
- df: Transaction DataFrame
36
- source_id: Optional source identifier
37
- """
38
- super().__init__(org_id=org_id, df=df, source_id=source_id)
39
 
40
- # Dynamic schema aliasing for cleaner code
41
  self._apply_schema_aliases()
42
-
43
- logger.info(f"[KPI] πŸ›’ Supermarket calculator ready with {len(self.df)} transactions")
44
 
45
  def _apply_schema_aliases(self):
46
  """
 
26
  - Predictive alerts
27
  """
28
 
29
+ # REPLACE SupermarketKPICalculator __init__ (lines 17-23)
30
+
31
+ def __init__(self, org_id: str, df: pd.DataFrame, source_id: str = None, entity_type: str = "SALES"):
32
+ # βœ… FIXED: Pass entity_type up the chain
33
+ super().__init__(
34
+ org_id=org_id,
35
+ df=df,
36
+ source_id=source_id,
37
+ entity_type=entity_type # βœ… Critical
38
+ )
39
 
 
40
  self._apply_schema_aliases()
41
+ logger.info(f"[KPI] πŸ›’ Supermarket calculator ready for {entity_type}")
 
42
 
43
  def _apply_schema_aliases(self):
44
  """
app/tasks/analytics_worker.py CHANGED
@@ -108,15 +108,20 @@ class AnalyticsWorker:
108
  name=f"embed-{self.org_id}-{self.source_id}"
109
  )
110
 
 
 
 
111
  # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
112
  industry = await self._get_industry()
113
- calculator = get_kpi_calculator(industry, self.org_id, df, self.source_id)
 
 
 
 
 
 
114
  results = await asyncio.to_thread(calculator.compute_all)
115
 
116
- self.computed_at = datetime.now()
117
- duration = (self.computed_at - start_time).total_seconds()
118
- logger.info(f"[WORKER] βœ… KPIs computed in {duration:.2f}s")
119
-
120
  # 🎯 STEP 8: Publish results (atomic pipeline)
121
  await self._publish(results)
122
 
@@ -133,6 +138,7 @@ class AnalyticsWorker:
133
  except asyncio.TimeoutError:
134
  logger.warning("[WORKER] ⚠️ Embedding timeout, but KPIs published")
135
 
 
136
  logger.info(f"[WORKER] 🎯 COMPLETE: {worker_id} in {duration:.2f}s")
137
  return results
138
 
 
108
  name=f"embed-{self.org_id}-{self.source_id}"
109
  )
110
 
111
+ # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
112
+ # REPLACE the KPI calculation block
113
+
114
  # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
115
  industry = await self._get_industry()
116
+ calculator = await get_kpi_calculator( # βœ… Make it async
117
+ industry=industry,
118
+ org_id=self.org_id,
119
+ df=df,
120
+ source_id=self.source_id,
121
+ entity_type=self._entity_type # βœ… Pass Redis value
122
+ )
123
  results = await asyncio.to_thread(calculator.compute_all)
124
 
 
 
 
 
125
  # 🎯 STEP 8: Publish results (atomic pipeline)
126
  await self._publish(results)
127
 
 
138
  except asyncio.TimeoutError:
139
  logger.warning("[WORKER] ⚠️ Embedding timeout, but KPIs published")
140
 
141
+ duration = (datetime.now() - start_time).total_seconds()
142
  logger.info(f"[WORKER] 🎯 COMPLETE: {worker_id} in {duration:.2f}s")
143
  return results
144