Spaces:
Sleeping
Sleeping
File size: 6,939 Bytes
e869d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | """
Configuration management for data lake access.
Supports AWS Athena-based data lakes with configuration from
CloudFormation stack outputs.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import boto3
from botocore.exceptions import ClientError
@dataclass
class DataLakeConfig:
"""
Data lake configuration for AWS Athena-based data lakes.
Configuration can be loaded from CloudFormation stack outputs or
created directly with credentials.
Attributes:
stack_name: CloudFormation stack name (default: 'datalake-stack')
database_name: Athena database name
workgroup: Athena workgroup name (optional)
s3_output_location: S3 location for query results (must end with /)
region: AWS region
profile: AWS profile name for credentials (optional)
access_key_id: AWS access key ID (optional, for explicit credentials)
secret_access_key: AWS secret access key (optional, for explicit credentials)
device_filter: Optional device ID filter (e.g., 'device_001')
message_filter: Optional message/rule filter (e.g., 'CAN_Message_001')
cache_enabled: Enable schema caching
"""
stack_name: str = "datalake-stack"
database_name: Optional[str] = None
workgroup: Optional[str] = None
s3_output_location: Optional[str] = None
region: str = "us-east-1"
profile: Optional[str] = None
access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None
device_filter: Optional[str] = None
message_filter: Optional[str] = None
cache_enabled: bool = True
@classmethod
def from_cloudformation(
cls,
stack_name: str = "datalake-stack",
region: Optional[str] = None,
profile: Optional[str] = None,
) -> "DataLakeConfig":
"""
Load config from CloudFormation stack outputs.
Args:
stack_name: CloudFormation stack name (default: 'datalake-stack')
region: AWS region (if None, will try to get from stack or use default)
profile: AWS profile name for credentials (optional)
Returns:
DataLakeConfig instance with values from stack outputs
Raises:
ClientError: If stack doesn't exist or can't be accessed
KeyError: If required stack outputs are missing
Expected CloudFormation stack outputs:
- DatabaseName: Athena database name (required)
- WorkGroup: Athena workgroup name (optional)
- S3OutputLocation: S3 location for Athena query results (required)
- Region: AWS region (optional, will use provided region or default)
"""
session = boto3.Session(profile_name=profile)
if region:
cf_client = session.client('cloudformation', region_name=region)
else:
# Try to get region from default config
try:
region = session.region_name or "us-east-1"
except:
region = "us-east-1"
cf_client = session.client('cloudformation', region_name=region)
try:
response = cf_client.describe_stacks(StackName=stack_name)
except ClientError as e:
raise ClientError(
{
'Error': {
'Code': 'StackNotFound',
'Message': f"CloudFormation stack '{stack_name}' not found. "
f"Make sure the stack exists and you have permissions to access it."
}
},
'DescribeStacks'
) from e
if not response['Stacks']:
raise ValueError(f"Stack '{stack_name}' not found")
stack = response['Stacks'][0]
outputs = {output['OutputKey']: output['OutputValue']
for output in stack.get('Outputs', [])}
# Get region from stack or use provided/default
if not region:
region = outputs.get('Region', session.region_name or "us-east-1")
# Required outputs
database_name = outputs.get('DatabaseName')
if not database_name:
raise KeyError(
f"Required output 'DatabaseName' not found in stack '{stack_name}'. "
f"Available outputs: {list(outputs.keys())}"
)
s3_output_location = outputs.get('S3OutputLocation')
if not s3_output_location:
raise KeyError(
f"Required output 'S3OutputLocation' not found in stack '{stack_name}'. "
f"Available outputs: {list(outputs.keys())}"
)
# Optional outputs
workgroup = outputs.get('WorkGroup')
return cls(
stack_name=stack_name,
database_name=database_name,
workgroup=workgroup,
s3_output_location=s3_output_location,
region=region,
profile=profile,
)
@classmethod
def from_credentials(
cls,
database_name: str,
workgroup: str,
s3_output_location: str,
region: str,
access_key_id: str,
secret_access_key: str,
) -> "DataLakeConfig":
"""
Create config directly with AWS credentials.
Args:
database_name: Athena database name
workgroup: Athena workgroup name
s3_output_location: S3 location for query results (must end with /)
region: AWS region
access_key_id: AWS access key ID
secret_access_key: AWS secret access key
Returns:
DataLakeConfig instance
"""
# Ensure S3 output location ends with /
if s3_output_location and not s3_output_location.endswith('/'):
s3_output_location = s3_output_location + '/'
return cls(
database_name=database_name,
workgroup=workgroup,
s3_output_location=s3_output_location,
region=region,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
)
def get_boto3_session(self) -> boto3.Session:
"""
Get boto3 session with configured credentials, profile, and region.
Returns:
boto3.Session instance
"""
if self.access_key_id and self.secret_access_key:
# Use explicit credentials
return boto3.Session(
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region,
)
else:
# Use profile or IAM role
return boto3.Session(profile_name=self.profile, region_name=self.region)
|