""" Storage Layer — SQLite for structured data, Parquet for OHLCV time series. Provides a unified interface for persisting and querying all quant data. """ import json import logging import sqlite3 import time from contextlib import contextmanager from pathlib import Path import pandas as pd from config import DB_PATH, PARQUET_DIR logger = logging.getLogger(__name__) class QuantStore: """Unified storage: SQLite (signals, features, logs) + Parquet (OHLCV).""" def __init__(self, db_path: Path = DB_PATH): self.db_path = db_path self._init_db() def _init_db(self): """Create SQLite tables if they don't exist.""" with self._conn() as conn: conn.executescript(""" -- Feature cache CREATE TABLE IF NOT EXISTS feature_cache ( ticker TEXT NOT NULL, feature_name TEXT NOT NULL, computed_at REAL NOT NULL, data_json TEXT NOT NULL, PRIMARY KEY (ticker, feature_name) ); -- Signals generated by agents CREATE TABLE IF NOT EXISTS signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, signal_type TEXT NOT NULL, strength REAL, confidence REAL, horizon_days INTEGER, entry_price REAL, stop_loss REAL, target_1 REAL, target_2 REAL, target_3 REAL, risk_reward REAL, alpha_score REAL, regime TEXT, concern_level TEXT, agent_name TEXT, narrative TEXT, created_at REAL DEFAULT (strftime('%s','now')), status TEXT DEFAULT 'active' ); -- Agent decision log CREATE TABLE IF NOT EXISTS agent_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT NOT NULL, action TEXT NOT NULL, input_summary TEXT, output_summary TEXT, tokens_used INTEGER DEFAULT 0, latency_ms REAL, created_at REAL DEFAULT (strftime('%s','now')) ); -- Trade tracking (paper + live) CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, side TEXT NOT NULL, -- 'long' or 'short' entry_price REAL, exit_price REAL, stop_loss REAL, quantity INTEGER, entry_date TEXT, exit_date TEXT, pnl REAL, pnl_pct REAL, status TEXT DEFAULT 'open', -- open, closed, stopped signal_id INTEGER, notes TEXT, created_at REAL DEFAULT (strftime('%s','now')), FOREIGN KEY (signal_id) REFERENCES signals(id) ); -- Scan history CREATE TABLE IF NOT EXISTS scan_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, scan_type TEXT NOT NULL, universe_size INTEGER, signals_found INTEGER, duration_seconds REAL, created_at REAL DEFAULT (strftime('%s','now')) ); """) logger.info(f"SQLite initialized at {self.db_path}") @contextmanager def _conn(self): """Context manager for SQLite connections with WAL mode.""" conn = sqlite3.connect(str(self.db_path), timeout=10) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() # ── Parquet (OHLCV) ── def save_ohlcv(self, ticker: str, df: pd.DataFrame): """Save OHLCV DataFrame to Parquet file (one per ticker).""" safe_name = ticker.replace(".", "_").replace("-", "_") path = PARQUET_DIR / f"{safe_name}.parquet" df.to_parquet(path, engine="pyarrow", index=True) logger.debug(f"Saved {len(df)} rows OHLCV for {ticker}") def load_ohlcv(self, ticker: str) -> pd.DataFrame | None: """Load cached OHLCV from Parquet. Returns None if not cached.""" safe_name = ticker.replace(".", "_").replace("-", "_") path = PARQUET_DIR / f"{safe_name}.parquet" if not path.exists(): return None try: df = pd.read_parquet(path, engine="pyarrow") return df except Exception as e: logger.warning(f"Error reading Parquet for {ticker}: {e}") return None def get_ohlcv_last_date(self, ticker: str) -> str | None: """Get the last date in cached OHLCV for incremental updates.""" df = self.load_ohlcv(ticker) if df is None or df.empty: return None return str(df.index[-1].date()) # ── Feature Cache ── def save_features(self, ticker: str, feature_name: str, data: dict): """Cache computed features in SQLite.""" with self._conn() as conn: conn.execute( "INSERT OR REPLACE INTO feature_cache (ticker, feature_name, computed_at, data_json) VALUES (?, ?, ?, ?)", (ticker, feature_name, time.time(), json.dumps(data)), ) def load_features(self, ticker: str, feature_name: str, max_age_hours: float = 12) -> dict | None: """Load cached features if not stale.""" with self._conn() as conn: row = conn.execute( "SELECT data_json, computed_at FROM feature_cache WHERE ticker=? AND feature_name=?", (ticker, feature_name), ).fetchone() if not row: return None age_hours = (time.time() - row["computed_at"]) / 3600 if age_hours > max_age_hours: return None return json.loads(row["data_json"]) # ── Signals ── def save_signal(self, signal: dict) -> int: """Store a generated signal. Returns signal ID.""" with self._conn() as conn: cursor = conn.execute( """INSERT INTO signals (ticker, signal_type, strength, confidence, horizon_days, entry_price, stop_loss, target_1, target_2, target_3, risk_reward, alpha_score, regime, concern_level, agent_name, narrative) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( signal.get("ticker"), signal.get("signal_type"), signal.get("strength"), signal.get("confidence"), signal.get("horizon_days"), signal.get("entry_price"), signal.get("stop_loss"), signal.get("target_1"), signal.get("target_2"), signal.get("target_3"), signal.get("risk_reward"), signal.get("alpha_score"), signal.get("regime"), signal.get("concern_level"), signal.get("agent_name"), signal.get("narrative"), ), ) return cursor.lastrowid def get_active_signals(self, limit: int = 50) -> list[dict]: """Get currently active signals, ordered by alpha score.""" with self._conn() as conn: rows = conn.execute( "SELECT * FROM signals WHERE status='active' ORDER BY alpha_score DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] def deactivate_old_signals(self, max_age_days: int = 14): """Mark old signals as expired.""" cutoff = time.time() - (max_age_days * 86400) with self._conn() as conn: conn.execute( "UPDATE signals SET status='expired' WHERE status='active' AND created_at < ?", (cutoff,), ) # ── Agent Logs ── def log_agent(self, agent_name: str, action: str, input_summary: str = "", output_summary: str = "", tokens_used: int = 0, latency_ms: float = 0): """Log an agent decision for auditing.""" with self._conn() as conn: conn.execute( """INSERT INTO agent_logs (agent_name, action, input_summary, output_summary, tokens_used, latency_ms) VALUES (?, ?, ?, ?, ?, ?)""", (agent_name, action, input_summary[:500], output_summary[:500], tokens_used, latency_ms), ) # ── Scan History ── def log_scan(self, scan_type: str, universe_size: int, signals_found: int, duration: float): with self._conn() as conn: conn.execute( "INSERT INTO scan_history (scan_type, universe_size, signals_found, duration_seconds) VALUES (?, ?, ?, ?)", (scan_type, universe_size, signals_found, duration), ) def get_scan_history(self, limit: int = 20) -> list[dict]: with self._conn() as conn: rows = conn.execute( "SELECT * FROM scan_history ORDER BY created_at DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] # ── Singleton ── _store: QuantStore | None = None def get_store() -> QuantStore: global _store if _store is None: _store = QuantStore() return _store