Spaces:
Sleeping
Sleeping
| """ | |
| Data lake catalog for discovering structure and metadata using AWS Athena/Glue. | |
| Provides methods to explore the data lake organization using Athena metadata: | |
| - List devices, messages, and dates from table structure | |
| - Get schemas for message/rule tables | |
| - Understand data availability | |
| """ | |
| from typing import List, Dict, Optional | |
| import re | |
| from .athena import AthenaQuery | |
| from .config import DataLakeConfig | |
| from .logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class DataLakeCatalog: | |
| """ | |
| Catalog for exploring data lake structure using Athena/Glue. | |
| Assumes Athena database contains tables organized by device and message. | |
| Table naming convention: {device_id}_{message_rule} or similar | |
| """ | |
| def __init__(self, athena_query: AthenaQuery, config: DataLakeConfig): | |
| """ | |
| Initialize catalog. | |
| Args: | |
| athena_query: AthenaQuery instance | |
| config: DataLakeConfig instance | |
| """ | |
| self.athena = athena_query | |
| self.config = config | |
| self._cache: Dict[str, Dict] = {} | |
| logger.info(f"Initialized catalog for database: {config.database_name}") | |
| def list_tables(self) -> List[str]: | |
| """ | |
| List all tables in the database. | |
| Returns: | |
| Sorted list of table names | |
| """ | |
| tables = self.athena.list_tables() | |
| logger.info(f"Found {len(tables)} tables in database") | |
| return sorted(tables) | |
| def list_devices(self, device_filter: Optional[str] = None) -> List[str]: | |
| """ | |
| List all device IDs by extracting from table names. | |
| Args: | |
| device_filter: Optional regex pattern to filter devices | |
| Returns: | |
| Sorted list of device IDs | |
| Note: | |
| Extracts device IDs from table names. Assumes table naming like: | |
| - {device_id}_{message_rule} | |
| - {device_id}__{message_rule} | |
| - Or similar pattern | |
| """ | |
| tables = self.list_tables() | |
| devices = set() | |
| for table in tables: | |
| # Try common patterns: device_message, device__message, device.message | |
| parts = re.split(r'_', table, maxsplit=2) | |
| if len(parts) >= 1: | |
| device = parts[1] | |
| if device == 'aggregations': # skip aggregations table | |
| continue | |
| if device_filter is None or re.search(device_filter, device): | |
| devices.add(device) | |
| result = sorted(devices) | |
| logger.info(f"Found {len(result)} device(s)") | |
| return result | |
| def list_messages(self, device_id: str, message_filter: Optional[str] = None) -> List[str]: | |
| """ | |
| List all message/rule names for a device. | |
| Args: | |
| device_id: Device identifier | |
| message_filter: Optional regex pattern to filter messages | |
| Returns: | |
| Sorted list of message/rule names | |
| Note: | |
| Extracts message names from table names. Assumes table naming like: | |
| - prefix_{device_id}_{message_rule} | |
| - Or {device_id}_{message_rule} | |
| """ | |
| tables = self.list_tables() | |
| messages = set() | |
| for table in tables: | |
| # Split table name by underscore (consistent with list_devices) | |
| parts = re.split(r'_', table, maxsplit=2) | |
| # Try pattern: prefix_device_message | |
| if len(parts) >= 3: | |
| table_device = parts[1] | |
| if table_device == device_id: | |
| message = parts[2] | |
| if message_filter is None or re.search(message_filter, message): | |
| messages.add(message) | |
| # Try pattern: device_message (no prefix) | |
| elif len(parts) >= 2: | |
| table_device = parts[0] | |
| if table_device == device_id: | |
| message = parts[1] | |
| if message_filter is None or re.search(message_filter, message): | |
| messages.add(message) | |
| result = sorted(messages) | |
| logger.info(f"Found {len(result)} messages for device {device_id}") | |
| return result | |
| def get_table_name(self, device_id: str, message: str) -> str: | |
| """ | |
| Get table name for device/message combination. | |
| Args: | |
| device_id: Device identifier | |
| message: Message/rule name | |
| Returns: | |
| Table name (tries common patterns) | |
| Raises: | |
| ValueError: If table not found | |
| """ | |
| tables = self.list_tables() | |
| # Try patterns consistent with list_devices/list_messages | |
| # Pattern 1: prefix_device_message | |
| for table in tables: | |
| parts = re.split(r'_', table, maxsplit=2) | |
| if len(parts) >= 3: | |
| if parts[1] == device_id and parts[2] == message: | |
| return table | |
| # Pattern 2: device_message (no prefix) | |
| for table in tables: | |
| parts = re.split(r'_', table, maxsplit=1) | |
| if len(parts) >= 2: | |
| if parts[0] == device_id and parts[1] == message: | |
| return table | |
| # Fallback: try exact matches | |
| patterns = [ | |
| f"{device_id}_{message}", | |
| f"{device_id}__{message}", | |
| f"{device_id}_{message}".lower(), | |
| f"{device_id}__{message}".lower(), | |
| ] | |
| for pattern in patterns: | |
| if pattern in tables: | |
| return pattern | |
| raise ValueError( | |
| f"Table not found for {device_id}/{message}. " | |
| f"Available tables: {tables[:10]}..." | |
| ) | |
| def get_schema(self, device_id: str, message: str) -> Optional[Dict[str, str]]: | |
| """ | |
| Get schema for a message table. | |
| Args: | |
| device_id: Device identifier | |
| message: Message/rule name | |
| Returns: | |
| Dict mapping column names to data types, or None if not found | |
| """ | |
| cache_key = f"{device_id}/{message}" | |
| if cache_key in self._cache: | |
| logger.debug(f"Using cached schema for {cache_key}") | |
| return self._cache[cache_key] | |
| try: | |
| table_name = self.get_table_name(device_id, message) | |
| schema_df = self.athena.describe_table(table_name) | |
| if schema_df.empty: | |
| logger.warning(f"No schema found for {device_id}/{message}") | |
| return None | |
| schema_dict = { | |
| row['column_name']: row['data_type'] | |
| for _, row in schema_df.iterrows() | |
| } | |
| self._cache[cache_key] = schema_dict | |
| logger.info(f"Schema for {cache_key}: {len(schema_dict)} columns") | |
| return schema_dict | |
| except Exception as e: | |
| logger.error(f"Failed to get schema for {device_id}/{message}: {e}") | |
| return None | |
| def list_partitions(self, device_id: str, message: str) -> List[str]: | |
| """ | |
| List partition values (dates) for a table. | |
| Args: | |
| device_id: Device identifier | |
| message: Message/rule name | |
| Returns: | |
| List of partition values (dates) in YYYY-MM-DD format | |
| Note: | |
| Handles hierarchical partitioning format: year=YYYY/month=MM/day=DD | |
| Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet | |
| """ | |
| try: | |
| table_name = self.get_table_name(device_id, message) | |
| # Query partition information | |
| # query = f"SHOW PARTITIONS {self.config.database_name}.{table_name}" | |
| query = f""" | |
| WITH files AS ( | |
| SELECT DISTINCT "$path" AS p | |
| FROM {self.config.database_name}.{table_name} | |
| WHERE "$path" LIKE '%.parquet' | |
| ), | |
| parts AS ( | |
| SELECT | |
| try_cast(element_at(split(p, '/'), -4) AS INTEGER) AS year, | |
| try_cast(element_at(split(p, '/'), -3) AS INTEGER) AS month, | |
| try_cast(element_at(split(p, '/'), -2) AS INTEGER) AS day | |
| FROM files | |
| ) | |
| SELECT DISTINCT year, month, day | |
| FROM parts | |
| WHERE year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL | |
| ORDER BY year DESC, month DESC, day DESC | |
| """ | |
| df = self.athena.query_to_dataframe(query) | |
| if df.empty: | |
| logger.warning(f"No partitions found for {table_name}") | |
| return [] | |
| # Extract date from partition string | |
| # Format: YYYY-MM-DD | |
| dates = [] | |
| for _, row in df.iterrows(): | |
| dates.append(f'{row.iloc[0]}-{row.iloc[1]}-{row.iloc[2]:02d}') | |
| logger.info(f"Found {len(dates)} partitions for {table_name}") | |
| return sorted(set(dates)) | |
| except Exception as e: | |
| logger.warning(f"Could not list partitions for {device_id}/{message}: {e}") | |
| # Table might not be partitioned or query might have failed | |
| return [] | |
| def clear_cache(self) -> None: | |
| """Clear schema cache.""" | |
| self._cache.clear() | |
| logger.debug("Schema cache cleared") | |