CSS_EDA_Dashboard / src /examples /batch_example.py
arash7920's picture
Upload 35 files
29854ee verified
"""
Example: Batch processing patterns for large-scale analysis using Athena.
This script demonstrates memory-efficient batch processing across
the entire data lake using SQL queries.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
from src.datalake.query import DataLakeQuery
from src.datalake.batch import BatchProcessor
import pandas as pd
def main():
"""Batch process data lake."""
# Setup
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
processor = BatchProcessor(query)
print("=" * 60)
print("Batch Processing Examples (Athena)")
print("=" * 60)
print()
# Example 1: Compute statistics across all data
print("Example 1: Compute statistics across all devices/messages")
print("-" * 60)
try:
stats = processor.aggregate_by_device_message(
aggregation_func=processor.compute_statistics,
message_filter=config.message_filter, # Optional filter
)
print(f"Processed {len(stats)} device(s):")
for device, messages in stats.items():
print(f"\n Device: {device}")
for message, metrics in messages.items():
print(f" Message: {message}")
print(f" Record count: {metrics.get('count', 0):,}")
# Show statistics for first numeric column found
for key, value in metrics.items():
if key != 'count' and '_mean' in key:
signal = key.replace('_mean', '')
print(f" {signal}:")
print(f" Mean: {value:.2f}")
print(f" Min: {metrics.get(f'{signal}_min', 'N/A')}")
print(f" Max: {metrics.get(f'{signal}_max', 'N/A')}")
break
except Exception as e:
print(f"Error in batch aggregation: {e}")
print()
# Example 2: Custom aggregation using SQL
print("Example 2: Custom SQL aggregation")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
table_name = catalog.get_table_name(device_id, message)
# Use SQL for aggregation
sql = f"""
SELECT
COUNT(*) as record_count,
MIN(t) as min_timestamp,
MAX(t) as max_timestamp
FROM {config.database_name}.{table_name}
"""
df_agg = query.execute_sql(sql)
print(f"Aggregation for {device_id}/{message}:")
print(df_agg)
except Exception as e:
print(f"Error in SQL aggregation: {e}")
print()
# Example 3: Export specific data
print("Example 3: Export data to CSV")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
output_path = f"{device_id}_{message}_export.csv"
processor.export_to_csv(
device_id=device_id,
message=message,
output_path=output_path,
limit=10000, # Limit for example
)
print(f"Exported to: {output_path}")
except Exception as e:
print(f"Error exporting data: {e}")
print()
# Example 4: Find anomalies using SQL
print("Example 4: Find anomalies using SQL")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
schema = catalog.get_schema(device_id, message)
if schema:
signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date']
if signal_cols:
signal_name = signal_cols[0]
table_name = catalog.get_table_name(device_id, message)
# Use SQL to find outliers (3 standard deviations)
sql = f"""
WITH stats AS (
SELECT
AVG({signal_name}) as mean_val,
STDDEV({signal_name}) as std_val
FROM {config.database_name}.{table_name}
WHERE {signal_name} IS NOT NULL
)
SELECT t, {signal_name}
FROM {config.database_name}.{table_name}, stats
WHERE {signal_name} IS NOT NULL
AND ABS({signal_name} - mean_val) > 3 * std_val
ORDER BY ABS({signal_name} - mean_val) DESC
LIMIT 10
"""
anomalies = query.execute_sql(sql)
if not anomalies.empty:
print(f"Found {len(anomalies)} anomalies in {signal_name}")
print(anomalies.head())
else:
print("No anomalies found")
except Exception as e:
print(f"Error finding anomalies: {e}")
if __name__ == "__main__":
main()