|
|
import os |
|
|
import glob |
|
|
import json |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Tuple |
|
|
|
|
|
from config import config |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AssetSeries: |
|
|
symbol: str |
|
|
asset_type: str |
|
|
df: pd.DataFrame |
|
|
|
|
|
|
|
|
def _parse_mixed_date(series: pd.Series) -> pd.Series: |
|
|
s = series.astype(str).str.strip() |
|
|
iso_mask = s.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False) |
|
|
out = pd.Series(pd.NaT, index=series.index) |
|
|
out.loc[iso_mask] = pd.to_datetime(s.loc[iso_mask], errors="coerce", dayfirst=False) |
|
|
out.loc[~iso_mask] = pd.to_datetime(s.loc[~iso_mask], errors="coerce", dayfirst=True) |
|
|
return out |
|
|
|
|
|
|
|
|
def _read_any_ohlcv_csv(path: str) -> pd.DataFrame: |
|
|
df = pd.read_csv(path, sep=None, engine="python") |
|
|
cols = [str(c).strip().lower() for c in df.columns] |
|
|
|
|
|
|
|
|
if "timestamp_ms" in cols: |
|
|
df.columns = cols |
|
|
df["date"] = pd.to_datetime(df["timestamp_ms"], unit="ms", errors="coerce", utc=True).dt.tz_convert(None) |
|
|
out = df[["date", "open", "high", "low", "close", "volume"]].copy() |
|
|
for c in ["open", "high", "low", "close", "volume"]: |
|
|
out[c] = pd.to_numeric(out[c], errors="coerce") |
|
|
out = out.dropna().sort_values("date").reset_index(drop=True) |
|
|
return out |
|
|
|
|
|
|
|
|
if "date" in cols and "open" in cols and "close" in cols: |
|
|
df.columns = cols |
|
|
out = df[["date", "open", "high", "low", "close", "volume"]].copy() |
|
|
else: |
|
|
out = df.iloc[:, :6].copy() |
|
|
out.columns = ["date", "open", "high", "low", "close", "volume"] |
|
|
|
|
|
out["date"] = _parse_mixed_date(out["date"]) |
|
|
out = out.dropna(subset=["date"]).copy() |
|
|
|
|
|
for c in ["open", "high", "low", "close", "volume"]: |
|
|
out[c] = pd.to_numeric(out[c], errors="coerce") |
|
|
|
|
|
out = out.dropna().sort_values("date").reset_index(drop=True) |
|
|
return out |
|
|
|
|
|
|
|
|
def discover_assets(data_dir: str) -> List[Tuple[str, str, str]]: |
|
|
assets = [] |
|
|
|
|
|
stock_glob = os.path.join(data_dir, "stocks", "stooq", "*", f"{config.INTERVAL}.csv") |
|
|
for p in glob.glob(stock_glob): |
|
|
sym = os.path.basename(os.path.dirname(p)) |
|
|
assets.append((sym, "stock", p)) |
|
|
|
|
|
crypto_glob = os.path.join(data_dir, "crypto", "binance", "*", f"{config.INTERVAL}.csv") |
|
|
for p in glob.glob(crypto_glob): |
|
|
sym = os.path.basename(os.path.dirname(p)) |
|
|
assets.append((sym, "crypto", p)) |
|
|
|
|
|
assets = sorted(assets, key=lambda x: (x[1], x[0])) |
|
|
return assets[: config.MAX_ASSETS] |
|
|
|
|
|
|
|
|
def load_asset_series() -> List[AssetSeries]: |
|
|
assets = discover_assets(config.DATA_DIR) |
|
|
series = [] |
|
|
for sym, a_type, path in assets: |
|
|
df = _read_any_ohlcv_csv(path) |
|
|
if len(df) >= config.WINDOW + config.HORIZON_DAYS + 5: |
|
|
series.append(AssetSeries(symbol=sym, asset_type=a_type, df=df)) |
|
|
return series |
|
|
|
|
|
|
|
|
def make_features(df: pd.DataFrame) -> pd.DataFrame: |
|
|
out = df.copy() |
|
|
out["log_return"] = np.log(out["close"] / out["close"].shift(1)) |
|
|
out["hl_range"] = (out["high"] - out["low"]) / (out["close"] + 1e-12) |
|
|
out["oc_return"] = (out["close"] - out["open"]) / (out["open"] + 1e-12) |
|
|
out["vol_log"] = np.log10(out["volume"].clip(lower=0) + 1.0) |
|
|
out = out.dropna().reset_index(drop=True) |
|
|
return out |
|
|
|
|
|
|
|
|
def build_windows(feats: pd.DataFrame, window: int, horizon: int): |
|
|
values = feats[["log_return", "hl_range", "oc_return", "vol_log"]].values.astype(np.float32) |
|
|
dates = feats["date"].values |
|
|
|
|
|
X_list, y_list, ts_list = [], [], [] |
|
|
for i in range(window, len(values) - horizon): |
|
|
X_list.append(values[i - window:i]) |
|
|
y_list.append(values[i + horizon, 0]) |
|
|
ts_list.append(pd.Timestamp(dates[i + horizon])) |
|
|
|
|
|
return np.stack(X_list), np.array(y_list, dtype=np.float32), ts_list |
|
|
|
|
|
|
|
|
def save_manifest(series: List[AssetSeries]): |
|
|
os.makedirs(config.ARTIFACT_DIR, exist_ok=True) |
|
|
path = os.path.join(config.ARTIFACT_DIR, "manifest.json") |
|
|
payload = [{"symbol": s.symbol, "asset_type": s.asset_type, "rows": int(len(s.df))} for s in series] |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
json.dump(payload, f, indent=2) |
|
|
return path |
|
|
|