""" Online feature store for real-time scoring. Batch feature engineering (src/features.py) recomputes rolling windows over a whole DataFrame. That's impossible at serving time — when a single transaction arrives you have milliseconds and only the card's recent history. This module maintains a compact in-memory state per card (recent timestamps, amounts, last location, running mean) and derives the SAME engineered features incrementally. This is the piece that turns an offline notebook model into a deployable fraud service. The features it emits are column-compatible with the batch pipeline, so the exact same trained model scores them. """ from __future__ import annotations from collections import defaultdict, deque from dataclasses import dataclass, field import numpy as np from src import config from src.features import haversine_km _WINDOWS = {"1h": 3600, "24h": 86400, "7d": 604800} @dataclass class CardState: times: deque = field(default_factory=lambda: deque()) # unix seconds amts: deque = field(default_factory=lambda: deque()) # aligned amounts merch: deque = field(default_factory=lambda: deque()) # aligned merchant ids last_time: float = None last_merch_lat: float = None last_merch_long: float = None sum_amt: float = 0.0 count: int = 0 def prune(self, now: float, horizon: int = 604800): """Drop events older than the largest window (7d).""" while self.times and now - self.times[0] > horizon: self.times.popleft() self.amts.popleft() self.merch.popleft() class OnlineFeatureStore: """Incremental per-card feature computation for single transactions.""" def __init__(self): self._state: dict = defaultdict(CardState) def transform(self, txn: dict) -> dict: """ Given a raw transaction dict, return the engineered feature row (looking only at the card's PAST). Does NOT mutate state — call `update` after you've scored, to mirror production ordering. """ cc = txn[config.CARD_COL] st = self._state[cc] now = float(txn["unix_time"]) st.prune(now) amt = float(txn["amt"]) feats = {} # Transaction + temporal import datetime as _dt ts = _dt.datetime.fromtimestamp(now) feats["amt"] = amt feats["amt_log"] = float(np.log1p(max(amt, 0))) feats["hour"] = ts.hour feats["day_of_week"] = ts.weekday() feats["is_night"] = int(ts.hour < 6 or ts.hour >= 22) feats["is_weekend"] = int(ts.weekday() >= 5) # Demographic age = (now - _to_unix(txn["dob"])) / (365.25 * 86400) feats["age"] = float(np.clip(age, 0, 120)) feats["city_pop_log"] = float(np.log1p(max(float(txn.get("city_pop", 0)), 0))) # Geo feats["dist_home_merchant_km"] = float(haversine_km( txn["lat"], txn["long"], txn["merch_lat"], txn["merch_long"])) if st.last_merch_lat is not None: feats["dist_from_prev_txn_km"] = float(haversine_km( txn["merch_lat"], txn["merch_long"], st.last_merch_lat, st.last_merch_long)) else: feats["dist_from_prev_txn_km"] = 0.0 # Velocity (past only) t_arr = np.array(st.times) a_arr = np.array(st.amts) for suffix, secs in _WINDOWS.items(): if len(t_arr): mask = (now - t_arr) <= secs feats[f"txn_count_{suffix}"] = float(mask.sum()) feats[f"amt_sum_{suffix}"] = float(a_arr[mask].sum()) else: feats[f"txn_count_{suffix}"] = 0.0 feats[f"amt_sum_{suffix}"] = 0.0 feats["amt_mean_24h"] = ( feats["amt_sum_24h"] / feats["txn_count_24h"] if feats["txn_count_24h"] else 0.0) feats["secs_since_prev_txn"] = float(now - st.last_time) if st.last_time else -1.0 # Behavioral past_mean = st.sum_amt / st.count if st.count else amt feats["amt_dev_from_card_mean"] = float(amt - past_mean) feats["amt_ratio_to_card_mean"] = float(min(amt / past_mean, 1000) if past_mean else 1.0) if len(t_arr): mask24 = (now - t_arr) <= 86400 feats["distinct_merchants_24h"] = float(len(set(np.array(st.merch)[mask24]))) else: feats["distinct_merchants_24h"] = 0.0 # Categoricals (passed through) feats["category"] = txn.get("category", "") feats["gender"] = txn.get("gender", "") feats["state"] = txn.get("state", "") return feats def update(self, txn: dict): """Commit this transaction to the card's state (after scoring).""" cc = txn[config.CARD_COL] st = self._state[cc] now = float(txn["unix_time"]) amt = float(txn["amt"]) st.times.append(now) st.amts.append(amt) st.merch.append(txn.get(config.MERCHANT_COL, "")) st.last_time = now st.last_merch_lat = txn["merch_lat"] st.last_merch_long = txn["merch_long"] st.sum_amt += amt st.count += 1 import datetime as _dt _EPOCH = _dt.datetime(1970, 1, 1) def _to_unix(value) -> float: """ Accept a unix float, ISO string, or date string for dob. Uses an explicit epoch difference (not .timestamp()) so pre-1970 dates — common for cardholder dob — work on Windows, where .timestamp() raises OSError for negative values. """ if isinstance(value, (int, float)): return float(value) for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"): try: return (_dt.datetime.strptime(str(value), fmt) - _EPOCH).total_seconds() except ValueError: continue import pandas as pd return (pd.Timestamp(value).to_pydatetime() - _EPOCH).total_seconds()