Spaces:
Sleeping
Sleeping
| """ | |
| Batch processing utilities for scalable data lake analysis. | |
| Provides patterns for aggregating, analyzing, and exporting data across | |
| the entire data lake or subsets thereof. | |
| """ | |
| from typing import Callable, Dict, Any, Optional | |
| import pandas as pd | |
| from .query import DataLakeQuery | |
| from .logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class BatchProcessor: | |
| """ | |
| Batch processing utilities for scalable data lake analysis. | |
| Provides high-level patterns for common analysis tasks across | |
| multiple devices and messages. | |
| """ | |
| def __init__(self, query: DataLakeQuery): | |
| """ | |
| Initialize batch processor. | |
| Args: | |
| query: DataLakeQuery instance | |
| """ | |
| self.query = query | |
| logger.info("Initialized BatchProcessor") | |
| def aggregate_by_device_message( | |
| self, | |
| aggregation_func: Callable[[pd.DataFrame], Dict[str, Any]], | |
| device_filter: Optional[str] = None, | |
| message_filter: Optional[str] = None, | |
| ) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Apply aggregation function to each device/message combination. | |
| Pattern for scalable analysis across entire data lake. Processes | |
| each device/message combination separately to manage memory. | |
| Args: | |
| aggregation_func: Function (df) -> dict of metrics/statistics | |
| device_filter: Device regex filter (applied via catalog) | |
| message_filter: Message regex filter | |
| Returns: | |
| Nested dict: {device_id: {message: aggregation_result}} | |
| Example: | |
| >>> def compute_stats(df): | |
| ... return { | |
| ... 'count': len(df), | |
| ... 'rpm_mean': df['RPM'].mean() if 'RPM' in df else None | |
| ... } | |
| >>> results = processor.aggregate_by_device_message(compute_stats) | |
| """ | |
| results: Dict[str, Dict[str, Any]] = {} | |
| # Get filtered device list | |
| devices = self.query.catalog.list_devices(device_filter) | |
| for device in devices: | |
| messages = self.query.catalog.list_messages(device, message_filter) | |
| for message in messages: | |
| try: | |
| # Read data for this device/message | |
| df = self.query.read_device_message(device, message) | |
| if device not in results: | |
| results[device] = {} | |
| results[device][message] = aggregation_func(df) | |
| except Exception as e: | |
| logger.error(f"Aggregation failed for {device}/{message}: {e}") | |
| if device not in results: | |
| results[device] = {} | |
| results[device][message] = {"error": str(e)} | |
| logger.info(f"Aggregation completed for {len(results)} devices") | |
| return results | |
| def export_to_csv( | |
| self, | |
| device_id: str, | |
| message: str, | |
| output_path: str, | |
| date_range: Optional[tuple[str, str]] = None, | |
| limit: Optional[int] = None, | |
| ) -> None: | |
| """ | |
| Export device/message data to CSV. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| output_path: Output CSV file path | |
| date_range: Optional (start_date, end_date) tuple | |
| limit: Optional row limit | |
| Raises: | |
| Exception: If export fails | |
| """ | |
| logger.info(f"Exporting {device_id}/{message} to {output_path}") | |
| df = self.query.read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| date_range=date_range, | |
| limit=limit, | |
| ) | |
| if df.empty: | |
| logger.warning(f"No data to export for {device_id}/{message}") | |
| return | |
| df.to_csv(output_path, index=False) | |
| logger.info(f"Exported {len(df)} rows to {output_path}") | |
| def export_to_parquet( | |
| self, | |
| device_id: str, | |
| message: str, | |
| output_path: str, | |
| date_range: Optional[tuple[str, str]] = None, | |
| ) -> None: | |
| """ | |
| Export device/message data to Parquet file. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| output_path: Output Parquet file path | |
| date_range: Optional (start_date, end_date) tuple | |
| Raises: | |
| Exception: If export fails | |
| """ | |
| logger.info(f"Exporting {device_id}/{message} to {output_path}") | |
| df = self.query.read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| date_range=date_range, | |
| ) | |
| if df.empty: | |
| logger.warning(f"No data to export for {device_id}/{message}") | |
| return | |
| df.to_parquet(output_path, index=False, compression='snappy') | |
| logger.info(f"Exported {len(df)} rows to {output_path}") | |
| def compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Compute basic statistics for aggregation. | |
| Args: | |
| df: Input DataFrame | |
| Returns: | |
| Dict with count, mean, min, max, std for numeric columns | |
| Note: | |
| Skips timestamp column 't' in statistics computation. | |
| """ | |
| stats: Dict[str, Any] = {"count": len(df)} | |
| if df.empty: | |
| return stats | |
| # Compute statistics for numeric columns (excluding timestamp) | |
| numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns | |
| numeric_cols = [c for c in numeric_cols if c != 't'] | |
| for col in numeric_cols: | |
| try: | |
| stats[f"{col}_mean"] = float(df[col].mean()) | |
| stats[f"{col}_min"] = float(df[col].min()) | |
| stats[f"{col}_max"] = float(df[col].max()) | |
| stats[f"{col}_std"] = float(df[col].std()) | |
| stats[f"{col}_null_count"] = int(df[col].isna().sum()) | |
| except Exception as e: | |
| logger.warning(f"Failed to compute stats for {col}: {e}") | |
| return stats | |
| def find_anomalies( | |
| self, | |
| device_id: str, | |
| message: str, | |
| signal_name: str, | |
| threshold_std: float = 3.0, | |
| ) -> pd.DataFrame: | |
| """ | |
| Find anomalous values in a signal using z-score method. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| signal_name: Signal column name | |
| threshold_std: Number of standard deviations for anomaly threshold | |
| Returns: | |
| DataFrame with anomalous records | |
| """ | |
| df = self.query.read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| columns=['t', signal_name], | |
| ) | |
| if df.empty or signal_name not in df.columns: | |
| logger.warning(f"No data or signal not found: {signal_name}") | |
| return pd.DataFrame() | |
| # Compute z-scores | |
| mean = df[signal_name].mean() | |
| std = df[signal_name].std() | |
| if std == 0: | |
| logger.warning(f"Zero standard deviation for {signal_name}") | |
| return pd.DataFrame() | |
| z_scores = (df[signal_name] - mean) / std | |
| anomalies = df[abs(z_scores) > threshold_std].copy() | |
| logger.info(f"Found {len(anomalies)} anomalies in {signal_name} " | |
| f"(threshold: {threshold_std} std)") | |
| return anomalies | |