Spaces:
No application file
No application file
| """ | |
| This script provides functionalities to download financial market data | |
| and save it in a structured, partitioned Parquet format. Data is fetched | |
| from MetaTrader 5 (MT5) for specified symbols and date ranges, with options | |
| to handle existing files intelligently. It includes a user-friendly GUI prompt | |
| to manage overwriting existing data, robust logging with 'loguru', and secure | |
| credential management using environment variables. The script is designed to be | |
| run as a standalone application for data acquisition, ensuring that the account | |
| used for downloading matches the account used for loading data later. | |
| FUntionality is provided for loading downloaded files into a DataFrame for analysis, | |
| with options to specify columns and date ranges. The script is optimized for memory | |
| usage and provides a clear directory structure for easy data management. | |
| Features: | |
| - Downloads data for a given list of symbols and a date range. | |
| - Organizes saved data into a clear directory structure: path/symbol/year/month.parquet | |
| - Implements a user-friendly, timed GUI prompt to ask for overwriting existing files. | |
| - Uses 'loguru' for robust logging to both console and a file. | |
| - Secures credentials using environment variables. | |
| - Verifies account consistency for both downloading and loading data. | |
| - Designed to be run as a standalone script for data acquisition. | |
| """ | |
| import json | |
| import os | |
| import sys | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| import MetaTrader5 as mt5 | |
| from dask import dataframe as dd | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| from tqdm import tqdm | |
| from ..util.misc import date_conversion, is_first_weekday, is_last_weekday, log_df_info | |
| from .clean_data import clean_tick_data | |
| # --- Credential and Login Management --- | |
| def get_credentials_from_env(account): | |
| """ | |
| Retrieves MT5 credentials from environment variables. | |
| Args: | |
| account (str): The account name (e.g., 'MyAccount'). | |
| Returns: | |
| tuple: (login, password, server) or (None, None, None) if not found. | |
| """ | |
| load_dotenv() # Load environment variables from .env file if present | |
| prefix = f"MT5_ACCOUNT_{account.upper()}" | |
| login = os.environ.get(f"{prefix}_LOGIN") | |
| password = os.environ.get(f"{prefix}_PASSWORD") | |
| server = os.environ.get(f"{prefix}_SERVER") | |
| if not all([login, password, server]): | |
| logger.error( | |
| f"Missing one or more environment variables for account '{account}'." | |
| ) | |
| logger.error( | |
| f"Please set {prefix}_LOGIN, {prefix}_PASSWORD, and {prefix}_SERVER." | |
| ) | |
| return None, None, None | |
| if login.isnumeric(): | |
| login = int(login) | |
| return login, password, server | |
| def login_mt5(account, timeout=60000, verbose=True): | |
| """ | |
| Logs in to a MetaTrader5 account using credentials from environment variables. | |
| Args: | |
| account (str): Account name to log in to. | |
| timeout (int): Connection timeout in milliseconds. | |
| verbose (bool): Whether to print detailed connection information. | |
| Returns: | |
| str: The account name if login is successful, otherwise None. | |
| """ | |
| import MetaTrader5 as mt5 | |
| logger.info(f"Attempting to log in to MT5 with account: {account}") | |
| login, password, server = get_credentials_from_env(account) | |
| if not login: | |
| return None | |
| if not mt5.initialize( | |
| login=login, password=password, server=server, timeout=timeout | |
| ): | |
| logger.error( | |
| f"MT5 initialize() failed for account {account}. Error: {mt5.last_error()}" | |
| ) | |
| mt5.shutdown() | |
| return | |
| logger.success(f"Successfully logged in to MT5 as {account}.") | |
| if verbose: | |
| logger.info(f"MT5 Version: {mt5.version()}") | |
| terminal_info = mt5.terminal_info() | |
| if terminal_info: | |
| logger.info(f"Connected to {terminal_info.name} at {terminal_info.path}") | |
| else: | |
| logger.warning("Could not retrieve terminal info.") | |
| return account | |
| # --- Data Validation and Verification --- | |
| def verify_or_create_account_info(data_path, current_account_name): | |
| """ | |
| Checks if the data directory is associated with the correct account. | |
| If no account info exists, it creates it. | |
| Args: | |
| data_path (Path): The root path of the data directory. | |
| current_account_name (str): The name of the account currently in use. | |
| Returns: | |
| bool: True if the account is verified, False otherwise. | |
| """ | |
| account_info_file = data_path / "account_info.json" | |
| current_account_name = current_account_name.upper() | |
| if account_info_file.exists(): | |
| try: | |
| with open(account_info_file, "r") as f: | |
| stored_info = json.load(f) | |
| stored_name = stored_info.get("account_name") | |
| if stored_name and stored_name != current_account_name: | |
| logger.error( | |
| f"Account Mismatch! This directory ('{data_path.name}') is for account '{stored_name}'." | |
| ) | |
| logger.error( | |
| f"Current operation is for account '{current_account_name}'. Aborting to prevent data errors." | |
| ) | |
| return False | |
| elif not stored_name: | |
| # File exists but is malformed, so we fix it. | |
| logger.warning( | |
| "Account info file is malformed. Overwriting with current account." | |
| ) | |
| with open(account_info_file, "w") as f: | |
| json.dump({"account_name": current_account_name}, f, indent=4) | |
| except json.JSONDecodeError: | |
| logger.warning( | |
| f"Could not read account info file. Overwriting with current account: '{current_account_name}'." | |
| ) | |
| with open(account_info_file, "w") as f: | |
| json.dump({"account_name": current_account_name}, f, indent=4) | |
| else: | |
| logger.info( | |
| f"First time use for this directory. Associating it with account '{current_account_name}'." | |
| ) | |
| with open(account_info_file, "w") as f: | |
| json.dump({"account_name": current_account_name}, f, indent=4) | |
| return True | |
| # --- Data Fetching and Saving --- | |
| def get_ticks(symbol, start_date, end_date, datetime_index=True, verbose=True): | |
| """ | |
| Downloads tick data from the MT5 terminal for a given period. | |
| Args: | |
| symbol (str): The financial instrument symbol (e.g., 'EURUSD'). | |
| start_date (pd.Timestamp): The timezone-aware start date for the data range. | |
| end_date (pd.Timestamp): The timezone-aware end date for the data range. | |
| datetime_index (bool): Set 'time' column to DatetimeIndex. | |
| Returns: | |
| pd.DataFrame: A DataFrame containing the tick data, or an empty DataFrame if | |
| no data is found or an error occurs. | |
| """ | |
| import MetaTrader5 as mt5 | |
| if not mt5.terminal_info(): # Check if connection is still active | |
| logger.error("MT5 connection lost. Cannot download data.") | |
| return pd.DataFrame() | |
| try: | |
| start_date, end_date = date_conversion(start_date, end_date) | |
| mt5.symbol_select(symbol, True) | |
| ticks = mt5.copy_ticks_range(symbol, start_date, end_date, mt5.COPY_TICKS_ALL) | |
| if ticks is None or len(ticks) == 0: | |
| logger.warning( | |
| f"No tick data returned for {symbol} from {start_date.date()} to {end_date.date()}." | |
| ) | |
| return pd.DataFrame() | |
| df = pd.DataFrame(ticks) | |
| df["time"] = pd.to_datetime(df["time_msc"], unit="ms", utc=True) | |
| df.drop(columns=["time_msc"], inplace=True) | |
| if datetime_index: | |
| df.set_index("time", inplace=True) | |
| # Keep only columns with meaningful data | |
| df = df.loc[:, df.any()] | |
| # Optimize memory usage | |
| for col in ["bid", "ask"]: | |
| if col in df.columns: | |
| df[col] = df[col].astype("float32") | |
| if verbose: | |
| log_df_info(df) | |
| return df | |
| except Exception as e: | |
| logger.error(f"An error occurred while getting ticks for {symbol}: {e}") | |
| return pd.DataFrame() | |
| def get_bars( | |
| symbol, timeframe, start_date, end_date, datetime_index=True, verbose=True | |
| ): | |
| """ | |
| Downloads bar (OHLCV) data from the MT5 terminal for a given period. | |
| Args: | |
| symbol (str): The financial instrument symbol (e.g., 'EURUSD'). | |
| timeframe (int): MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1, mt5.TIMEFRAME_H1). | |
| start_date (pd.Timestamp): Timezone-aware start date. | |
| end_date (pd.Timestamp): Timezone-aware end date. | |
| datetime_index (bool): Set 'time' column to DatetimeIndex. | |
| verbose (bool): Print DataFrame info. | |
| Returns: | |
| pd.DataFrame: A DataFrame containing OHLCV data, or empty if no data/error. | |
| """ | |
| import MetaTrader5 as mt5 | |
| if not mt5.terminal_info(): | |
| logger.error("MT5 connection lost. Cannot download data.") | |
| return pd.DataFrame() | |
| try: | |
| start_date, end_date = date_conversion(start_date, end_date) | |
| timeframe = getattr(mt5, f"TIMEFRAME_{timeframe}") | |
| mt5.symbol_select(symbol, True) | |
| bars = mt5.copy_rates_range(symbol, timeframe, start_date, end_date) | |
| if bars is None or len(bars) == 0: | |
| logger.warning( | |
| f"No bar data returned for {symbol} from {start_date.date()} to {end_date.date()}." | |
| ) | |
| return pd.DataFrame() | |
| df = pd.DataFrame(bars) | |
| df["time"] = pd.to_datetime(df["time"], unit="s", utc=True) | |
| if datetime_index: | |
| df.set_index("time", inplace=True) | |
| # Optimize memory usage | |
| for col in [ | |
| "open", | |
| "high", | |
| "low", | |
| "close", | |
| "tick_volume", | |
| "spread", | |
| "real_volume", | |
| ]: | |
| if col in df.columns: | |
| df[col] = df[col].astype("float32") | |
| if verbose: | |
| log_df_info(df) | |
| return df | |
| except Exception as e: | |
| logger.error(f"An error occurred while getting bars for {symbol}: {e}") | |
| return pd.DataFrame() | |
| def process_symbol(symbol, start_dt, end_dt, data_path, account_name): | |
| """Worker function to download data for a single symbol.""" | |
| try: | |
| login_mt5(account_name) # Each worker needs its own login | |
| except Exception as e: | |
| return {symbol: f"login_failed: {e}"} | |
| symbol_path = data_path / symbol | |
| # Generate month starts for the range | |
| dates_from = pd.date_range( | |
| start=start_dt.replace(day=1), | |
| end=end_dt, | |
| freq="MS", | |
| tz="UTC" | |
| ) | |
| # Generate month ends | |
| dates_to = [] | |
| for d in dates_from: | |
| # End of the current month | |
| m_end = (d + pd.offsets.MonthEnd(0)).replace(hour=23, minute=59, second=59) | |
| # But don't go past the global end_dt | |
| dates_to.append(min(m_end, end_dt)) | |
| # Adjust the first start date if it's after the month start | |
| dates_from_list = list(dates_from) | |
| if dates_from_list: | |
| dates_from_list[0] = max(dates_from_list[0], start_dt) | |
| missing_data = [] | |
| for start, end in zip(dates_from_list, dates_to): | |
| year_path = symbol_path / str(start.year) | |
| year_path.mkdir(parents=True, exist_ok=True) | |
| file = year_path / f"month-{start.month:02d}.parquet" | |
| log_msg_prefix = f"{symbol} -> Month {start.strftime('%Y-%m')}..." | |
| if file.exists(): | |
| df0 = pd.read_parquet(file) | |
| if not df0.empty: | |
| first, start = [x.date() for x in df0.index[[0, -1]]] | |
| if is_first_weekday(first) and is_last_weekday(start): | |
| logger.info(f"{log_msg_prefix} Exists—Skipping download") | |
| continue | |
| else: | |
| logger.info( | |
| f"{log_msg_prefix} Exists—Appending from {start} to {end}" | |
| ) | |
| else: | |
| df0 = pd.DataFrame() | |
| df1 = get_ticks(symbol, start, end, verbose=False) | |
| df = pd.concat([df0, df1]) | |
| if not df.empty: | |
| df = clean_tick_data(df) | |
| df.to_parquet(file, engine="pyarrow", compression="zstd") | |
| logger.success(f"{log_msg_prefix} Saved {len(df):,} rows") | |
| else: | |
| logger.warning(f"{log_msg_prefix} No data found") | |
| missing_data.append(start.strftime("%Y-%m")) | |
| try: | |
| year_path.rmdir() | |
| except Exception as e: | |
| logger.error(f"Could not remove empty directory {year_path}: {e}") | |
| return {symbol: missing_data} | |
| def save_data_to_parquet(symbols, start_date, end_date, account_name, path=None): | |
| """ | |
| Downloads and saves tick data to a partitioned Parquet structure. | |
| Args: | |
| symbols (Union[str, list, tuple]): A single symbol or a collection of symbols to download. | |
| start_date (Union[str, dt, pd.Timestamp]): The start date for the data range. | |
| end_date (Union[str, dt, pd.Timestamp]): The end date for the data range. | |
| account_name (str): The name of the account used for the download. | |
| path (Union[str, Path]): The root folder where data will be saved. | |
| Returns: | |
| None | |
| """ | |
| data_path = Path(path) if path is not None else Path().home() / "tick_data_parquet" | |
| data_path.mkdir(parents=True, exist_ok=True) | |
| date_range = date_conversion(start_date, end_date) | |
| if not date_range: | |
| return | |
| start_dt, end_dt = date_range | |
| if isinstance(symbols, str): | |
| symbols = [symbols] | |
| missing_data = {} | |
| for symbol in tqdm(symbols, desc="Downloading symbols"): | |
| try: | |
| result = process_symbol(symbol, start_dt, end_dt, data_path, account_name) | |
| missing_data.update(result) | |
| except Exception as e: | |
| logger.critical(f"Worker for {symbol} failed: {e}") | |
| # Summary logging | |
| logger.info("Download process finished.") | |
| if missing_data and any(missing_data.values()): | |
| logger.warning("Missing data summary:") | |
| for symbol, months in missing_data.items(): | |
| logger.warning(f" - {symbol}: {', '.join(months)}") | |
| logger.success(f"All operations complete. Files saved to {data_path}") | |
| # --- Loading Data from Files --- | |
| def load_tick_data( | |
| symbol, | |
| start_date, | |
| end_date, | |
| account_name, | |
| path=None, | |
| columns=None, | |
| verbose=True, | |
| ): | |
| """ | |
| Loads tick data from a partitioned Parquet structure after verifying account. | |
| Args: | |
| path (Union[str, Path]): The root folder where the data is stored. | |
| symbol (str): The financial instrument symbol to load. | |
| start_date (Union[str, dt, pd.Timestamp]): The start date of the desired data range. | |
| end_date (Union[str, dt, pd.Timestamp]): The end date of the desired data range. | |
| account_name (str): The account name to verify against the data directory. | |
| columns (Optional[list]): A list of specific columns to load. Loads all if None. | |
| verbose (bool): If True, logs detailed DataFrame info upon successful load. | |
| Returns: | |
| pd.DataFrame: A DataFrame with the requested tick data, or an empty DataFrame | |
| if the account verification fails, dates are invalid, or an error occurs. | |
| """ | |
| try: | |
| root_path = Path(path) | |
| except TypeError: | |
| root_path = Path.home() / "tick_data_parquet" | |
| if not verify_or_create_account_info(root_path, account_name): | |
| return pd.DataFrame() | |
| date_range = date_conversion(start_date, end_date) | |
| if date_range: | |
| start_dt, end_dt = date_range | |
| fname = root_path / symbol.upper() | |
| else: | |
| return pd.DataFrame() | |
| try: | |
| filters = [("time", ">=", start_dt), ("time", "<=", end_dt)] | |
| if not fname.exists(): | |
| logger.error(f"Data directory {fname} not found for {symbol}") | |
| return pd.DataFrame() | |
| ddf = dd.read_parquet(fname, columns=columns, filters=filters, engine="pyarrow") | |
| df = ddf.compute() | |
| if df.empty: | |
| logger.warning( | |
| f"No tick data found for {symbol} between {start_dt} and {end_dt} " | |
| f"in account {account_name}" | |
| ) | |
| return pd.DataFrame() | |
| size = df.memory_usage(deep=True).sum() / 1024**2 | |
| logger.success( | |
| f"Loaded {len(df):,} rows of {symbol} ({size:,.2f} MB) tick data for account {account_name}" | |
| ) | |
| to_drop = [] | |
| for col in df.columns: | |
| # Drop columns | |
| if any(np.isnan(df[col].unique())): | |
| to_drop.append(col) | |
| # Optimise dtype of flags column for memory | |
| if col == "flags": | |
| mem = df.memory_usage(deep=True).sum() # memory before downcasting | |
| dtype_orig = df["flags"].dtype | |
| limit = df["flags"].max() | |
| for x in (8, 16, 32): | |
| dtype = f"uint{x}" | |
| if dtype_orig != dtype and np.iinfo(dtype).max >= limit: | |
| df = df.astype({"flags": dtype}) | |
| mem = (mem - df.memory_usage(deep=True).sum()) / 1024**2 | |
| logger.info( | |
| f"Converted flags from {dtype_orig} to {df['flags'].dtype} saving {mem:,.1f} MB" | |
| ) | |
| break | |
| if to_drop: | |
| df.drop(columns=to_drop, inplace=True) | |
| logger.info(f"Dropped empty columns {to_drop}") | |
| if not df.index.is_monotonic_increasing: | |
| df.sort_index(inplace=True) | |
| if verbose: | |
| log_df_info(df) | |
| return df | |
| except Exception as e: | |
| logger.error(f"Failed to load data for {symbol}. Error: {e}") | |
| return pd.DataFrame() | |
| # --- Main Execution Block --- | |
| if __name__ == "__main__": | |
| import MetaTrader5 as mt5 | |
| MAJORS = [ | |
| "EURUSD", | |
| "USDJPY", | |
| "GBPUSD", | |
| "USDCHF", | |
| "AUDUSD", | |
| "USDCAD", | |
| "NZDUSD", | |
| "XAUUSD", | |
| ] | |
| CRYPTO = [ | |
| "ADAUSD", | |
| "BTCUSD", | |
| "DOGUSD", | |
| "ETHUSD", | |
| "LNKUSD", | |
| "LTCUSD", | |
| "XLMUSD", | |
| "XMRUSD", | |
| "XRPUSD", | |
| ] | |
| # --- 1. User Configuration --- | |
| CONFIG = { | |
| "save_path": Path.home() / "tick_data_parquet", | |
| "symbols_to_download": MAJORS + CRYPTO, | |
| "account_to_use": "FundedNext_STLR2_6K", # This name MUST match the one used in your environment variables | |
| "start_date": "2016-01-01", | |
| "end_date": "2017-12-31", | |
| "verbose_login": True, | |
| } | |
| # --- 2. Setup Logging --- | |
| # Configure logger to output to console and a file for persistent records. | |
| logger.add( | |
| sys.stderr, | |
| format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | " | |
| "<level>{level: <8}</level> | " | |
| "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - " | |
| "<level>{message}</level>", | |
| colorize=True, | |
| backtrace=True, | |
| diagnose=True, | |
| enqueue=True, | |
| ) | |
| log_path = CONFIG["save_path"] / "data_download.log" | |
| log_path.parent.mkdir(parents=True, exist_ok=True) | |
| # The default logger is console-only. Add a file sink. | |
| logger.add( | |
| log_path, | |
| rotation="10 MB", | |
| retention="30 days", | |
| level="INFO", | |
| format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", | |
| enqueue=True, | |
| ) | |
| logger.info("--- Starting New Data Download Session ---") | |
| # --- 3. Login to MT5 --- | |
| logged_in_account = login_mt5( | |
| account=CONFIG["account_to_use"], verbose=CONFIG["verbose_login"] | |
| ) | |
| # --- 4. Run Downloader --- | |
| if logged_in_account: | |
| save_data_to_parquet( | |
| symbols=CONFIG["symbols_to_download"], | |
| start_date=CONFIG["start_date"], | |
| end_date=CONFIG["end_date"], | |
| account_name=logged_in_account, | |
| path=CONFIG["save_path"], | |
| ) | |
| # --- 6. Shutdown MT5 Connection --- | |
| mt5.shutdown() | |
| logger.info("--- MT5 Connection Closed. Session End ---") | |
| else: | |
| logger.critical("Could not log in to MetaTrader 5. Aborting all operations.") | |