Spaces:
Running
Running
| """ | |
| Dhan API integration utilities | |
| """ | |
| import requests | |
| import pandas as pd | |
| from dhanhq import dhanhq | |
| from backend.utils.data_loader import safe_json_to_df | |
| def fetch_holdings(access_token: str) -> pd.DataFrame: | |
| """ | |
| Fetch holdings from Dhan API | |
| Args: | |
| access_token: Client access token | |
| Returns: | |
| DataFrame with holdings data | |
| """ | |
| url = "https://api.dhan.co/v2/holdings" | |
| headers = {"Content-Type": "application/json", "access-token": access_token} | |
| try: | |
| resp = requests.get(url, headers=headers, timeout=8) | |
| except Exception as e: | |
| print(f"Error fetching holdings: {e}") | |
| return pd.DataFrame() | |
| columns_holding = [ | |
| "exchange", | |
| "tradingSymbol", | |
| "securityId", | |
| "isin", | |
| "totalQty", | |
| "dpQty", | |
| "t1Qty", | |
| "mtf_t1_qty", | |
| "mtf_qty", | |
| "availableQty", | |
| "collateralQty", | |
| "avgCostPrice", | |
| "lastTradedPrice", | |
| ] | |
| df = safe_json_to_df(resp, columns_holding) | |
| # Ensure numeric columns exist | |
| for c in ["mtf_qty", "availableQty", "avgCostPrice", "lastTradedPrice"]: | |
| if c not in df.columns: | |
| df[c] = 0 | |
| df["mtf_qty"] = pd.to_numeric(df["mtf_qty"], errors="coerce").fillna(0) | |
| df["availableQty"] = pd.to_numeric(df["availableQty"], errors="coerce").fillna(0) | |
| df["avgCostPrice"] = pd.to_numeric(df["avgCostPrice"], errors="coerce").fillna(0) | |
| df["lastTradedPrice"] = pd.to_numeric(df["lastTradedPrice"], errors="coerce").fillna(0) | |
| return df | |
| def fetch_positions(access_token: str) -> pd.DataFrame: | |
| """ | |
| Fetch positions from Dhan API | |
| Args: | |
| access_token: Client access token | |
| Returns: | |
| DataFrame with positions data | |
| """ | |
| url = "https://api.dhan.co/v2/positions" | |
| headers = {"Content-Type": "application/json", "access-token": access_token} | |
| try: | |
| resp = requests.get(url, headers=headers, timeout=8) | |
| except Exception as e: | |
| print(f"Error fetching positions: {e}") | |
| return pd.DataFrame() | |
| columns_position = [ | |
| "dhanClientId", | |
| "tradingSymbol", | |
| "securityId", | |
| "positionType", | |
| "exchangeSegment", | |
| "productType", | |
| "buyAvg", | |
| "costPrice", | |
| "buyQty", | |
| "sellAvg", | |
| "sellQty", | |
| "netQty", | |
| "realizedProfit", | |
| "unrealizedProfit", | |
| "rbiReferenceRate", | |
| "multiplier", | |
| "carryForwardBuyQty", | |
| "carryForwardSellQty", | |
| "carryForwardBuyValue", | |
| "carryForwardSellValue", | |
| "dayBuyQty", | |
| "daySellQty", | |
| "dayBuyValue", | |
| "daySellValue", | |
| "drvExpiryDate", | |
| "drvOptionType", | |
| "drvStrikePrice", | |
| "crossCurrency", | |
| ] | |
| df = safe_json_to_df(resp, columns_position) | |
| # Ensure numeric columns exist | |
| for c in ["netQty", "costPrice", "unrealizedProfit"]: | |
| if c not in df.columns: | |
| df[c] = 0 | |
| df["netQty"] = pd.to_numeric(df["netQty"], errors="coerce").fillna(0) | |
| df["costPrice"] = pd.to_numeric(df["costPrice"], errors="coerce").fillna(0) | |
| df["unrealizedProfit"] = pd.to_numeric(df["unrealizedProfit"], errors="coerce").fillna(0) | |
| return df | |
| def process_client_data(client_row: pd.Series) -> pd.DataFrame: | |
| """ | |
| Process client data by combining holdings and positions | |
| Args: | |
| client_row: Series containing client details | |
| Returns: | |
| DataFrame with combined holdings and positions | |
| """ | |
| access_token = client_row.get("access_token") | |
| account_holder = client_row.get("Client Name") or client_row.get("client_id") | |
| holding_df = fetch_holdings(access_token) | |
| position_df = fetch_positions(access_token) | |
| # Normalize column names and compute P&L | |
| if not holding_df.empty: | |
| holding_df["productType"] = holding_df["mtf_qty"].apply(lambda x: "MTF" if x > 0 else "CASH") | |
| holding_df["positionType"] = holding_df["availableQty"].apply(lambda x: "BUY" if x > 0 else "SELL") | |
| holding_df["exchangeSegment"] = holding_df["availableQty"].apply(lambda x: "NSE_EQ" if x > 0 else "NONE") | |
| holding_df["netQty"] = holding_df["availableQty"] | |
| holding_df["costPrice"] = holding_df["avgCostPrice"] | |
| holding_df["P & L"] = (holding_df["lastTradedPrice"] - holding_df["avgCostPrice"]) * holding_df["netQty"] | |
| # Positions may already have netQty / costPrice / unrealizedProfit | |
| if not position_df.empty: | |
| position_df = position_df.rename(columns={"unrealizedProfit": "P & L"}) | |
| columns = ["tradingSymbol", "positionType", "exchangeSegment", "productType", "costPrice", "netQty", "P & L"] | |
| if position_df.empty and holding_df.empty: | |
| result = pd.DataFrame(columns=columns) | |
| else: | |
| parts = [] | |
| if not position_df.empty: | |
| parts.append(position_df.reindex(columns=columns, fill_value=0)) | |
| if not holding_df.empty: | |
| parts.append(holding_df.reindex(columns=columns, fill_value=0)) | |
| result = pd.concat(parts, ignore_index=True) | |
| result = result[result["netQty"] != 0].reset_index(drop=True) | |
| result['costPrice'] = round(result['costPrice'], 2) | |
| # Attach account holder for traceability | |
| if not result.empty: | |
| result["Account_Holder"] = account_holder | |
| return result | |
| def add_total_row(df: pd.DataFrame, label: str = "TOTAL") -> pd.DataFrame: | |
| """ | |
| Add a total row to DataFrame with summed numeric columns | |
| Args: | |
| df: Input DataFrame | |
| label: Label for the total row | |
| Returns: | |
| DataFrame with total row appended | |
| """ | |
| if df.empty: | |
| return df | |
| total_row = {col: "" for col in df.columns} | |
| first_col = list(df.columns)[0] | |
| total_row[first_col] = label | |
| if "P & L" in df.columns: | |
| total_row["P & L"] = df["P & L"].sum() | |
| if "My Investment" in df.columns: | |
| total_row["My Investment"] = df["My Investment"].sum() | |
| if "Total Investment" in df.columns: | |
| total_row["Total Investment"] = df["Total Investment"].sum() | |
| if "Profit %" in df.columns: | |
| if "My Investment" in df.columns and df["My Investment"].sum() != 0: | |
| weighted_profit = (df["P & L"].sum() / df["My Investment"].sum()) * 100 | |
| total_row["Profit %"] = round(weighted_profit, 2) | |
| else: | |
| total_row["Profit %"] = 0 | |
| return pd.concat([df, pd.DataFrame([total_row])], ignore_index=True) | |
| def get_fund_limits(client_id: str, access_token: str) -> dict: | |
| """ | |
| Get fund limits for a client using dhanhq SDK | |
| Args: | |
| client_id: Client ID | |
| access_token: Access token | |
| Returns: | |
| Dictionary with fund limit information | |
| """ | |
| try: | |
| dhan = dhanhq(client_id, access_token) | |
| fund_info = dhan.get_fund_limits().get("data", {}) | |
| return { | |
| "availabelBalance": fund_info.get("availabelBalance", 0), | |
| "utilizedAmount": fund_info.get("utilizedAmount", 0) | |
| } | |
| except Exception as e: | |
| print(f"Error fetching fund limits for {client_id}: {e}") | |
| return {"availabelBalance": 0, "utilizedAmount": 0} | |