File size: 4,258 Bytes
60d27dc
 
 
 
 
 
2383fa0
60d27dc
 
 
 
 
 
 
 
 
 
 
4936080
 
 
 
 
 
 
 
 
60d27dc
2383fa0
 
 
4936080
2383fa0
 
 
 
 
 
 
 
 
 
4936080
 
2383fa0
60d27dc
2383fa0
 
60d27dc
4936080
2383fa0
60d27dc
 
2383fa0
60d27dc
2383fa0
 
60d27dc
 
 
 
 
 
 
2383fa0
60d27dc
 
 
 
2383fa0
60d27dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2383fa0
60d27dc
 
 
2383fa0
60d27dc
2383fa0
4936080
60d27dc
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import glob
import json
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple

from config import config


@dataclass
class AssetSeries:
    symbol: str
    asset_type: str  # "stock" or "crypto"
    df: pd.DataFrame  # columns: date, open, high, low, close, volume


def _parse_mixed_date(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.strip()
    iso_mask = s.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False)
    out = pd.Series(pd.NaT, index=series.index)
    out.loc[iso_mask] = pd.to_datetime(s.loc[iso_mask], errors="coerce", dayfirst=False)
    out.loc[~iso_mask] = pd.to_datetime(s.loc[~iso_mask], errors="coerce", dayfirst=True)
    return out


def _read_any_ohlcv_csv(path: str) -> pd.DataFrame:
    df = pd.read_csv(path, sep=None, engine="python")
    cols = [str(c).strip().lower() for c in df.columns]

    # Binance format (timestamp_ms)
    if "timestamp_ms" in cols:
        df.columns = cols
        df["date"] = pd.to_datetime(df["timestamp_ms"], unit="ms", errors="coerce", utc=True).dt.tz_convert(None)
        out = df[["date", "open", "high", "low", "close", "volume"]].copy()
        for c in ["open", "high", "low", "close", "volume"]:
            out[c] = pd.to_numeric(out[c], errors="coerce")
        out = out.dropna().sort_values("date").reset_index(drop=True)
        return out

    # Stooq / generic OHLCV
    if "date" in cols and "open" in cols and "close" in cols:
        df.columns = cols
        out = df[["date", "open", "high", "low", "close", "volume"]].copy()
    else:
        out = df.iloc[:, :6].copy()
        out.columns = ["date", "open", "high", "low", "close", "volume"]

    out["date"] = _parse_mixed_date(out["date"])
    out = out.dropna(subset=["date"]).copy()

    for c in ["open", "high", "low", "close", "volume"]:
        out[c] = pd.to_numeric(out[c], errors="coerce")

    out = out.dropna().sort_values("date").reset_index(drop=True)
    return out


def discover_assets(data_dir: str) -> List[Tuple[str, str, str]]:
    assets = []

    stock_glob = os.path.join(data_dir, "stocks", "stooq", "*", f"{config.INTERVAL}.csv")
    for p in glob.glob(stock_glob):
        sym = os.path.basename(os.path.dirname(p))
        assets.append((sym, "stock", p))

    crypto_glob = os.path.join(data_dir, "crypto", "binance", "*", f"{config.INTERVAL}.csv")
    for p in glob.glob(crypto_glob):
        sym = os.path.basename(os.path.dirname(p))
        assets.append((sym, "crypto", p))

    assets = sorted(assets, key=lambda x: (x[1], x[0]))
    return assets[: config.MAX_ASSETS]


def load_asset_series() -> List[AssetSeries]:
    assets = discover_assets(config.DATA_DIR)
    series = []
    for sym, a_type, path in assets:
        df = _read_any_ohlcv_csv(path)
        if len(df) >= config.WINDOW + config.HORIZON_DAYS + 5:
            series.append(AssetSeries(symbol=sym, asset_type=a_type, df=df))
    return series


def make_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["log_return"] = np.log(out["close"] / out["close"].shift(1))
    out["hl_range"] = (out["high"] - out["low"]) / (out["close"] + 1e-12)
    out["oc_return"] = (out["close"] - out["open"]) / (out["open"] + 1e-12)
    out["vol_log"] = np.log10(out["volume"].clip(lower=0) + 1.0)
    out = out.dropna().reset_index(drop=True)
    return out


def build_windows(feats: pd.DataFrame, window: int, horizon: int):
    values = feats[["log_return", "hl_range", "oc_return", "vol_log"]].values.astype(np.float32)
    dates = feats["date"].values

    X_list, y_list, ts_list = [], [], []
    for i in range(window, len(values) - horizon):
        X_list.append(values[i - window:i])
        y_list.append(values[i + horizon, 0])
        ts_list.append(pd.Timestamp(dates[i + horizon]))

    return np.stack(X_list), np.array(y_list, dtype=np.float32), ts_list


def save_manifest(series: List[AssetSeries]):
    os.makedirs(config.ARTIFACT_DIR, exist_ok=True)
    path = os.path.join(config.ARTIFACT_DIR, "manifest.json")
    payload = [{"symbol": s.symbol, "asset_type": s.asset_type, "rows": int(len(s.df))} for s in series]
    with open(path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2)
    return path