Spaces:
Sleeping
Sleeping
| # CANedge Data Lake Python SDK | |
| Production-ready Python package for querying and analyzing CAN/LIN data lakes created from CSS Electronics CANedge MDF4 logs using AWS Athena. | |
| ## Features | |
| - **AWS Athena Integration**: Query Parquet data using SQL via Athena | |
| - **CloudFormation Configuration**: Automatic configuration from CloudFormation stack outputs | |
| - **Scalable**: Leverage Athena's distributed query engine for large datasets | |
| - **Type-safe**: Full type hints and docstrings | |
| - **Well-architected**: Clean module design with logging and error handling | |
| ## Installation | |
| ```bash | |
| # Clone or download project | |
| cd CSS | |
| # Install in development mode | |
| pip install -e . | |
| # Or install from requirements | |
| pip install -r requirements.txt | |
| ``` | |
| ## Prerequisites | |
| 1. **AWS Account** with: | |
| - CloudFormation stack named `datalake-stack` (or specify custom name) | |
| - Athena database configured | |
| - S3 bucket with Parquet data | |
| - AWS Glue catalog with table definitions | |
| 2. **CloudFormation Stack Outputs**: | |
| Your `datalake-stack` must have the following outputs: | |
| - `DatabaseName`: Athena database name | |
| - `S3OutputLocation`: S3 location for Athena query results (e.g., `s3://bucket/athena-results/`) | |
| - `WorkGroup`: (Optional) Athena workgroup name | |
| - `Region`: (Optional) AWS region | |
| 3. **AWS Credentials**: | |
| - AWS CLI configured: `aws configure` | |
| - Or IAM role (for EC2/ECS/Lambda) | |
| - Or environment variables | |
| ## Quick Start | |
| ### Option 1: Using Explicit Credentials (Recommended for Testing) | |
| ```python | |
| from datalake.config import DataLakeConfig | |
| from datalake.athena import AthenaQuery | |
| from datalake.catalog import DataLakeCatalog | |
| from datalake.query import DataLakeQuery | |
| # Load config with explicit credentials | |
| config = DataLakeConfig.from_credentials( | |
| database_name="dbparquetdatalake05", | |
| workgroup="athenaworkgroup-datalake05", | |
| s3_output_location="s3://canedge-raw-data-parquet/athena-results/", | |
| region="eu-north-1", | |
| access_key_id="YOUR_ACCESS_KEY_ID", | |
| secret_access_key="YOUR_SECRET_ACCESS_KEY", | |
| ) | |
| # Initialize Athena and catalog | |
| athena = AthenaQuery(config) | |
| catalog = DataLakeCatalog(athena, config) | |
| query = DataLakeQuery(athena, catalog) | |
| # List devices | |
| devices = catalog.list_devices() | |
| print(f"Devices: {devices}") | |
| # Query data | |
| df = query.read_device_message( | |
| device_id="device_001", | |
| message="EngineData", | |
| date_range=("2024-01-15", "2024-01-20"), | |
| limit=1000 | |
| ) | |
| print(f"Loaded {len(df)} records") | |
| ``` | |
| ### Option 2: Using CloudFormation Stack | |
| ```python | |
| from datalake.config import DataLakeConfig | |
| from datalake.athena import AthenaQuery | |
| from datalake.catalog import DataLakeCatalog | |
| from datalake.query import DataLakeQuery | |
| # Load config from CloudFormation stack | |
| config = DataLakeConfig.from_cloudformation( | |
| stack_name="datalake-stack", | |
| region=None, # Auto-detect from stack or use default | |
| profile=None, # Use default profile or IAM role | |
| ) | |
| # Initialize Athena and catalog | |
| athena = AthenaQuery(config) | |
| catalog = DataLakeCatalog(athena, config) | |
| query = DataLakeQuery(athena, catalog) | |
| ``` | |
| ## Configuration | |
| ### Option 1: Using Explicit Credentials | |
| For direct access with AWS credentials: | |
| ```python | |
| config = DataLakeConfig.from_credentials( | |
| database_name="dbparquetdatalake05", | |
| workgroup="athenaworkgroup-datalake05", | |
| s3_output_location="s3://canedge-raw-data-parquet/athena-results/", | |
| region="eu-north-1", | |
| access_key_id="AKIARJQJFFVASPMSGNNY", | |
| secret_access_key="YOUR_SECRET_KEY", | |
| ) | |
| ``` | |
| **Parameters:** | |
| - `database_name`: Athena database name | |
| - `workgroup`: Athena workgroup name | |
| - `s3_output_location`: S3 path for query results (must end with `/`) | |
| - `region`: AWS region | |
| - `access_key_id`: AWS access key ID | |
| - `secret_access_key`: AWS secret access key | |
| ### Option 2: Using CloudFormation Stack | |
| ### CloudFormation Stack Setup | |
| Your CloudFormation stack (`datalake-stack`) should output: | |
| ```yaml | |
| Outputs: | |
| DatabaseName: | |
| Description: Athena database name | |
| Value: canedge_datalake | |
| S3OutputLocation: | |
| Description: S3 location for Athena query results | |
| Value: s3://my-bucket/athena-results/ | |
| WorkGroup: | |
| Description: Athena workgroup name (optional) | |
| Value: primary | |
| Region: | |
| Description: AWS region | |
| Value: us-east-1 | |
| ``` | |
| ### Loading Configuration | |
| ```python | |
| from datalake.config import DataLakeConfig | |
| # Load from CloudFormation stack (default: 'datalake-stack') | |
| config = DataLakeConfig.from_cloudformation() | |
| # Or specify custom stack name | |
| config = DataLakeConfig.from_cloudformation( | |
| stack_name="my-custom-stack", | |
| region="us-east-1", # Optional: override region | |
| profile="myprofile", # Optional: use named AWS profile | |
| ) | |
| ``` | |
| ## Data Lake Structure | |
| ### Athena Database Organization | |
| The data lake is organized in Athena with: | |
| - **Database**: Contains all tables (from CloudFormation output `DatabaseName`) | |
| - **Tables**: Named by device and message (e.g., `device_001_EngineData`) | |
| - **Partitions**: Date-based partitioning for efficient queries | |
| - **Schema**: Each table has columns: `t` (timestamp), signal columns from DBC files | |
| ### Table Naming Convention | |
| Tables are typically named: | |
| - `{device_id}_{message_rule}` (e.g., `device_001_EngineData`) | |
| - Or `{device_id}__{message_rule}` (double underscore) | |
| - The catalog automatically detects the pattern | |
| ## Usage Patterns | |
| ### 1. Explore Data Lake | |
| ```python | |
| from datalake.config import DataLakeConfig | |
| from datalake.athena import AthenaQuery | |
| from datalake.catalog import DataLakeCatalog | |
| config = DataLakeConfig.from_cloudformation() | |
| athena = AthenaQuery(config) | |
| catalog = DataLakeCatalog(athena, config) | |
| # List all tables | |
| tables = catalog.list_tables() | |
| print(f"Tables: {tables}") | |
| # List devices | |
| devices = catalog.list_devices() | |
| print(f"Devices: {devices}") | |
| # List messages for device | |
| messages = catalog.list_messages("device_001") | |
| print(f"Messages: {messages}") | |
| # Get schema | |
| schema = catalog.get_schema("device_001", "EngineData") | |
| print(f"Columns: {list(schema.keys())}") | |
| # List partitions (dates) | |
| partitions = catalog.list_partitions("device_001", "EngineData") | |
| print(f"Dates: {partitions}") | |
| ``` | |
| ### 2. Query Data | |
| ```python | |
| from datalake.query import DataLakeQuery | |
| query = DataLakeQuery(athena, catalog) | |
| # Read all data for device/message | |
| df = query.read_device_message( | |
| device_id="device_001", | |
| message="EngineData", | |
| date_range=("2024-01-15", "2024-01-20"), | |
| columns=["t", "RPM", "Temperature"], | |
| limit=10000 | |
| ) | |
| print(f"Loaded {len(df)} records") | |
| ``` | |
| ### 3. Time Series Query | |
| ```python | |
| # Query single signal over time window | |
| df_ts = query.time_series_query( | |
| device_id="device_001", | |
| message="EngineData", | |
| signal_name="RPM", | |
| start_time=1000000000000000, # microseconds | |
| end_time=2000000000000000, | |
| limit=10000 | |
| ) | |
| # Convert timestamp and plot | |
| df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us') | |
| print(df_ts[['timestamp', 'RPM']].head()) | |
| ``` | |
| ### 4. Custom SQL Queries | |
| ```python | |
| # Execute custom SQL | |
| # Note: Use path-based filtering for date ranges | |
| # Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet | |
| sql = """ | |
| SELECT | |
| COUNT(*) as record_count, | |
| AVG(RPM) as avg_rpm, | |
| MAX(Temperature) as max_temp | |
| FROM canedge_datalake.device_001_EngineData | |
| WHERE try_cast(element_at(split("$path", '/'), -4) AS INTEGER) = 2024 | |
| AND try_cast(element_at(split("$path", '/'), -3) AS INTEGER) >= 1 | |
| AND try_cast(element_at(split("$path", '/'), -2) AS INTEGER) >= 15 | |
| """ | |
| df = query.execute_sql(sql) | |
| print(df) | |
| ``` | |
| ### 5. Aggregation Queries | |
| ```python | |
| # Use built-in aggregation method | |
| # For date filtering, use path-based extraction | |
| path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)" | |
| path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)" | |
| path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)" | |
| where_clause = f"{path_year} = 2024 AND {path_month} >= 1 AND {path_day} >= 15" | |
| df_agg = query.aggregate( | |
| device_id="device_001", | |
| message="EngineData", | |
| aggregation="COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm", | |
| where_clause=where_clause | |
| ) | |
| print(df_agg) | |
| ``` | |
| ### 6. Batch Processing | |
| ```python | |
| from datalake.batch import BatchProcessor | |
| processor = BatchProcessor(query) | |
| # Compute statistics across all data | |
| stats = processor.aggregate_by_device_message( | |
| aggregation_func=processor.compute_statistics, | |
| message_filter="Engine.*" | |
| ) | |
| for device, messages in stats.items(): | |
| for message, metrics in messages.items(): | |
| print(f"{device}/{message}: {metrics['count']} records") | |
| # Export to CSV | |
| processor.export_to_csv( | |
| device_id="device_001", | |
| message="EngineData", | |
| output_path="engine_export.csv", | |
| limit=100000 | |
| ) | |
| ``` | |
| ## Running Examples | |
| ```bash | |
| # Test connection first | |
| python test_connection.py | |
| # Explore data lake structure | |
| python examples/explore_example.py | |
| # Query and analyze data | |
| python examples/query_example.py | |
| # Batch processing | |
| python examples/batch_example.py | |
| ``` | |
| **Note:** All examples use explicit credentials. Update them with your actual credentials or modify to use CloudFormation stack. | |
| ## CloudFormation Stack Requirements | |
| ### Required Stack Outputs | |
| 1. **DatabaseName** (required) | |
| - Athena database name containing your tables | |
| - Example: `canedge_datalake` | |
| 2. **S3OutputLocation** (required) | |
| - S3 bucket/path for Athena query results | |
| - Must end with `/` | |
| - Example: `s3://my-bucket/athena-results/` | |
| - Must have write permissions for Athena | |
| 3. **WorkGroup** (optional) | |
| - Athena workgroup name | |
| - If not provided, uses default workgroup | |
| 4. **Region** (optional) | |
| - AWS region | |
| - If not provided, uses default region or stack region | |
| ### Example CloudFormation Template | |
| ```yaml | |
| Resources: | |
| AthenaDatabase: | |
| Type: AWS::Glue::Database | |
| Properties: | |
| CatalogId: !Ref AWS::AccountId | |
| DatabaseInput: | |
| Name: canedge_datalake | |
| Outputs: | |
| DatabaseName: | |
| Description: Athena database name | |
| Value: canedge_datalake | |
| Export: | |
| Name: !Sub "${AWS::StackName}-DatabaseName" | |
| S3OutputLocation: | |
| Description: S3 location for Athena query results | |
| Value: !Sub "s3://${ResultsBucket}/athena-results/" | |
| Export: | |
| Name: !Sub "${AWS::StackName}-S3OutputLocation" | |
| WorkGroup: | |
| Description: Athena workgroup name | |
| Value: primary | |
| Export: | |
| Name: !Sub "${AWS::StackName}-WorkGroup" | |
| Region: | |
| Description: AWS region | |
| Value: !Ref AWS::Region | |
| Export: | |
| Name: !Sub "${AWS::StackName}-Region" | |
| ``` | |
| ## Performance Notes | |
| - **Athena Query Limits**: Use `limit` parameter to control result size | |
| - **Partition Pruning**: Date-based queries automatically use partition pruning | |
| - **Query Costs**: Athena charges per TB scanned - use column selection and filters | |
| - **Result Caching**: Athena caches query results for 24 hours | |
| - **Concurrent Queries**: Athena supports multiple concurrent queries | |
| ## Troubleshooting | |
| **"Stack not found"** | |
| - Verify stack name: `aws cloudformation describe-stacks --stack-name datalake-stack` | |
| - Check AWS credentials and region | |
| - Ensure you have CloudFormation read permissions | |
| **"Required output not found"** | |
| - Verify stack outputs: `aws cloudformation describe-stacks --stack-name datalake-stack --query 'Stacks[0].Outputs'` | |
| - Ensure `DatabaseName` and `S3OutputLocation` outputs exist | |
| **"Query execution failed"** | |
| - Check Athena permissions (Glue catalog access, S3 read permissions) | |
| - Verify table names exist in the database | |
| - Check S3 output location has write permissions | |
| **"Table not found"** | |
| - List tables: `catalog.list_tables()` to see available tables | |
| - Verify table naming convention matches expected pattern | |
| - Check Glue catalog for table definitions | |
| ## License | |
| MIT | |
| ## References | |
| - [CSS Electronics CANedge Documentation](https://www.csselectronics.com/pages/can-bus-logger-canedge) | |
| - [AWS Athena Documentation](https://docs.aws.amazon.com/athena/) | |
| - [AWS Glue Catalog](https://docs.aws.amazon.com/glue/latest/dg/catalog-and-crawler.html) | |