BuckLakeAI / tests /test_raw_preprocessing.py
Parker's Fedora
Support anchored-path v2 serving.
c125dfd
Raw
History Blame Contribute Delete
19.6 kB
import tempfile
import unittest
import json
from pathlib import Path
from unittest.mock import patch
import numpy as np
from bucklake_ai.config import Settings
from bucklake_ai.service import InferenceService
class StubTextEncoder:
def __init__(self):
self.encoded_texts = []
self.sentiment_texts = []
self.pos_entity_texts = []
self.preloaded = False
def preload(self) -> None:
self.preloaded = True
def encode_text(self, text: str) -> np.ndarray:
self.encoded_texts.append(text)
return np.ones((768,), dtype=np.float32)
def build_sentiment(self, text: str) -> float:
self.sentiment_texts.append(text)
return 0.75
def build_pos_entity_vectors(self, text: str) -> tuple[np.ndarray, np.ndarray]:
self.pos_entity_texts.append(text)
return np.full((1024,), 1.0 / 1024.0, dtype=np.float32), np.full((1024,), 1.0 / 1024.0, dtype=np.float32)
@property
def is_loaded(self) -> bool:
return True
class FakeModelLoader:
def __init__(self, settings):
self._settings = settings
self._model = object()
def get_model(self):
return self._model
@property
def is_loaded(self) -> bool:
return True
@property
def resolved_model_path(self) -> Path:
return Path(self._settings.model_weights_path)
class SentimentAwareModel:
def predict(self, model_inputs, verbose=0):
sentiment = float(model_inputs[3][0][0])
close_returns = np.full((3, 1), sentiment * 0.01, dtype=np.float32)
zero_1 = np.zeros((3, 1), dtype=np.float32)
return [
close_returns,
zero_1,
zero_1,
zero_1,
np.zeros((3, 4), dtype=np.float32),
np.full((3, 1), 0.5 + sentiment * 0.1, dtype=np.float32),
zero_1,
zero_1,
]
class FakePredictModelLoader(FakeModelLoader):
def __init__(self, settings):
self._settings = settings
self._model = SentimentAwareModel()
class StubToken:
def __init__(self, pos: str, tag: str, *, is_punct: bool = False, is_space: bool = False):
self.pos_ = pos
self.tag_ = tag
self.is_punct = is_punct
self.is_space = is_space
class StubEntity:
def __init__(self, label: str, text: str):
self.label_ = label
self.text = text
class StubDoc:
def __init__(self):
self.ents = [StubEntity("ORG", "Apple")]
self._tokens = [
StubToken("PROPN", "NNP"),
StubToken("VERB", "VBZ"),
StubToken("PUNCT", ".", is_punct=True),
]
def __iter__(self):
return iter(self._tokens)
class RawPreprocessingTests(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
root_dir = Path(self.temp_dir.name)
data_dir = root_dir / "data"
data_dir.mkdir(parents=True, exist_ok=True)
(data_dir / "bl_symbol.csv").write_text(
'id,symbol,market,company_name,sector,industry,country,currency,ipo_date,market_cap\n'
'1,AAPL,NASDAQ,Apple,Technology,Computer Manufacturing,US,USD,,\n'
'2,MSFT,NASDAQ,Microsoft,Technology,Computer Software,US,USD,,\n',
encoding="utf-8",
)
self.scaler_path = root_dir / "scalers_v21_rolling_7d.json"
self.scaler_path.write_text(
json.dumps(
{
"index": {
"mean": [11630.00801763615, 11632.439510618715, 11699.49961104118, 11556.167112385792, 1855572862.157339, 5344228574915.616],
"scale": [10451.767239272729, 10453.620729932496, 10507.837853482002, 10393.161849307528, 1700927598.7351134, 8624911182529.906],
},
"stock": {
"mean": [69.22109597071513, 69.27929937205207, 70.12196645745344, 68.34393667735944, 13.586773233899697, 0.0],
"scale": [611.5967864850386, 613.200961031293, 621.5358163904571, 602.4584422610227, 2.2932616337759755, 1.0],
"var": [374050.62923882593, 376015.41860970133, 386306.7710561519, 362956.174651578, 5.259048920948857, 0.0],
},
}
),
encoding="utf-8",
)
self.settings = Settings(
app_name="BuckLakeAI",
app_version="2.0.0",
model_version="anchored-path-v1",
input_contract_version="v2.0.0",
root_dir=root_dir,
hf_repo_id="parkerjj/BuckLake-Stock-Model",
hf_model_filename="stock_prediction_model_anchored-path-v1.keras",
hf_scaler_filename="scalers_v21_rolling_7d.json",
hf_cache_dir=root_dir / ".cache" / "huggingface",
hf_token=None,
model_weights_path=root_dir / ".cache" / "huggingface" / "stock_prediction_model_anchored-path-v1.keras",
text_encoder_model="test-encoder",
text_encoder_device="cpu",
text_encoder_max_seq_length=512,
enable_finbert_sentiment=False,
finbert_model_name="ProsusAI/finbert",
preload_model=False,
preload_text_encoder=False,
scaler_artifact_path=self.scaler_path,
)
def test_preload_loads_text_encoder_when_configured(self):
settings = self.settings.__class__(
**{**self.settings.__dict__, "preload_text_encoder": True}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service.preload()
self.assertTrue(service._text_encoder.preloaded)
def test_service_health_does_not_require_scaler_until_predict(self):
missing_scaler_settings = Settings(
app_name="BuckLakeAI",
app_version="2.0.0",
model_version="anchored-path-v1",
input_contract_version="v2.0.0",
root_dir=self.settings.root_dir,
hf_repo_id=self.settings.hf_repo_id,
hf_model_filename=self.settings.hf_model_filename,
hf_scaler_filename=self.settings.hf_scaler_filename,
hf_cache_dir=self.settings.hf_cache_dir,
hf_token=self.settings.hf_token,
model_weights_path=self.settings.model_weights_path,
text_encoder_model=self.settings.text_encoder_model,
text_encoder_device=self.settings.text_encoder_device,
text_encoder_max_seq_length=self.settings.text_encoder_max_seq_length,
enable_finbert_sentiment=self.settings.enable_finbert_sentiment,
finbert_model_name=self.settings.finbert_model_name,
preload_model=self.settings.preload_model,
preload_text_encoder=self.settings.preload_text_encoder,
scaler_artifact_path=self.settings.root_dir / "missing-scaler.json",
)
service = InferenceService(missing_scaler_settings)
self.assertEqual(service.get_health().status, "ok")
def tearDown(self):
self.temp_dir.cleanup()
def _request_payload(self):
bar = {
"open": 100.0,
"close": 101.0,
"high": 102.0,
"low": 99.0,
"volume": 1000.0,
"amount": 5000.0,
}
bars = [dict(bar, open=100.0 + i, close=101.0 + i, high=102.0 + i, low=99.0 + i, volume=1000.0 + i, amount=5000.0 + i) for i in range(30)]
return {
"symbol": "AAPL",
"published_at": "2026-04-15T12:30:00Z",
"text": "Apple launches something new.",
"market_bars": {
"stock": bars,
"inx": bars,
"dj": bars,
"ixic": bars,
"ndx": bars,
},
"history_news": [
{
"text": "Apple history one",
"published_at": "2026-04-15T09:00:00Z",
"symbols": ["AAPL"],
},
{
"text": "Microsoft history one",
"published_at": "2026-04-14T09:00:00Z",
"symbols": ["MSFT"],
},
],
}
def test_prepare_model_inputs_builds_pool_vectors_from_history_news(self):
from bucklake_ai.schemas import PredictRequest
service = InferenceService(self.settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(self.settings)
request = PredictRequest.model_validate(self._request_payload())
model_inputs = service._prepare_model_inputs(request)
self.assertEqual(len(model_inputs), 14)
self.assertTrue(np.allclose(model_inputs[0], 1.0))
self.assertTrue(np.allclose(model_inputs[3], 0.0))
self.assertEqual(service._text_encoder.encoded_texts[0], request.text)
self.assertEqual(service._text_encoder.sentiment_texts, [])
self.assertEqual(model_inputs[4].shape, (1, 30, 6))
self.assertEqual(model_inputs[9].shape, (1, 768))
self.assertEqual(model_inputs[13].shape, (1, 8))
self.assertGreater(float(np.linalg.norm(model_inputs[9])), 0.0)
self.assertGreater(float(model_inputs[13][0][0]), 0.0)
self.assertGreater(float(model_inputs[13][0][6]), 0.0)
def test_prepare_model_inputs_uses_pos_and_entity_vectors_from_text_encoder(self):
from bucklake_ai.schemas import PredictRequest
service = InferenceService(self.settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(self.settings)
request = PredictRequest.model_validate(self._request_payload())
model_inputs = service._prepare_model_inputs(request)
self.assertEqual(service._text_encoder.pos_entity_texts, [request.text])
self.assertEqual(model_inputs[1].shape, (1, 1024))
self.assertEqual(model_inputs[2].shape, (1, 1024))
self.assertEqual(model_inputs[1].dtype, np.float32)
self.assertEqual(model_inputs[2].dtype, np.float32)
self.assertGreater(float(np.linalg.norm(model_inputs[1])), 0.0)
self.assertGreater(float(np.linalg.norm(model_inputs[2])), 0.0)
def test_prepare_model_inputs_scales_stock_volume_and_ignores_constant_amount(self):
from bucklake_ai.schemas import PredictRequest
service = InferenceService(self.settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(self.settings)
request = PredictRequest.model_validate(self._request_payload())
model_inputs = service._prepare_model_inputs(request)
scaled_stock = model_inputs[8][0]
first_row = scaled_stock[0]
self.assertLess(first_row[0], 1.0)
self.assertGreater(first_row[0], -1.0)
self.assertLess(first_row[1], 1.0)
self.assertGreater(first_row[1], -1.0)
self.assertLess(first_row[4], 0.0)
self.assertEqual(first_row[5], 0.0)
def test_prepare_model_inputs_ignores_stock_amount_when_training_scaler_is_constant(self):
from bucklake_ai.schemas import PredictRequest
payload = self._request_payload()
for bar in payload["market_bars"]["stock"]:
bar["amount"] = bar["close"] * bar["volume"]
service = InferenceService(self.settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(self.settings)
request = PredictRequest.model_validate(payload)
model_inputs = service._prepare_model_inputs(request)
scaled_stock = model_inputs[8][0]
self.assertTrue(np.allclose(scaled_stock[:, 5], 0.0))
def test_prepare_model_inputs_uses_session_aware_index_anchor_for_v2(self):
from bucklake_ai.schemas import PredictRequest
payload = self._request_payload()
payload["text_session"] = "post_market"
payload["market_bars"]["inx"] = [
{
"open": 90.0,
"close": 100.0,
"high": 110.0,
"low": 80.0,
"volume": 1000.0,
"amount": 2000.0,
}
] * 29 + [
{
"open": 108.0,
"close": 120.0,
"high": 132.0,
"low": 96.0,
"volume": 1100.0,
"amount": 2100.0,
}
]
payload["market_bars"]["dj"] = payload["market_bars"]["inx"]
payload["market_bars"]["ixic"] = payload["market_bars"]["inx"]
payload["market_bars"]["ndx"] = payload["market_bars"]["inx"]
payload_scaler = json.loads(self.scaler_path.read_text(encoding="utf-8"))
payload_scaler["metadata"] = {
"numeric_input_version": "index-anchor-relative-v1",
"index_price_representation": "session-aware-log-relative",
}
payload_scaler["index"] = {
"mean": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"scale": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
}
self.scaler_path.write_text(json.dumps(payload_scaler), encoding="utf-8")
settings = self.settings.__class__(
**{**self.settings.__dict__, "model_version": "anchored-path-v2"}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(settings)
request = PredictRequest.model_validate(payload)
model_inputs = service._prepare_model_inputs(request)
index_inx = model_inputs[4][0]
self.assertAlmostEqual(float(index_inx[-1, 1]), 0.0, places=6)
self.assertAlmostEqual(float(index_inx[0, 1]), float(np.log(100.0 / 120.0)), places=6)
self.assertEqual(float(index_inx[-1, 4]), 1100.0)
def test_prepare_model_inputs_rejects_missing_text_session_for_v2(self):
from bucklake_ai.errors import InvalidInferenceInput
from bucklake_ai.schemas import PredictRequest
payload = self._request_payload()
settings = self.settings.__class__(
**{**self.settings.__dict__, "model_version": "anchored-path-v2"}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(settings)
request = PredictRequest.model_validate(payload)
with self.assertRaisesRegex(InvalidInferenceInput, "text_session"):
service._prepare_model_inputs(request)
def test_prepare_model_inputs_rejects_nonpositive_index_ohlc_for_v2(self):
from bucklake_ai.errors import InvalidInferenceInput
from bucklake_ai.schemas import PredictRequest
payload = self._request_payload()
payload["text_session"] = "post_market"
payload["market_bars"]["inx"][-1]["close"] = 0.0
payload_scaler = json.loads(self.scaler_path.read_text(encoding="utf-8"))
payload_scaler["metadata"] = {
"numeric_input_version": "index-anchor-relative-v1",
"index_price_representation": "session-aware-log-relative",
}
self.scaler_path.write_text(json.dumps(payload_scaler), encoding="utf-8")
settings = self.settings.__class__(
**{**self.settings.__dict__, "model_version": "anchored-path-v2"}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(settings)
request = PredictRequest.model_validate(payload)
with self.assertRaisesRegex(InvalidInferenceInput, "index"):
service._prepare_model_inputs(request)
def test_prepare_model_inputs_uses_finbert_sentiment_when_enabled(self):
from bucklake_ai.schemas import PredictRequest
settings = self.settings.__class__(
**{**self.settings.__dict__, "enable_finbert_sentiment": True}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakeModelLoader(settings)
request = PredictRequest.model_validate(self._request_payload())
model_inputs = service._prepare_model_inputs(request)
self.assertEqual(service._text_encoder.sentiment_texts, [request.text])
self.assertEqual(float(model_inputs[3][0][0]), 0.75)
def test_predict_logs_error_when_stock_input_collapses(self):
from bucklake_ai.schemas import PredictRequest
payload = self._request_payload()
for bar in payload["market_bars"]["stock"]:
bar.update(
{
"open": 69.22109597071513,
"close": 69.27929937205207,
"high": 70.12196645745344,
"low": 68.34393667735944,
"volume": float(np.expm1(13.586773233899697)),
"amount": 0.0,
}
)
request = PredictRequest.model_validate(payload)
settings = self.settings.__class__(
**{**self.settings.__dict__, "model_version": "anchored_path_v1"}
)
service = InferenceService(settings)
service._text_encoder = StubTextEncoder()
service._model_loader = FakePredictModelLoader(settings)
with patch("bucklake_ai.service.logger.error") as error:
service.predict(request)
collapsed_calls = [
call for call in error.call_args_list
if call.args[0].startswith("Collapsed model input for symbol %s: %s")
and call.args[1] == "AAPL"
and call.args[2] == "stock_input"
]
self.assertEqual(len(collapsed_calls), 1)
self.assertEqual(collapsed_calls[0].args[3], (1, 30, 6))
self.assertLess(collapsed_calls[0].args[4], 1e-6)
def test_predict_output_changes_when_finbert_sentiment_is_enabled(self):
from bucklake_ai.schemas import PredictRequest
request = PredictRequest.model_validate(self._request_payload())
disabled_settings = self.settings.__class__(
**{**self.settings.__dict__, "model_version": "anchored_path_v1"}
)
disabled_service = InferenceService(disabled_settings)
disabled_service._text_encoder = StubTextEncoder()
disabled_service._model_loader = FakePredictModelLoader(disabled_settings)
enabled_settings = self.settings.__class__(
**{
**self.settings.__dict__,
"model_version": "anchored_path_v1",
"enable_finbert_sentiment": True,
}
)
enabled_service = InferenceService(enabled_settings)
enabled_service._text_encoder = StubTextEncoder()
enabled_service._model_loader = FakePredictModelLoader(enabled_settings)
disabled_response = disabled_service.predict(request)
enabled_response = enabled_service.predict(request)
self.assertEqual(disabled_service._text_encoder.sentiment_texts, [])
self.assertEqual(enabled_service._text_encoder.sentiment_texts, [request.text])
self.assertNotEqual(
disabled_response.derived.predicted_stock_close,
enabled_response.derived.predicted_stock_close,
)
self.assertNotEqual(
disabled_response.derived.predicted_direction_probability,
enabled_response.derived.predicted_direction_probability,
)
if __name__ == "__main__":
unittest.main()