""" Example: Query and analyze data from the Athena data lake. This script demonstrates how to read data for specific devices/messages, perform time series queries, and filter by date ranges using SQL. """ import sys from pathlib import Path _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from src.datalake.config import DataLakeConfig from src.datalake.athena import AthenaQuery from src.datalake.catalog import DataLakeCatalog from src.datalake.query import DataLakeQuery import pandas as pd def main(): """Query and analyze data.""" # Setup # Load config with explicit credentials config = DataLakeConfig.from_credentials( database_name="dbparquetdatalake05", workgroup="athenaworkgroup-datalake05", s3_output_location="s3://canedge-raw-data-parquet/athena-results/", region="eu-north-1", access_key_id="AKIARJQJFFVASPMSGNNY", secret_access_key="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", ) athena = AthenaQuery(config) catalog = DataLakeCatalog(athena, config) query = DataLakeQuery(athena, catalog) # Get first available device and message try: devices = catalog.list_devices() if not devices: print("No devices found in data lake") return device_id = devices[0] messages = catalog.list_messages(device_id) if not messages: print(f"No messages found for device {device_id}") return message = messages[0] except Exception as e: print(f"Error discovering devices/messages: {e}") return print("=" * 60) print("Querying Data Lake (Athena)") print("=" * 60) print(f"Device: {device_id}") print(f"Message: {message}") print() # Example 1: Read all data for device/message print("Example 1: Read all data") print("-" * 60) try: df = query.read_device_message( device_id=device_id, message=message, columns=["t"], # Only read timestamp initially to check structure limit=100, # Limit for example ) print(f"Loaded {len(df)} records") if not df.empty: print(f"Columns: {list(df.columns)}") if 't' in df.columns: print(f"Time range: {df['t'].min()} to {df['t'].max()} microseconds") print(f"Sample data:") print(df.head()) except Exception as e: print(f"Error reading data: {e}") print() # Example 2: Read with date range print("Example 2: Read with date range") print("-" * 60) try: partitions = catalog.list_partitions(device_id, message) if partitions: start_date = partitions[0] end_date = partitions[-1] if len(partitions) > 1 else partitions[0] print(f"Date range: {start_date} to {end_date}") df_date = query.read_date_range( device_id=device_id, message=message, start_date=start_date, end_date=end_date, limit=100, ) print(f"Loaded {len(df_date)} records for date range") except Exception as e: print(f"Error reading date range: {e}") print() # Example 3: Time series query (if signal columns exist) print("Example 3: Time series query") print("-" * 60) try: schema = catalog.get_schema(device_id, message) if schema: # Find first signal column (not 't') signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date'] if signal_cols: signal_name = signal_cols[0] print(f"Querying signal: {signal_name}") df_ts = query.time_series_query( device_id=device_id, message=message, signal_name=signal_name, limit=100, ) if not df_ts.empty: print(f"Time series: {len(df_ts)} records") # Convert timestamp to datetime for display if 't' in df_ts.columns: df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us') print(df_ts[['timestamp', signal_name]].head()) # Basic statistics print(f"\nStatistics for {signal_name}:") print(f" Mean: {df_ts[signal_name].mean():.2f}") print(f" Min: {df_ts[signal_name].min():.2f}") print(f" Max: {df_ts[signal_name].max():.2f}") except Exception as e: print(f"Error in time series query: {e}") print() # Example 4: Custom SQL query print("Example 4: Custom SQL query") print("-" * 60) try: table_name = catalog.get_table_name(device_id, message) custom_sql = f""" SELECT COUNT(*) as record_count, MIN(t) as min_time, MAX(t) as max_time FROM {config.database_name}.{table_name} LIMIT 1 """ df_custom = query.execute_sql(custom_sql) print("Custom query results:") print(df_custom) except Exception as e: print(f"Error in custom SQL query: {e}") print() # Example 5: Aggregation query print("Example 5: Aggregation query") print("-" * 60) try: partitions = catalog.list_partitions(device_id, message) if partitions: # Filter by date using path-based extraction # Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet target_date = partitions[0] date_parts = target_date.split('-') if len(date_parts) == 3: year, month, day = date_parts # Use path-based filtering consistent with data architecture path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)" path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)" path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)" where_clause = f"{path_year} = {year} AND {path_month} = {month} AND {path_day} = {day}" else: where_clause = None df_agg = query.aggregate( device_id=device_id, message=message, aggregation="COUNT(*) as count, AVG(t) as avg_time", where_clause=where_clause, ) print("Aggregation results:") print(df_agg) except Exception as e: print(f"Error in aggregation query: {e}") if __name__ == "__main__": main()