import pandas as pd import os import json import yfinance as yf from langchain_core.output_parsers import JsonOutputParser from pydantic import BaseModel, Field, ValidationError from typing import List, Optional, Dict from langchain_groq import ChatGroq from dataclasses import dataclass, field from dotenv import load_dotenv import pickle load_dotenv() # Load environment variables from .env # Configuration: Move to configurations class Config: ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") STOCK_DATA_DIR = "stock_data_NSE" OUTPUT_FILE = "output_files/portfolio.json" SECTORS = [ "Communication Services", "Consumer Discretionary", "Consumer Staples", "Energy", "Financials", "Health Care", "Industrials", "Information Technology", "Materials", "Real Estate", "Utilities" ] # Create the output directory if it doesn't exist if not os.path.exists(Config.STOCK_DATA_DIR): os.makedirs(Config.STOCK_DATA_DIR) def fetch_stock_data(symbols: List[str]) -> Dict[str, pd.DataFrame | None]: """Fetches stock data for multiple symbols from Yahoo Finance. Args: symbols (list): A list of stock symbols (e.g., ["RELIANCE.NS", "TCS.NS"]). Returns: dict: A dictionary where keys are stock symbols and values are pandas DataFrames or None if an error occurred. """ stock_dataframes = {} for symbol in symbols: try: ticker = yf.Ticker(symbol) data = ticker.history(period="max") if data.empty: print(f"Warning: No data found for symbol '{symbol}'.") stock_dataframes[symbol] = None continue stock_dataframes[symbol] = data except Exception as e: print(f"Error fetching data for symbol '{symbol}': {e}") stock_dataframes[symbol] = None return stock_dataframes def store_stock_data(stock_dataframes: Dict[str, pd.DataFrame | None], output_path: str = Config.STOCK_DATA_DIR) -> None: """Stores stock data to local CSV files. Args: stock_dataframes (dict): A dictionary where keys are stock symbols and values are pandas DataFrames. output_path (str, optional): Path to store files. Defaults to STOCK_DATA_DIR """ for symbol, data in stock_dataframes.items(): if data is not None: file_name = f"{symbol}_daily_data.csv" file_path = os.path.join(output_path, file_name) try: data.to_csv(file_path) print(f"Info: Data for '{symbol}' saved to {file_path}") except Exception as e: print(f"Error saving data for '{symbol}' to {file_path}: {e}") else: print(f"Warning: No data available for '{symbol}', skipping storage.") def load_stock_data_and_extract_price(output_path_dir: str) -> Dict[str, Dict[str, float]]: """Loads stock data from CSV files and extracts the most recent (last) day's closing price. Args: output_path_dir (str): Path where the CSV files are located. Returns: dict: A dictionary where keys are stock symbols and values are dictionaries containing the initial price. """ all_stock_data = {} for filename in os.listdir(output_path_dir): if filename.endswith("_daily_data.csv"): symbol = filename.replace("_daily_data.csv", "") file_path = os.path.join(output_path_dir, filename) try: df = pd.read_csv(file_path, index_col=0) if not df.empty: initial_price = df.iloc[-1]['Close'] all_stock_data[symbol] = {"initial_price": initial_price} else: print(f"Warning: Empty dataframe for symbol '{symbol}'. Setting initial price to 0") all_stock_data[symbol] = {"initial_price": 0.0} except (IndexError, KeyError, FileNotFoundError) as e: print(f"Error occurred for reading {symbol}, due to: {e}") all_stock_data[symbol] = {"initial_price": 0.0} # default initial price is 0.0 return all_stock_data def merge_stock_data_with_price(stock_data: Dict, extracted_data: Dict) -> Dict: """Merges the extracted price data with the main stock data. Args: stock_data (dict): Stock data dictionary (name, symbol, quantity) extracted_data (dict): Extracted price data dictionary (symbol: initial_price) Returns: dict: merged data of stocks """ merged_stock_data = stock_data.copy() for key, value in stock_data.items(): symbol = value["symbol"] if symbol in extracted_data: merged_stock_data[key]["initial_price"] = extracted_data[symbol]["initial_price"] else: merged_stock_data[key]["initial_price"] = 0.0 # default value if it cannot be extracted return merged_stock_data def generate_prompt(stock_data: Dict) -> str: """Generates a prompt for the language model with all the stock data Args: stock_data (dict): merged stock data that includes stock name, symbol, quantity, and initial price Returns: str: Formatted prompt for LLM """ prompt_template_with_price = """ You are a financial analysis expert. Please provide a summary of the following stock data, including the company name, stock symbol, and initial purchase price. Stock Data: {stock_data} Summary: """ stock_json_str = json.dumps(stock_data) formatted_prompt_with_price = prompt_template_with_price.format(stock_data=stock_json_str) return formatted_prompt_with_price class Asset(BaseModel): """Represents an asset within a portfolio.""" quantity: int = Field(..., description="The number of shares or units held for this specific asset.") initial_price: float = Field(..., description="The initial purchase price per share or unit of this asset.") sector: str = Field(..., description=f"""The economic sector of the asset, based on the stock symbol or company name. For example, use this {Config.SECTORS}'Financials' for HDFC or JPM, 'consumer' for PG, 'Information Technology' for GOOG. This categorization should be done based on the business nature of the company whose stock is traded. For instance, if the stock symbol is 'HDFCBANK', the sector is expected to be 'Financials'.""") class Portfolio(BaseModel): """Represents an individual portfolio.""" name: str = Field(..., description="The name given to this portfolio, for example 'Diversified Portfolio'. 'Aggressive Tech Portfolio' ") assets: Dict[str, Asset] = Field(..., description="""A dictionary containing the assets within this portfolio. The keys of the dictionary are the ticker symbols of the stocks (e.g., 'JPM', 'PG'), and the values are the corresponding 'Asset' objects, which define the quantity, initial price, and sector for each asset. Example: {'JPM': {'quantity': 150, 'initial_price': 140, 'sector': 'finance'}, 'PG': {'quantity': 200, 'initial_price': 160, 'sector': 'consumer'}}""" ) def invoke_llm_for_portfolio(formatted_prompt: str) -> Portfolio: """Invokes the LLM for structured output of the portfolio Args: formatted_prompt (str): formatted prompt for the LLM Returns: Portfolio: structured output of the portfolio """ llm = ChatGroq(groq_api_key=Config.GROQ_API_KEY, model_name="llama-3.1-8b-instant") structured_llm = llm.with_structured_output(Portfolio) try: output = structured_llm.invoke(formatted_prompt) return output except ValidationError as e: print(f"Error during LLM invocation: {e}") raise except Exception as e: print(f"Unexpected error during LLM invocation {e}") raise def portfolio_to_json(portfolio: Portfolio, output_file: str = Config.OUTPUT_FILE) -> None: """Converts a Portfolio object to a JSON string and saves it to a file.""" try: json_str = portfolio.model_dump_json(indent=4) with open(output_file, "w") as f: f.write(json_str) print(f"Info: Portfolio saved to '{output_file}'") except Exception as e: print(f"Error saving JSON file {e}") if __name__ == '__main__': # Sample stock data stock_data = { "stock1": {"name": "Reliance Industries Ltd.", "symbol": "RELIANCE.NS", "quantity": 10}, "stock2": {"name": "Tata Consultancy Services Ltd.", "symbol": "TCS.NS", "quantity": 15}, "stock3": {"name": "HDFC Bank Ltd.", "symbol": "HDFCBANK.NS", "quantity": 20}, "stock4": {"name": "Infosys Ltd.", "symbol": "INFY.NS", "quantity": 12}, "stock5": {"name": "Hindustan Unilever Ltd.", "symbol": "HINDUNILVR.NS", "quantity": 8} } # 1. Fetch stock data stock_symbols = [value["symbol"] for value in stock_data.values()] stock_dfs = fetch_stock_data(stock_symbols) # Save DataFrames in a dictionary for future use saved_dataframes = {} if stock_dfs: for symbol, df in stock_dfs.items(): if df is not None: # Save DataFrame in the variable saved_dataframes[symbol] = df print(f"Data for '{symbol}' loaded into variable.") else: print(f"No data found for '{symbol}'") else: print("Error occurred during fetching data. DataFrames are not returned.") # Save the dictionary to a local file def save_dataframes(dataframes_dict, filename="output_files/saved_dataframes.pkl"): with open(filename, 'wb') as file: pickle.dump(dataframes_dict, file) print(f"DataFrames successfully saved to {filename}.") save_dataframes(saved_dataframes) # 2. Store data store_stock_data(stock_dfs) # 3. Load the last price extracted_data = load_stock_data_and_extract_price(Config.STOCK_DATA_DIR) # 4. Merge extracted price with the main dictionary merged_stock_data = merge_stock_data_with_price(stock_data, extracted_data) # 5. Generate prompt for LLM formatted_prompt = generate_prompt(merged_stock_data) print(formatted_prompt) # 6. Invoke LLM try: portfolio_output = invoke_llm_for_portfolio(formatted_prompt) print(portfolio_output) except Exception as e: print(f"An unexpected error occurred during the LLM invocation: {e}") else: # 7. Save portfolio output to JSON portfolio_to_json(portfolio_output)