Spaces:
Sleeping
Sleeping
File size: 6,996 Bytes
e869d90 29854ee e869d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """
Example: Query and analyze data from the Athena data lake.
This script demonstrates how to read data for specific devices/messages,
perform time series queries, and filter by date ranges using SQL.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
from src.datalake.query import DataLakeQuery
import pandas as pd
def main():
"""Query and analyze data."""
# Setup
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
# Get first available device and message
try:
devices = catalog.list_devices()
if not devices:
print("No devices found in data lake")
return
device_id = devices[0]
messages = catalog.list_messages(device_id)
if not messages:
print(f"No messages found for device {device_id}")
return
message = messages[0]
except Exception as e:
print(f"Error discovering devices/messages: {e}")
return
print("=" * 60)
print("Querying Data Lake (Athena)")
print("=" * 60)
print(f"Device: {device_id}")
print(f"Message: {message}")
print()
# Example 1: Read all data for device/message
print("Example 1: Read all data")
print("-" * 60)
try:
df = query.read_device_message(
device_id=device_id,
message=message,
columns=["t"], # Only read timestamp initially to check structure
limit=100, # Limit for example
)
print(f"Loaded {len(df)} records")
if not df.empty:
print(f"Columns: {list(df.columns)}")
if 't' in df.columns:
print(f"Time range: {df['t'].min()} to {df['t'].max()} microseconds")
print(f"Sample data:")
print(df.head())
except Exception as e:
print(f"Error reading data: {e}")
print()
# Example 2: Read with date range
print("Example 2: Read with date range")
print("-" * 60)
try:
partitions = catalog.list_partitions(device_id, message)
if partitions:
start_date = partitions[0]
end_date = partitions[-1] if len(partitions) > 1 else partitions[0]
print(f"Date range: {start_date} to {end_date}")
df_date = query.read_date_range(
device_id=device_id,
message=message,
start_date=start_date,
end_date=end_date,
limit=100,
)
print(f"Loaded {len(df_date)} records for date range")
except Exception as e:
print(f"Error reading date range: {e}")
print()
# Example 3: Time series query (if signal columns exist)
print("Example 3: Time series query")
print("-" * 60)
try:
schema = catalog.get_schema(device_id, message)
if schema:
# Find first signal column (not 't')
signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date']
if signal_cols:
signal_name = signal_cols[0]
print(f"Querying signal: {signal_name}")
df_ts = query.time_series_query(
device_id=device_id,
message=message,
signal_name=signal_name,
limit=100,
)
if not df_ts.empty:
print(f"Time series: {len(df_ts)} records")
# Convert timestamp to datetime for display
if 't' in df_ts.columns:
df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')
print(df_ts[['timestamp', signal_name]].head())
# Basic statistics
print(f"\nStatistics for {signal_name}:")
print(f" Mean: {df_ts[signal_name].mean():.2f}")
print(f" Min: {df_ts[signal_name].min():.2f}")
print(f" Max: {df_ts[signal_name].max():.2f}")
except Exception as e:
print(f"Error in time series query: {e}")
print()
# Example 4: Custom SQL query
print("Example 4: Custom SQL query")
print("-" * 60)
try:
table_name = catalog.get_table_name(device_id, message)
custom_sql = f"""
SELECT COUNT(*) as record_count,
MIN(t) as min_time,
MAX(t) as max_time
FROM {config.database_name}.{table_name}
LIMIT 1
"""
df_custom = query.execute_sql(custom_sql)
print("Custom query results:")
print(df_custom)
except Exception as e:
print(f"Error in custom SQL query: {e}")
print()
# Example 5: Aggregation query
print("Example 5: Aggregation query")
print("-" * 60)
try:
partitions = catalog.list_partitions(device_id, message)
if partitions:
# Filter by date using path-based extraction
# Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet
target_date = partitions[0]
date_parts = target_date.split('-')
if len(date_parts) == 3:
year, month, day = date_parts
# Use path-based filtering consistent with data architecture
path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)"
path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)"
path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)"
where_clause = f"{path_year} = {year} AND {path_month} = {month} AND {path_day} = {day}"
else:
where_clause = None
df_agg = query.aggregate(
device_id=device_id,
message=message,
aggregation="COUNT(*) as count, AVG(t) as avg_time",
where_clause=where_clause,
)
print("Aggregation results:")
print(df_agg)
except Exception as e:
print(f"Error in aggregation query: {e}")
if __name__ == "__main__":
main()
|