| """ |
| P170 Market Data Refresh — yfinance |
| Updates data/lake/clean/market_data/NSE/ with latest candles. |
| Run daily before market open or after close. |
| |
| Usage: |
| python refresh_market_data.py # refresh all symbols |
| python refresh_market_data.py --top-n 200 # refresh top 200 only |
| python refresh_market_data.py --symbol RELIANCE # single symbol |
| """ |
|
|
| import argparse |
| import time |
| import sys |
| from pathlib import Path |
| from datetime import datetime |
|
|
| import numpy as np |
| import pandas as pd |
| import yfinance as yf |
|
|
| CLEAN_DIR = Path("data/lake/clean/market_data/NSE") |
| LOG_EVERY = 50 |
|
|
| |
| SYMBOL_MAP = { |
| "M&M": "MM.NS", |
| "L&T": "LT.NS", |
| "ARE&M": "ARE&M.NS", |
| "MOTHERSON": "MOTHERSON.NS", |
| } |
|
|
| def nse_to_yf(symbol: str) -> str: |
| """Convert NSE symbol to Yahoo Finance ticker.""" |
| if symbol in SYMBOL_MAP: |
| return SYMBOL_MAP[symbol] |
| return f"{symbol}.NS" |
|
|
|
|
| def get_all_nse_symbols() -> list: |
| """Return all symbols currently in the clean data lake.""" |
| syms = [] |
| for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"): |
| sym = f.stem.replace("_real_market_data_v2", "") |
| syms.append(sym) |
| return sorted(syms) |
|
|
|
|
| def refresh_symbol(symbol: str, period: str = "1y") -> dict: |
| """ |
| Fetch latest data for one symbol and update the parquet file. |
| Returns status dict. |
| """ |
| path = CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet" |
| yf_sym = nse_to_yf(symbol) |
|
|
| try: |
| |
| last_date = None |
| if path.exists(): |
| existing = pd.read_parquet(path) |
| date_col = "timestamp" if "timestamp" in existing.columns else "date" |
| last_date = pd.to_datetime(existing[date_col]).max() |
|
|
| |
| df = yf.download( |
| yf_sym, |
| period=period, |
| interval="1d", |
| progress=False, |
| auto_adjust=True, |
| ) |
|
|
| if df is None or len(df) == 0: |
| return {"symbol": symbol, "status": "no_data", "rows": 0} |
|
|
| |
| df = df.reset_index() |
| df.columns = [c.lower() if isinstance(c, str) else c[0].lower() |
| for c in df.columns] |
| df = df.rename(columns={"date": "timestamp", "index": "timestamp"}) |
|
|
| |
| keep = ["timestamp", "open", "high", "low", "close", "volume"] |
| df = df[[c for c in keep if c in df.columns]].copy() |
| df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None) |
| df = df.dropna(subset=["close"]).sort_values("timestamp").reset_index(drop=True) |
|
|
| if len(df) == 0: |
| return {"symbol": symbol, "status": "empty", "rows": 0} |
|
|
| |
| if path.exists() and last_date is not None: |
| new_rows = df[df["timestamp"] > last_date] |
| if len(new_rows) == 0: |
| return {"symbol": symbol, "status": "already_fresh", |
| "rows": len(existing), "last": str(last_date.date())} |
| |
| if date_col == "date": |
| existing = existing.rename(columns={"date": "timestamp"}) |
| existing["timestamp"] = pd.to_datetime(existing["timestamp"]) |
| keep_cols = ["timestamp","open","high","low","close","volume"] |
| existing = existing[[c for c in keep_cols if c in existing.columns]] |
| combined = pd.concat([existing, new_rows], ignore_index=True) |
| combined = combined.drop_duplicates(subset=["timestamp"]).sort_values("timestamp") |
| df = combined |
|
|
| df.to_parquet(path, index=False) |
| last = df["timestamp"].max() |
| return { |
| "symbol": symbol, "status": "updated", |
| "rows": len(df), "last": str(last.date()), |
| "yf_sym": yf_sym, |
| } |
|
|
| except Exception as e: |
| return {"symbol": symbol, "status": "error", "error": str(e)} |
|
|
|
|
| def refresh_all(symbols: list, delay: float = 0.1) -> dict: |
| """Refresh all symbols with rate limiting.""" |
| t0 = time.time() |
| results = {"updated": 0, "already_fresh": 0, "no_data": 0, "error": 0} |
| errors = [] |
|
|
| print(f"Refreshing {len(symbols)} symbols...") |
| for i, sym in enumerate(symbols): |
| r = refresh_symbol(sym) |
| status = r.get("status", "error") |
|
|
| if status == "updated": |
| results["updated"] += 1 |
| elif status == "already_fresh": |
| results["already_fresh"] += 1 |
| elif status in ("no_data", "empty"): |
| results["no_data"] += 1 |
| else: |
| results["error"] += 1 |
| errors.append(f"{sym}: {r.get('error','')}") |
|
|
| if (i + 1) % LOG_EVERY == 0: |
| elapsed = time.time() - t0 |
| rate = (i + 1) / elapsed |
| eta = (len(symbols) - i - 1) / rate |
| print(f" [{i+1}/{len(symbols)}] " |
| f"updated={results['updated']} " |
| f"fresh={results['already_fresh']} " |
| f"errors={results['error']} " |
| f"eta={eta:.0f}s") |
|
|
| time.sleep(delay) |
|
|
| elapsed = time.time() - t0 |
| print(f"\nDone in {elapsed:.0f}s") |
| print(f" Updated: {results['updated']}") |
| print(f" Already fresh:{results['already_fresh']}") |
| print(f" No data: {results['no_data']}") |
| print(f" Errors: {results['error']}") |
| if errors: |
| print(f"\nFirst 10 errors:") |
| for e in errors[:10]: |
| print(f" {e}") |
| return results |
|
|
|
|
| if __name__ == "__main__": |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--symbol", type=str, default=None) |
| ap.add_argument("--top-n", type=int, default=None) |
| ap.add_argument("--period", type=str, default="1y", |
| help="yfinance period: 1y, 2y, 6mo etc") |
| ap.add_argument("--delay", type=float, default=0.15, |
| help="seconds between requests (avoid rate limiting)") |
| args = ap.parse_args() |
|
|
| if args.symbol: |
| r = refresh_symbol(args.symbol.upper(), period=args.period) |
| print(r) |
| else: |
| syms = get_all_nse_symbols() |
| if args.top_n: |
| syms = syms[:args.top_n] |
| refresh_all(syms, delay=args.delay) |
|
|