Spaces:
Sleeping
Sleeping
File size: 6,538 Bytes
e869d90 29854ee e869d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | """
Example: Batch processing patterns for large-scale analysis using Athena.
This script demonstrates memory-efficient batch processing across
the entire data lake using SQL queries.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
from src.datalake.query import DataLakeQuery
from src.datalake.batch import BatchProcessor
import pandas as pd
def main():
"""Batch process data lake."""
# Setup
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
processor = BatchProcessor(query)
print("=" * 60)
print("Batch Processing Examples (Athena)")
print("=" * 60)
print()
# Example 1: Compute statistics across all data
print("Example 1: Compute statistics across all devices/messages")
print("-" * 60)
try:
stats = processor.aggregate_by_device_message(
aggregation_func=processor.compute_statistics,
message_filter=config.message_filter, # Optional filter
)
print(f"Processed {len(stats)} device(s):")
for device, messages in stats.items():
print(f"\n Device: {device}")
for message, metrics in messages.items():
print(f" Message: {message}")
print(f" Record count: {metrics.get('count', 0):,}")
# Show statistics for first numeric column found
for key, value in metrics.items():
if key != 'count' and '_mean' in key:
signal = key.replace('_mean', '')
print(f" {signal}:")
print(f" Mean: {value:.2f}")
print(f" Min: {metrics.get(f'{signal}_min', 'N/A')}")
print(f" Max: {metrics.get(f'{signal}_max', 'N/A')}")
break
except Exception as e:
print(f"Error in batch aggregation: {e}")
print()
# Example 2: Custom aggregation using SQL
print("Example 2: Custom SQL aggregation")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
table_name = catalog.get_table_name(device_id, message)
# Use SQL for aggregation
sql = f"""
SELECT
COUNT(*) as record_count,
MIN(t) as min_timestamp,
MAX(t) as max_timestamp
FROM {config.database_name}.{table_name}
"""
df_agg = query.execute_sql(sql)
print(f"Aggregation for {device_id}/{message}:")
print(df_agg)
except Exception as e:
print(f"Error in SQL aggregation: {e}")
print()
# Example 3: Export specific data
print("Example 3: Export data to CSV")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
output_path = f"{device_id}_{message}_export.csv"
processor.export_to_csv(
device_id=device_id,
message=message,
output_path=output_path,
limit=10000, # Limit for example
)
print(f"Exported to: {output_path}")
except Exception as e:
print(f"Error exporting data: {e}")
print()
# Example 4: Find anomalies using SQL
print("Example 4: Find anomalies using SQL")
print("-" * 60)
try:
devices = catalog.list_devices()
if devices:
device_id = devices[0]
messages = catalog.list_messages(device_id)
if messages:
message = messages[0]
schema = catalog.get_schema(device_id, message)
if schema:
signal_cols = [c for c in schema.keys() if c != 't' and c.lower() != 'date']
if signal_cols:
signal_name = signal_cols[0]
table_name = catalog.get_table_name(device_id, message)
# Use SQL to find outliers (3 standard deviations)
sql = f"""
WITH stats AS (
SELECT
AVG({signal_name}) as mean_val,
STDDEV({signal_name}) as std_val
FROM {config.database_name}.{table_name}
WHERE {signal_name} IS NOT NULL
)
SELECT t, {signal_name}
FROM {config.database_name}.{table_name}, stats
WHERE {signal_name} IS NOT NULL
AND ABS({signal_name} - mean_val) > 3 * std_val
ORDER BY ABS({signal_name} - mean_val) DESC
LIMIT 10
"""
anomalies = query.execute_sql(sql)
if not anomalies.empty:
print(f"Found {len(anomalies)} anomalies in {signal_name}")
print(anomalies.head())
else:
print("No anomalies found")
except Exception as e:
print(f"Error finding anomalies: {e}")
if __name__ == "__main__":
main()
|