""" P170 Market Data Refresh — yfinance Updates data/lake/clean/market_data/NSE/ with latest candles. Run daily before market open or after close. Usage: python refresh_market_data.py # refresh all symbols python refresh_market_data.py --top-n 200 # refresh top 200 only python refresh_market_data.py --symbol RELIANCE # single symbol """ import argparse import time import sys from pathlib import Path from datetime import datetime import numpy as np import pandas as pd import yfinance as yf CLEAN_DIR = Path("data/lake/clean/market_data/NSE") LOG_EVERY = 50 # Known NSE→Yahoo symbol overrides (symbols that differ on Yahoo Finance) SYMBOL_MAP = { "M&M": "MM.NS", "L&T": "LT.NS", "ARE&M": "ARE&M.NS", "MOTHERSON": "MOTHERSON.NS", } def nse_to_yf(symbol: str) -> str: """Convert NSE symbol to Yahoo Finance ticker.""" if symbol in SYMBOL_MAP: return SYMBOL_MAP[symbol] return f"{symbol}.NS" def get_all_nse_symbols() -> list: """Return all symbols currently in the clean data lake.""" syms = [] for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"): sym = f.stem.replace("_real_market_data_v2", "") syms.append(sym) return sorted(syms) def refresh_symbol(symbol: str, period: str = "1y") -> dict: """ Fetch latest data for one symbol and update the parquet file. Returns status dict. """ path = CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet" yf_sym = nse_to_yf(symbol) try: # Load existing data to find last date last_date = None if path.exists(): existing = pd.read_parquet(path) date_col = "timestamp" if "timestamp" in existing.columns else "date" last_date = pd.to_datetime(existing[date_col]).max() # Fetch from yfinance df = yf.download( yf_sym, period=period, interval="1d", progress=False, auto_adjust=True, ) if df is None or len(df) == 0: return {"symbol": symbol, "status": "no_data", "rows": 0} # Normalise columns df = df.reset_index() df.columns = [c.lower() if isinstance(c, str) else c[0].lower() for c in df.columns] df = df.rename(columns={"date": "timestamp", "index": "timestamp"}) # Keep only OHLCV keep = ["timestamp", "open", "high", "low", "close", "volume"] df = df[[c for c in keep if c in df.columns]].copy() df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None) df = df.dropna(subset=["close"]).sort_values("timestamp").reset_index(drop=True) if len(df) == 0: return {"symbol": symbol, "status": "empty", "rows": 0} # Merge with existing (avoid duplicates) — always enforce clean schema if path.exists() and last_date is not None: new_rows = df[df["timestamp"] > last_date] if len(new_rows) == 0: return {"symbol": symbol, "status": "already_fresh", "rows": len(existing), "last": str(last_date.date())} # Normalise existing to clean schema before concat if date_col == "date": existing = existing.rename(columns={"date": "timestamp"}) existing["timestamp"] = pd.to_datetime(existing["timestamp"]) keep_cols = ["timestamp","open","high","low","close","volume"] existing = existing[[c for c in keep_cols if c in existing.columns]] combined = pd.concat([existing, new_rows], ignore_index=True) combined = combined.drop_duplicates(subset=["timestamp"]).sort_values("timestamp") df = combined df.to_parquet(path, index=False) last = df["timestamp"].max() return { "symbol": symbol, "status": "updated", "rows": len(df), "last": str(last.date()), "yf_sym": yf_sym, } except Exception as e: return {"symbol": symbol, "status": "error", "error": str(e)} def refresh_all(symbols: list, delay: float = 0.1) -> dict: """Refresh all symbols with rate limiting.""" t0 = time.time() results = {"updated": 0, "already_fresh": 0, "no_data": 0, "error": 0} errors = [] print(f"Refreshing {len(symbols)} symbols...") for i, sym in enumerate(symbols): r = refresh_symbol(sym) status = r.get("status", "error") if status == "updated": results["updated"] += 1 elif status == "already_fresh": results["already_fresh"] += 1 elif status in ("no_data", "empty"): results["no_data"] += 1 else: results["error"] += 1 errors.append(f"{sym}: {r.get('error','')}") if (i + 1) % LOG_EVERY == 0: elapsed = time.time() - t0 rate = (i + 1) / elapsed eta = (len(symbols) - i - 1) / rate print(f" [{i+1}/{len(symbols)}] " f"updated={results['updated']} " f"fresh={results['already_fresh']} " f"errors={results['error']} " f"eta={eta:.0f}s") time.sleep(delay) elapsed = time.time() - t0 print(f"\nDone in {elapsed:.0f}s") print(f" Updated: {results['updated']}") print(f" Already fresh:{results['already_fresh']}") print(f" No data: {results['no_data']}") print(f" Errors: {results['error']}") if errors: print(f"\nFirst 10 errors:") for e in errors[:10]: print(f" {e}") return results if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument("--symbol", type=str, default=None) ap.add_argument("--top-n", type=int, default=None) ap.add_argument("--period", type=str, default="1y", help="yfinance period: 1y, 2y, 6mo etc") ap.add_argument("--delay", type=float, default=0.15, help="seconds between requests (avoid rate limiting)") args = ap.parse_args() if args.symbol: r = refresh_symbol(args.symbol.upper(), period=args.period) print(r) else: syms = get_all_nse_symbols() if args.top_n: syms = syms[:args.top_n] refresh_all(syms, delay=args.delay)