# CANedge Data Lake Python SDK Production-ready Python package for querying and analyzing CAN/LIN data lakes created from CSS Electronics CANedge MDF4 logs using AWS Athena. ## Features - **AWS Athena Integration**: Query Parquet data using SQL via Athena - **CloudFormation Configuration**: Automatic configuration from CloudFormation stack outputs - **Scalable**: Leverage Athena's distributed query engine for large datasets - **Type-safe**: Full type hints and docstrings - **Well-architected**: Clean module design with logging and error handling ## Installation ```bash # Clone or download project cd CSS # Install in development mode pip install -e . # Or install from requirements pip install -r requirements.txt ``` ## Prerequisites 1. **AWS Account** with: - CloudFormation stack named `datalake-stack` (or specify custom name) - Athena database configured - S3 bucket with Parquet data - AWS Glue catalog with table definitions 2. **CloudFormation Stack Outputs**: Your `datalake-stack` must have the following outputs: - `DatabaseName`: Athena database name - `S3OutputLocation`: S3 location for Athena query results (e.g., `s3://bucket/athena-results/`) - `WorkGroup`: (Optional) Athena workgroup name - `Region`: (Optional) AWS region 3. **AWS Credentials**: - AWS CLI configured: `aws configure` - Or IAM role (for EC2/ECS/Lambda) - Or environment variables ## Quick Start ### Option 1: Using Explicit Credentials (Recommended for Testing) ```python from datalake.config import DataLakeConfig from datalake.athena import AthenaQuery from datalake.catalog import DataLakeCatalog from datalake.query import DataLakeQuery # Load config with explicit credentials config = DataLakeConfig.from_credentials( database_name="dbparquetdatalake05", workgroup="athenaworkgroup-datalake05", s3_output_location="s3://canedge-raw-data-parquet/athena-results/", region="eu-north-1", access_key_id="YOUR_ACCESS_KEY_ID", secret_access_key="YOUR_SECRET_ACCESS_KEY", ) # Initialize Athena and catalog athena = AthenaQuery(config) catalog = DataLakeCatalog(athena, config) query = DataLakeQuery(athena, catalog) # List devices devices = catalog.list_devices() print(f"Devices: {devices}") # Query data df = query.read_device_message( device_id="device_001", message="EngineData", date_range=("2024-01-15", "2024-01-20"), limit=1000 ) print(f"Loaded {len(df)} records") ``` ### Option 2: Using CloudFormation Stack ```python from datalake.config import DataLakeConfig from datalake.athena import AthenaQuery from datalake.catalog import DataLakeCatalog from datalake.query import DataLakeQuery # Load config from CloudFormation stack config = DataLakeConfig.from_cloudformation( stack_name="datalake-stack", region=None, # Auto-detect from stack or use default profile=None, # Use default profile or IAM role ) # Initialize Athena and catalog athena = AthenaQuery(config) catalog = DataLakeCatalog(athena, config) query = DataLakeQuery(athena, catalog) ``` ## Configuration ### Option 1: Using Explicit Credentials For direct access with AWS credentials: ```python config = DataLakeConfig.from_credentials( database_name="dbparquetdatalake05", workgroup="athenaworkgroup-datalake05", s3_output_location="s3://canedge-raw-data-parquet/athena-results/", region="eu-north-1", access_key_id="AKIARJQJFFVASPMSGNNY", secret_access_key="YOUR_SECRET_KEY", ) ``` **Parameters:** - `database_name`: Athena database name - `workgroup`: Athena workgroup name - `s3_output_location`: S3 path for query results (must end with `/`) - `region`: AWS region - `access_key_id`: AWS access key ID - `secret_access_key`: AWS secret access key ### Option 2: Using CloudFormation Stack ### CloudFormation Stack Setup Your CloudFormation stack (`datalake-stack`) should output: ```yaml Outputs: DatabaseName: Description: Athena database name Value: canedge_datalake S3OutputLocation: Description: S3 location for Athena query results Value: s3://my-bucket/athena-results/ WorkGroup: Description: Athena workgroup name (optional) Value: primary Region: Description: AWS region Value: us-east-1 ``` ### Loading Configuration ```python from datalake.config import DataLakeConfig # Load from CloudFormation stack (default: 'datalake-stack') config = DataLakeConfig.from_cloudformation() # Or specify custom stack name config = DataLakeConfig.from_cloudformation( stack_name="my-custom-stack", region="us-east-1", # Optional: override region profile="myprofile", # Optional: use named AWS profile ) ``` ## Data Lake Structure ### Athena Database Organization The data lake is organized in Athena with: - **Database**: Contains all tables (from CloudFormation output `DatabaseName`) - **Tables**: Named by device and message (e.g., `device_001_EngineData`) - **Partitions**: Date-based partitioning for efficient queries - **Schema**: Each table has columns: `t` (timestamp), signal columns from DBC files ### Table Naming Convention Tables are typically named: - `{device_id}_{message_rule}` (e.g., `device_001_EngineData`) - Or `{device_id}__{message_rule}` (double underscore) - The catalog automatically detects the pattern ## Usage Patterns ### 1. Explore Data Lake ```python from datalake.config import DataLakeConfig from datalake.athena import AthenaQuery from datalake.catalog import DataLakeCatalog config = DataLakeConfig.from_cloudformation() athena = AthenaQuery(config) catalog = DataLakeCatalog(athena, config) # List all tables tables = catalog.list_tables() print(f"Tables: {tables}") # List devices devices = catalog.list_devices() print(f"Devices: {devices}") # List messages for device messages = catalog.list_messages("device_001") print(f"Messages: {messages}") # Get schema schema = catalog.get_schema("device_001", "EngineData") print(f"Columns: {list(schema.keys())}") # List partitions (dates) partitions = catalog.list_partitions("device_001", "EngineData") print(f"Dates: {partitions}") ``` ### 2. Query Data ```python from datalake.query import DataLakeQuery query = DataLakeQuery(athena, catalog) # Read all data for device/message df = query.read_device_message( device_id="device_001", message="EngineData", date_range=("2024-01-15", "2024-01-20"), columns=["t", "RPM", "Temperature"], limit=10000 ) print(f"Loaded {len(df)} records") ``` ### 3. Time Series Query ```python # Query single signal over time window df_ts = query.time_series_query( device_id="device_001", message="EngineData", signal_name="RPM", start_time=1000000000000000, # microseconds end_time=2000000000000000, limit=10000 ) # Convert timestamp and plot df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us') print(df_ts[['timestamp', 'RPM']].head()) ``` ### 4. Custom SQL Queries ```python # Execute custom SQL # Note: Use path-based filtering for date ranges # Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet sql = """ SELECT COUNT(*) as record_count, AVG(RPM) as avg_rpm, MAX(Temperature) as max_temp FROM canedge_datalake.device_001_EngineData WHERE try_cast(element_at(split("$path", '/'), -4) AS INTEGER) = 2024 AND try_cast(element_at(split("$path", '/'), -3) AS INTEGER) >= 1 AND try_cast(element_at(split("$path", '/'), -2) AS INTEGER) >= 15 """ df = query.execute_sql(sql) print(df) ``` ### 5. Aggregation Queries ```python # Use built-in aggregation method # For date filtering, use path-based extraction path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)" path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)" path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)" where_clause = f"{path_year} = 2024 AND {path_month} >= 1 AND {path_day} >= 15" df_agg = query.aggregate( device_id="device_001", message="EngineData", aggregation="COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm", where_clause=where_clause ) print(df_agg) ``` ### 6. Batch Processing ```python from datalake.batch import BatchProcessor processor = BatchProcessor(query) # Compute statistics across all data stats = processor.aggregate_by_device_message( aggregation_func=processor.compute_statistics, message_filter="Engine.*" ) for device, messages in stats.items(): for message, metrics in messages.items(): print(f"{device}/{message}: {metrics['count']} records") # Export to CSV processor.export_to_csv( device_id="device_001", message="EngineData", output_path="engine_export.csv", limit=100000 ) ``` ## Running Examples ```bash # Test connection first python test_connection.py # Explore data lake structure python examples/explore_example.py # Query and analyze data python examples/query_example.py # Batch processing python examples/batch_example.py ``` **Note:** All examples use explicit credentials. Update them with your actual credentials or modify to use CloudFormation stack. ## CloudFormation Stack Requirements ### Required Stack Outputs 1. **DatabaseName** (required) - Athena database name containing your tables - Example: `canedge_datalake` 2. **S3OutputLocation** (required) - S3 bucket/path for Athena query results - Must end with `/` - Example: `s3://my-bucket/athena-results/` - Must have write permissions for Athena 3. **WorkGroup** (optional) - Athena workgroup name - If not provided, uses default workgroup 4. **Region** (optional) - AWS region - If not provided, uses default region or stack region ### Example CloudFormation Template ```yaml Resources: AthenaDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: canedge_datalake Outputs: DatabaseName: Description: Athena database name Value: canedge_datalake Export: Name: !Sub "${AWS::StackName}-DatabaseName" S3OutputLocation: Description: S3 location for Athena query results Value: !Sub "s3://${ResultsBucket}/athena-results/" Export: Name: !Sub "${AWS::StackName}-S3OutputLocation" WorkGroup: Description: Athena workgroup name Value: primary Export: Name: !Sub "${AWS::StackName}-WorkGroup" Region: Description: AWS region Value: !Ref AWS::Region Export: Name: !Sub "${AWS::StackName}-Region" ``` ## Performance Notes - **Athena Query Limits**: Use `limit` parameter to control result size - **Partition Pruning**: Date-based queries automatically use partition pruning - **Query Costs**: Athena charges per TB scanned - use column selection and filters - **Result Caching**: Athena caches query results for 24 hours - **Concurrent Queries**: Athena supports multiple concurrent queries ## Troubleshooting **"Stack not found"** - Verify stack name: `aws cloudformation describe-stacks --stack-name datalake-stack` - Check AWS credentials and region - Ensure you have CloudFormation read permissions **"Required output not found"** - Verify stack outputs: `aws cloudformation describe-stacks --stack-name datalake-stack --query 'Stacks[0].Outputs'` - Ensure `DatabaseName` and `S3OutputLocation` outputs exist **"Query execution failed"** - Check Athena permissions (Glue catalog access, S3 read permissions) - Verify table names exist in the database - Check S3 output location has write permissions **"Table not found"** - List tables: `catalog.list_tables()` to see available tables - Verify table naming convention matches expected pattern - Check Glue catalog for table definitions ## License MIT ## References - [CSS Electronics CANedge Documentation](https://www.csselectronics.com/pages/can-bus-logger-canedge) - [AWS Athena Documentation](https://docs.aws.amazon.com/athena/) - [AWS Glue Catalog](https://docs.aws.amazon.com/glue/latest/dg/catalog-and-crawler.html)