| """Round-trip tests for the columnar time-series serializer.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from datetime import UTC, datetime |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import pytest |
|
|
| from TerraFin.data.cache.serializers import ( |
| ColumnarTimeSeriesSerializer, |
| HistoryChunkSerializer, |
| ) |
| from TerraFin.data.contracts import HistoryChunk, TimeSeriesDataFrame |
|
|
|
|
| def _make_ohlcv_frame(rows: int = 10) -> pd.DataFrame: |
| idx = pd.date_range("2023-01-02", periods=rows, freq="B") |
| rng = np.arange(rows, dtype=float) + 100.0 |
| return pd.DataFrame( |
| { |
| "Open": rng, |
| "High": rng + 1, |
| "Low": rng - 1, |
| "Close": rng + 0.5, |
| "Volume": (rng * 1000).astype(float), |
| }, |
| index=idx, |
| ) |
|
|
|
|
| def test_columnar_round_trip(tmp_path: Path) -> None: |
| serializer = ColumnarTimeSeriesSerializer() |
| raw = _make_ohlcv_frame(20) |
| payload = TimeSeriesDataFrame(raw) |
|
|
| artifact_dir = tmp_path / "frame" |
| serializer.write(artifact_dir, payload) |
|
|
| loaded = serializer.read(artifact_dir) |
|
|
| assert isinstance(loaded, TimeSeriesDataFrame) |
| assert len(loaded) == len(payload) |
| pd.testing.assert_series_equal( |
| loaded["close"].reset_index(drop=True), |
| payload["close"].reset_index(drop=True), |
| check_names=False, |
| ) |
| pd.testing.assert_series_equal( |
| loaded["volume"].reset_index(drop=True), |
| payload["volume"].reset_index(drop=True), |
| check_names=False, |
| ) |
|
|
|
|
| def test_columnar_partial_read_recent_and_backfill(tmp_path: Path) -> None: |
| serializer = ColumnarTimeSeriesSerializer() |
| rows = pd.date_range("2020-01-01", periods=400, freq="B") |
| rng = np.arange(len(rows), dtype=float) + 50.0 |
| raw = pd.DataFrame( |
| {"Open": rng, "High": rng + 1, "Low": rng - 1, "Close": rng + 0.25, "Volume": rng * 100}, |
| index=rows, |
| ) |
| artifact_dir = tmp_path / "AAPL" / "full" |
| serializer.write(artifact_dir, raw) |
|
|
| recent, has_older = serializer.read_recent(artifact_dir, "6m", mmap=True) |
| assert has_older is True |
| assert not recent.empty |
| assert len(recent) < len(raw) |
|
|
| backfill, start, end = serializer.read_backfill(artifact_dir, recent["time"].iloc[0].strftime("%Y-%m-%d")) |
| assert start is not None and end is not None |
| assert not backfill.empty |
| assert len(backfill) + len(recent) <= len(raw) + 5 |
|
|
|
|
| def test_columnar_reads_existing_yfinance_v2_layout(tmp_path: Path) -> None: |
| """Manually lay down the legacy on-disk format and ensure the serializer reads it.""" |
| artifact_dir = tmp_path / "yfinance_v2" / "aapl" / "full" |
| artifact_dir.mkdir(parents=True) |
| rows = 5 |
| idx = pd.date_range("2024-01-02", periods=rows, freq="B") |
| times = (pd.DatetimeIndex(idx).view("int64") // 10**9).astype(np.int64) |
| np.save(artifact_dir / "time_i64.npy", times) |
| closes = np.array([100.0, 101.0, 102.0, 103.0, 104.0]) |
| np.save(artifact_dir / "open_f64.npy", closes - 0.5) |
| np.save(artifact_dir / "high_f64.npy", closes + 1) |
| np.save(artifact_dir / "low_f64.npy", closes - 1) |
| np.save(artifact_dir / "close_f64.npy", closes) |
| np.save(artifact_dir / "volume_f64.npy", closes * 1000) |
| meta = { |
| "version": 2, |
| "schema": "ohlcv", |
| "columns": ["Open", "High", "Low", "Close", "Volume"], |
| "row_count": rows, |
| "start_time": idx[0].strftime("%Y-%m-%d"), |
| "end_time": idx[-1].strftime("%Y-%m-%d"), |
| "cached_at": datetime.now(UTC).isoformat(), |
| "is_complete": True, |
| "has_older": False, |
| "source": "yfinance", |
| "index_name": "Date", |
| } |
| (artifact_dir / "meta.json").write_text(json.dumps(meta)) |
|
|
| serializer = ColumnarTimeSeriesSerializer() |
| loaded = serializer.read(artifact_dir) |
| assert isinstance(loaded, TimeSeriesDataFrame) |
| assert len(loaded) == rows |
| assert list(loaded["close"]) == pytest.approx(list(closes)) |
|
|
|
|
| def test_columnar_preserves_name_and_chart_meta(tmp_path: Path) -> None: |
| serializer = ColumnarTimeSeriesSerializer() |
| raw = _make_ohlcv_frame(10) |
| payload = TimeSeriesDataFrame(raw, name="AAPL", chart_meta={"unit": "USD", "kind": "ohlcv"}) |
|
|
| artifact_dir = tmp_path / "frame" |
| serializer.write(artifact_dir, payload) |
| loaded = serializer.read(artifact_dir) |
|
|
| assert loaded.name == "AAPL" |
| assert loaded.chart_meta == {"unit": "USD", "kind": "ohlcv"} |
|
|
|
|
| def test_columnar_drops_non_serializable_chart_meta(tmp_path: Path) -> None: |
| serializer = ColumnarTimeSeriesSerializer() |
| raw = _make_ohlcv_frame(5) |
|
|
| class Weird: |
| pass |
|
|
| payload = TimeSeriesDataFrame(raw, name="TSLA", chart_meta={"obj": Weird()}) |
| artifact_dir = tmp_path / "frame" |
| serializer.write(artifact_dir, payload) |
| loaded = serializer.read(artifact_dir) |
| |
| assert loaded.name == "TSLA" |
| |
| assert isinstance(loaded.chart_meta, dict) |
|
|
|
|
| def test_history_chunk_serializer_round_trip(tmp_path: Path) -> None: |
| serializer = HistoryChunkSerializer() |
| frame = TimeSeriesDataFrame(_make_ohlcv_frame(8)) |
| chunk = HistoryChunk( |
| frame=frame, |
| loaded_start="2023-01-02", |
| loaded_end="2023-01-11", |
| requested_period="3y", |
| is_complete=False, |
| has_older=True, |
| source_version="test", |
| ) |
| artifact_dir = tmp_path / "chunk" |
| serializer.write(artifact_dir, chunk) |
| loaded = serializer.read(artifact_dir) |
|
|
| assert loaded.loaded_start == "2023-01-02" |
| assert loaded.loaded_end == "2023-01-11" |
| assert loaded.requested_period == "3y" |
| assert loaded.is_complete is False |
| assert loaded.has_older is True |
| assert loaded.source_version == "test" |
| assert isinstance(loaded.frame, TimeSeriesDataFrame) |
| assert len(loaded.frame) == len(frame) |
|
|