| """ |
| ML-3m-trader Data Fetcher |
| ========================== |
| Connects to MetaTrader 5 and downloads 1-year of 3-minute OHLCV data |
| for XAUUSDc. Saves raw CSV for reproducibility. |
| |
| Uses copy_rates_from_pos (position-based fetch) because copy_rates_range |
| returns 'Invalid params' on certain MT5 builds. |
| """ |
|
|
| import os |
| import sys |
| from datetime import datetime, timezone |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| import config as cfg |
|
|
|
|
| def _ensure_dirs(): |
| os.makedirs(cfg.DATA_DIR, exist_ok=True) |
|
|
|
|
| def fetch_mt5( |
| symbol: str = cfg.SYMBOL, |
| timeframe_minutes: int = cfg.TIMEFRAME_MINUTES, |
| ) -> pd.DataFrame: |
| """ |
| Fetch historical bars from a running MetaTrader 5 terminal using |
| copy_rates_from_pos (position-based). Fetches all available bars, |
| then filters to the configured date range. |
| |
| Returns |
| ------- |
| pd.DataFrame |
| Columns: time, open, high, low, close, volume, spread |
| """ |
| try: |
| import MetaTrader5 as mt5 |
| except ImportError: |
| print("[ERROR] MetaTrader5 package not installed. Run: pip install MetaTrader5") |
| sys.exit(1) |
|
|
| if not mt5.initialize(): |
| print(f"[ERROR] MT5 initialize failed: {mt5.last_error()}") |
| sys.exit(1) |
|
|
| |
| si = mt5.symbol_info(symbol) |
| if si is None: |
| print(f"[ERROR] Symbol '{symbol}' not found: {mt5.last_error()}") |
| mt5.shutdown() |
| sys.exit(1) |
|
|
| mt5.symbol_select(symbol, True) |
| print(f"[INFO] Symbol '{symbol}' visible={si.visible}, bid={si.bid}, spread={si.spread}") |
|
|
| |
| tf_map = { |
| 1: mt5.TIMEFRAME_M1, |
| 3: mt5.TIMEFRAME_M3, |
| 5: mt5.TIMEFRAME_M5, |
| 15: mt5.TIMEFRAME_M15, |
| 30: mt5.TIMEFRAME_M30, |
| 60: mt5.TIMEFRAME_H1, |
| } |
| tf = tf_map.get(timeframe_minutes) |
| if tf is None: |
| print(f"[ERROR] Unsupported timeframe: {timeframe_minutes}m") |
| mt5.shutdown() |
| sys.exit(1) |
|
|
| |
| |
| max_bars = 500_000 |
| print(f"[INFO] Fetching up to {max_bars:,} M{timeframe_minutes} bars via copy_rates_from_pos...") |
|
|
| rates = mt5.copy_rates_from_pos(symbol, tf, 0, max_bars) |
|
|
| |
| if rates is None or len(rates) == 0: |
| print(f"[WARN] No M{timeframe_minutes} data. Fetching M1 and resampling...") |
| rates_m1 = mt5.copy_rates_from_pos(symbol, mt5.TIMEFRAME_M1, 0, max_bars) |
| mt5.shutdown() |
|
|
| if rates_m1 is None or len(rates_m1) == 0: |
| print(f"[ERROR] No M1 data either: {mt5.last_error()}") |
| sys.exit(1) |
|
|
| df = pd.DataFrame(rates_m1) |
| df["time"] = pd.to_datetime(df["time"], unit="s", utc=True) |
| df.rename(columns={"tick_volume": "volume"}, inplace=True) |
| df.set_index("time", inplace=True) |
|
|
| agg = {"open": "first", "high": "max", "low": "min", |
| "close": "last", "volume": "sum", "spread": "last"} |
| |
| agg = {k: v for k, v in agg.items() if k in df.columns} |
| df = df.resample(f"{timeframe_minutes}min").agg(agg).dropna(subset=["open"]) |
| df.reset_index(inplace=True) |
| print(f"[INFO] Resampled M1 -> M{timeframe_minutes}: {len(df):,} bars") |
| else: |
| mt5.shutdown() |
| df = pd.DataFrame(rates) |
| df["time"] = pd.to_datetime(df["time"], unit="s", utc=True) |
| df.rename(columns={"tick_volume": "volume"}, inplace=True) |
| print(f"[INFO] Fetched {len(df):,} M{timeframe_minutes} bars directly") |
|
|
| |
| cols = ["time", "open", "high", "low", "close", "volume", "spread"] |
| df = df[[c for c in cols if c in df.columns]].copy() |
| if "spread" not in df.columns: |
| df["spread"] = 0 |
|
|
| |
| date_from = pd.Timestamp("2025-01-01", tz="UTC") |
| date_to = pd.Timestamp("2026-01-01", tz="UTC") |
| before = len(df) |
| df = df[(df["time"] >= date_from) & (df["time"] < date_to)].copy() |
| df.reset_index(drop=True, inplace=True) |
| print(f"[INFO] Filtered to {date_from.date()} — {date_to.date()}: " |
| f"{len(df):,} bars (dropped {before - len(df):,} outside range)") |
|
|
| if len(df) == 0: |
| print("[ERROR] No bars in the target date range.") |
| sys.exit(1) |
|
|
| print(f"[INFO] Final dataset: {len(df):,} bars " |
| f"({df['time'].iloc[0]} to {df['time'].iloc[-1]})") |
|
|
| return df |
|
|
|
|
| def save_csv(df: pd.DataFrame, filename: str = None) -> str: |
| """Save DataFrame to CSV in the data directory.""" |
| _ensure_dirs() |
| if filename is None: |
| filename = f"raw_{cfg.SYMBOL.lower()}_m{cfg.TIMEFRAME_MINUTES}.csv" |
| path = os.path.join(cfg.DATA_DIR, filename) |
| df.to_csv(path, index=False) |
| print(f"[INFO] Saved {len(df):,} rows to {path}") |
| return path |
|
|
|
|
| def load_csv(filename: str = None) -> pd.DataFrame: |
| """Load previously saved CSV.""" |
| if filename is None: |
| filename = f"raw_{cfg.SYMBOL.lower()}_m{cfg.TIMEFRAME_MINUTES}.csv" |
| path = os.path.join(cfg.DATA_DIR, filename) |
| if not os.path.exists(path): |
| print(f"[ERROR] Data file not found: {path}") |
| print(" Run 'python main.py fetch' first.") |
| sys.exit(1) |
| df = pd.read_csv(path, parse_dates=["time"]) |
| print(f"[INFO] Loaded {len(df):,} rows from {path}") |
| return df |
|
|