{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# CANedge Data Lake Explorer\n", "\n", "This notebook helps you explore and analyze your CANedge data lake using AWS Athena.\n", "\n", "## Setup\n", "\n", "First, let's configure the connection and test it." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "✓ Libraries imported successfully\n" ] } ], "source": [ "# Import required libraries\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns\n", "from datalake.config import DataLakeConfig\n", "from datalake.athena import AthenaQuery\n", "from datalake.catalog import DataLakeCatalog\n", "from datalake.query import DataLakeQuery\n", "from datalake.batch import BatchProcessor\n", "\n", "# Set up plotting\n", "%matplotlib inline\n", "plt.style.use('seaborn-v0_8')\n", "sns.set_palette(\"husl\")\n", "\n", "print(\"✓ Libraries imported successfully\")" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "✓ Configuration loaded\n", " Database: dbparquetdatalake05\n", " Workgroup: athenaworkgroup-datalake05\n", " Region: eu-north-1\n" ] } ], "source": [ "# Configure connection with your credentials\n", "config = DataLakeConfig.from_credentials(\n", " database_name=\"dbparquetdatalake05\",\n", " workgroup=\"athenaworkgroup-datalake05\",\n", " s3_output_location=\"s3://canedge-raw-data-parquet/athena-results/\",\n", " region=\"eu-north-1\",\n", " access_key_id=\"AKIARJQJFFVASPMSGNNY\",\n", " secret_access_key=\"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\",\n", ")\n", "\n", "print(f\"✓ Configuration loaded\")\n", "print(f\" Database: {config.database_name}\")\n", "print(f\" Workgroup: {config.workgroup}\")\n", "print(f\" Region: {config.region}\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 16:42:53,113 - datalake.athena - INFO - Initialized Athena client for database: dbparquetdatalake05\n", "2026-01-25 16:42:53,113 - datalake.catalog - INFO - Initialized catalog for database: dbparquetdatalake05\n", "2026-01-25 16:42:53,114 - datalake.query - INFO - Initialized DataLakeQuery\n", "2026-01-25 16:42:53,114 - datalake.batch - INFO - Initialized BatchProcessor\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "✓ Athena client and catalog initialized\n" ] } ], "source": [ "# Initialize Athena and catalog\n", "athena = AthenaQuery(config)\n", "catalog = DataLakeCatalog(athena, config)\n", "query = DataLakeQuery(athena, catalog)\n", "processor = BatchProcessor(query)\n", "\n", "print(\"✓ Athena client and catalog initialized\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test Connection\n", "\n", "Let's verify the connection works by listing tables." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 16:43:00,494 - datalake.athena - INFO - Query started with execution ID: fb177297-ccc0-4c3d-b0ee-44078f0d3fa8\n", "2026-01-25 16:43:01,953 - datalake.athena - INFO - Query fb177297-ccc0-4c3d-b0ee-44078f0d3fa8 completed successfully\n", "2026-01-25 16:43:02,379 - datalake.athena - INFO - Retrieved 77 rows from query fb177297-ccc0-4c3d-b0ee-44078f0d3fa8\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "✓ Connection successful!\n", " Found 77 tables in database\n", "\n", " First 10 tables:\n", " tab_name\n", "0 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m03\n", "1 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m04\n", "2 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m05\n", "3 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m06\n", "4 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m07\n", "5 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0c\n", "6 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0d\n", "7 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0e\n", "8 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0f\n", "9 tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m10\n" ] } ], "source": [ "# Test connection with a simple query\n", "try:\n", " test_query = f\"SHOW TABLES IN {config.database_name}\"\n", " df_tables = athena.query_to_dataframe(test_query, timeout=60)\n", " print(f\"✓ Connection successful!\")\n", " print(f\" Found {len(df_tables)} tables in database\")\n", " if not df_tables.empty:\n", " print(f\"\\n First 10 tables:\")\n", " print(df_tables.head(10))\n", "except Exception as e:\n", " print(f\"✗ Connection failed: {e}\")\n", " import traceback\n", " traceback.print_exc()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Explore Data Lake Structure\n", "\n", "Discover devices, messages, and available data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 16:45:30,372 - datalake.athena - INFO - Query started with execution ID: f341e52d-c3ea-4baf-b805-eb1f327b1d1c\n", "2026-01-25 16:45:31,482 - datalake.athena - INFO - Query f341e52d-c3ea-4baf-b805-eb1f327b1d1c completed successfully\n", "2026-01-25 16:45:31,613 - datalake.athena - INFO - Retrieved 78 rows from query f341e52d-c3ea-4baf-b805-eb1f327b1d1c\n", "2026-01-25 16:45:31,614 - datalake.catalog - INFO - Found 78 tables in database\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Total tables: 78\n", "\n", "Sample tables:\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m00\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m03\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m04\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m05\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m06\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m07\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0c\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0d\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0e\n", " - tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m0f\n" ] } ], "source": [ "# List all tables\n", "tables = catalog.list_tables()\n", "print(f\"Total tables: {len(tables)}\")\n", "print(f\"\\nSample tables:\")\n", "for table in tables[:10]:\n", " print(f\" - {table}\")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 21:21:04,434 - datalake.athena - INFO - Query started with execution ID: 4f1cfb71-2b52-4226-bd01-412f44cf23e3\n", "2026-01-25 21:21:05,589 - datalake.athena - INFO - Query 4f1cfb71-2b52-4226-bd01-412f44cf23e3 completed successfully\n", "2026-01-25 21:21:05,720 - datalake.athena - INFO - Retrieved 78 rows from query 4f1cfb71-2b52-4226-bd01-412f44cf23e3\n", "2026-01-25 21:21:05,721 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 21:21:05,721 - datalake.catalog - INFO - Found 1 device(s)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Found 1 device(s):\n", " - tbl\n" ] } ], "source": [ "# Discover devices\n", "devices = catalog.list_devices()\n", "print(f\"Found {len(devices)} device(s):\")\n", "for device in devices:\n", " print(f\" - {device}\")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 21:21:12,744 - datalake.athena - INFO - Query started with execution ID: 3e5558cf-432c-4beb-8217-97bcbbf71694\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "Exploring device: tbl\n", "============================================================\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 21:21:13,885 - datalake.athena - INFO - Query 3e5558cf-432c-4beb-8217-97bcbbf71694 completed successfully\n", "2026-01-25 21:21:14,016 - datalake.athena - INFO - Retrieved 78 rows from query 3e5558cf-432c-4beb-8217-97bcbbf71694\n", "2026-01-25 21:21:14,017 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 21:21:14,017 - datalake.catalog - INFO - Found 78 messages for device tbl\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Found 78 message(s):\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m00\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m03\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m04\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m05\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m06\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m07\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m0c\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m0d\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m0e\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m0f\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m10\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m11\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m1f\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m2e\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m2f\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m33\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m34\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m35\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m43\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m44\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m49\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m55\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m56\n", " - 97a4aaf4_can1_obd2_s_m41_s01pid_m5c\n", " - 97a4aaf4_can9_gnssaltitude\n", " - 97a4aaf4_can9_gnssdistance\n", " - 97a4aaf4_can9_gnsspos\n", " - 97a4aaf4_can9_gnssspeed\n", " - 97a4aaf4_can9_gnssstatus\n", " - 97a4aaf4_can9_gnsstime\n", " - 97a4aaf4_can9_heartbeat\n", " - 97a4aaf4_can9_imudata\n", " - 97a4aaf4_can9_timecalendar\n", " - 97a4aaf4_can9_timeexternal\n", " - 97a4aaf4_messages\n", " - aggregations_devicemeta\n", " - aggregations_tripsummary\n", " - b8280fd1_can9_gnssaltitude\n", " - b8280fd1_can9_gnssdistance\n", " - b8280fd1_can9_gnsspos\n", " - b8280fd1_can9_gnssspeed\n", " - b8280fd1_can9_gnssstatus\n", " - b8280fd1_can9_gnsstime\n", " - b8280fd1_can9_heartbeat\n", " - b8280fd1_can9_imudata\n", " - b8280fd1_can9_timecalendar\n", " - b8280fd1_can9_timeexternal\n", " - b8280fd1_messages\n", " - f1da612a_can1_obd2_s_m41_s01pid_m03\n", " - f1da612a_can1_obd2_s_m41_s01pid_m04\n", " - f1da612a_can1_obd2_s_m41_s01pid_m05\n", " - f1da612a_can1_obd2_s_m41_s01pid_m06\n", " - f1da612a_can1_obd2_s_m41_s01pid_m07\n", " - f1da612a_can1_obd2_s_m41_s01pid_m0c\n", " - f1da612a_can1_obd2_s_m41_s01pid_m0d\n", " - f1da612a_can1_obd2_s_m41_s01pid_m0e\n", " - f1da612a_can1_obd2_s_m41_s01pid_m0f\n", " - f1da612a_can1_obd2_s_m41_s01pid_m10\n", " - f1da612a_can1_obd2_s_m41_s01pid_m1f\n", " - f1da612a_can1_obd2_s_m41_s01pid_m2e\n", " - f1da612a_can1_obd2_s_m41_s01pid_m33\n", " - f1da612a_can1_obd2_s_m41_s01pid_m34\n", " - f1da612a_can1_obd2_s_m41_s01pid_m35\n", " - f1da612a_can1_obd2_s_m41_s01pid_m43\n", " - f1da612a_can1_obd2_s_m41_s01pid_m44\n", " - f1da612a_can1_obd2_s_m41_s01pid_m49\n", " - f1da612a_can1_obd2_s_m41_s01pid_m5c\n", " - f1da612a_can9_gnssaltitude\n", " - f1da612a_can9_gnssdistance\n", " - f1da612a_can9_gnsspos\n", " - f1da612a_can9_gnssspeed\n", " - f1da612a_can9_gnssstatus\n", " - f1da612a_can9_gnsstime\n", " - f1da612a_can9_heartbeat\n", " - f1da612a_can9_imudata\n", " - f1da612a_can9_timecalendar\n", " - f1da612a_can9_timeexternal\n", " - f1da612a_messages\n" ] } ], "source": [ "# Explore messages for the first device\n", "if devices:\n", " device_id = devices[0]\n", " print(f\"\\nExploring device: {device_id}\")\n", " print(\"=\" * 60)\n", " \n", " messages = catalog.list_messages(device_id)\n", " print(f\"Found {len(messages)} message(s):\")\n", " for message in messages:\n", " print(f\" - {message}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Schema for tbl/97a4aaf4_can1_obd2_s_m41_s01pid_m00:\n", "============================================================\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 17:31:48,556 - datalake.athena - INFO - Query started with execution ID: 6916c876-c526-4474-bbe0-ad626b4786e7\n", "2026-01-25 17:31:49,686 - datalake.athena - INFO - Query 6916c876-c526-4474-bbe0-ad626b4786e7 completed successfully\n", "2026-01-25 17:31:49,816 - datalake.athena - INFO - Retrieved 78 rows from query 6916c876-c526-4474-bbe0-ad626b4786e7\n", "2026-01-25 17:31:49,817 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 17:31:49,993 - datalake.athena - INFO - Query started with execution ID: b71d0e14-e0e8-4e3f-8c8c-ceffcd9984f2\n", "2026-01-25 17:31:51,132 - datalake.athena - INFO - Query b71d0e14-e0e8-4e3f-8c8c-ceffcd9984f2 completed successfully\n", "2026-01-25 17:31:51,371 - datalake.athena - INFO - Retrieved 3 rows from query b71d0e14-e0e8-4e3f-8c8c-ceffcd9984f2\n", "2026-01-25 17:31:51,372 - datalake.catalog - INFO - Schema for tbl/97a4aaf4_can1_obd2_s_m41_s01pid_m00: 3 columns\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Column Type\n", " t timestamp(3)\n", "s01pid00_pidssupported_01_20 double\n", " date_created varchar\n", "\n", "Total columns: 3\n" ] } ], "source": [ "# Get schema for first device/message combination\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " \n", " print(f\"\\nSchema for {device_id}/{message}:\")\n", " print(\"=\" * 60)\n", " \n", " schema = catalog.get_schema(device_id, message)\n", " if schema:\n", " schema_df = pd.DataFrame([\n", " {\"Column\": col, \"Type\": dtype}\n", " for col, dtype in schema.items()\n", " ])\n", " print(schema_df.to_string(index=False))\n", " print(f\"\\nTotal columns: {len(schema)}\")" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 17:31:58,489 - datalake.athena - INFO - Query started with execution ID: 844cf5ba-7756-46cf-a0e4-d6bfe8c98f74\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "Partitions (dates) for tbl/97a4aaf4_can1_obd2_s_m41_s01pid_m00:\n", "============================================================\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 17:31:59,938 - datalake.athena - INFO - Query 844cf5ba-7756-46cf-a0e4-d6bfe8c98f74 completed successfully\n", "2026-01-25 17:32:00,137 - datalake.athena - INFO - Retrieved 78 rows from query 844cf5ba-7756-46cf-a0e4-d6bfe8c98f74\n", "2026-01-25 17:32:00,137 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 17:32:00,265 - datalake.athena - INFO - Query started with execution ID: c4a13ea7-d58e-4658-90aa-9413d04b9417\n", "2026-01-25 17:32:02,108 - datalake.athena - INFO - Query c4a13ea7-d58e-4658-90aa-9413d04b9417 completed successfully\n", "2026-01-25 17:32:02,219 - datalake.athena - WARNING - No results returned for execution c4a13ea7-d58e-4658-90aa-9413d04b9417\n", "2026-01-25 17:32:02,222 - datalake.catalog - WARNING - No partitions found for tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m00\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "No partitions found (table may not be partitioned)\n" ] } ], "source": [ "# Check available partitions (dates)\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " \n", " print(f\"\\nPartitions (dates) for {device_id}/{message}:\")\n", " print(\"=\" * 60)\n", " \n", " try:\n", " partitions = catalog.list_partitions(device_id, message)\n", " if partitions:\n", " print(f\"Found {len(partitions)} partition(s):\")\n", " print(f\" Date range: {partitions[0]} to {partitions[-1]}\")\n", " print(f\"\\n All dates:\")\n", " for date in partitions[:20]: # Show first 20\n", " print(f\" - {date}\")\n", " if len(partitions) > 20:\n", " print(f\" ... and {len(partitions) - 20} more\")\n", " else:\n", " print(\"No partitions found (table may not be partitioned)\")\n", " except Exception as e:\n", " print(f\"Could not list partitions: {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query Data\n", "\n", "Now let's query some actual data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Reading sample data from tbl/97a4aaf4_can1_obd2_s_m41_s01pid_m00...\n", "============================================================\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 17:32:42,022 - datalake.athena - INFO - Query started with execution ID: 2a7e2ed0-8c44-46e7-a5b4-1a57fab4938b\n", "2026-01-25 17:32:43,601 - datalake.athena - INFO - Query 2a7e2ed0-8c44-46e7-a5b4-1a57fab4938b completed successfully\n", "2026-01-25 17:32:43,731 - datalake.athena - INFO - Retrieved 78 rows from query 2a7e2ed0-8c44-46e7-a5b4-1a57fab4938b\n", "2026-01-25 17:32:43,732 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 17:32:43,732 - datalake.query - INFO - Executing query for tbl/97a4aaf4_can1_obd2_s_m41_s01pid_m00\n", "2026-01-25 17:32:43,859 - datalake.athena - INFO - Query started with execution ID: 02fe08ed-4f1c-4363-b167-c1a4a0196094\n", "2026-01-25 17:32:48,300 - datalake.athena - INFO - Query 02fe08ed-4f1c-4363-b167-c1a4a0196094 completed successfully\n", "2026-01-25 17:32:48,430 - datalake.athena - INFO - Retrieved 100 rows from query 02fe08ed-4f1c-4363-b167-c1a4a0196094\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "✓ Loaded 100 records\n", "\n", "Data shape: (100, 3)\n", "\n", "Columns: ['t', 's01pid00_pidssupported_01_20', 'date_created']\n", "\n", "First few rows:\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ts01pid00_pidssupported_01_20date_created
02025-10-29 05:45:53.0633.189744e+092025/10/29
12025-10-29 05:46:18.0623.189744e+092025/10/29
22025-10-29 05:46:48.0633.189744e+092025/10/29
32025-10-29 05:47:43.0623.189744e+092025/10/29
42025-10-29 05:48:08.0623.189744e+092025/10/29
52025-10-29 05:49:33.0633.189744e+092025/10/29
62025-10-29 05:49:48.0633.189744e+092025/10/29
72025-10-29 05:50:03.0633.189744e+092025/10/29
82025-10-29 05:50:33.0643.189744e+092025/10/29
92025-10-29 05:50:58.0643.189744e+092025/10/29
\n", "
" ], "text/plain": [ " t s01pid00_pidssupported_01_20 date_created\n", "0 2025-10-29 05:45:53.063 3.189744e+09 2025/10/29\n", "1 2025-10-29 05:46:18.062 3.189744e+09 2025/10/29\n", "2 2025-10-29 05:46:48.063 3.189744e+09 2025/10/29\n", "3 2025-10-29 05:47:43.062 3.189744e+09 2025/10/29\n", "4 2025-10-29 05:48:08.062 3.189744e+09 2025/10/29\n", "5 2025-10-29 05:49:33.063 3.189744e+09 2025/10/29\n", "6 2025-10-29 05:49:48.063 3.189744e+09 2025/10/29\n", "7 2025-10-29 05:50:03.063 3.189744e+09 2025/10/29\n", "8 2025-10-29 05:50:33.064 3.189744e+09 2025/10/29\n", "9 2025-10-29 05:50:58.064 3.189744e+09 2025/10/29" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "Data types:\n", "t object\n", "s01pid00_pidssupported_01_20 float64\n", "date_created object\n", "dtype: object\n", "\n", "Basic statistics:\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
s01pid00_pidssupported_01_20
count1.000000e+02
mean3.189744e+09
std0.000000e+00
min3.189744e+09
25%3.189744e+09
50%3.189744e+09
75%3.189744e+09
max3.189744e+09
\n", "
" ], "text/plain": [ " s01pid00_pidssupported_01_20\n", "count 1.000000e+02\n", "mean 3.189744e+09\n", "std 0.000000e+00\n", "min 3.189744e+09\n", "25% 3.189744e+09\n", "50% 3.189744e+09\n", "75% 3.189744e+09\n", "max 3.189744e+09" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Read a sample of data\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " \n", " print(f\"Reading sample data from {device_id}/{message}...\")\n", " print(\"=\" * 60)\n", " \n", " try:\n", " df = query.read_device_message(\n", " device_id=device_id,\n", " message=message,\n", " limit=100 # Limit for quick preview\n", " )\n", " \n", " print(f\"✓ Loaded {len(df)} records\")\n", " print(f\"\\nData shape: {df.shape}\")\n", " print(f\"\\nColumns: {list(df.columns)}\")\n", " print(f\"\\nFirst few rows:\")\n", " display(df.head(10))\n", " \n", " print(f\"\\nData types:\")\n", " print(df.dtypes)\n", " \n", " print(f\"\\nBasic statistics:\")\n", " display(df.describe())\n", " \n", " except Exception as e:\n", " print(f\"✗ Error reading data: {e}\")\n", " import traceback\n", " traceback.print_exc()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query ALL Data (No Limits)\n", "\n", "To see all your data, remove the `limit` parameter or set it to `None`. \n", "**Note:** This may take longer and use more memory for large datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query ALL data (no limit) - use with caution for large datasets\n", "if 'first_device' in locals() and 'first_message' in locals() and first_message:\n", " device_id = first_device\n", " message = first_message\n", " \n", " print(f\"Querying ALL data from {device_id}/{message}...\")\n", " print(\"=\" * 60)\n", " print(\"⚠️ This may take a while for large datasets!\")\n", " print()\n", " \n", " # Uncomment the lines below to query all data (remove limit)\n", " # try:\n", " # df_all = query.read_device_message(\n", " # device_id=device_id,\n", " # message=message,\n", " # limit=None # No limit - gets ALL data\n", " # )\n", " # \n", " # print(f\"✓ Loaded ALL {len(df_all)} records\")\n", " # print(f\"\\nData shape: {df_all.shape}\")\n", " # display(df_all.head(20))\n", " # \n", " # except Exception as e:\n", " # print(f\"✗ Error reading all data: {e}\")\n", " # import traceback\n", " # traceback.print_exc()\n", " \n", " print(\"(Uncomment the code above to query all data)\")" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2026-01-25 17:32:55,234 - datalake.athena - INFO - Query started with execution ID: ee01954e-ad1e-4044-aba1-d5b9695cbaef\n", "2026-01-25 17:32:56,362 - datalake.athena - INFO - Query ee01954e-ad1e-4044-aba1-d5b9695cbaef completed successfully\n", "2026-01-25 17:32:56,510 - datalake.athena - INFO - Retrieved 78 rows from query ee01954e-ad1e-4044-aba1-d5b9695cbaef\n", "2026-01-25 17:32:56,511 - datalake.catalog - INFO - Found 78 tables in database\n", "2026-01-25 17:32:56,630 - datalake.athena - INFO - Query started with execution ID: 6b5ca88b-bfaa-4cb2-9aa2-95e7e8d0facc\n", "2026-01-25 17:32:57,977 - datalake.athena - INFO - Query 6b5ca88b-bfaa-4cb2-9aa2-95e7e8d0facc completed successfully\n", "2026-01-25 17:32:58,133 - datalake.athena - WARNING - No results returned for execution 6b5ca88b-bfaa-4cb2-9aa2-95e7e8d0facc\n", "2026-01-25 17:32:58,134 - datalake.catalog - WARNING - No partitions found for tbl_97a4aaf4_can1_obd2_s_m41_s01pid_m00\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "No partitions available for date filtering\n" ] } ], "source": [ "# Query with date range (if partitions available)\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " \n", " try:\n", " partitions = catalog.list_partitions(device_id, message)\n", " if partitions:\n", " start_date = partitions[0]\n", " end_date = partitions[-1] if len(partitions) > 1 else partitions[0]\n", " \n", " print(f\"Querying data from {start_date} to {end_date}...\")\n", " \n", " df_date = query.read_date_range(\n", " device_id=device_id,\n", " message=message,\n", " start_date=start_date,\n", " end_date=end_date,\n", " limit=1000\n", " )\n", " \n", " print(f\"✓ Loaded {len(df_date)} records\")\n", " display(df_date.head())\n", " else:\n", " print(\"No partitions available for date filtering\")\n", " except Exception as e:\n", " print(f\"Error querying date range: {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Time Series Analysis\n", "\n", "Analyze signals over time." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Available signal columns (2):\n", " - s01pid00_pidssupported_01_20\n", " - date_created\n" ] } ], "source": [ "# Get available signal columns\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " \n", " schema = catalog.get_schema(device_id, message)\n", " if schema:\n", " # Find signal columns (exclude timestamp and date)\n", " signal_cols = [\n", " col for col in schema.keys() \n", " if col not in ['t', 'date', 'timestamp']\n", " ]\n", " \n", " print(f\"Available signal columns ({len(signal_cols)}):\")\n", " for col in signal_cols[:10]:\n", " print(f\" - {col}\")\n", " if len(signal_cols) > 10:\n", " print(f\" ... and {len(signal_cols) - 10} more\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query time series for a specific signal\n", "if devices and messages and 'signal_cols' in locals() and signal_cols:\n", " device_id = devices[0]\n", " message = messages[0]\n", " signal_name = signal_cols[0] # Use first signal\n", " \n", " print(f\"Querying time series for {signal_name}...\")\n", " print(\"=\" * 60)\n", " \n", " try:\n", " df_ts = query.time_series_query(\n", " device_id=device_id,\n", " message=message,\n", " signal_name=signal_name,\n", " limit=10000 # Adjust based on your needs\n", " )\n", " \n", " if not df_ts.empty:\n", " # Convert timestamp to datetime\n", " if 't' in df_ts.columns:\n", " df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')\n", " \n", " print(f\"✓ Loaded {len(df_ts)} records\")\n", " print(f\"\\nTime range: {df_ts['timestamp'].min()} to {df_ts['timestamp'].max()}\")\n", " \n", " # Display sample\n", " display(df_ts[['timestamp', signal_name]].head(10))\n", " \n", " # Statistics\n", " print(f\"\\nStatistics for {signal_name}:\")\n", " print(f\" Mean: {df_ts[signal_name].mean():.2f}\")\n", " print(f\" Min: {df_ts[signal_name].min():.2f}\")\n", " print(f\" Max: {df_ts[signal_name].max():.2f}\")\n", " print(f\" Std: {df_ts[signal_name].std():.2f}\")\n", " else:\n", " print(\"No data returned\")\n", " \n", " except Exception as e:\n", " print(f\"✗ Error querying time series: {e}\")\n", " import traceback\n", " traceback.print_exc()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Plot time series (if data available)\n", "if 'df_ts' in locals() and not df_ts.empty and 'timestamp' in df_ts.columns:\n", " try:\n", " plt.figure(figsize=(14, 6))\n", " plt.plot(df_ts['timestamp'], df_ts[signal_name], linewidth=0.5, alpha=0.7)\n", " plt.title(f\"Time Series: {signal_name}\", fontsize=14, fontweight='bold')\n", " plt.xlabel('Time', fontsize=12)\n", " plt.ylabel(signal_name, fontsize=12)\n", " plt.grid(True, alpha=0.3)\n", " plt.xticks(rotation=45)\n", " plt.tight_layout()\n", " plt.show()\n", " \n", " # Histogram\n", " plt.figure(figsize=(10, 6))\n", " plt.hist(df_ts[signal_name], bins=50, edgecolor='black', alpha=0.7)\n", " plt.title(f\"Distribution: {signal_name}\", fontsize=14, fontweight='bold')\n", " plt.xlabel(signal_name, fontsize=12)\n", " plt.ylabel('Frequency', fontsize=12)\n", " plt.grid(True, alpha=0.3)\n", " plt.tight_layout()\n", " plt.show()\n", " \n", " except Exception as e:\n", " print(f\"Error plotting: {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Custom SQL Queries\n", "\n", "Execute custom SQL queries for advanced analysis." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Example: Get record counts per device/message\n", "if devices and messages:\n", " device_id = devices[0]\n", " message = messages[0]\n", " table_name = catalog.get_table_name(device_id, message)\n", " \n", " custom_sql = f\"\"\"\n", " SELECT \n", " COUNT(*) as record_count,\n", " MIN(t) as min_timestamp,\n", " MAX(t) as max_timestamp\n", " FROM {config.database_name}.{table_name}\n", " \"\"\"\n", " \n", " try:\n", " df_stats = query.execute_sql(custom_sql)\n", " print(f\"Statistics for {device_id}/{message}:\")\n", " display(df_stats)\n", " except Exception as e:\n", " print(f\"Error executing custom SQL: {e}\")\n", " import traceback\n", " traceback.print_exc()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Example: Aggregation query\n", "if devices and messages and 'signal_cols' in locals() and signal_cols:\n", " device_id = devices[0]\n", " message = messages[0]\n", " signal_name = signal_cols[0]\n", " \n", " try:\n", " df_agg = query.aggregate(\n", " device_id=device_id,\n", " message=message,\n", " aggregation=f\"\"\"\n", " COUNT(*) as count,\n", " AVG({signal_name}) as avg_{signal_name},\n", " MIN({signal_name}) as min_{signal_name},\n", " MAX({signal_name}) as max_{signal_name},\n", " STDDEV({signal_name}) as std_{signal_name}\n", " \"\"\",\n", " )\n", " \n", " print(f\"Aggregation for {signal_name}:\")\n", " display(df_agg)\n", " \n", " except Exception as e:\n", " print(f\"Error in aggregation: {e}\")\n", " import traceback\n", " traceback.print_exc()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "You've successfully:\n", "1. ✓ Connected to Athena\n", "2. ✓ Explored the data lake structure\n", "3. ✓ Queried sample data\n", "4. ✓ Analyzed time series\n", "5. ✓ Executed custom SQL queries\n", "\n", "### Next Steps\n", "\n", "- Modify the queries to explore your specific data\n", "- Add more visualizations\n", "- Perform statistical analysis\n", "- Export data for further analysis\n", "\n", "### Useful Commands\n", "\n", "```python\n", "# List all devices\n", "devices = catalog.list_devices()\n", "\n", "# List messages for a device\n", "messages = catalog.list_messages('device_id')\n", "\n", "# Get schema\n", "schema = catalog.get_schema('device_id', 'message_name')\n", "\n", "# Query data\n", "df = query.read_device_message('device_id', 'message_name', limit=1000)\n", "\n", "# Custom SQL\n", "df = query.execute_sql('SELECT * FROM database.table LIMIT 100')\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.18" } }, "nbformat": 4, "nbformat_minor": 2 }