File size: 6,939 Bytes
e869d90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Configuration management for data lake access.

Supports AWS Athena-based data lakes with configuration from
CloudFormation stack outputs.
"""

from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import boto3
from botocore.exceptions import ClientError


@dataclass
class DataLakeConfig:
    """
    Data lake configuration for AWS Athena-based data lakes.
    
    Configuration can be loaded from CloudFormation stack outputs or
    created directly with credentials.
    
    Attributes:
        stack_name: CloudFormation stack name (default: 'datalake-stack')
        database_name: Athena database name
        workgroup: Athena workgroup name (optional)
        s3_output_location: S3 location for query results (must end with /)
        region: AWS region
        profile: AWS profile name for credentials (optional)
        access_key_id: AWS access key ID (optional, for explicit credentials)
        secret_access_key: AWS secret access key (optional, for explicit credentials)
        device_filter: Optional device ID filter (e.g., 'device_001')
        message_filter: Optional message/rule filter (e.g., 'CAN_Message_001')
        cache_enabled: Enable schema caching
    """
    stack_name: str = "datalake-stack"
    database_name: Optional[str] = None
    workgroup: Optional[str] = None
    s3_output_location: Optional[str] = None
    region: str = "us-east-1"
    profile: Optional[str] = None
    access_key_id: Optional[str] = None
    secret_access_key: Optional[str] = None
    device_filter: Optional[str] = None
    message_filter: Optional[str] = None
    cache_enabled: bool = True
    
    @classmethod
    def from_cloudformation(
        cls,
        stack_name: str = "datalake-stack",
        region: Optional[str] = None,
        profile: Optional[str] = None,
    ) -> "DataLakeConfig":
        """
        Load config from CloudFormation stack outputs.
        
        Args:
            stack_name: CloudFormation stack name (default: 'datalake-stack')
            region: AWS region (if None, will try to get from stack or use default)
            profile: AWS profile name for credentials (optional)
        
        Returns:
            DataLakeConfig instance with values from stack outputs
        
        Raises:
            ClientError: If stack doesn't exist or can't be accessed
            KeyError: If required stack outputs are missing
        
        Expected CloudFormation stack outputs:
        - DatabaseName: Athena database name (required)
        - WorkGroup: Athena workgroup name (optional)
        - S3OutputLocation: S3 location for Athena query results (required)
        - Region: AWS region (optional, will use provided region or default)
        """
        session = boto3.Session(profile_name=profile)
        if region:
            cf_client = session.client('cloudformation', region_name=region)
        else:
            # Try to get region from default config
            try:
                region = session.region_name or "us-east-1"
            except:
                region = "us-east-1"
            cf_client = session.client('cloudformation', region_name=region)
        
        try:
            response = cf_client.describe_stacks(StackName=stack_name)
        except ClientError as e:
            raise ClientError(
                {
                    'Error': {
                        'Code': 'StackNotFound',
                        'Message': f"CloudFormation stack '{stack_name}' not found. "
                                 f"Make sure the stack exists and you have permissions to access it."
                    }
                },
                'DescribeStacks'
            ) from e
        
        if not response['Stacks']:
            raise ValueError(f"Stack '{stack_name}' not found")
        
        stack = response['Stacks'][0]
        outputs = {output['OutputKey']: output['OutputValue'] 
                   for output in stack.get('Outputs', [])}
        
        # Get region from stack or use provided/default
        if not region:
            region = outputs.get('Region', session.region_name or "us-east-1")
        
        # Required outputs
        database_name = outputs.get('DatabaseName')
        if not database_name:
            raise KeyError(
                f"Required output 'DatabaseName' not found in stack '{stack_name}'. "
                f"Available outputs: {list(outputs.keys())}"
            )
        
        s3_output_location = outputs.get('S3OutputLocation')
        if not s3_output_location:
            raise KeyError(
                f"Required output 'S3OutputLocation' not found in stack '{stack_name}'. "
                f"Available outputs: {list(outputs.keys())}"
            )
        
        # Optional outputs
        workgroup = outputs.get('WorkGroup')
        
        return cls(
            stack_name=stack_name,
            database_name=database_name,
            workgroup=workgroup,
            s3_output_location=s3_output_location,
            region=region,
            profile=profile,
        )
    
    @classmethod
    def from_credentials(
        cls,
        database_name: str,
        workgroup: str,
        s3_output_location: str,
        region: str,
        access_key_id: str,
        secret_access_key: str,
    ) -> "DataLakeConfig":
        """
        Create config directly with AWS credentials.
        
        Args:
            database_name: Athena database name
            workgroup: Athena workgroup name
            s3_output_location: S3 location for query results (must end with /)
            region: AWS region
            access_key_id: AWS access key ID
            secret_access_key: AWS secret access key
        
        Returns:
            DataLakeConfig instance
        """
        # Ensure S3 output location ends with /
        if s3_output_location and not s3_output_location.endswith('/'):
            s3_output_location = s3_output_location + '/'
        
        return cls(
            database_name=database_name,
            workgroup=workgroup,
            s3_output_location=s3_output_location,
            region=region,
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
        )
    
    def get_boto3_session(self) -> boto3.Session:
        """
        Get boto3 session with configured credentials, profile, and region.
        
        Returns:
            boto3.Session instance
        """
        if self.access_key_id and self.secret_access_key:
            # Use explicit credentials
            return boto3.Session(
                aws_access_key_id=self.access_key_id,
                aws_secret_access_key=self.secret_access_key,
                region_name=self.region,
            )
        else:
            # Use profile or IAM role
            return boto3.Session(profile_name=self.profile, region_name=self.region)