CSS_EDA_Dashboard / src /examples /query_example.py
arash7920's picture
Upload 35 files
29854ee verified
"""
Example: Query and analyze data from the Athena data lake.
This script demonstrates how to read data for specific devices/messages,
perform time series queries, and filter by date ranges using SQL.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
from src.datalake.query import DataLakeQuery
import pandas as pd
def main():
"""Query and analyze data."""
# Setup
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
# Get first available device and message
try:
devices = catalog.list_devices()
if not devices:
print("No devices found in data lake")
return
device_id = devices[0]
messages = catalog.list_messages(device_id)
if not messages:
print(f"No messages found for device {device_id}")
return
message = messages[0]
except Exception as e:
print(f"Error discovering devices/messages: {e}")
return
print("=" * 60)
print("Querying Data Lake (Athena)")
print("=" * 60)
print(f"Device: {device_id}")
print(f"Message: {message}")
print()
# Example 1: Read all data for device/message
print("Example 1: Read all data")
print("-" * 60)
try:
df = query.read_device_message(
device_id=device_id,
message=message,
columns=["t"], # Only read timestamp initially to check structure
limit=100, # Limit for example
)
print(f"Loaded {len(df)} records")
if not df.empty:
print(f"Columns: {list(df.columns)}")
if 't' in df.columns:
print(f"Time range: {df['t'].min()} to {df['t'].max()} microseconds")
print(f"Sample data:")
print(df.head())
except Exception as e:
print(f"Error reading data: {e}")
print()
# Example 2: Read with date range
print("Example 2: Read with date range")
print("-" * 60)
try:
partitions = catalog.list_partitions(device_id, message)
if partitions:
start_date = partitions[0]
end_date = partitions[-1] if len(partitions) > 1 else partitions[0]
print(f"Date range: {start_date} to {end_date}")
df_date = query.read_date_range(
device_id=device_id,
message=message,
start_date=start_date,
end_date=end_date,
limit=100,
)
print(f"Loaded {len(df_date)} records for date range")
except Exception as e:
print(f"Error reading date range: {e}")
print()
# Example 3: Time series query (if signal columns exist)
print("Example 3: Time series query")
print("-" * 60)
try:
schema = catalog.get_schema(device_id, message)
if schema:
# Find first signal column (not 't')
signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date']
if signal_cols:
signal_name = signal_cols[0]
print(f"Querying signal: {signal_name}")
df_ts = query.time_series_query(
device_id=device_id,
message=message,
signal_name=signal_name,
limit=100,
)
if not df_ts.empty:
print(f"Time series: {len(df_ts)} records")
# Convert timestamp to datetime for display
if 't' in df_ts.columns:
df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')
print(df_ts[['timestamp', signal_name]].head())
# Basic statistics
print(f"\nStatistics for {signal_name}:")
print(f" Mean: {df_ts[signal_name].mean():.2f}")
print(f" Min: {df_ts[signal_name].min():.2f}")
print(f" Max: {df_ts[signal_name].max():.2f}")
except Exception as e:
print(f"Error in time series query: {e}")
print()
# Example 4: Custom SQL query
print("Example 4: Custom SQL query")
print("-" * 60)
try:
table_name = catalog.get_table_name(device_id, message)
custom_sql = f"""
SELECT COUNT(*) as record_count,
MIN(t) as min_time,
MAX(t) as max_time
FROM {config.database_name}.{table_name}
LIMIT 1
"""
df_custom = query.execute_sql(custom_sql)
print("Custom query results:")
print(df_custom)
except Exception as e:
print(f"Error in custom SQL query: {e}")
print()
# Example 5: Aggregation query
print("Example 5: Aggregation query")
print("-" * 60)
try:
partitions = catalog.list_partitions(device_id, message)
if partitions:
# Filter by date using path-based extraction
# Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet
target_date = partitions[0]
date_parts = target_date.split('-')
if len(date_parts) == 3:
year, month, day = date_parts
# Use path-based filtering consistent with data architecture
path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)"
path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)"
path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)"
where_clause = f"{path_year} = {year} AND {path_month} = {month} AND {path_day} = {day}"
else:
where_clause = None
df_agg = query.aggregate(
device_id=device_id,
message=message,
aggregation="COUNT(*) as count, AVG(t) as avg_time",
where_clause=where_clause,
)
print("Aggregation results:")
print(df_agg)
except Exception as e:
print(f"Error in aggregation query: {e}")
if __name__ == "__main__":
main()