SiddharthVenba's picture
Initial commit for HF Space
75d9b3c
Raw
History Blame Contribute Delete
9.97 kB
"""
Storage Layer β€” SQLite for structured data, Parquet for OHLCV time series.
Provides a unified interface for persisting and querying all quant data.
"""
import json
import logging
import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path
import pandas as pd
from config import DB_PATH, PARQUET_DIR
logger = logging.getLogger(__name__)
class QuantStore:
"""Unified storage: SQLite (signals, features, logs) + Parquet (OHLCV)."""
def __init__(self, db_path: Path = DB_PATH):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Create SQLite tables if they don't exist."""
with self._conn() as conn:
conn.executescript("""
-- Feature cache
CREATE TABLE IF NOT EXISTS feature_cache (
ticker TEXT NOT NULL,
feature_name TEXT NOT NULL,
computed_at REAL NOT NULL,
data_json TEXT NOT NULL,
PRIMARY KEY (ticker, feature_name)
);
-- Signals generated by agents
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
signal_type TEXT NOT NULL,
strength REAL,
confidence REAL,
horizon_days INTEGER,
entry_price REAL,
stop_loss REAL,
target_1 REAL,
target_2 REAL,
target_3 REAL,
risk_reward REAL,
alpha_score REAL,
regime TEXT,
concern_level TEXT,
agent_name TEXT,
narrative TEXT,
created_at REAL DEFAULT (strftime('%s','now')),
status TEXT DEFAULT 'active'
);
-- Agent decision log
CREATE TABLE IF NOT EXISTS agent_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
action TEXT NOT NULL,
input_summary TEXT,
output_summary TEXT,
tokens_used INTEGER DEFAULT 0,
latency_ms REAL,
created_at REAL DEFAULT (strftime('%s','now'))
);
-- Trade tracking (paper + live)
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
side TEXT NOT NULL, -- 'long' or 'short'
entry_price REAL,
exit_price REAL,
stop_loss REAL,
quantity INTEGER,
entry_date TEXT,
exit_date TEXT,
pnl REAL,
pnl_pct REAL,
status TEXT DEFAULT 'open', -- open, closed, stopped
signal_id INTEGER,
notes TEXT,
created_at REAL DEFAULT (strftime('%s','now')),
FOREIGN KEY (signal_id) REFERENCES signals(id)
);
-- Scan history
CREATE TABLE IF NOT EXISTS scan_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_type TEXT NOT NULL,
universe_size INTEGER,
signals_found INTEGER,
duration_seconds REAL,
created_at REAL DEFAULT (strftime('%s','now'))
);
""")
logger.info(f"SQLite initialized at {self.db_path}")
@contextmanager
def _conn(self):
"""Context manager for SQLite connections with WAL mode."""
conn = sqlite3.connect(str(self.db_path), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# ── Parquet (OHLCV) ──
def save_ohlcv(self, ticker: str, df: pd.DataFrame):
"""Save OHLCV DataFrame to Parquet file (one per ticker)."""
safe_name = ticker.replace(".", "_").replace("-", "_")
path = PARQUET_DIR / f"{safe_name}.parquet"
df.to_parquet(path, engine="pyarrow", index=True)
logger.debug(f"Saved {len(df)} rows OHLCV for {ticker}")
def load_ohlcv(self, ticker: str) -> pd.DataFrame | None:
"""Load cached OHLCV from Parquet. Returns None if not cached."""
safe_name = ticker.replace(".", "_").replace("-", "_")
path = PARQUET_DIR / f"{safe_name}.parquet"
if not path.exists():
return None
try:
df = pd.read_parquet(path, engine="pyarrow")
return df
except Exception as e:
logger.warning(f"Error reading Parquet for {ticker}: {e}")
return None
def get_ohlcv_last_date(self, ticker: str) -> str | None:
"""Get the last date in cached OHLCV for incremental updates."""
df = self.load_ohlcv(ticker)
if df is None or df.empty:
return None
return str(df.index[-1].date())
# ── Feature Cache ──
def save_features(self, ticker: str, feature_name: str, data: dict):
"""Cache computed features in SQLite."""
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO feature_cache (ticker, feature_name, computed_at, data_json) VALUES (?, ?, ?, ?)",
(ticker, feature_name, time.time(), json.dumps(data)),
)
def load_features(self, ticker: str, feature_name: str, max_age_hours: float = 12) -> dict | None:
"""Load cached features if not stale."""
with self._conn() as conn:
row = conn.execute(
"SELECT data_json, computed_at FROM feature_cache WHERE ticker=? AND feature_name=?",
(ticker, feature_name),
).fetchone()
if not row:
return None
age_hours = (time.time() - row["computed_at"]) / 3600
if age_hours > max_age_hours:
return None
return json.loads(row["data_json"])
# ── Signals ──
def save_signal(self, signal: dict) -> int:
"""Store a generated signal. Returns signal ID."""
with self._conn() as conn:
cursor = conn.execute(
"""INSERT INTO signals (ticker, signal_type, strength, confidence, horizon_days,
entry_price, stop_loss, target_1, target_2, target_3, risk_reward,
alpha_score, regime, concern_level, agent_name, narrative)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
signal.get("ticker"), signal.get("signal_type"), signal.get("strength"),
signal.get("confidence"), signal.get("horizon_days"),
signal.get("entry_price"), signal.get("stop_loss"),
signal.get("target_1"), signal.get("target_2"), signal.get("target_3"),
signal.get("risk_reward"), signal.get("alpha_score"),
signal.get("regime"), signal.get("concern_level"),
signal.get("agent_name"), signal.get("narrative"),
),
)
return cursor.lastrowid
def get_active_signals(self, limit: int = 50) -> list[dict]:
"""Get currently active signals, ordered by alpha score."""
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM signals WHERE status='active' ORDER BY alpha_score DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def deactivate_old_signals(self, max_age_days: int = 14):
"""Mark old signals as expired."""
cutoff = time.time() - (max_age_days * 86400)
with self._conn() as conn:
conn.execute(
"UPDATE signals SET status='expired' WHERE status='active' AND created_at < ?",
(cutoff,),
)
# ── Agent Logs ──
def log_agent(self, agent_name: str, action: str, input_summary: str = "",
output_summary: str = "", tokens_used: int = 0, latency_ms: float = 0):
"""Log an agent decision for auditing."""
with self._conn() as conn:
conn.execute(
"""INSERT INTO agent_logs (agent_name, action, input_summary, output_summary,
tokens_used, latency_ms) VALUES (?, ?, ?, ?, ?, ?)""",
(agent_name, action, input_summary[:500], output_summary[:500],
tokens_used, latency_ms),
)
# ── Scan History ──
def log_scan(self, scan_type: str, universe_size: int, signals_found: int, duration: float):
with self._conn() as conn:
conn.execute(
"INSERT INTO scan_history (scan_type, universe_size, signals_found, duration_seconds) VALUES (?, ?, ?, ?)",
(scan_type, universe_size, signals_found, duration),
)
def get_scan_history(self, limit: int = 20) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM scan_history ORDER BY created_at DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
# ── Singleton ──
_store: QuantStore | None = None
def get_store() -> QuantStore:
global _store
if _store is None:
_store = QuantStore()
return _store