|
|
import os |
|
|
import yaml |
|
|
import logging |
|
|
from typing import Dict, List |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
def load_extract_config(config_name: str) -> Dict: |
|
|
"""Load a YAML configuration file from the configs directory. |
|
|
|
|
|
Args: |
|
|
config_name (str): Name of the config file (e.g., 'model_config.yml'). |
|
|
|
|
|
Returns: |
|
|
Dict: Parsed configuration dictionary. |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the configuration file does not exist. |
|
|
ValueError: If config_name is empty or not a string. |
|
|
""" |
|
|
if not isinstance(config_name, str) or not config_name.strip(): |
|
|
raise ValueError("config_name must be a non-empty string") |
|
|
|
|
|
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'configs', config_name)) |
|
|
logging.debug(f"Attempting to load config from: {config_path}") |
|
|
|
|
|
if not os.path.exists(config_path): |
|
|
logging.error(f"Configuration file not found: {config_path}") |
|
|
raise FileNotFoundError(f"Configuration file not found: {config_path}") |
|
|
|
|
|
try: |
|
|
with open(config_path, 'r') as f: |
|
|
config = yaml.safe_load(f) |
|
|
if config is None: |
|
|
logging.warning(f"Configuration file {config_name} is empty") |
|
|
return {} |
|
|
logging.info(f"Successfully loaded config: {config_name}") |
|
|
return config |
|
|
except yaml.YAMLError as e: |
|
|
logging.error(f"Failed to parse YAML in {config_name}: {e}") |
|
|
raise |
|
|
|
|
|
def get_parquet_file_names() -> List[str]: |
|
|
"""Retrieve Parquet file names from the extract_data.yml configuration. |
|
|
|
|
|
Returns: |
|
|
List[str]: List of Parquet file names derived from CSV file names. |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If extract_data.yml is missing. |
|
|
ValueError: If no files are specified in the configuration. |
|
|
""" |
|
|
config = load_extract_config('extract_data.yml') |
|
|
files = config.get('files', []) |
|
|
if not files: |
|
|
logging.error("No files specified in extract_data.yml") |
|
|
raise ValueError("No files specified in extract_data.yml") |
|
|
|
|
|
parquet_files = [f.replace(".csv", ".parquet") for f in files] |
|
|
logging.debug(f"Derived Parquet file names: {parquet_files}") |
|
|
return parquet_files |
|
|
|
|
|
def load_pipeline_config() -> Dict: |
|
|
"""Load pipeline configuration from pipeline_config.yml. |
|
|
|
|
|
Returns: |
|
|
Dict: Pipeline configuration dictionary. |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If pipeline_config.yml is missing. |
|
|
""" |
|
|
config = load_extract_config('pipeline_config.yml') |
|
|
logging.debug(f"Pipeline config loaded: {config}") |
|
|
return config |
|
|
|
|
|
def define_server_filenames(**kwargs) -> List[str]: |
|
|
"""Extract base filenames from client file paths using Airflow XCom. |
|
|
|
|
|
Args: |
|
|
kwargs: Airflow task instance arguments containing 'ti' for XCom. |
|
|
|
|
|
Returns: |
|
|
List[str]: List of base filenames. |
|
|
|
|
|
Raises: |
|
|
KeyError: If 'ti' is not provided in kwargs. |
|
|
ValueError: If no files are retrieved from XCom. |
|
|
""" |
|
|
if 'ti' not in kwargs: |
|
|
logging.error("Task instance 'ti' not provided in kwargs") |
|
|
raise KeyError("Task instance 'ti' not provided in kwargs") |
|
|
|
|
|
ti = kwargs['ti'] |
|
|
client_files = ti.xcom_pull(task_ids='download_binance_csv') |
|
|
if client_files is None: |
|
|
logging.error("No files retrieved from XCom for task 'download_binance_csv'") |
|
|
raise ValueError("No files retrieved from XCom for task 'download_binance_csv'") |
|
|
|
|
|
if not isinstance(client_files, list): |
|
|
client_files = [client_files] |
|
|
|
|
|
server_files = [os.path.basename(p) for p in client_files] |
|
|
logging.debug(f"Extracted server filenames: {server_files}") |
|
|
return server_files |