""" Batch processing utilities for scalable data lake analysis. Provides patterns for aggregating, analyzing, and exporting data across the entire data lake or subsets thereof. """ from typing import Callable, Dict, Any, Optional import pandas as pd from .query import DataLakeQuery from .logger import setup_logger logger = setup_logger(__name__) class BatchProcessor: """ Batch processing utilities for scalable data lake analysis. Provides high-level patterns for common analysis tasks across multiple devices and messages. """ def __init__(self, query: DataLakeQuery): """ Initialize batch processor. Args: query: DataLakeQuery instance """ self.query = query logger.info("Initialized BatchProcessor") def aggregate_by_device_message( self, aggregation_func: Callable[[pd.DataFrame], Dict[str, Any]], device_filter: Optional[str] = None, message_filter: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: """ Apply aggregation function to each device/message combination. Pattern for scalable analysis across entire data lake. Processes each device/message combination separately to manage memory. Args: aggregation_func: Function (df) -> dict of metrics/statistics device_filter: Device regex filter (applied via catalog) message_filter: Message regex filter Returns: Nested dict: {device_id: {message: aggregation_result}} Example: >>> def compute_stats(df): ... return { ... 'count': len(df), ... 'rpm_mean': df['RPM'].mean() if 'RPM' in df else None ... } >>> results = processor.aggregate_by_device_message(compute_stats) """ results: Dict[str, Dict[str, Any]] = {} # Get filtered device list devices = self.query.catalog.list_devices(device_filter) for device in devices: messages = self.query.catalog.list_messages(device, message_filter) for message in messages: try: # Read data for this device/message df = self.query.read_device_message(device, message) if device not in results: results[device] = {} results[device][message] = aggregation_func(df) except Exception as e: logger.error(f"Aggregation failed for {device}/{message}: {e}") if device not in results: results[device] = {} results[device][message] = {"error": str(e)} logger.info(f"Aggregation completed for {len(results)} devices") return results def export_to_csv( self, device_id: str, message: str, output_path: str, date_range: Optional[tuple[str, str]] = None, limit: Optional[int] = None, ) -> None: """ Export device/message data to CSV. Args: device_id: Device identifier message: Message name output_path: Output CSV file path date_range: Optional (start_date, end_date) tuple limit: Optional row limit Raises: Exception: If export fails """ logger.info(f"Exporting {device_id}/{message} to {output_path}") df = self.query.read_device_message( device_id=device_id, message=message, date_range=date_range, limit=limit, ) if df.empty: logger.warning(f"No data to export for {device_id}/{message}") return df.to_csv(output_path, index=False) logger.info(f"Exported {len(df)} rows to {output_path}") def export_to_parquet( self, device_id: str, message: str, output_path: str, date_range: Optional[tuple[str, str]] = None, ) -> None: """ Export device/message data to Parquet file. Args: device_id: Device identifier message: Message name output_path: Output Parquet file path date_range: Optional (start_date, end_date) tuple Raises: Exception: If export fails """ logger.info(f"Exporting {device_id}/{message} to {output_path}") df = self.query.read_device_message( device_id=device_id, message=message, date_range=date_range, ) if df.empty: logger.warning(f"No data to export for {device_id}/{message}") return df.to_parquet(output_path, index=False, compression='snappy') logger.info(f"Exported {len(df)} rows to {output_path}") def compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: """ Compute basic statistics for aggregation. Args: df: Input DataFrame Returns: Dict with count, mean, min, max, std for numeric columns Note: Skips timestamp column 't' in statistics computation. """ stats: Dict[str, Any] = {"count": len(df)} if df.empty: return stats # Compute statistics for numeric columns (excluding timestamp) numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns numeric_cols = [c for c in numeric_cols if c != 't'] for col in numeric_cols: try: stats[f"{col}_mean"] = float(df[col].mean()) stats[f"{col}_min"] = float(df[col].min()) stats[f"{col}_max"] = float(df[col].max()) stats[f"{col}_std"] = float(df[col].std()) stats[f"{col}_null_count"] = int(df[col].isna().sum()) except Exception as e: logger.warning(f"Failed to compute stats for {col}: {e}") return stats def find_anomalies( self, device_id: str, message: str, signal_name: str, threshold_std: float = 3.0, ) -> pd.DataFrame: """ Find anomalous values in a signal using z-score method. Args: device_id: Device identifier message: Message name signal_name: Signal column name threshold_std: Number of standard deviations for anomaly threshold Returns: DataFrame with anomalous records """ df = self.query.read_device_message( device_id=device_id, message=message, columns=['t', signal_name], ) if df.empty or signal_name not in df.columns: logger.warning(f"No data or signal not found: {signal_name}") return pd.DataFrame() # Compute z-scores mean = df[signal_name].mean() std = df[signal_name].std() if std == 0: logger.warning(f"Zero standard deviation for {signal_name}") return pd.DataFrame() z_scores = (df[signal_name] - mean) / std anomalies = df[abs(z_scores) > threshold_std].copy() logger.info(f"Found {len(anomalies)} anomalies in {signal_name} " f"(threshold: {threshold_std} std)") return anomalies