TerraFin / tests /data /cache /test_columnar_serializer.py
sk851's picture
refactor: Phase 6 data layer + portfolio + UI fixes + breadth label
085d910
"""Round-trip tests for the columnar time-series serializer."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
from TerraFin.data.cache.serializers import (
ColumnarTimeSeriesSerializer,
HistoryChunkSerializer,
)
from TerraFin.data.contracts import HistoryChunk, TimeSeriesDataFrame
def _make_ohlcv_frame(rows: int = 10) -> pd.DataFrame:
idx = pd.date_range("2023-01-02", periods=rows, freq="B")
rng = np.arange(rows, dtype=float) + 100.0
return pd.DataFrame(
{
"Open": rng,
"High": rng + 1,
"Low": rng - 1,
"Close": rng + 0.5,
"Volume": (rng * 1000).astype(float),
},
index=idx,
)
def test_columnar_round_trip(tmp_path: Path) -> None:
serializer = ColumnarTimeSeriesSerializer()
raw = _make_ohlcv_frame(20)
payload = TimeSeriesDataFrame(raw)
artifact_dir = tmp_path / "frame"
serializer.write(artifact_dir, payload)
loaded = serializer.read(artifact_dir)
assert isinstance(loaded, TimeSeriesDataFrame)
assert len(loaded) == len(payload)
pd.testing.assert_series_equal(
loaded["close"].reset_index(drop=True),
payload["close"].reset_index(drop=True),
check_names=False,
)
pd.testing.assert_series_equal(
loaded["volume"].reset_index(drop=True),
payload["volume"].reset_index(drop=True),
check_names=False,
)
def test_columnar_partial_read_recent_and_backfill(tmp_path: Path) -> None:
serializer = ColumnarTimeSeriesSerializer()
rows = pd.date_range("2020-01-01", periods=400, freq="B")
rng = np.arange(len(rows), dtype=float) + 50.0
raw = pd.DataFrame(
{"Open": rng, "High": rng + 1, "Low": rng - 1, "Close": rng + 0.25, "Volume": rng * 100},
index=rows,
)
artifact_dir = tmp_path / "AAPL" / "full"
serializer.write(artifact_dir, raw)
recent, has_older = serializer.read_recent(artifact_dir, "6m", mmap=True)
assert has_older is True
assert not recent.empty
assert len(recent) < len(raw)
backfill, start, end = serializer.read_backfill(artifact_dir, recent["time"].iloc[0].strftime("%Y-%m-%d"))
assert start is not None and end is not None
assert not backfill.empty
assert len(backfill) + len(recent) <= len(raw) + 5
def test_columnar_reads_existing_yfinance_v2_layout(tmp_path: Path) -> None:
"""Manually lay down the legacy on-disk format and ensure the serializer reads it."""
artifact_dir = tmp_path / "yfinance_v2" / "aapl" / "full"
artifact_dir.mkdir(parents=True)
rows = 5
idx = pd.date_range("2024-01-02", periods=rows, freq="B")
times = (pd.DatetimeIndex(idx).view("int64") // 10**9).astype(np.int64)
np.save(artifact_dir / "time_i64.npy", times)
closes = np.array([100.0, 101.0, 102.0, 103.0, 104.0])
np.save(artifact_dir / "open_f64.npy", closes - 0.5)
np.save(artifact_dir / "high_f64.npy", closes + 1)
np.save(artifact_dir / "low_f64.npy", closes - 1)
np.save(artifact_dir / "close_f64.npy", closes)
np.save(artifact_dir / "volume_f64.npy", closes * 1000)
meta = {
"version": 2,
"schema": "ohlcv",
"columns": ["Open", "High", "Low", "Close", "Volume"],
"row_count": rows,
"start_time": idx[0].strftime("%Y-%m-%d"),
"end_time": idx[-1].strftime("%Y-%m-%d"),
"cached_at": datetime.now(UTC).isoformat(),
"is_complete": True,
"has_older": False,
"source": "yfinance",
"index_name": "Date",
}
(artifact_dir / "meta.json").write_text(json.dumps(meta))
serializer = ColumnarTimeSeriesSerializer()
loaded = serializer.read(artifact_dir)
assert isinstance(loaded, TimeSeriesDataFrame)
assert len(loaded) == rows
assert list(loaded["close"]) == pytest.approx(list(closes))
def test_columnar_preserves_name_and_chart_meta(tmp_path: Path) -> None:
serializer = ColumnarTimeSeriesSerializer()
raw = _make_ohlcv_frame(10)
payload = TimeSeriesDataFrame(raw, name="AAPL", chart_meta={"unit": "USD", "kind": "ohlcv"})
artifact_dir = tmp_path / "frame"
serializer.write(artifact_dir, payload)
loaded = serializer.read(artifact_dir)
assert loaded.name == "AAPL"
assert loaded.chart_meta == {"unit": "USD", "kind": "ohlcv"}
def test_columnar_drops_non_serializable_chart_meta(tmp_path: Path) -> None:
serializer = ColumnarTimeSeriesSerializer()
raw = _make_ohlcv_frame(5)
class Weird:
pass
payload = TimeSeriesDataFrame(raw, name="TSLA", chart_meta={"obj": Weird()})
artifact_dir = tmp_path / "frame"
serializer.write(artifact_dir, payload)
loaded = serializer.read(artifact_dir)
# name still preserved
assert loaded.name == "TSLA"
# chart_meta either populated (via default=str) or empty dict; never crash
assert isinstance(loaded.chart_meta, dict)
def test_history_chunk_serializer_round_trip(tmp_path: Path) -> None:
serializer = HistoryChunkSerializer()
frame = TimeSeriesDataFrame(_make_ohlcv_frame(8))
chunk = HistoryChunk(
frame=frame,
loaded_start="2023-01-02",
loaded_end="2023-01-11",
requested_period="3y",
is_complete=False,
has_older=True,
source_version="test",
)
artifact_dir = tmp_path / "chunk"
serializer.write(artifact_dir, chunk)
loaded = serializer.read(artifact_dir)
assert loaded.loaded_start == "2023-01-02"
assert loaded.loaded_end == "2023-01-11"
assert loaded.requested_period == "3y"
assert loaded.is_complete is False
assert loaded.has_older is True
assert loaded.source_version == "test"
assert isinstance(loaded.frame, TimeSeriesDataFrame)
assert len(loaded.frame) == len(frame)