File size: 8,236 Bytes
ec9186e
 
 
 
 
0bd628a
ec9186e
ae09122
ec9186e
 
 
 
ae09122
ec9186e
 
 
 
0bd628a
 
ec9186e
 
 
 
 
 
 
0bd628a
d3d9d83
ec9186e
 
d3d9d83
ec9186e
 
 
 
d3d9d83
ec9186e
 
 
d3d9d83
0bd628a
 
ec9186e
d3d9d83
 
 
 
ec9186e
 
 
0bd628a
d3d9d83
0bd628a
ec9186e
 
 
 
 
 
 
0bd628a
 
ec9186e
 
 
 
 
 
 
ae09122
ec9186e
 
 
 
 
 
 
 
 
 
ae09122
0bd628a
ec9186e
ae09122
 
ec9186e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0bd628a
ec9186e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
πŸ›‘οΈ Universal Base KPI Calculator
Enterprise Pattern: Async, fault-tolerant, LLM-guarded, schema-aware
"""

import pandas as pd
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from datetime import datetime
import asyncio
import json
from app.schemas.org_schema import OrgSchema
from app.service.llm_service import get_llm_service

logger = logging.getLogger(__name__)


class BaseKPICalculator(ABC):
    """
    πŸ›οΈ Enterprise Base Class
    - Async-ready
    - LLM-guarded (won't crash if LLM not loaded)
    - Schema-aware with dynamic mapping
    - Comprehensive error handling
    """
    
    def __init__(self, org_id: str, df: pd.DataFrame, source_id: Optional[str] = None, entity_type: str = "SALES"):
        """
        βœ… Universal constructor - all parameters optional except org_id and df
    
        Args:
            org_id: Organization ID (required)
            df: DataFrame to analyze (required)
            source_id: Optional source identifier for tracking
            entity_type: Entity type from Redis (e.g., "SALES", "INVENTORY")
        """
        if not org_id or df.empty:
            raise ValueError("org_id and non-empty df required")
        
        self.org_id = org_id
        self.source_id = source_id
        self.df = df.copy()  # Defensive copy to prevent mutation
        self.entity_type = entity_type  # βœ… Store entity_type
    
        # βœ… FIXED: Pass entity_type to OrgSchema
        self.schema = OrgSchema(org_id=org_id, entity_type=entity_type)
        self.llm = get_llm_service()
        self.computed_at = datetime.utcnow()
        self._cache: Dict[str, Any] = {}  # In-memory cache for this run
    
        logger.info(f"[KPI] πŸ“Š {self.__class__.__name__} initialized for {org_id}/{entity_type} ({len(df)} rows)")
    @abstractmethod
    async def compute_all(self) -> Dict[str, Any]:
        """
        🎯 Main entry point - **MUST BE ASYNC** for LLM calls
        
        Returns:
            Complete KPI dictionary with metadata
        """
        pass
    
    def _safe_calc(
        self, 
        semantic_field: str, 
        operation: str, 
        default: Any = 0.0,
        fallback_field: Optional[str] = None
    ) -> Any:
        """
        πŸ”’ **Enterprise-safe calculation** with multiple fallback strategies
        
        Args:
            semantic_field: Semantic field name (e.g., "total")
            operation: pandas operation ("sum", "mean", "nunique", etc.)
            default: Default value if calculation fails
            fallback_field: Secondary field to try if primary fails
        
        Returns:
            Scalar result or default
        """
        try:
            # Primary field resolution
            actual_col = self.schema.get_column(semantic_field)
            
            if actual_col and actual_col in self.df.columns:
                series = self.df[actual_col]
                
                # Handle different operation types
                if operation == "nunique":
                    return int(series.nunique())
                elif operation == "count":
                    return int(series.count())
                elif operation == "sum":
                    return float(series.sum())
                elif operation == "mean":
                    return float(series.mean())
                elif operation == "max":
                    return float(series.max())
                elif operation == "min":
                    return float(series.min())
                elif operation == "std":
                    return float(series.std())
                else:
                    logger.warning(f"[KPI] Unknown operation: {operation}")
                    return default
            
            # Fallback field if provided
            if fallback_field and fallback_field in self.df.columns:
                logger.info(f"[KPI] Fallback to {fallback_field} for {semantic_field}")
                return getattr(self.df[fallback_field], operation, lambda: default)()
            
            logger.warning(f"[KPI] Field '{semantic_field}' not found, returning default: {default}")
            return default
            
        except Exception as e:
            logger.error(f"[KPI] Calculation failed for '{semantic_field}.{operation}': {e}")
            return default
    
    def _cache_value(self, key: str, value: Any, ttl: int = 3600):
        """
        πŸ’Ύ Cache value in Redis for cross-worker sharing
        
        Args:
            key: Cache key (will be prefixed with org_id)
            value: Value to cache (must be JSON-serializable)
            ttl: Time-to-live in seconds
        """
        try:
            from app.core.event_hub import event_hub
            cache_key = f"kpi_cache:{self.org_id}:{key}"
            event_hub.setex(cache_key, ttl, json.dumps(value))
        except Exception as e:
            logger.warning(f"[KPI] Cache write failed: {e}")
    
    def _get_cached_value(self, key: str, default: Any = None) -> Any:
        """
        πŸ“– Retrieve cached value from Redis
        
        Args:
            key: Cache key (without prefix)
            default: Default value if cache miss
            
        Returns:
            Cached value or default
        """
        try:
            from app.core.event_hub import event_hub
            cache_key = f"kpi_cache:{self.org_id}:{key}"
            data = event_hub.get_key(cache_key)
            
            if data:
                return json.loads(data)
            return default
            
        except Exception as e:
            logger.warning(f"[KPI] Cache read failed: {e}")
            return default
    
    def _calculate_growth(self, current: float, previous: float) -> float:
        """
        πŸ“ˆ Safe growth calculation with divide-by-zero protection
        
        Args:
            current: Current period value
            previous: Previous period value
            
        Returns:
            Growth percentage or 0.0 if invalid
        """
        try:
            if previous and previous > 0:
                return float((current - previous) / previous * 100)
            return 0.0
        except Exception:
            return 0.0
    
    async def _llm_generate_safe(self, prompt: str, max_tokens: int = 50) -> Optional[str]:
        """
        πŸ€– **LLM-guarded generation** - won't crash if LLM not ready
        
        Args:
            prompt: Prompt for LLM
            max_tokens: Max tokens to generate
            
        Returns:
            Generated text or None if LLM unavailable
        """
        try:
            if not self.llm.is_ready():
                logger.warning("[KPI] LLM not ready, skipping AI tier")
                return None
            
            return await asyncio.to_thread(
                self.llm.generate, 
                prompt, 
                max_tokens=max_tokens
            )
        except Exception as e:
            logger.warning(f"[KPI] LLM generation failed: {e}")
            return None
    
    def _validate_data_quality(self) -> List[Dict[str, Any]]:
        """
        πŸ” **Enterprise data quality check**
        
        Returns:
            List of quality issues with severity levels
        """
        issues = []
        
        # Check for missing timestamps
        if 'timestamp' in self.df.columns:
            missing_ts = self.df['timestamp'].isna().sum()
            if missing_ts > 0:
                issues.append({
                    "field": "timestamp",
                    "issue": "missing_values",
                    "count": int(missing_ts),
                    "severity": "high" if missing_ts > len(self.df) * 0.1 else "medium"
                })
        
        # Check for negative totals
        if 'total' in self.df.columns:
            negative_sales = (self.df['total'] < 0).sum()
            if negative_sales > 0:
                issues.append({
                    "field": "total",
                    "issue": "negative_values",
                    "count": int(negative_sales),
                    "severity": "medium"
                })
        
        return issues