DHAN_AI_DATA / refresh_market_data.py
addyAIMLprojects's picture
P170 10yr model + code — 2026-05-15
b22f9ac verified
"""
P170 Market Data Refresh — yfinance
Updates data/lake/clean/market_data/NSE/ with latest candles.
Run daily before market open or after close.
Usage:
python refresh_market_data.py # refresh all symbols
python refresh_market_data.py --top-n 200 # refresh top 200 only
python refresh_market_data.py --symbol RELIANCE # single symbol
"""
import argparse
import time
import sys
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import yfinance as yf
CLEAN_DIR = Path("data/lake/clean/market_data/NSE")
LOG_EVERY = 50
# Known NSE→Yahoo symbol overrides (symbols that differ on Yahoo Finance)
SYMBOL_MAP = {
"M&M": "MM.NS",
"L&T": "LT.NS",
"ARE&M": "ARE&M.NS",
"MOTHERSON": "MOTHERSON.NS",
}
def nse_to_yf(symbol: str) -> str:
"""Convert NSE symbol to Yahoo Finance ticker."""
if symbol in SYMBOL_MAP:
return SYMBOL_MAP[symbol]
return f"{symbol}.NS"
def get_all_nse_symbols() -> list:
"""Return all symbols currently in the clean data lake."""
syms = []
for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"):
sym = f.stem.replace("_real_market_data_v2", "")
syms.append(sym)
return sorted(syms)
def refresh_symbol(symbol: str, period: str = "1y") -> dict:
"""
Fetch latest data for one symbol and update the parquet file.
Returns status dict.
"""
path = CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet"
yf_sym = nse_to_yf(symbol)
try:
# Load existing data to find last date
last_date = None
if path.exists():
existing = pd.read_parquet(path)
date_col = "timestamp" if "timestamp" in existing.columns else "date"
last_date = pd.to_datetime(existing[date_col]).max()
# Fetch from yfinance
df = yf.download(
yf_sym,
period=period,
interval="1d",
progress=False,
auto_adjust=True,
)
if df is None or len(df) == 0:
return {"symbol": symbol, "status": "no_data", "rows": 0}
# Normalise columns
df = df.reset_index()
df.columns = [c.lower() if isinstance(c, str) else c[0].lower()
for c in df.columns]
df = df.rename(columns={"date": "timestamp", "index": "timestamp"})
# Keep only OHLCV
keep = ["timestamp", "open", "high", "low", "close", "volume"]
df = df[[c for c in keep if c in df.columns]].copy()
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None)
df = df.dropna(subset=["close"]).sort_values("timestamp").reset_index(drop=True)
if len(df) == 0:
return {"symbol": symbol, "status": "empty", "rows": 0}
# Merge with existing (avoid duplicates) — always enforce clean schema
if path.exists() and last_date is not None:
new_rows = df[df["timestamp"] > last_date]
if len(new_rows) == 0:
return {"symbol": symbol, "status": "already_fresh",
"rows": len(existing), "last": str(last_date.date())}
# Normalise existing to clean schema before concat
if date_col == "date":
existing = existing.rename(columns={"date": "timestamp"})
existing["timestamp"] = pd.to_datetime(existing["timestamp"])
keep_cols = ["timestamp","open","high","low","close","volume"]
existing = existing[[c for c in keep_cols if c in existing.columns]]
combined = pd.concat([existing, new_rows], ignore_index=True)
combined = combined.drop_duplicates(subset=["timestamp"]).sort_values("timestamp")
df = combined
df.to_parquet(path, index=False)
last = df["timestamp"].max()
return {
"symbol": symbol, "status": "updated",
"rows": len(df), "last": str(last.date()),
"yf_sym": yf_sym,
}
except Exception as e:
return {"symbol": symbol, "status": "error", "error": str(e)}
def refresh_all(symbols: list, delay: float = 0.1) -> dict:
"""Refresh all symbols with rate limiting."""
t0 = time.time()
results = {"updated": 0, "already_fresh": 0, "no_data": 0, "error": 0}
errors = []
print(f"Refreshing {len(symbols)} symbols...")
for i, sym in enumerate(symbols):
r = refresh_symbol(sym)
status = r.get("status", "error")
if status == "updated":
results["updated"] += 1
elif status == "already_fresh":
results["already_fresh"] += 1
elif status in ("no_data", "empty"):
results["no_data"] += 1
else:
results["error"] += 1
errors.append(f"{sym}: {r.get('error','')}")
if (i + 1) % LOG_EVERY == 0:
elapsed = time.time() - t0
rate = (i + 1) / elapsed
eta = (len(symbols) - i - 1) / rate
print(f" [{i+1}/{len(symbols)}] "
f"updated={results['updated']} "
f"fresh={results['already_fresh']} "
f"errors={results['error']} "
f"eta={eta:.0f}s")
time.sleep(delay)
elapsed = time.time() - t0
print(f"\nDone in {elapsed:.0f}s")
print(f" Updated: {results['updated']}")
print(f" Already fresh:{results['already_fresh']}")
print(f" No data: {results['no_data']}")
print(f" Errors: {results['error']}")
if errors:
print(f"\nFirst 10 errors:")
for e in errors[:10]:
print(f" {e}")
return results
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("--symbol", type=str, default=None)
ap.add_argument("--top-n", type=int, default=None)
ap.add_argument("--period", type=str, default="1y",
help="yfinance period: 1y, 2y, 6mo etc")
ap.add_argument("--delay", type=float, default=0.15,
help="seconds between requests (avoid rate limiting)")
args = ap.parse_args()
if args.symbol:
r = refresh_symbol(args.symbol.upper(), period=args.period)
print(r)
else:
syms = get_all_nse_symbols()
if args.top_n:
syms = syms[:args.top_n]
refresh_all(syms, delay=args.delay)