File size: 6,380 Bytes
b22f9ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """
P170 Market Data Refresh — yfinance
Updates data/lake/clean/market_data/NSE/ with latest candles.
Run daily before market open or after close.
Usage:
python refresh_market_data.py # refresh all symbols
python refresh_market_data.py --top-n 200 # refresh top 200 only
python refresh_market_data.py --symbol RELIANCE # single symbol
"""
import argparse
import time
import sys
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import yfinance as yf
CLEAN_DIR = Path("data/lake/clean/market_data/NSE")
LOG_EVERY = 50
# Known NSE→Yahoo symbol overrides (symbols that differ on Yahoo Finance)
SYMBOL_MAP = {
"M&M": "MM.NS",
"L&T": "LT.NS",
"ARE&M": "ARE&M.NS",
"MOTHERSON": "MOTHERSON.NS",
}
def nse_to_yf(symbol: str) -> str:
"""Convert NSE symbol to Yahoo Finance ticker."""
if symbol in SYMBOL_MAP:
return SYMBOL_MAP[symbol]
return f"{symbol}.NS"
def get_all_nse_symbols() -> list:
"""Return all symbols currently in the clean data lake."""
syms = []
for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"):
sym = f.stem.replace("_real_market_data_v2", "")
syms.append(sym)
return sorted(syms)
def refresh_symbol(symbol: str, period: str = "1y") -> dict:
"""
Fetch latest data for one symbol and update the parquet file.
Returns status dict.
"""
path = CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet"
yf_sym = nse_to_yf(symbol)
try:
# Load existing data to find last date
last_date = None
if path.exists():
existing = pd.read_parquet(path)
date_col = "timestamp" if "timestamp" in existing.columns else "date"
last_date = pd.to_datetime(existing[date_col]).max()
# Fetch from yfinance
df = yf.download(
yf_sym,
period=period,
interval="1d",
progress=False,
auto_adjust=True,
)
if df is None or len(df) == 0:
return {"symbol": symbol, "status": "no_data", "rows": 0}
# Normalise columns
df = df.reset_index()
df.columns = [c.lower() if isinstance(c, str) else c[0].lower()
for c in df.columns]
df = df.rename(columns={"date": "timestamp", "index": "timestamp"})
# Keep only OHLCV
keep = ["timestamp", "open", "high", "low", "close", "volume"]
df = df[[c for c in keep if c in df.columns]].copy()
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None)
df = df.dropna(subset=["close"]).sort_values("timestamp").reset_index(drop=True)
if len(df) == 0:
return {"symbol": symbol, "status": "empty", "rows": 0}
# Merge with existing (avoid duplicates) — always enforce clean schema
if path.exists() and last_date is not None:
new_rows = df[df["timestamp"] > last_date]
if len(new_rows) == 0:
return {"symbol": symbol, "status": "already_fresh",
"rows": len(existing), "last": str(last_date.date())}
# Normalise existing to clean schema before concat
if date_col == "date":
existing = existing.rename(columns={"date": "timestamp"})
existing["timestamp"] = pd.to_datetime(existing["timestamp"])
keep_cols = ["timestamp","open","high","low","close","volume"]
existing = existing[[c for c in keep_cols if c in existing.columns]]
combined = pd.concat([existing, new_rows], ignore_index=True)
combined = combined.drop_duplicates(subset=["timestamp"]).sort_values("timestamp")
df = combined
df.to_parquet(path, index=False)
last = df["timestamp"].max()
return {
"symbol": symbol, "status": "updated",
"rows": len(df), "last": str(last.date()),
"yf_sym": yf_sym,
}
except Exception as e:
return {"symbol": symbol, "status": "error", "error": str(e)}
def refresh_all(symbols: list, delay: float = 0.1) -> dict:
"""Refresh all symbols with rate limiting."""
t0 = time.time()
results = {"updated": 0, "already_fresh": 0, "no_data": 0, "error": 0}
errors = []
print(f"Refreshing {len(symbols)} symbols...")
for i, sym in enumerate(symbols):
r = refresh_symbol(sym)
status = r.get("status", "error")
if status == "updated":
results["updated"] += 1
elif status == "already_fresh":
results["already_fresh"] += 1
elif status in ("no_data", "empty"):
results["no_data"] += 1
else:
results["error"] += 1
errors.append(f"{sym}: {r.get('error','')}")
if (i + 1) % LOG_EVERY == 0:
elapsed = time.time() - t0
rate = (i + 1) / elapsed
eta = (len(symbols) - i - 1) / rate
print(f" [{i+1}/{len(symbols)}] "
f"updated={results['updated']} "
f"fresh={results['already_fresh']} "
f"errors={results['error']} "
f"eta={eta:.0f}s")
time.sleep(delay)
elapsed = time.time() - t0
print(f"\nDone in {elapsed:.0f}s")
print(f" Updated: {results['updated']}")
print(f" Already fresh:{results['already_fresh']}")
print(f" No data: {results['no_data']}")
print(f" Errors: {results['error']}")
if errors:
print(f"\nFirst 10 errors:")
for e in errors[:10]:
print(f" {e}")
return results
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("--symbol", type=str, default=None)
ap.add_argument("--top-n", type=int, default=None)
ap.add_argument("--period", type=str, default="1y",
help="yfinance period: 1y, 2y, 6mo etc")
ap.add_argument("--delay", type=float, default=0.15,
help="seconds between requests (avoid rate limiting)")
args = ap.parse_args()
if args.symbol:
r = refresh_symbol(args.symbol.upper(), period=args.period)
print(r)
else:
syms = get_all_nse_symbols()
if args.top_n:
syms = syms[:args.top_n]
refresh_all(syms, delay=args.delay)
|