File size: 3,512 Bytes
e869d90
 
 
 
 
 
 
29854ee
 
 
 
 
 
 
 
 
 
e869d90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Example: Explore data lake structure using Athena.

This script demonstrates how to discover devices, messages, dates,
and schemas in the CANedge Athena data lake.
"""

import sys
from pathlib import Path

_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
    sys.path.insert(0, str(_project_root))

from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog


def main():
    """Explore data lake structure."""
    # Load config with explicit credentials
    config = DataLakeConfig.from_credentials(
        database_name="dbparquetdatalake05",
        workgroup="athenaworkgroup-datalake05",
        s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
        region="eu-north-1",
        access_key_id="AKIARJQJFFVASPMSGNNY",
        secret_access_key="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",
    )
    
    # Initialize Athena and catalog
    athena = AthenaQuery(config)
    catalog = DataLakeCatalog(athena, config)
    
    # List available devices
    print("=" * 60)
    print("Exploring Data Lake (Athena)")
    print("=" * 60)
    print(f"Database: {config.database_name}")
    print(f"Region: {config.region}")
    print(f"Workgroup: {config.workgroup}")
    print()
    
    # List all tables
    try:
        tables = catalog.list_tables()
        print(f"Found {len(tables)} table(s) in database")
        if tables:
            print(f"Sample tables: {tables[:10]}")
        print()
    except Exception as e:
        print(f"Error listing tables: {e}")
        return
    
    # List devices
    try:
        devices = catalog.list_devices(device_filter=config.device_filter)
        print(f"Found {len(devices)} device(s):")
        for device in devices:
            print(f"  - {device}")
    except Exception as e:
        print(f"Error listing devices: {e}")
        return
    
    # List messages for first device
    if devices:
        device_id = devices[0]
        print(f"\nMessages for device '{device_id}':")
        try:
            messages = catalog.list_messages(device_id, message_filter=config.message_filter)
            
            for message in messages:
                print(f"  - {message}")
                
                # Get schema
                try:
                    schema = catalog.get_schema(device_id, message)
                    
                    if schema:
                        print(f"    Schema: {len(schema)} column(s)")
                        print(f"    Columns: {', '.join(list(schema.keys())[:5])}")
                        if len(schema) > 5:
                            print(f"             ... and {len(schema) - 5} more")
                except Exception as e:
                    print(f"    Error getting schema: {e}")
                
                # Try to list partitions (dates)
                try:
                    partitions = catalog.list_partitions(device_id, message)
                    if partitions:
                        print(f"    Partitions: {len(partitions)} date(s)")
                        if partitions:
                            print(f"    Date range: {partitions[0]} to {partitions[-1]}")
                except Exception as e:
                    print(f"    Could not list partitions: {e}")
                print()
        except Exception as e:
            print(f"Error listing messages: {e}")


if __name__ == "__main__":
    main()