Spaces:
Sleeping
Sleeping
| """ | |
| Configuration management for data lake access. | |
| Supports AWS Athena-based data lakes with configuration from | |
| CloudFormation stack outputs. | |
| """ | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Optional | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| class DataLakeConfig: | |
| """ | |
| Data lake configuration for AWS Athena-based data lakes. | |
| Configuration can be loaded from CloudFormation stack outputs or | |
| created directly with credentials. | |
| Attributes: | |
| stack_name: CloudFormation stack name (default: 'datalake-stack') | |
| database_name: Athena database name | |
| workgroup: Athena workgroup name (optional) | |
| s3_output_location: S3 location for query results (must end with /) | |
| region: AWS region | |
| profile: AWS profile name for credentials (optional) | |
| access_key_id: AWS access key ID (optional, for explicit credentials) | |
| secret_access_key: AWS secret access key (optional, for explicit credentials) | |
| device_filter: Optional device ID filter (e.g., 'device_001') | |
| message_filter: Optional message/rule filter (e.g., 'CAN_Message_001') | |
| cache_enabled: Enable schema caching | |
| """ | |
| stack_name: str = "datalake-stack" | |
| database_name: Optional[str] = None | |
| workgroup: Optional[str] = None | |
| s3_output_location: Optional[str] = None | |
| region: str = "us-east-1" | |
| profile: Optional[str] = None | |
| access_key_id: Optional[str] = None | |
| secret_access_key: Optional[str] = None | |
| device_filter: Optional[str] = None | |
| message_filter: Optional[str] = None | |
| cache_enabled: bool = True | |
| def from_cloudformation( | |
| cls, | |
| stack_name: str = "datalake-stack", | |
| region: Optional[str] = None, | |
| profile: Optional[str] = None, | |
| ) -> "DataLakeConfig": | |
| """ | |
| Load config from CloudFormation stack outputs. | |
| Args: | |
| stack_name: CloudFormation stack name (default: 'datalake-stack') | |
| region: AWS region (if None, will try to get from stack or use default) | |
| profile: AWS profile name for credentials (optional) | |
| Returns: | |
| DataLakeConfig instance with values from stack outputs | |
| Raises: | |
| ClientError: If stack doesn't exist or can't be accessed | |
| KeyError: If required stack outputs are missing | |
| Expected CloudFormation stack outputs: | |
| - DatabaseName: Athena database name (required) | |
| - WorkGroup: Athena workgroup name (optional) | |
| - S3OutputLocation: S3 location for Athena query results (required) | |
| - Region: AWS region (optional, will use provided region or default) | |
| """ | |
| session = boto3.Session(profile_name=profile) | |
| if region: | |
| cf_client = session.client('cloudformation', region_name=region) | |
| else: | |
| # Try to get region from default config | |
| try: | |
| region = session.region_name or "us-east-1" | |
| except: | |
| region = "us-east-1" | |
| cf_client = session.client('cloudformation', region_name=region) | |
| try: | |
| response = cf_client.describe_stacks(StackName=stack_name) | |
| except ClientError as e: | |
| raise ClientError( | |
| { | |
| 'Error': { | |
| 'Code': 'StackNotFound', | |
| 'Message': f"CloudFormation stack '{stack_name}' not found. " | |
| f"Make sure the stack exists and you have permissions to access it." | |
| } | |
| }, | |
| 'DescribeStacks' | |
| ) from e | |
| if not response['Stacks']: | |
| raise ValueError(f"Stack '{stack_name}' not found") | |
| stack = response['Stacks'][0] | |
| outputs = {output['OutputKey']: output['OutputValue'] | |
| for output in stack.get('Outputs', [])} | |
| # Get region from stack or use provided/default | |
| if not region: | |
| region = outputs.get('Region', session.region_name or "us-east-1") | |
| # Required outputs | |
| database_name = outputs.get('DatabaseName') | |
| if not database_name: | |
| raise KeyError( | |
| f"Required output 'DatabaseName' not found in stack '{stack_name}'. " | |
| f"Available outputs: {list(outputs.keys())}" | |
| ) | |
| s3_output_location = outputs.get('S3OutputLocation') | |
| if not s3_output_location: | |
| raise KeyError( | |
| f"Required output 'S3OutputLocation' not found in stack '{stack_name}'. " | |
| f"Available outputs: {list(outputs.keys())}" | |
| ) | |
| # Optional outputs | |
| workgroup = outputs.get('WorkGroup') | |
| return cls( | |
| stack_name=stack_name, | |
| database_name=database_name, | |
| workgroup=workgroup, | |
| s3_output_location=s3_output_location, | |
| region=region, | |
| profile=profile, | |
| ) | |
| def from_credentials( | |
| cls, | |
| database_name: str, | |
| workgroup: str, | |
| s3_output_location: str, | |
| region: str, | |
| access_key_id: str, | |
| secret_access_key: str, | |
| ) -> "DataLakeConfig": | |
| """ | |
| Create config directly with AWS credentials. | |
| Args: | |
| database_name: Athena database name | |
| workgroup: Athena workgroup name | |
| s3_output_location: S3 location for query results (must end with /) | |
| region: AWS region | |
| access_key_id: AWS access key ID | |
| secret_access_key: AWS secret access key | |
| Returns: | |
| DataLakeConfig instance | |
| """ | |
| # Ensure S3 output location ends with / | |
| if s3_output_location and not s3_output_location.endswith('/'): | |
| s3_output_location = s3_output_location + '/' | |
| return cls( | |
| database_name=database_name, | |
| workgroup=workgroup, | |
| s3_output_location=s3_output_location, | |
| region=region, | |
| access_key_id=access_key_id, | |
| secret_access_key=secret_access_key, | |
| ) | |
| def get_boto3_session(self) -> boto3.Session: | |
| """ | |
| Get boto3 session with configured credentials, profile, and region. | |
| Returns: | |
| boto3.Session instance | |
| """ | |
| if self.access_key_id and self.secret_access_key: | |
| # Use explicit credentials | |
| return boto3.Session( | |
| aws_access_key_id=self.access_key_id, | |
| aws_secret_access_key=self.secret_access_key, | |
| region_name=self.region, | |
| ) | |
| else: | |
| # Use profile or IAM role | |
| return boto3.Session(profile_name=self.profile, region_name=self.region) | |