""" This script provides functionalities to download financial market data and save it in a structured, partitioned Parquet format. Data is fetched from MetaTrader 5 (MT5) for specified symbols and date ranges, with options to handle existing files intelligently. It includes a user-friendly GUI prompt to manage overwriting existing data, robust logging with 'loguru', and secure credential management using environment variables. The script is designed to be run as a standalone application for data acquisition, ensuring that the account used for downloading matches the account used for loading data later. FUntionality is provided for loading downloaded files into a DataFrame for analysis, with options to specify columns and date ranges. The script is optimized for memory usage and provides a clear directory structure for easy data management. Features: - Downloads data for a given list of symbols and a date range. - Organizes saved data into a clear directory structure: path/symbol/year/month.parquet - Implements a user-friendly, timed GUI prompt to ask for overwriting existing files. - Uses 'loguru' for robust logging to both console and a file. - Secures credentials using environment variables. - Verifies account consistency for both downloading and loading data. - Designed to be run as a standalone script for data acquisition. """ import json import os import sys from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path import numpy as np import pandas as pd import MetaTrader5 as mt5 from dask import dataframe as dd from dotenv import load_dotenv from loguru import logger from tqdm import tqdm from ..util.misc import date_conversion, is_first_weekday, is_last_weekday, log_df_info from .clean_data import clean_tick_data # --- Credential and Login Management --- def get_credentials_from_env(account): """ Retrieves MT5 credentials from environment variables. Args: account (str): The account name (e.g., 'MyAccount'). Returns: tuple: (login, password, server) or (None, None, None) if not found. """ load_dotenv() # Load environment variables from .env file if present prefix = f"MT5_ACCOUNT_{account.upper()}" login = os.environ.get(f"{prefix}_LOGIN") password = os.environ.get(f"{prefix}_PASSWORD") server = os.environ.get(f"{prefix}_SERVER") if not all([login, password, server]): logger.error( f"Missing one or more environment variables for account '{account}'." ) logger.error( f"Please set {prefix}_LOGIN, {prefix}_PASSWORD, and {prefix}_SERVER." ) return None, None, None if login.isnumeric(): login = int(login) return login, password, server def login_mt5(account, timeout=60000, verbose=True): """ Logs in to a MetaTrader5 account using credentials from environment variables. Args: account (str): Account name to log in to. timeout (int): Connection timeout in milliseconds. verbose (bool): Whether to print detailed connection information. Returns: str: The account name if login is successful, otherwise None. """ import MetaTrader5 as mt5 logger.info(f"Attempting to log in to MT5 with account: {account}") login, password, server = get_credentials_from_env(account) if not login: return None if not mt5.initialize( login=login, password=password, server=server, timeout=timeout ): logger.error( f"MT5 initialize() failed for account {account}. Error: {mt5.last_error()}" ) mt5.shutdown() return logger.success(f"Successfully logged in to MT5 as {account}.") if verbose: logger.info(f"MT5 Version: {mt5.version()}") terminal_info = mt5.terminal_info() if terminal_info: logger.info(f"Connected to {terminal_info.name} at {terminal_info.path}") else: logger.warning("Could not retrieve terminal info.") return account # --- Data Validation and Verification --- def verify_or_create_account_info(data_path, current_account_name): """ Checks if the data directory is associated with the correct account. If no account info exists, it creates it. Args: data_path (Path): The root path of the data directory. current_account_name (str): The name of the account currently in use. Returns: bool: True if the account is verified, False otherwise. """ account_info_file = data_path / "account_info.json" current_account_name = current_account_name.upper() if account_info_file.exists(): try: with open(account_info_file, "r") as f: stored_info = json.load(f) stored_name = stored_info.get("account_name") if stored_name and stored_name != current_account_name: logger.error( f"Account Mismatch! This directory ('{data_path.name}') is for account '{stored_name}'." ) logger.error( f"Current operation is for account '{current_account_name}'. Aborting to prevent data errors." ) return False elif not stored_name: # File exists but is malformed, so we fix it. logger.warning( "Account info file is malformed. Overwriting with current account." ) with open(account_info_file, "w") as f: json.dump({"account_name": current_account_name}, f, indent=4) except json.JSONDecodeError: logger.warning( f"Could not read account info file. Overwriting with current account: '{current_account_name}'." ) with open(account_info_file, "w") as f: json.dump({"account_name": current_account_name}, f, indent=4) else: logger.info( f"First time use for this directory. Associating it with account '{current_account_name}'." ) with open(account_info_file, "w") as f: json.dump({"account_name": current_account_name}, f, indent=4) return True # --- Data Fetching and Saving --- def get_ticks(symbol, start_date, end_date, datetime_index=True, verbose=True): """ Downloads tick data from the MT5 terminal for a given period. Args: symbol (str): The financial instrument symbol (e.g., 'EURUSD'). start_date (pd.Timestamp): The timezone-aware start date for the data range. end_date (pd.Timestamp): The timezone-aware end date for the data range. datetime_index (bool): Set 'time' column to DatetimeIndex. Returns: pd.DataFrame: A DataFrame containing the tick data, or an empty DataFrame if no data is found or an error occurs. """ import MetaTrader5 as mt5 if not mt5.terminal_info(): # Check if connection is still active logger.error("MT5 connection lost. Cannot download data.") return pd.DataFrame() try: start_date, end_date = date_conversion(start_date, end_date) mt5.symbol_select(symbol, True) ticks = mt5.copy_ticks_range(symbol, start_date, end_date, mt5.COPY_TICKS_ALL) if ticks is None or len(ticks) == 0: logger.warning( f"No tick data returned for {symbol} from {start_date.date()} to {end_date.date()}." ) return pd.DataFrame() df = pd.DataFrame(ticks) df["time"] = pd.to_datetime(df["time_msc"], unit="ms", utc=True) df.drop(columns=["time_msc"], inplace=True) if datetime_index: df.set_index("time", inplace=True) # Keep only columns with meaningful data df = df.loc[:, df.any()] # Optimize memory usage for col in ["bid", "ask"]: if col in df.columns: df[col] = df[col].astype("float32") if verbose: log_df_info(df) return df except Exception as e: logger.error(f"An error occurred while getting ticks for {symbol}: {e}") return pd.DataFrame() def get_bars( symbol, timeframe, start_date, end_date, datetime_index=True, verbose=True ): """ Downloads bar (OHLCV) data from the MT5 terminal for a given period. Args: symbol (str): The financial instrument symbol (e.g., 'EURUSD'). timeframe (int): MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1, mt5.TIMEFRAME_H1). start_date (pd.Timestamp): Timezone-aware start date. end_date (pd.Timestamp): Timezone-aware end date. datetime_index (bool): Set 'time' column to DatetimeIndex. verbose (bool): Print DataFrame info. Returns: pd.DataFrame: A DataFrame containing OHLCV data, or empty if no data/error. """ import MetaTrader5 as mt5 if not mt5.terminal_info(): logger.error("MT5 connection lost. Cannot download data.") return pd.DataFrame() try: start_date, end_date = date_conversion(start_date, end_date) timeframe = getattr(mt5, f"TIMEFRAME_{timeframe}") mt5.symbol_select(symbol, True) bars = mt5.copy_rates_range(symbol, timeframe, start_date, end_date) if bars is None or len(bars) == 0: logger.warning( f"No bar data returned for {symbol} from {start_date.date()} to {end_date.date()}." ) return pd.DataFrame() df = pd.DataFrame(bars) df["time"] = pd.to_datetime(df["time"], unit="s", utc=True) if datetime_index: df.set_index("time", inplace=True) # Optimize memory usage for col in [ "open", "high", "low", "close", "tick_volume", "spread", "real_volume", ]: if col in df.columns: df[col] = df[col].astype("float32") if verbose: log_df_info(df) return df except Exception as e: logger.error(f"An error occurred while getting bars for {symbol}: {e}") return pd.DataFrame() def process_symbol(symbol, start_dt, end_dt, data_path, account_name): """Worker function to download data for a single symbol.""" try: login_mt5(account_name) # Each worker needs its own login except Exception as e: return {symbol: f"login_failed: {e}"} symbol_path = data_path / symbol # Generate month starts for the range dates_from = pd.date_range( start=start_dt.replace(day=1), end=end_dt, freq="MS", tz="UTC" ) # Generate month ends dates_to = [] for d in dates_from: # End of the current month m_end = (d + pd.offsets.MonthEnd(0)).replace(hour=23, minute=59, second=59) # But don't go past the global end_dt dates_to.append(min(m_end, end_dt)) # Adjust the first start date if it's after the month start dates_from_list = list(dates_from) if dates_from_list: dates_from_list[0] = max(dates_from_list[0], start_dt) missing_data = [] for start, end in zip(dates_from_list, dates_to): year_path = symbol_path / str(start.year) year_path.mkdir(parents=True, exist_ok=True) file = year_path / f"month-{start.month:02d}.parquet" log_msg_prefix = f"{symbol} -> Month {start.strftime('%Y-%m')}..." if file.exists(): df0 = pd.read_parquet(file) if not df0.empty: first, start = [x.date() for x in df0.index[[0, -1]]] if is_first_weekday(first) and is_last_weekday(start): logger.info(f"{log_msg_prefix} Exists—Skipping download") continue else: logger.info( f"{log_msg_prefix} Exists—Appending from {start} to {end}" ) else: df0 = pd.DataFrame() df1 = get_ticks(symbol, start, end, verbose=False) df = pd.concat([df0, df1]) if not df.empty: df = clean_tick_data(df) df.to_parquet(file, engine="pyarrow", compression="zstd") logger.success(f"{log_msg_prefix} Saved {len(df):,} rows") else: logger.warning(f"{log_msg_prefix} No data found") missing_data.append(start.strftime("%Y-%m")) try: year_path.rmdir() except Exception as e: logger.error(f"Could not remove empty directory {year_path}: {e}") return {symbol: missing_data} def save_data_to_parquet(symbols, start_date, end_date, account_name, path=None): """ Downloads and saves tick data to a partitioned Parquet structure. Args: symbols (Union[str, list, tuple]): A single symbol or a collection of symbols to download. start_date (Union[str, dt, pd.Timestamp]): The start date for the data range. end_date (Union[str, dt, pd.Timestamp]): The end date for the data range. account_name (str): The name of the account used for the download. path (Union[str, Path]): The root folder where data will be saved. Returns: None """ data_path = Path(path) if path is not None else Path().home() / "tick_data_parquet" data_path.mkdir(parents=True, exist_ok=True) date_range = date_conversion(start_date, end_date) if not date_range: return start_dt, end_dt = date_range if isinstance(symbols, str): symbols = [symbols] missing_data = {} for symbol in tqdm(symbols, desc="Downloading symbols"): try: result = process_symbol(symbol, start_dt, end_dt, data_path, account_name) missing_data.update(result) except Exception as e: logger.critical(f"Worker for {symbol} failed: {e}") # Summary logging logger.info("Download process finished.") if missing_data and any(missing_data.values()): logger.warning("Missing data summary:") for symbol, months in missing_data.items(): logger.warning(f" - {symbol}: {', '.join(months)}") logger.success(f"All operations complete. Files saved to {data_path}") # --- Loading Data from Files --- def load_tick_data( symbol, start_date, end_date, account_name, path=None, columns=None, verbose=True, ): """ Loads tick data from a partitioned Parquet structure after verifying account. Args: path (Union[str, Path]): The root folder where the data is stored. symbol (str): The financial instrument symbol to load. start_date (Union[str, dt, pd.Timestamp]): The start date of the desired data range. end_date (Union[str, dt, pd.Timestamp]): The end date of the desired data range. account_name (str): The account name to verify against the data directory. columns (Optional[list]): A list of specific columns to load. Loads all if None. verbose (bool): If True, logs detailed DataFrame info upon successful load. Returns: pd.DataFrame: A DataFrame with the requested tick data, or an empty DataFrame if the account verification fails, dates are invalid, or an error occurs. """ try: root_path = Path(path) except TypeError: root_path = Path.home() / "tick_data_parquet" if not verify_or_create_account_info(root_path, account_name): return pd.DataFrame() date_range = date_conversion(start_date, end_date) if date_range: start_dt, end_dt = date_range fname = root_path / symbol.upper() else: return pd.DataFrame() try: filters = [("time", ">=", start_dt), ("time", "<=", end_dt)] if not fname.exists(): logger.error(f"Data directory {fname} not found for {symbol}") return pd.DataFrame() ddf = dd.read_parquet(fname, columns=columns, filters=filters, engine="pyarrow") df = ddf.compute() if df.empty: logger.warning( f"No tick data found for {symbol} between {start_dt} and {end_dt} " f"in account {account_name}" ) return pd.DataFrame() size = df.memory_usage(deep=True).sum() / 1024**2 logger.success( f"Loaded {len(df):,} rows of {symbol} ({size:,.2f} MB) tick data for account {account_name}" ) to_drop = [] for col in df.columns: # Drop columns if any(np.isnan(df[col].unique())): to_drop.append(col) # Optimise dtype of flags column for memory if col == "flags": mem = df.memory_usage(deep=True).sum() # memory before downcasting dtype_orig = df["flags"].dtype limit = df["flags"].max() for x in (8, 16, 32): dtype = f"uint{x}" if dtype_orig != dtype and np.iinfo(dtype).max >= limit: df = df.astype({"flags": dtype}) mem = (mem - df.memory_usage(deep=True).sum()) / 1024**2 logger.info( f"Converted flags from {dtype_orig} to {df['flags'].dtype} saving {mem:,.1f} MB" ) break if to_drop: df.drop(columns=to_drop, inplace=True) logger.info(f"Dropped empty columns {to_drop}") if not df.index.is_monotonic_increasing: df.sort_index(inplace=True) if verbose: log_df_info(df) return df except Exception as e: logger.error(f"Failed to load data for {symbol}. Error: {e}") return pd.DataFrame() # --- Main Execution Block --- if __name__ == "__main__": import MetaTrader5 as mt5 MAJORS = [ "EURUSD", "USDJPY", "GBPUSD", "USDCHF", "AUDUSD", "USDCAD", "NZDUSD", "XAUUSD", ] CRYPTO = [ "ADAUSD", "BTCUSD", "DOGUSD", "ETHUSD", "LNKUSD", "LTCUSD", "XLMUSD", "XMRUSD", "XRPUSD", ] # --- 1. User Configuration --- CONFIG = { "save_path": Path.home() / "tick_data_parquet", "symbols_to_download": MAJORS + CRYPTO, "account_to_use": "FundedNext_STLR2_6K", # This name MUST match the one used in your environment variables "start_date": "2016-01-01", "end_date": "2017-12-31", "verbose_login": True, } # --- 2. Setup Logging --- # Configure logger to output to console and a file for persistent records. logger.add( sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | " "{level: <8} | " "{name}:{function}:{line} - " "{message}", colorize=True, backtrace=True, diagnose=True, enqueue=True, ) log_path = CONFIG["save_path"] / "data_download.log" log_path.parent.mkdir(parents=True, exist_ok=True) # The default logger is console-only. Add a file sink. logger.add( log_path, rotation="10 MB", retention="30 days", level="INFO", format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", enqueue=True, ) logger.info("--- Starting New Data Download Session ---") # --- 3. Login to MT5 --- logged_in_account = login_mt5( account=CONFIG["account_to_use"], verbose=CONFIG["verbose_login"] ) # --- 4. Run Downloader --- if logged_in_account: save_data_to_parquet( symbols=CONFIG["symbols_to_download"], start_date=CONFIG["start_date"], end_date=CONFIG["end_date"], account_name=logged_in_account, path=CONFIG["save_path"], ) # --- 6. Shutdown MT5 Connection --- mt5.shutdown() logger.info("--- MT5 Connection Closed. Session End ---") else: logger.critical("Could not log in to MetaTrader 5. Aborting all operations.")