File size: 7,701 Bytes
e869d90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
Batch processing utilities for scalable data lake analysis.

Provides patterns for aggregating, analyzing, and exporting data across
the entire data lake or subsets thereof.
"""

from typing import Callable, Dict, Any, Optional
import pandas as pd
from .query import DataLakeQuery
from .logger import setup_logger

logger = setup_logger(__name__)


class BatchProcessor:
    """
    Batch processing utilities for scalable data lake analysis.
    
    Provides high-level patterns for common analysis tasks across
    multiple devices and messages.
    """
    
    def __init__(self, query: DataLakeQuery):
        """
        Initialize batch processor.
        
        Args:
            query: DataLakeQuery instance
        """
        self.query = query
        logger.info("Initialized BatchProcessor")
    
    def aggregate_by_device_message(
        self,
        aggregation_func: Callable[[pd.DataFrame], Dict[str, Any]],
        device_filter: Optional[str] = None,
        message_filter: Optional[str] = None,
    ) -> Dict[str, Dict[str, Any]]:
        """
        Apply aggregation function to each device/message combination.
        
        Pattern for scalable analysis across entire data lake. Processes
        each device/message combination separately to manage memory.
        
        Args:
            aggregation_func: Function (df) -> dict of metrics/statistics
            device_filter: Device regex filter (applied via catalog)
            message_filter: Message regex filter
        
        Returns:
            Nested dict: {device_id: {message: aggregation_result}}
        
        Example:
            >>> def compute_stats(df):
            ...     return {
            ...         'count': len(df),
            ...         'rpm_mean': df['RPM'].mean() if 'RPM' in df else None
            ...     }
            >>> results = processor.aggregate_by_device_message(compute_stats)
        """
        results: Dict[str, Dict[str, Any]] = {}
        
        # Get filtered device list
        devices = self.query.catalog.list_devices(device_filter)
        
        for device in devices:
            messages = self.query.catalog.list_messages(device, message_filter)
            for message in messages:
                try:
                    # Read data for this device/message
                    df = self.query.read_device_message(device, message)
                    if device not in results:
                        results[device] = {}
                    results[device][message] = aggregation_func(df)
                except Exception as e:
                    logger.error(f"Aggregation failed for {device}/{message}: {e}")
                    if device not in results:
                        results[device] = {}
                    results[device][message] = {"error": str(e)}
        
        logger.info(f"Aggregation completed for {len(results)} devices")
        return results
    
    def export_to_csv(
        self,
        device_id: str,
        message: str,
        output_path: str,
        date_range: Optional[tuple[str, str]] = None,
        limit: Optional[int] = None,
    ) -> None:
        """
        Export device/message data to CSV.
        
        Args:
            device_id: Device identifier
            message: Message name
            output_path: Output CSV file path
            date_range: Optional (start_date, end_date) tuple
            limit: Optional row limit
        
        Raises:
            Exception: If export fails
        """
        logger.info(f"Exporting {device_id}/{message} to {output_path}")
        df = self.query.read_device_message(
            device_id=device_id,
            message=message,
            date_range=date_range,
            limit=limit,
        )
        
        if df.empty:
            logger.warning(f"No data to export for {device_id}/{message}")
            return
        
        df.to_csv(output_path, index=False)
        logger.info(f"Exported {len(df)} rows to {output_path}")
    
    def export_to_parquet(
        self,
        device_id: str,
        message: str,
        output_path: str,
        date_range: Optional[tuple[str, str]] = None,
    ) -> None:
        """
        Export device/message data to Parquet file.
        
        Args:
            device_id: Device identifier
            message: Message name
            output_path: Output Parquet file path
            date_range: Optional (start_date, end_date) tuple
        
        Raises:
            Exception: If export fails
        """
        logger.info(f"Exporting {device_id}/{message} to {output_path}")
        df = self.query.read_device_message(
            device_id=device_id,
            message=message,
            date_range=date_range,
        )
        
        if df.empty:
            logger.warning(f"No data to export for {device_id}/{message}")
            return
        
        df.to_parquet(output_path, index=False, compression='snappy')
        logger.info(f"Exported {len(df)} rows to {output_path}")
    
    def compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
        """
        Compute basic statistics for aggregation.
        
        Args:
            df: Input DataFrame
        
        Returns:
            Dict with count, mean, min, max, std for numeric columns
        
        Note:
            Skips timestamp column 't' in statistics computation.
        """
        stats: Dict[str, Any] = {"count": len(df)}
        
        if df.empty:
            return stats
        
        # Compute statistics for numeric columns (excluding timestamp)
        numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
        numeric_cols = [c for c in numeric_cols if c != 't']
        
        for col in numeric_cols:
            try:
                stats[f"{col}_mean"] = float(df[col].mean())
                stats[f"{col}_min"] = float(df[col].min())
                stats[f"{col}_max"] = float(df[col].max())
                stats[f"{col}_std"] = float(df[col].std())
                stats[f"{col}_null_count"] = int(df[col].isna().sum())
            except Exception as e:
                logger.warning(f"Failed to compute stats for {col}: {e}")
        
        return stats
    
    def find_anomalies(
        self,
        device_id: str,
        message: str,
        signal_name: str,
        threshold_std: float = 3.0,
    ) -> pd.DataFrame:
        """
        Find anomalous values in a signal using z-score method.
        
        Args:
            device_id: Device identifier
            message: Message name
            signal_name: Signal column name
            threshold_std: Number of standard deviations for anomaly threshold
        
        Returns:
            DataFrame with anomalous records
        """
        df = self.query.read_device_message(
            device_id=device_id,
            message=message,
            columns=['t', signal_name],
        )
        
        if df.empty or signal_name not in df.columns:
            logger.warning(f"No data or signal not found: {signal_name}")
            return pd.DataFrame()
        
        # Compute z-scores
        mean = df[signal_name].mean()
        std = df[signal_name].std()
        
        if std == 0:
            logger.warning(f"Zero standard deviation for {signal_name}")
            return pd.DataFrame()
        
        z_scores = (df[signal_name] - mean) / std
        anomalies = df[abs(z_scores) > threshold_std].copy()
        
        logger.info(f"Found {len(anomalies)} anomalies in {signal_name} "
                   f"(threshold: {threshold_std} std)")
        
        return anomalies