Spaces:
Runtime error
Runtime error
File size: 4,973 Bytes
d5b7ee9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | """Batch market scanner — caches OHLCV and screens for signals efficiently.
Instead of fetching 30 days of OHLCV per stock per cycle (slow, API-heavy),
this maintains a rolling cache and screens thousands of stocks in batches.
"""
from __future__ import annotations
import json
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
logger = logging.getLogger(__name__)
class MarketScanner:
"""Maintains rolling OHLCV cache and screens for trading signals.
Architecture:
- Each stock has a cached OHLCV window (~60 days) stored on disk
- Each cycle: fetch today's price (batch), append to cache
- Screen vectorized: price > 20d_high for all stocks at once
- Only compute full strategy analysis on breakout candidates
"""
def __init__(self, cache_dir: Path | None = None):
self._cache_dir = cache_dir or Path.home() / ".cache" / "trading-cli" / "ohlcv"
self._cache_dir.mkdir(parents=True, exist_ok=True)
self._last_fetch: dict[str, float] = {} # symbol -> last fetch timestamp
def get_cached(self, symbol: str) -> pd.DataFrame | None:
"""Load cached OHLCV for a symbol. Returns None if missing or stale."""
path = self._cache_dir / f"{symbol}.json"
if not path.exists():
return None
try:
data = json.loads(path.read_text())
if not data.get("bars"):
return None
df = pd.DataFrame(data["bars"])
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
return df
except Exception as exc:
logger.debug("Cache load failed for %s: %s", symbol, exc)
return None
def save(self, symbol: str, df: pd.DataFrame) -> None:
"""Save OHLCV to cache (keeps last 90 days)."""
try:
df_cached = df.tail(90).copy()
bars = df_cached.reset_index().to_dict(orient="records")
# Serialize dates
for bar in bars:
if isinstance(bar.get("date"), pd.Timestamp):
bar["date"] = bar["date"].isoformat()
elif hasattr(bar.get("date"), "isoformat"):
bar["date"] = bar["date"].isoformat()
self._cache_dir.mkdir(parents=True, exist_ok=True)
path = self._cache_dir / f"{symbol}.json"
path.write_text(json.dumps({"bars": bars, "updated": datetime.now().isoformat()}))
except Exception as exc:
logger.debug("Cache save failed for %s: %s", symbol, exc)
def append_bar(self, symbol: str, bar: dict) -> pd.DataFrame | None:
"""Append a new daily bar to cache. Returns updated DataFrame."""
cached = self.get_cached(symbol)
if cached is not None:
# Check if bar is already present (same date)
bar_date = bar.get("date", "")
if isinstance(bar_date, str):
bar_date = pd.Timestamp(bar_date)
last_date = cached.index[-1] if len(cached) > 0 else None
if last_date and bar_date and bar_date.date() == last_date.date():
# Update existing bar
cached.loc[last_date] = bar
else:
# Append new bar
cached.loc[bar_date] = bar
cached = cached.tail(90)
self.save(symbol, cached)
return cached
return None
def screen_breakouts(
self,
symbols: list[str],
current_prices: dict[str, float],
entry_period: int = 20,
) -> list[str]:
"""Quick screen: find stocks where price >= 20-day high.
Uses cached data + current prices. Very fast — no fresh OHLCV fetch.
"""
candidates = []
for symbol in symbols:
price = current_prices.get(symbol)
if not price:
continue
cached = self.get_cached(symbol)
if cached is None or len(cached) < entry_period:
continue
high_col = "high" if "high" in cached.columns else "High"
if high_col not in cached.columns:
continue
donchian_high = cached[high_col].iloc[-entry_period:].max()
if price >= donchian_high * 0.998: # ~0.2% tolerance for intraday
candidates.append(symbol)
return candidates
def cleanup_old_cache(self, max_age_days: int = 7) -> int:
"""Remove cache files older than max_age_days. Returns count removed."""
removed = 0
cutoff = time.time() - max_age_days * 86400
for path in self._cache_dir.glob("*.json"):
try:
if path.stat().st_mtime < cutoff:
path.unlink()
removed += 1
except Exception:
pass
return removed
|