# ===================================================== # utils.py # Backend Functions for Portfolio Allocation # ===================================================== import os import joblib import numpy as np import pandas as pd from stable_baselines3 import PPO from config import ( PPO_PATH, RL_FEATURE_DATA, PRICE_DATA, CORRELATION_MATRIX, ALL_STOCKS, RISK_MAPPING ) # ===================================================== # Load PPO Model # ===================================================== def load_ppo(): model = PPO.load(PPO_PATH) return model # ===================================================== # Load Deployment Data # ===================================================== def load_data(): rl_feature_data = joblib.load(RL_FEATURE_DATA) price_data = joblib.load(PRICE_DATA) correlation_matrix = joblib.load(CORRELATION_MATRIX) return ( rl_feature_data, price_data, correlation_matrix ) # ===================================================== # Latest Stock Prices # ===================================================== def get_latest_prices(price_data): latest_prices = {} for stock in ALL_STOCKS: latest_prices[stock] = float( price_data[stock].iloc[-1] ) return latest_prices # ===================================================== # Latest Feature Vector # ===================================================== def get_latest_feature_matrix( rl_feature_data ): feature_matrix = [] for stock in ALL_STOCKS: latest = ( rl_feature_data[stock] .iloc[-1] .values .astype(np.float32) ) feature_matrix.extend(latest) return np.array( feature_matrix, dtype=np.float32 ) def build_observation( rl_feature_data, budget, risk_profile, investment_horizon ): feature_matrix = get_latest_feature_matrix( rl_feature_data ) shares = np.zeros( len(ALL_STOCKS), dtype=np.float32 ) cash = budget portfolio_value = budget if budget < 100000: tier = 0 elif budget < 500000: tier = 1 else: tier = 2 portfolio_state = np.array( [ budget, tier, cash, RISK_MAPPING[risk_profile], investment_horizon, portfolio_value ], dtype=np.float32 ) observation = np.concatenate( [ feature_matrix, shares, portfolio_state ] ) return observation def predict_portfolio( model, observation ): action, _ = model.predict( observation, deterministic=True ) action = np.clip( action, 0, None ) if action.sum() == 0: action += 1 weights = action / action.sum() return weights def adjust_weights_for_risk(weights, risk_profile): """ Post-process PPO allocation according to investor risk. PPO decides WHICH stocks are best. Risk engine decides HOW MUCH to allocate. weights[-1] = cash """ weights = weights.copy() stock_weights = weights[:-1] cash = weights[-1] # Rank stocks according to PPO ranked = np.argsort(stock_weights)[::-1] # ---------------------------------------------------- # Conservative # ---------------------------------------------------- if risk_profile == "Conservative": cash = 0.15 template = np.array([ 0.18, 0.16, 0.14, 0.12, 0.10, 0.07, 0.05, 0.02, 0.01, 0.00 ]) # ---------------------------------------------------- # Moderate # ---------------------------------------------------- elif risk_profile == "Moderate": return weights # ---------------------------------------------------- # Aggressive # ---------------------------------------------------- else: cash = 0.02 template = np.array([ 0.28, 0.20, 0.15, 0.11, 0.09, 0.07, 0.05, 0.02, 0.01, 0.00 ]) # Scale template to remaining capital template *= (1 - cash) new_stock_weights = np.zeros_like(stock_weights) for i, idx in enumerate(ranked): new_stock_weights[idx] = template[i] final = np.concatenate([new_stock_weights, [cash]]) return final def generate_portfolio( weights, budget, price_data ): latest_prices = get_latest_prices(price_data) allocation = [] cash = budget * weights[-1] # Initial purchase for stock, weight in zip(ALL_STOCKS, weights[:-1]): target = budget * weight price = latest_prices[stock] shares = int(target // price) invested = shares * price cash += target - invested allocation.append({ "Stock": stock, "Price (₹)": round(price,2), "Target": target, "Shares": shares, "Investment (₹)": invested }) allocation = pd.DataFrame(allocation) # --------------------------------------------------- # Redistribute remaining cash # --------------------------------------------------- while True: affordable = allocation[ allocation["Price (₹)"] <= cash ] if affordable.empty: break # Remaining amount needed to reach target affordable = affordable.copy() affordable["Gap"] = ( affordable["Target"] - affordable["Investment (₹)"] ) affordable = affordable.sort_values( "Gap", ascending=False ) bought = False for idx in affordable.index: price = allocation.loc[idx, "Price (₹)"] if price <= cash: allocation.loc[idx, "Shares"] += 1 allocation.loc[idx, "Investment (₹)"] += price cash -= price bought = True break if not bought: break allocation = allocation[ allocation["Shares"] > 0 ] allocation["Weight (%)"] = ( allocation["Investment (₹)"] / budget ) * 100 allocation = allocation.drop( columns="Target" ) allocation = allocation.sort_values( "Investment (₹)", ascending=False ) allocation.reset_index( drop=True, inplace=True ) return allocation, round(cash,2) # ===================================================== # Main Recommendation Pipeline # ===================================================== def generate_recommendation( budget, risk_profile, investment_horizon ): # Load everything model = load_ppo() rl_feature_data, price_data, correlation_matrix = load_data() snapshot_date = ( pd.to_datetime( next(iter(rl_feature_data.values())).index[-1] ).strftime("%d-%b-%Y")) # Build PPO observation observation = build_observation( rl_feature_data=rl_feature_data, budget=budget, risk_profile=risk_profile, investment_horizon=investment_horizon ) # PPO prediction weights = predict_portfolio( model, observation ) weights = adjust_weights_for_risk( weights, risk_profile ) allocation, cash = generate_portfolio( weights, budget, price_data ) summary = { "Total Investment": allocation["Investment (₹)"].sum(), "Cash": cash, "Number of Stocks": len(allocation) } return allocation, cash, summary, weights, snapshot_date