| """ |
| Storage Layer β SQLite for structured data, Parquet for OHLCV time series. |
| Provides a unified interface for persisting and querying all quant data. |
| """ |
| import json |
| import logging |
| import sqlite3 |
| import time |
| from contextlib import contextmanager |
| from pathlib import Path |
|
|
| import pandas as pd |
|
|
| from config import DB_PATH, PARQUET_DIR |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QuantStore: |
| """Unified storage: SQLite (signals, features, logs) + Parquet (OHLCV).""" |
|
|
| def __init__(self, db_path: Path = DB_PATH): |
| self.db_path = db_path |
| self._init_db() |
|
|
| def _init_db(self): |
| """Create SQLite tables if they don't exist.""" |
| with self._conn() as conn: |
| conn.executescript(""" |
| -- Feature cache |
| CREATE TABLE IF NOT EXISTS feature_cache ( |
| ticker TEXT NOT NULL, |
| feature_name TEXT NOT NULL, |
| computed_at REAL NOT NULL, |
| data_json TEXT NOT NULL, |
| PRIMARY KEY (ticker, feature_name) |
| ); |
| |
| -- Signals generated by agents |
| CREATE TABLE IF NOT EXISTS signals ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| ticker TEXT NOT NULL, |
| signal_type TEXT NOT NULL, |
| strength REAL, |
| confidence REAL, |
| horizon_days INTEGER, |
| entry_price REAL, |
| stop_loss REAL, |
| target_1 REAL, |
| target_2 REAL, |
| target_3 REAL, |
| risk_reward REAL, |
| alpha_score REAL, |
| regime TEXT, |
| concern_level TEXT, |
| agent_name TEXT, |
| narrative TEXT, |
| created_at REAL DEFAULT (strftime('%s','now')), |
| status TEXT DEFAULT 'active' |
| ); |
| |
| -- Agent decision log |
| CREATE TABLE IF NOT EXISTS agent_logs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| agent_name TEXT NOT NULL, |
| action TEXT NOT NULL, |
| input_summary TEXT, |
| output_summary TEXT, |
| tokens_used INTEGER DEFAULT 0, |
| latency_ms REAL, |
| created_at REAL DEFAULT (strftime('%s','now')) |
| ); |
| |
| -- Trade tracking (paper + live) |
| CREATE TABLE IF NOT EXISTS trades ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| ticker TEXT NOT NULL, |
| side TEXT NOT NULL, -- 'long' or 'short' |
| entry_price REAL, |
| exit_price REAL, |
| stop_loss REAL, |
| quantity INTEGER, |
| entry_date TEXT, |
| exit_date TEXT, |
| pnl REAL, |
| pnl_pct REAL, |
| status TEXT DEFAULT 'open', -- open, closed, stopped |
| signal_id INTEGER, |
| notes TEXT, |
| created_at REAL DEFAULT (strftime('%s','now')), |
| FOREIGN KEY (signal_id) REFERENCES signals(id) |
| ); |
| |
| -- Scan history |
| CREATE TABLE IF NOT EXISTS scan_history ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| scan_type TEXT NOT NULL, |
| universe_size INTEGER, |
| signals_found INTEGER, |
| duration_seconds REAL, |
| created_at REAL DEFAULT (strftime('%s','now')) |
| ); |
| """) |
| logger.info(f"SQLite initialized at {self.db_path}") |
|
|
| @contextmanager |
| def _conn(self): |
| """Context manager for SQLite connections with WAL mode.""" |
| conn = sqlite3.connect(str(self.db_path), timeout=10) |
| conn.execute("PRAGMA journal_mode=WAL") |
| conn.execute("PRAGMA busy_timeout=5000") |
| conn.row_factory = sqlite3.Row |
| try: |
| yield conn |
| conn.commit() |
| except Exception: |
| conn.rollback() |
| raise |
| finally: |
| conn.close() |
|
|
| |
|
|
| def save_ohlcv(self, ticker: str, df: pd.DataFrame): |
| """Save OHLCV DataFrame to Parquet file (one per ticker).""" |
| safe_name = ticker.replace(".", "_").replace("-", "_") |
| path = PARQUET_DIR / f"{safe_name}.parquet" |
| df.to_parquet(path, engine="pyarrow", index=True) |
| logger.debug(f"Saved {len(df)} rows OHLCV for {ticker}") |
|
|
| def load_ohlcv(self, ticker: str) -> pd.DataFrame | None: |
| """Load cached OHLCV from Parquet. Returns None if not cached.""" |
| safe_name = ticker.replace(".", "_").replace("-", "_") |
| path = PARQUET_DIR / f"{safe_name}.parquet" |
| if not path.exists(): |
| return None |
| try: |
| df = pd.read_parquet(path, engine="pyarrow") |
| return df |
| except Exception as e: |
| logger.warning(f"Error reading Parquet for {ticker}: {e}") |
| return None |
|
|
| def get_ohlcv_last_date(self, ticker: str) -> str | None: |
| """Get the last date in cached OHLCV for incremental updates.""" |
| df = self.load_ohlcv(ticker) |
| if df is None or df.empty: |
| return None |
| return str(df.index[-1].date()) |
|
|
| |
|
|
| def save_features(self, ticker: str, feature_name: str, data: dict): |
| """Cache computed features in SQLite.""" |
| with self._conn() as conn: |
| conn.execute( |
| "INSERT OR REPLACE INTO feature_cache (ticker, feature_name, computed_at, data_json) VALUES (?, ?, ?, ?)", |
| (ticker, feature_name, time.time(), json.dumps(data)), |
| ) |
|
|
| def load_features(self, ticker: str, feature_name: str, max_age_hours: float = 12) -> dict | None: |
| """Load cached features if not stale.""" |
| with self._conn() as conn: |
| row = conn.execute( |
| "SELECT data_json, computed_at FROM feature_cache WHERE ticker=? AND feature_name=?", |
| (ticker, feature_name), |
| ).fetchone() |
| if not row: |
| return None |
| age_hours = (time.time() - row["computed_at"]) / 3600 |
| if age_hours > max_age_hours: |
| return None |
| return json.loads(row["data_json"]) |
|
|
| |
|
|
| def save_signal(self, signal: dict) -> int: |
| """Store a generated signal. Returns signal ID.""" |
| with self._conn() as conn: |
| cursor = conn.execute( |
| """INSERT INTO signals (ticker, signal_type, strength, confidence, horizon_days, |
| entry_price, stop_loss, target_1, target_2, target_3, risk_reward, |
| alpha_score, regime, concern_level, agent_name, narrative) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", |
| ( |
| signal.get("ticker"), signal.get("signal_type"), signal.get("strength"), |
| signal.get("confidence"), signal.get("horizon_days"), |
| signal.get("entry_price"), signal.get("stop_loss"), |
| signal.get("target_1"), signal.get("target_2"), signal.get("target_3"), |
| signal.get("risk_reward"), signal.get("alpha_score"), |
| signal.get("regime"), signal.get("concern_level"), |
| signal.get("agent_name"), signal.get("narrative"), |
| ), |
| ) |
| return cursor.lastrowid |
|
|
| def get_active_signals(self, limit: int = 50) -> list[dict]: |
| """Get currently active signals, ordered by alpha score.""" |
| with self._conn() as conn: |
| rows = conn.execute( |
| "SELECT * FROM signals WHERE status='active' ORDER BY alpha_score DESC LIMIT ?", |
| (limit,), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
| def deactivate_old_signals(self, max_age_days: int = 14): |
| """Mark old signals as expired.""" |
| cutoff = time.time() - (max_age_days * 86400) |
| with self._conn() as conn: |
| conn.execute( |
| "UPDATE signals SET status='expired' WHERE status='active' AND created_at < ?", |
| (cutoff,), |
| ) |
|
|
| |
|
|
| def log_agent(self, agent_name: str, action: str, input_summary: str = "", |
| output_summary: str = "", tokens_used: int = 0, latency_ms: float = 0): |
| """Log an agent decision for auditing.""" |
| with self._conn() as conn: |
| conn.execute( |
| """INSERT INTO agent_logs (agent_name, action, input_summary, output_summary, |
| tokens_used, latency_ms) VALUES (?, ?, ?, ?, ?, ?)""", |
| (agent_name, action, input_summary[:500], output_summary[:500], |
| tokens_used, latency_ms), |
| ) |
|
|
| |
|
|
| def log_scan(self, scan_type: str, universe_size: int, signals_found: int, duration: float): |
| with self._conn() as conn: |
| conn.execute( |
| "INSERT INTO scan_history (scan_type, universe_size, signals_found, duration_seconds) VALUES (?, ?, ?, ?)", |
| (scan_type, universe_size, signals_found, duration), |
| ) |
|
|
| def get_scan_history(self, limit: int = 20) -> list[dict]: |
| with self._conn() as conn: |
| rows = conn.execute( |
| "SELECT * FROM scan_history ORDER BY created_at DESC LIMIT ?", (limit,) |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
| _store: QuantStore | None = None |
|
|
| def get_store() -> QuantStore: |
| global _store |
| if _store is None: |
| _store = QuantStore() |
| return _store |
|
|