CSS_EDA_Dashboard / src /README.md
arash7920's picture
Upload 38 files
e869d90 verified
# CANedge Data Lake Python SDK
Production-ready Python package for querying and analyzing CAN/LIN data lakes created from CSS Electronics CANedge MDF4 logs using AWS Athena.
## Features
- **AWS Athena Integration**: Query Parquet data using SQL via Athena
- **CloudFormation Configuration**: Automatic configuration from CloudFormation stack outputs
- **Scalable**: Leverage Athena's distributed query engine for large datasets
- **Type-safe**: Full type hints and docstrings
- **Well-architected**: Clean module design with logging and error handling
## Installation
```bash
# Clone or download project
cd CSS
# Install in development mode
pip install -e .
# Or install from requirements
pip install -r requirements.txt
```
## Prerequisites
1. **AWS Account** with:
- CloudFormation stack named `datalake-stack` (or specify custom name)
- Athena database configured
- S3 bucket with Parquet data
- AWS Glue catalog with table definitions
2. **CloudFormation Stack Outputs**:
Your `datalake-stack` must have the following outputs:
- `DatabaseName`: Athena database name
- `S3OutputLocation`: S3 location for Athena query results (e.g., `s3://bucket/athena-results/`)
- `WorkGroup`: (Optional) Athena workgroup name
- `Region`: (Optional) AWS region
3. **AWS Credentials**:
- AWS CLI configured: `aws configure`
- Or IAM role (for EC2/ECS/Lambda)
- Or environment variables
## Quick Start
### Option 1: Using Explicit Credentials (Recommended for Testing)
```python
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="YOUR_ACCESS_KEY_ID",
secret_access_key="YOUR_SECRET_ACCESS_KEY",
)
# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")
# Query data
df = query.read_device_message(
device_id="device_001",
message="EngineData",
date_range=("2024-01-15", "2024-01-20"),
limit=1000
)
print(f"Loaded {len(df)} records")
```
### Option 2: Using CloudFormation Stack
```python
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery
# Load config from CloudFormation stack
config = DataLakeConfig.from_cloudformation(
stack_name="datalake-stack",
region=None, # Auto-detect from stack or use default
profile=None, # Use default profile or IAM role
)
# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
```
## Configuration
### Option 1: Using Explicit Credentials
For direct access with AWS credentials:
```python
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="YOUR_SECRET_KEY",
)
```
**Parameters:**
- `database_name`: Athena database name
- `workgroup`: Athena workgroup name
- `s3_output_location`: S3 path for query results (must end with `/`)
- `region`: AWS region
- `access_key_id`: AWS access key ID
- `secret_access_key`: AWS secret access key
### Option 2: Using CloudFormation Stack
### CloudFormation Stack Setup
Your CloudFormation stack (`datalake-stack`) should output:
```yaml
Outputs:
DatabaseName:
Description: Athena database name
Value: canedge_datalake
S3OutputLocation:
Description: S3 location for Athena query results
Value: s3://my-bucket/athena-results/
WorkGroup:
Description: Athena workgroup name (optional)
Value: primary
Region:
Description: AWS region
Value: us-east-1
```
### Loading Configuration
```python
from datalake.config import DataLakeConfig
# Load from CloudFormation stack (default: 'datalake-stack')
config = DataLakeConfig.from_cloudformation()
# Or specify custom stack name
config = DataLakeConfig.from_cloudformation(
stack_name="my-custom-stack",
region="us-east-1", # Optional: override region
profile="myprofile", # Optional: use named AWS profile
)
```
## Data Lake Structure
### Athena Database Organization
The data lake is organized in Athena with:
- **Database**: Contains all tables (from CloudFormation output `DatabaseName`)
- **Tables**: Named by device and message (e.g., `device_001_EngineData`)
- **Partitions**: Date-based partitioning for efficient queries
- **Schema**: Each table has columns: `t` (timestamp), signal columns from DBC files
### Table Naming Convention
Tables are typically named:
- `{device_id}_{message_rule}` (e.g., `device_001_EngineData`)
- Or `{device_id}__{message_rule}` (double underscore)
- The catalog automatically detects the pattern
## Usage Patterns
### 1. Explore Data Lake
```python
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
config = DataLakeConfig.from_cloudformation()
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
# List all tables
tables = catalog.list_tables()
print(f"Tables: {tables}")
# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")
# List messages for device
messages = catalog.list_messages("device_001")
print(f"Messages: {messages}")
# Get schema
schema = catalog.get_schema("device_001", "EngineData")
print(f"Columns: {list(schema.keys())}")
# List partitions (dates)
partitions = catalog.list_partitions("device_001", "EngineData")
print(f"Dates: {partitions}")
```
### 2. Query Data
```python
from datalake.query import DataLakeQuery
query = DataLakeQuery(athena, catalog)
# Read all data for device/message
df = query.read_device_message(
device_id="device_001",
message="EngineData",
date_range=("2024-01-15", "2024-01-20"),
columns=["t", "RPM", "Temperature"],
limit=10000
)
print(f"Loaded {len(df)} records")
```
### 3. Time Series Query
```python
# Query single signal over time window
df_ts = query.time_series_query(
device_id="device_001",
message="EngineData",
signal_name="RPM",
start_time=1000000000000000, # microseconds
end_time=2000000000000000,
limit=10000
)
# Convert timestamp and plot
df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')
print(df_ts[['timestamp', 'RPM']].head())
```
### 4. Custom SQL Queries
```python
# Execute custom SQL
# Note: Use path-based filtering for date ranges
# Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet
sql = """
SELECT
COUNT(*) as record_count,
AVG(RPM) as avg_rpm,
MAX(Temperature) as max_temp
FROM canedge_datalake.device_001_EngineData
WHERE try_cast(element_at(split("$path", '/'), -4) AS INTEGER) = 2024
AND try_cast(element_at(split("$path", '/'), -3) AS INTEGER) >= 1
AND try_cast(element_at(split("$path", '/'), -2) AS INTEGER) >= 15
"""
df = query.execute_sql(sql)
print(df)
```
### 5. Aggregation Queries
```python
# Use built-in aggregation method
# For date filtering, use path-based extraction
path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)"
path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)"
path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)"
where_clause = f"{path_year} = 2024 AND {path_month} >= 1 AND {path_day} >= 15"
df_agg = query.aggregate(
device_id="device_001",
message="EngineData",
aggregation="COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm",
where_clause=where_clause
)
print(df_agg)
```
### 6. Batch Processing
```python
from datalake.batch import BatchProcessor
processor = BatchProcessor(query)
# Compute statistics across all data
stats = processor.aggregate_by_device_message(
aggregation_func=processor.compute_statistics,
message_filter="Engine.*"
)
for device, messages in stats.items():
for message, metrics in messages.items():
print(f"{device}/{message}: {metrics['count']} records")
# Export to CSV
processor.export_to_csv(
device_id="device_001",
message="EngineData",
output_path="engine_export.csv",
limit=100000
)
```
## Running Examples
```bash
# Test connection first
python test_connection.py
# Explore data lake structure
python examples/explore_example.py
# Query and analyze data
python examples/query_example.py
# Batch processing
python examples/batch_example.py
```
**Note:** All examples use explicit credentials. Update them with your actual credentials or modify to use CloudFormation stack.
## CloudFormation Stack Requirements
### Required Stack Outputs
1. **DatabaseName** (required)
- Athena database name containing your tables
- Example: `canedge_datalake`
2. **S3OutputLocation** (required)
- S3 bucket/path for Athena query results
- Must end with `/`
- Example: `s3://my-bucket/athena-results/`
- Must have write permissions for Athena
3. **WorkGroup** (optional)
- Athena workgroup name
- If not provided, uses default workgroup
4. **Region** (optional)
- AWS region
- If not provided, uses default region or stack region
### Example CloudFormation Template
```yaml
Resources:
AthenaDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: canedge_datalake
Outputs:
DatabaseName:
Description: Athena database name
Value: canedge_datalake
Export:
Name: !Sub "${AWS::StackName}-DatabaseName"
S3OutputLocation:
Description: S3 location for Athena query results
Value: !Sub "s3://${ResultsBucket}/athena-results/"
Export:
Name: !Sub "${AWS::StackName}-S3OutputLocation"
WorkGroup:
Description: Athena workgroup name
Value: primary
Export:
Name: !Sub "${AWS::StackName}-WorkGroup"
Region:
Description: AWS region
Value: !Ref AWS::Region
Export:
Name: !Sub "${AWS::StackName}-Region"
```
## Performance Notes
- **Athena Query Limits**: Use `limit` parameter to control result size
- **Partition Pruning**: Date-based queries automatically use partition pruning
- **Query Costs**: Athena charges per TB scanned - use column selection and filters
- **Result Caching**: Athena caches query results for 24 hours
- **Concurrent Queries**: Athena supports multiple concurrent queries
## Troubleshooting
**"Stack not found"**
- Verify stack name: `aws cloudformation describe-stacks --stack-name datalake-stack`
- Check AWS credentials and region
- Ensure you have CloudFormation read permissions
**"Required output not found"**
- Verify stack outputs: `aws cloudformation describe-stacks --stack-name datalake-stack --query 'Stacks[0].Outputs'`
- Ensure `DatabaseName` and `S3OutputLocation` outputs exist
**"Query execution failed"**
- Check Athena permissions (Glue catalog access, S3 read permissions)
- Verify table names exist in the database
- Check S3 output location has write permissions
**"Table not found"**
- List tables: `catalog.list_tables()` to see available tables
- Verify table naming convention matches expected pattern
- Check Glue catalog for table definitions
## License
MIT
## References
- [CSS Electronics CANedge Documentation](https://www.csselectronics.com/pages/can-bus-logger-canedge)
- [AWS Athena Documentation](https://docs.aws.amazon.com/athena/)
- [AWS Glue Catalog](https://docs.aws.amazon.com/glue/latest/dg/catalog-and-crawler.html)