#!/usr/bin/env python3 """Seed Almanac historic accuracy data from local index CSV files. Run this script from the project root so relative paths resolve against the repo: python scripts/seed_accuracy.py """ from __future__ import annotations import argparse import csv import json import sys from collections import defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path PRIMARY_ALMANAC_PATH = Path("data") / "almanac_2026" / "almanac_2026.json" FALLBACK_ALMANAC_PATH = Path("data") / "almanac_2026" / "almanac_2026_db_dump.json" OUTPUT_PATH = Path("data") / "almanac_2026" / "accuracy_results.json" INDEX_CONFIG = { "d": { "csv_key": "dji", "summary_key": "dow", "label": "Dow", "arg": "dji", "default": Path("data") / "historical" / "DJI_daily.csv", }, "s": { "csv_key": "sp500", "summary_key": "sp500", "label": "S&P 500", "arg": "sp500", "default": Path("data") / "historical" / "GSPC_daily.csv", }, "n": { "csv_key": "nasdaq", "summary_key": "nasdaq", "label": "NASDAQ", "arg": "nasdaq", "default": Path("data") / "historical" / "IXIC_daily.csv", }, } def iso_utc_now() -> str: return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def read_json(path: Path) -> dict: with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{path} must contain a top-level JSON object") return payload def load_almanac_predictions(project_root: Path) -> dict[str, dict[str, object]]: primary_path = project_root / PRIMARY_ALMANAC_PATH fallback_path = project_root / FALLBACK_ALMANAC_PATH if primary_path.exists(): payload = read_json(primary_path) daily = payload.get("daily", {}) if isinstance(daily, dict): normalized = {} for date_key, day in daily.items(): if not isinstance(day, dict): continue normalized[str(date_key)] = { "d": float(day.get("d", 0.0)), "s": float(day.get("s", 0.0)), "n": float(day.get("n", 0.0)), "context": str(day.get("notes", "") or "").strip(), } if normalized: return normalized if fallback_path.exists(): payload = read_json(fallback_path) table = payload.get("daily_probabilities", {}) rows = table.get("rows", []) if isinstance(table, dict) else [] if isinstance(rows, list): normalized = {} for row in rows: if not isinstance(row, dict): continue date_key = str(row.get("date", "")).strip() if not date_key: continue normalized[date_key] = { "d": float(row.get("dow_prob", 0.0)), "s": float(row.get("sp500_prob", 0.0)), "n": float(row.get("nasdaq_prob", 0.0)), "context": str(row.get("notes", "") or "").strip(), } if normalized: return normalized raise FileNotFoundError( "No supported almanac source found. Expected " f"{primary_path} or {fallback_path}." ) def parse_close(value: str) -> float: return float(str(value or "").replace(",", "").strip()) def load_history_csv(path: Path) -> dict[str, dict[str, float | None]]: if not path.exists(): raise FileNotFoundError(f"Missing historical CSV: {path}") rows: list[tuple[datetime, float]] = [] with path.open("r", encoding="utf-8-sig", newline="") as handle: reader = csv.DictReader(handle) required = {"Date", "Close"} if not required.issubset(set(reader.fieldnames or [])): raise ValueError(f"{path} must contain Date and Close columns") for row in reader: date_text = str(row.get("Date", "")).strip() if not date_text: continue try: parsed_date = datetime.strptime(date_text, "%m/%d/%Y") close_value = parse_close(str(row.get("Close", ""))) except ValueError as exc: raise ValueError(f"Unable to parse row in {path}: {row}") from exc rows.append((parsed_date, close_value)) if not rows: raise ValueError(f"{path} did not contain any historical rows") rows.sort(key=lambda item: item[0]) lookup: dict[str, dict[str, float | None]] = {} previous_close: float | None = None for trade_date, close_value in rows: iso_date = trade_date.strftime("%Y-%m-%d") lookup[iso_date] = {"close": close_value, "prev_close": previous_close} previous_close = close_value return lookup def actual_direction(pct_change: float) -> str: if pct_change > 0: return "UP" if pct_change < 0: return "DOWN" return "FLAT" def predicted_direction(probability: float) -> str | None: if probability > 50: return "UP" if probability < 50: return "DOWN" return None def score_prediction(probability: float, pct_change: float) -> dict[str, str | None]: predicted = predicted_direction(probability) actual = actual_direction(pct_change) verdict = None if predicted == "UP": verdict = "HIT" if pct_change > 0 else "MISS" elif predicted == "DOWN": verdict = "HIT" if pct_change < 0 else "MISS" return {"verdict": verdict, "predicted": predicted, "actual": actual} def pct(value: int, total: int) -> float: if total <= 0: return 0.0 return round((value / total) * 100, 1) def week_start_key(date_key: str) -> str: parsed = datetime.strptime(date_key, "%Y-%m-%d") week_start = parsed - timedelta(days=parsed.weekday()) return week_start.strftime("%Y-%m-%d") def build_daily_results( almanac_daily: dict[str, dict[str, object]], history_by_index: dict[str, dict[str, dict[str, float | None]]], ) -> dict[str, dict[str, object]]: daily_results: dict[str, dict[str, object]] = {} for date_key in sorted(almanac_daily.keys()): current_records = {} for config in INDEX_CONFIG.values(): history = history_by_index[config["csv_key"]] current_records[config["csv_key"]] = history.get(date_key) if any(record is None or record.get("prev_close") is None for record in current_records.values()): continue day_predictions = almanac_daily[date_key] actuals = {} prev_closes = {} pct_changes = {} results = {} hits = 0 total_calls = 0 for signal_key, config in INDEX_CONFIG.items(): csv_key = config["csv_key"] record = current_records[csv_key] or {} close_value = float(record["close"]) prev_close = float(record["prev_close"]) pct_change = (close_value - prev_close) / prev_close probability = float(day_predictions.get(signal_key, 0.0)) actuals[csv_key] = round(close_value, 6) prev_closes[csv_key] = round(prev_close, 6) pct_changes[csv_key] = round(pct_change, 6) results[signal_key] = score_prediction(probability, pct_change) if results[signal_key]["verdict"] is not None: total_calls += 1 if results[signal_key]["verdict"] == "HIT": hits += 1 daily_results[date_key] = { "actual": actuals, "prev_close": prev_closes, "pct_change": pct_changes, "almanac_scores": { "d": float(day_predictions.get("d", 0.0)), "s": float(day_predictions.get("s", 0.0)), "n": float(day_predictions.get("n", 0.0)), }, "results": results, "hits": hits, "total_calls": total_calls, "context": str(day_predictions.get("context", "") or "").strip(), } return daily_results def aggregate_periods( daily_results: dict[str, dict[str, object]], key_builder, include_dates: bool = False, include_trading_days: bool = False, ) -> dict[str, dict[str, object]]: grouped: dict[str, dict[str, object]] = defaultdict( lambda: { "dates": [], "hits": 0, "total_calls": 0, "dow": {"hits": 0, "total": 0}, "sp500": {"hits": 0, "total": 0}, "nasdaq": {"hits": 0, "total": 0}, } ) for date_key, day in sorted(daily_results.items()): group_key = key_builder(date_key) bucket = grouped[group_key] bucket["dates"].append(date_key) bucket["hits"] += int(day.get("hits", 0)) bucket["total_calls"] += int(day.get("total_calls", 0)) for signal_key, config in INDEX_CONFIG.items(): result = (day.get("results", {}) or {}).get(signal_key, {}) verdict = result.get("verdict") if verdict is None: continue summary_bucket = bucket[config["summary_key"]] summary_bucket["total"] += 1 if verdict == "HIT": summary_bucket["hits"] += 1 summarized: dict[str, dict[str, object]] = {} for group_key, bucket in sorted(grouped.items()): record: dict[str, object] = { "hits": bucket["hits"], "total_calls": bucket["total_calls"], "accuracy": pct(bucket["hits"], bucket["total_calls"]), } if include_dates: record["dates"] = bucket["dates"] for index_key in ("dow", "sp500", "nasdaq"): index_bucket = bucket[index_key] record[index_key] = { "hits": index_bucket["hits"], "total": index_bucket["total"], "pct": pct(index_bucket["hits"], index_bucket["total"]), } if include_trading_days: record["trading_days"] = len(bucket["dates"]) summarized[group_key] = record return summarized def build_output(daily_results: dict[str, dict[str, object]]) -> dict[str, object]: weekly = aggregate_periods( daily_results, key_builder=week_start_key, include_dates=True, ) monthly = aggregate_periods( daily_results, key_builder=lambda date_key: date_key[:7], include_trading_days=True, ) sorted_dates = sorted(daily_results.keys()) return { "meta": { "last_updated": iso_utc_now(), "total_days_scored": len(sorted_dates), "data_range": { "from": sorted_dates[0] if sorted_dates else None, "to": sorted_dates[-1] if sorted_dates else None, }, "source": "Historic CSV backtest via scripts/seed_accuracy.py", }, "daily": daily_results, "weekly": weekly, "monthly": monthly, } def format_score(hits: int, total: int) -> str: if total <= 0: return "0/0 (--%)" return f"{hits}/{total} ({round((hits / total) * 100):.0f}%)" def print_summary(output: dict[str, object]) -> None: monthly = output.get("monthly", {}) if not isinstance(monthly, dict): return print("=== 2026 Almanac Accuracy Backtest ===") print(f"{'Month':<10} {'Dow':<15} {'S&P 500':<15} {'NASDAQ':<15} {'All':<15}") total_hits = 0 total_calls = 0 per_index_totals = { "dow": {"hits": 0, "total": 0}, "sp500": {"hits": 0, "total": 0}, "nasdaq": {"hits": 0, "total": 0}, } for month_key in sorted(monthly.keys()): month_data = monthly[month_key] month_name = datetime.strptime(month_key + "-01", "%Y-%m-%d").strftime("%B") total_hits += int(month_data.get("hits", 0)) total_calls += int(month_data.get("total_calls", 0)) for index_key in per_index_totals: per_index_totals[index_key]["hits"] += int(month_data.get(index_key, {}).get("hits", 0)) per_index_totals[index_key]["total"] += int(month_data.get(index_key, {}).get("total", 0)) print( f"{month_name:<10} " f"{format_score(month_data['dow']['hits'], month_data['dow']['total']):<15} " f"{format_score(month_data['sp500']['hits'], month_data['sp500']['total']):<15} " f"{format_score(month_data['nasdaq']['hits'], month_data['nasdaq']['total']):<15} " f"{format_score(month_data['hits'], month_data['total_calls']):<15}" ) total_label = "Q1 Total" if set(monthly.keys()).issubset({"2026-01", "2026-02", "2026-03"}) else "YTD Total" print( f"{total_label:<10} " f"{format_score(per_index_totals['dow']['hits'], per_index_totals['dow']['total']):<15} " f"{format_score(per_index_totals['sp500']['hits'], per_index_totals['sp500']['total']):<15} " f"{format_score(per_index_totals['nasdaq']['hits'], per_index_totals['nasdaq']['total']):<15} " f"{format_score(total_hits, total_calls):<15}" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Seed Almanac historic accuracy results from local CSV data.") parser.add_argument("--dji", type=Path, default=INDEX_CONFIG["d"]["default"]) parser.add_argument("--sp500", type=Path, default=INDEX_CONFIG["s"]["default"]) parser.add_argument("--nasdaq", type=Path, default=INDEX_CONFIG["n"]["default"]) return parser.parse_args() def main() -> int: args = parse_args() project_root = Path.cwd() try: almanac_daily = load_almanac_predictions(project_root) history_by_index = { "dji": load_history_csv(project_root / Path(args.dji)), "sp500": load_history_csv(project_root / Path(args.sp500)), "nasdaq": load_history_csv(project_root / Path(args.nasdaq)), } daily_results = build_daily_results(almanac_daily, history_by_index) output = build_output(daily_results) output_path = project_root / OUTPUT_PATH output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as handle: json.dump(output, handle, indent=2) handle.write("\n") print_summary(output) print(f"Wrote {output_path}") return 0 except Exception as exc: print(f"[seed_accuracy] {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())