File size: 3,746 Bytes
2eee82e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import yaml
import logging
from typing import Dict, List

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_extract_config(config_name: str) -> Dict:
    """Load a YAML configuration file from the configs directory.

    Args:
        config_name (str): Name of the config file (e.g., 'model_config.yml').

    Returns:
        Dict: Parsed configuration dictionary.

    Raises:
        FileNotFoundError: If the configuration file does not exist.
        ValueError: If config_name is empty or not a string.
    """
    if not isinstance(config_name, str) or not config_name.strip():
        raise ValueError("config_name must be a non-empty string")

    config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'configs', config_name))
    logging.debug(f"Attempting to load config from: {config_path}")

    if not os.path.exists(config_path):
        logging.error(f"Configuration file not found: {config_path}")
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        if config is None:
            logging.warning(f"Configuration file {config_name} is empty")
            return {}
        logging.info(f"Successfully loaded config: {config_name}")
        return config
    except yaml.YAMLError as e:
        logging.error(f"Failed to parse YAML in {config_name}: {e}")
        raise

def get_parquet_file_names() -> List[str]:
    """Retrieve Parquet file names from the extract_data.yml configuration.

    Returns:
        List[str]: List of Parquet file names derived from CSV file names.

    Raises:
        FileNotFoundError: If extract_data.yml is missing.
        ValueError: If no files are specified in the configuration.
    """
    config = load_extract_config('extract_data.yml')
    files = config.get('files', [])
    if not files:
        logging.error("No files specified in extract_data.yml")
        raise ValueError("No files specified in extract_data.yml")
    
    parquet_files = [f.replace(".csv", ".parquet") for f in files]
    logging.debug(f"Derived Parquet file names: {parquet_files}")
    return parquet_files

def load_pipeline_config() -> Dict:
    """Load pipeline configuration from pipeline_config.yml.

    Returns:
        Dict: Pipeline configuration dictionary.

    Raises:
        FileNotFoundError: If pipeline_config.yml is missing.
    """
    config = load_extract_config('pipeline_config.yml')
    logging.debug(f"Pipeline config loaded: {config}")
    return config

def define_server_filenames(**kwargs) -> List[str]:
    """Extract base filenames from client file paths using Airflow XCom.

    Args:
        kwargs: Airflow task instance arguments containing 'ti' for XCom.

    Returns:
        List[str]: List of base filenames.

    Raises:
        KeyError: If 'ti' is not provided in kwargs.
        ValueError: If no files are retrieved from XCom.
    """
    if 'ti' not in kwargs:
        logging.error("Task instance 'ti' not provided in kwargs")
        raise KeyError("Task instance 'ti' not provided in kwargs")

    ti = kwargs['ti']
    client_files = ti.xcom_pull(task_ids='download_binance_csv')
    if client_files is None:
        logging.error("No files retrieved from XCom for task 'download_binance_csv'")
        raise ValueError("No files retrieved from XCom for task 'download_binance_csv'")

    if not isinstance(client_files, list):
        client_files = [client_files]
    
    server_files = [os.path.basename(p) for p in client_files]
    logging.debug(f"Extracted server filenames: {server_files}")
    return server_files