nse-bot-backend / history_utils.py
ash001's picture
Deploy from GitHub Actions to nse-bot-backend
cd7a838 verified
from pathlib import Path
from datetime import datetime
import pandas as pd
import requests
from config import DHAN_API_BASE_URL, get_broker_name, require_dhan_config
from dhan_instruments import get_dhan_equity_row, get_dhan_instrument_row
BASE_DIR = Path(__file__).resolve().parent
NSE_PATH = BASE_DIR / "instruments_nse.csv"
NFO_PATH = BASE_DIR / "instruments_nfo.csv"
_INSTRUMENT_CACHE = {}
# ----------------------------
# Zerodha path
# ----------------------------
def _load_instruments(exchange: str) -> pd.DataFrame:
exchange = exchange.upper()
if exchange in _INSTRUMENT_CACHE:
return _INSTRUMENT_CACHE[exchange].copy()
if exchange == "NSE":
df = pd.read_csv(NSE_PATH)
elif exchange == "NFO":
df = pd.read_csv(NFO_PATH)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
df["tradingsymbol"] = df["tradingsymbol"].astype(str).str.upper()
_INSTRUMENT_CACHE[exchange] = df
return df.copy()
def get_instrument_row(exchange: str, tradingsymbol: str) -> pd.Series:
df = _load_instruments(exchange)
row = df[df["tradingsymbol"] == tradingsymbol.upper()]
if row.empty:
raise ValueError(f"{exchange}:{tradingsymbol} not found in instrument file")
return row.iloc[0]
def _zerodha_historical_candles(
exchange: str,
tradingsymbol: str,
from_dt: datetime,
to_dt: datetime,
interval: str = "5minute",
oi: bool = False,
) -> pd.DataFrame:
from kite_client import get_kite
kite = get_kite()
row = get_instrument_row(exchange, tradingsymbol)
instrument_token = int(row["instrument_token"])
candles = kite.historical_data(
instrument_token=instrument_token,
from_date=from_dt,
to_date=to_dt,
interval=interval,
oi=oi,
)
df = pd.DataFrame(candles)
if df.empty:
return df
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"]).copy()
if df["date"].dt.tz is None:
df["date"] = df["date"].dt.tz_localize("Asia/Kolkata")
else:
df["date"] = df["date"].dt.tz_convert("Asia/Kolkata")
df = df.rename(columns={"date": "timestamp"})
return df
# ----------------------------
# Dhan path
# ----------------------------
def _dhan_headers() -> dict:
from config import DHAN_ACCESS_TOKEN, DHAN_CLIENT_ID
require_dhan_config(require_token=True)
return {
"Accept": "application/json",
"Content-Type": "application/json",
"access-token": DHAN_ACCESS_TOKEN,
"client-id": DHAN_CLIENT_ID,
}
def _dhan_row_for_symbol(exchange: str, tradingsymbol: str) -> pd.Series:
exchange = str(exchange).upper()
if exchange == "NSE":
return get_dhan_equity_row(tradingsymbol)
if exchange == "NFO":
return get_dhan_instrument_row("NSE_FNO", tradingsymbol)
raise ValueError(f"Unsupported Dhan exchange: {exchange}")
def _format_dhan_intraday_df(body: dict) -> pd.DataFrame:
timestamps = body.get("timestamp") or []
if not timestamps:
return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume", "open_interest"])
n = len(timestamps)
def _col(key):
values = body.get(key)
if values is None:
return [None] * n
if len(values) != n:
# Pad / truncate to match timestamp length so DataFrame construction never fails
if len(values) < n:
return list(values) + [None] * (n - len(values))
return list(values)[:n]
return values
df = pd.DataFrame({
"timestamp": pd.to_datetime(timestamps, unit="s", errors="coerce", utc=True),
"open": _col("open"),
"high": _col("high"),
"low": _col("low"),
"close": _col("close"),
"volume": _col("volume"),
"open_interest": _col("open_interest"),
})
df = df.dropna(subset=["timestamp"]).copy()
df["timestamp"] = df["timestamp"].dt.tz_convert("Asia/Kolkata")
return df
def _dhan_historical_candles(
exchange: str,
tradingsymbol: str,
from_dt: datetime,
to_dt: datetime,
interval: str = "5minute",
oi: bool = False,
) -> pd.DataFrame:
interval_map = {
"minute": "1",
"1minute": "1",
"3minute": "3",
"5minute": "5",
"15minute": "15",
"25minute": "25",
"60minute": "60",
}
dhan_interval = interval_map.get(str(interval).lower())
if dhan_interval is None:
raise ValueError(f"Unsupported Dhan interval: {interval}")
row = _dhan_row_for_symbol(exchange, tradingsymbol)
payload = {
"securityId": str(row["security_id"]),
"exchangeSegment": str(row["exchange_segment"]),
"instrument": str(row["instrument"]),
"interval": dhan_interval,
"oi": bool(oi),
"fromDate": pd.Timestamp(from_dt).tz_convert("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S") if pd.Timestamp(from_dt).tzinfo else pd.Timestamp(from_dt).tz_localize("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S"),
"toDate": pd.Timestamp(to_dt).tz_convert("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S") if pd.Timestamp(to_dt).tzinfo else pd.Timestamp(to_dt).tz_localize("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S"),
}
resp = requests.post(
f"{DHAN_API_BASE_URL.rstrip('/')}/charts/intraday",
json=payload,
headers=_dhan_headers(),
timeout=30,
)
try:
body = resp.json()
except Exception:
body = {"detail": resp.text}
if resp.status_code >= 400:
raise RuntimeError(f"Dhan historical API error {resp.status_code}: {body}")
return _format_dhan_intraday_df(body)
# ----------------------------
# Public API
# ----------------------------
def get_historical_candles(
exchange: str,
tradingsymbol: str,
from_dt: datetime,
to_dt: datetime,
interval: str = "5minute",
oi: bool = False,
) -> pd.DataFrame:
if get_broker_name() == "dhan":
return _dhan_historical_candles(exchange, tradingsymbol, from_dt, to_dt, interval=interval, oi=oi)
return _zerodha_historical_candles(exchange, tradingsymbol, from_dt, to_dt, interval=interval, oi=oi)
def get_stock_candles(symbol: str, from_dt: datetime, to_dt: datetime, interval: str = "5minute") -> pd.DataFrame:
return get_historical_candles("NSE", symbol, from_dt, to_dt, interval=interval)
def get_option_candles(option_symbol: str, from_dt: datetime, to_dt: datetime, interval: str = "5minute") -> pd.DataFrame:
return get_historical_candles("NFO", option_symbol, from_dt, to_dt, interval=interval)
def get_stock_5m(symbol: str, from_dt: datetime, to_dt: datetime) -> pd.DataFrame:
return get_stock_candles(symbol, from_dt, to_dt, interval="5minute")
def get_option_5m(option_symbol: str, from_dt: datetime, to_dt: datetime) -> pd.DataFrame:
return get_option_candles(option_symbol, from_dt, to_dt, interval="5minute")