arash7920's picture
Upload 38 files
e869d90 verified
"""
Configuration management for data lake access.
Supports AWS Athena-based data lakes with configuration from
CloudFormation stack outputs.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import boto3
from botocore.exceptions import ClientError
@dataclass
class DataLakeConfig:
"""
Data lake configuration for AWS Athena-based data lakes.
Configuration can be loaded from CloudFormation stack outputs or
created directly with credentials.
Attributes:
stack_name: CloudFormation stack name (default: 'datalake-stack')
database_name: Athena database name
workgroup: Athena workgroup name (optional)
s3_output_location: S3 location for query results (must end with /)
region: AWS region
profile: AWS profile name for credentials (optional)
access_key_id: AWS access key ID (optional, for explicit credentials)
secret_access_key: AWS secret access key (optional, for explicit credentials)
device_filter: Optional device ID filter (e.g., 'device_001')
message_filter: Optional message/rule filter (e.g., 'CAN_Message_001')
cache_enabled: Enable schema caching
"""
stack_name: str = "datalake-stack"
database_name: Optional[str] = None
workgroup: Optional[str] = None
s3_output_location: Optional[str] = None
region: str = "us-east-1"
profile: Optional[str] = None
access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None
device_filter: Optional[str] = None
message_filter: Optional[str] = None
cache_enabled: bool = True
@classmethod
def from_cloudformation(
cls,
stack_name: str = "datalake-stack",
region: Optional[str] = None,
profile: Optional[str] = None,
) -> "DataLakeConfig":
"""
Load config from CloudFormation stack outputs.
Args:
stack_name: CloudFormation stack name (default: 'datalake-stack')
region: AWS region (if None, will try to get from stack or use default)
profile: AWS profile name for credentials (optional)
Returns:
DataLakeConfig instance with values from stack outputs
Raises:
ClientError: If stack doesn't exist or can't be accessed
KeyError: If required stack outputs are missing
Expected CloudFormation stack outputs:
- DatabaseName: Athena database name (required)
- WorkGroup: Athena workgroup name (optional)
- S3OutputLocation: S3 location for Athena query results (required)
- Region: AWS region (optional, will use provided region or default)
"""
session = boto3.Session(profile_name=profile)
if region:
cf_client = session.client('cloudformation', region_name=region)
else:
# Try to get region from default config
try:
region = session.region_name or "us-east-1"
except:
region = "us-east-1"
cf_client = session.client('cloudformation', region_name=region)
try:
response = cf_client.describe_stacks(StackName=stack_name)
except ClientError as e:
raise ClientError(
{
'Error': {
'Code': 'StackNotFound',
'Message': f"CloudFormation stack '{stack_name}' not found. "
f"Make sure the stack exists and you have permissions to access it."
}
},
'DescribeStacks'
) from e
if not response['Stacks']:
raise ValueError(f"Stack '{stack_name}' not found")
stack = response['Stacks'][0]
outputs = {output['OutputKey']: output['OutputValue']
for output in stack.get('Outputs', [])}
# Get region from stack or use provided/default
if not region:
region = outputs.get('Region', session.region_name or "us-east-1")
# Required outputs
database_name = outputs.get('DatabaseName')
if not database_name:
raise KeyError(
f"Required output 'DatabaseName' not found in stack '{stack_name}'. "
f"Available outputs: {list(outputs.keys())}"
)
s3_output_location = outputs.get('S3OutputLocation')
if not s3_output_location:
raise KeyError(
f"Required output 'S3OutputLocation' not found in stack '{stack_name}'. "
f"Available outputs: {list(outputs.keys())}"
)
# Optional outputs
workgroup = outputs.get('WorkGroup')
return cls(
stack_name=stack_name,
database_name=database_name,
workgroup=workgroup,
s3_output_location=s3_output_location,
region=region,
profile=profile,
)
@classmethod
def from_credentials(
cls,
database_name: str,
workgroup: str,
s3_output_location: str,
region: str,
access_key_id: str,
secret_access_key: str,
) -> "DataLakeConfig":
"""
Create config directly with AWS credentials.
Args:
database_name: Athena database name
workgroup: Athena workgroup name
s3_output_location: S3 location for query results (must end with /)
region: AWS region
access_key_id: AWS access key ID
secret_access_key: AWS secret access key
Returns:
DataLakeConfig instance
"""
# Ensure S3 output location ends with /
if s3_output_location and not s3_output_location.endswith('/'):
s3_output_location = s3_output_location + '/'
return cls(
database_name=database_name,
workgroup=workgroup,
s3_output_location=s3_output_location,
region=region,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
)
def get_boto3_session(self) -> boto3.Session:
"""
Get boto3 session with configured credentials, profile, and region.
Returns:
boto3.Session instance
"""
if self.access_key_id and self.secret_access_key:
# Use explicit credentials
return boto3.Session(
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region,
)
else:
# Use profile or IAM role
return boto3.Session(profile_name=self.profile, region_name=self.region)