""" Example: Explore data lake structure using Athena. This script demonstrates how to discover devices, messages, dates, and schemas in the CANedge Athena data lake. """ import sys from pathlib import Path _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from src.datalake.config import DataLakeConfig from src.datalake.athena import AthenaQuery from src.datalake.catalog import DataLakeCatalog def main(): """Explore data lake structure.""" # Load config with explicit credentials config = DataLakeConfig.from_credentials( database_name="dbparquetdatalake05", workgroup="athenaworkgroup-datalake05", s3_output_location="s3://canedge-raw-data-parquet/athena-results/", region="eu-north-1", access_key_id="AKIARJQJFFVASPMSGNNY", secret_access_key="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", ) # Initialize Athena and catalog athena = AthenaQuery(config) catalog = DataLakeCatalog(athena, config) # List available devices print("=" * 60) print("Exploring Data Lake (Athena)") print("=" * 60) print(f"Database: {config.database_name}") print(f"Region: {config.region}") print(f"Workgroup: {config.workgroup}") print() # List all tables try: tables = catalog.list_tables() print(f"Found {len(tables)} table(s) in database") if tables: print(f"Sample tables: {tables[:10]}") print() except Exception as e: print(f"Error listing tables: {e}") return # List devices try: devices = catalog.list_devices(device_filter=config.device_filter) print(f"Found {len(devices)} device(s):") for device in devices: print(f" - {device}") except Exception as e: print(f"Error listing devices: {e}") return # List messages for first device if devices: device_id = devices[0] print(f"\nMessages for device '{device_id}':") try: messages = catalog.list_messages(device_id, message_filter=config.message_filter) for message in messages: print(f" - {message}") # Get schema try: schema = catalog.get_schema(device_id, message) if schema: print(f" Schema: {len(schema)} column(s)") print(f" Columns: {', '.join(list(schema.keys())[:5])}") if len(schema) > 5: print(f" ... and {len(schema) - 5} more") except Exception as e: print(f" Error getting schema: {e}") # Try to list partitions (dates) try: partitions = catalog.list_partitions(device_id, message) if partitions: print(f" Partitions: {len(partitions)} date(s)") if partitions: print(f" Date range: {partitions[0]} to {partitions[-1]}") except Exception as e: print(f" Could not list partitions: {e}") print() except Exception as e: print(f"Error listing messages: {e}") if __name__ == "__main__": main()