File size: 4,973 Bytes
d5b7ee9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Batch market scanner — caches OHLCV and screens for signals efficiently.

Instead of fetching 30 days of OHLCV per stock per cycle (slow, API-heavy),
this maintains a rolling cache and screens thousands of stocks in batches.
"""

from __future__ import annotations

import json
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd

logger = logging.getLogger(__name__)


class MarketScanner:
    """Maintains rolling OHLCV cache and screens for trading signals.

    Architecture:
      - Each stock has a cached OHLCV window (~60 days) stored on disk
      - Each cycle: fetch today's price (batch), append to cache
      - Screen vectorized: price > 20d_high for all stocks at once
      - Only compute full strategy analysis on breakout candidates
    """

    def __init__(self, cache_dir: Path | None = None):
        self._cache_dir = cache_dir or Path.home() / ".cache" / "trading-cli" / "ohlcv"
        self._cache_dir.mkdir(parents=True, exist_ok=True)
        self._last_fetch: dict[str, float] = {}  # symbol -> last fetch timestamp

    def get_cached(self, symbol: str) -> pd.DataFrame | None:
        """Load cached OHLCV for a symbol. Returns None if missing or stale."""
        path = self._cache_dir / f"{symbol}.json"
        if not path.exists():
            return None
        try:
            data = json.loads(path.read_text())
            if not data.get("bars"):
                return None
            df = pd.DataFrame(data["bars"])
            if "date" in df.columns:
                df["date"] = pd.to_datetime(df["date"])
                df = df.set_index("date")
            return df
        except Exception as exc:
            logger.debug("Cache load failed for %s: %s", symbol, exc)
            return None

    def save(self, symbol: str, df: pd.DataFrame) -> None:
        """Save OHLCV to cache (keeps last 90 days)."""
        try:
            df_cached = df.tail(90).copy()
            bars = df_cached.reset_index().to_dict(orient="records")
            # Serialize dates
            for bar in bars:
                if isinstance(bar.get("date"), pd.Timestamp):
                    bar["date"] = bar["date"].isoformat()
                elif hasattr(bar.get("date"), "isoformat"):
                    bar["date"] = bar["date"].isoformat()
            self._cache_dir.mkdir(parents=True, exist_ok=True)
            path = self._cache_dir / f"{symbol}.json"
            path.write_text(json.dumps({"bars": bars, "updated": datetime.now().isoformat()}))
        except Exception as exc:
            logger.debug("Cache save failed for %s: %s", symbol, exc)

    def append_bar(self, symbol: str, bar: dict) -> pd.DataFrame | None:
        """Append a new daily bar to cache. Returns updated DataFrame."""
        cached = self.get_cached(symbol)
        if cached is not None:
            # Check if bar is already present (same date)
            bar_date = bar.get("date", "")
            if isinstance(bar_date, str):
                bar_date = pd.Timestamp(bar_date)
            last_date = cached.index[-1] if len(cached) > 0 else None
            if last_date and bar_date and bar_date.date() == last_date.date():
                # Update existing bar
                cached.loc[last_date] = bar
            else:
                # Append new bar
                cached.loc[bar_date] = bar
                cached = cached.tail(90)
            self.save(symbol, cached)
            return cached
        return None

    def screen_breakouts(
        self,
        symbols: list[str],
        current_prices: dict[str, float],
        entry_period: int = 20,
    ) -> list[str]:
        """Quick screen: find stocks where price >= 20-day high.

        Uses cached data + current prices. Very fast — no fresh OHLCV fetch.
        """
        candidates = []
        for symbol in symbols:
            price = current_prices.get(symbol)
            if not price:
                continue

            cached = self.get_cached(symbol)
            if cached is None or len(cached) < entry_period:
                continue

            high_col = "high" if "high" in cached.columns else "High"
            if high_col not in cached.columns:
                continue

            donchian_high = cached[high_col].iloc[-entry_period:].max()
            if price >= donchian_high * 0.998:  # ~0.2% tolerance for intraday
                candidates.append(symbol)

        return candidates

    def cleanup_old_cache(self, max_age_days: int = 7) -> int:
        """Remove cache files older than max_age_days. Returns count removed."""
        removed = 0
        cutoff = time.time() - max_age_days * 86400
        for path in self._cache_dir.glob("*.json"):
            try:
                if path.stat().st_mtime < cutoff:
                    path.unlink()
                    removed += 1
            except Exception:
                pass
        return removed