Spaces:
Sleeping
Sleeping
| """ | |
| Example: Query and analyze data from the Athena data lake. | |
| This script demonstrates how to read data for specific devices/messages, | |
| perform time series queries, and filter by date ranges using SQL. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| _project_root = Path(__file__).resolve().parent.parent.parent | |
| if str(_project_root) not in sys.path: | |
| sys.path.insert(0, str(_project_root)) | |
| from src.datalake.config import DataLakeConfig | |
| from src.datalake.athena import AthenaQuery | |
| from src.datalake.catalog import DataLakeCatalog | |
| from src.datalake.query import DataLakeQuery | |
| import pandas as pd | |
| def main(): | |
| """Query and analyze data.""" | |
| # Setup | |
| # Load config with explicit credentials | |
| config = DataLakeConfig.from_credentials( | |
| database_name="dbparquetdatalake05", | |
| workgroup="athenaworkgroup-datalake05", | |
| s3_output_location="s3://canedge-raw-data-parquet/athena-results/", | |
| region="eu-north-1", | |
| access_key_id="AKIARJQJFFVASPMSGNNY", | |
| secret_access_key="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", | |
| ) | |
| athena = AthenaQuery(config) | |
| catalog = DataLakeCatalog(athena, config) | |
| query = DataLakeQuery(athena, catalog) | |
| # Get first available device and message | |
| try: | |
| devices = catalog.list_devices() | |
| if not devices: | |
| print("No devices found in data lake") | |
| return | |
| device_id = devices[0] | |
| messages = catalog.list_messages(device_id) | |
| if not messages: | |
| print(f"No messages found for device {device_id}") | |
| return | |
| message = messages[0] | |
| except Exception as e: | |
| print(f"Error discovering devices/messages: {e}") | |
| return | |
| print("=" * 60) | |
| print("Querying Data Lake (Athena)") | |
| print("=" * 60) | |
| print(f"Device: {device_id}") | |
| print(f"Message: {message}") | |
| print() | |
| # Example 1: Read all data for device/message | |
| print("Example 1: Read all data") | |
| print("-" * 60) | |
| try: | |
| df = query.read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| columns=["t"], # Only read timestamp initially to check structure | |
| limit=100, # Limit for example | |
| ) | |
| print(f"Loaded {len(df)} records") | |
| if not df.empty: | |
| print(f"Columns: {list(df.columns)}") | |
| if 't' in df.columns: | |
| print(f"Time range: {df['t'].min()} to {df['t'].max()} microseconds") | |
| print(f"Sample data:") | |
| print(df.head()) | |
| except Exception as e: | |
| print(f"Error reading data: {e}") | |
| print() | |
| # Example 2: Read with date range | |
| print("Example 2: Read with date range") | |
| print("-" * 60) | |
| try: | |
| partitions = catalog.list_partitions(device_id, message) | |
| if partitions: | |
| start_date = partitions[0] | |
| end_date = partitions[-1] if len(partitions) > 1 else partitions[0] | |
| print(f"Date range: {start_date} to {end_date}") | |
| df_date = query.read_date_range( | |
| device_id=device_id, | |
| message=message, | |
| start_date=start_date, | |
| end_date=end_date, | |
| limit=100, | |
| ) | |
| print(f"Loaded {len(df_date)} records for date range") | |
| except Exception as e: | |
| print(f"Error reading date range: {e}") | |
| print() | |
| # Example 3: Time series query (if signal columns exist) | |
| print("Example 3: Time series query") | |
| print("-" * 60) | |
| try: | |
| schema = catalog.get_schema(device_id, message) | |
| if schema: | |
| # Find first signal column (not 't') | |
| signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date'] | |
| if signal_cols: | |
| signal_name = signal_cols[0] | |
| print(f"Querying signal: {signal_name}") | |
| df_ts = query.time_series_query( | |
| device_id=device_id, | |
| message=message, | |
| signal_name=signal_name, | |
| limit=100, | |
| ) | |
| if not df_ts.empty: | |
| print(f"Time series: {len(df_ts)} records") | |
| # Convert timestamp to datetime for display | |
| if 't' in df_ts.columns: | |
| df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us') | |
| print(df_ts[['timestamp', signal_name]].head()) | |
| # Basic statistics | |
| print(f"\nStatistics for {signal_name}:") | |
| print(f" Mean: {df_ts[signal_name].mean():.2f}") | |
| print(f" Min: {df_ts[signal_name].min():.2f}") | |
| print(f" Max: {df_ts[signal_name].max():.2f}") | |
| except Exception as e: | |
| print(f"Error in time series query: {e}") | |
| print() | |
| # Example 4: Custom SQL query | |
| print("Example 4: Custom SQL query") | |
| print("-" * 60) | |
| try: | |
| table_name = catalog.get_table_name(device_id, message) | |
| custom_sql = f""" | |
| SELECT COUNT(*) as record_count, | |
| MIN(t) as min_time, | |
| MAX(t) as max_time | |
| FROM {config.database_name}.{table_name} | |
| LIMIT 1 | |
| """ | |
| df_custom = query.execute_sql(custom_sql) | |
| print("Custom query results:") | |
| print(df_custom) | |
| except Exception as e: | |
| print(f"Error in custom SQL query: {e}") | |
| print() | |
| # Example 5: Aggregation query | |
| print("Example 5: Aggregation query") | |
| print("-" * 60) | |
| try: | |
| partitions = catalog.list_partitions(device_id, message) | |
| if partitions: | |
| # Filter by date using path-based extraction | |
| # Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet | |
| target_date = partitions[0] | |
| date_parts = target_date.split('-') | |
| if len(date_parts) == 3: | |
| year, month, day = date_parts | |
| # Use path-based filtering consistent with data architecture | |
| path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)" | |
| path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)" | |
| path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)" | |
| where_clause = f"{path_year} = {year} AND {path_month} = {month} AND {path_day} = {day}" | |
| else: | |
| where_clause = None | |
| df_agg = query.aggregate( | |
| device_id=device_id, | |
| message=message, | |
| aggregation="COUNT(*) as count, AVG(t) as avg_time", | |
| where_clause=where_clause, | |
| ) | |
| print("Aggregation results:") | |
| print(df_agg) | |
| except Exception as e: | |
| print(f"Error in aggregation query: {e}") | |
| if __name__ == "__main__": | |
| main() | |