Spaces:
Sleeping
Sleeping
| import os | |
| import nltk | |
| import requests | |
| import datetime | |
| # Use a directory within the user's home directory | |
| nltk_data_dir = os.path.expanduser("~/.nltk_data") | |
| os.makedirs(nltk_data_dir, exist_ok=True) | |
| nltk.data.path.append(nltk_data_dir) | |
| # Download NLTK data | |
| nltk.download('punkt', download_dir=nltk_data_dir, quiet=True) | |
| import chainlit as cl | |
| from llama_index.core import VectorStoreIndex, Document | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.llms.groq import Groq | |
| from llama_index.core import ServiceContext | |
| from llama_index.core.node_parser import SentenceSplitter | |
| from dotenv import load_dotenv | |
| import yfinance as yf | |
| import pandas as pd | |
| load_dotenv() | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| FMP_API_KEY = os.getenv("FMP_API_KEY") | |
| embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| llm = Groq(model="llama3-70b-8192", api_key=GROQ_API_KEY) | |
| service_context = ServiceContext.from_defaults( | |
| llm=llm, | |
| embed_model=embed_model, | |
| node_parser=SentenceSplitter(chunk_size=1000, chunk_overlap=200) | |
| ) | |
| def fetch_annual_report_10k(symbol: str) -> str: | |
| """ | |
| Tries up to 5 years (current year backward) to find a 10-K. | |
| Returns the raw text of the first successful result. | |
| """ | |
| current_year = datetime.datetime.now().year | |
| # We'll attempt up to 5 years back | |
| for year_try in range(current_year, current_year - 5, -1): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v4/financial-reports-json" | |
| f"?symbol={symbol}&year={year_try}&period=FY&apikey={FMP_API_KEY}" | |
| ) | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| text_data = response.text | |
| # If FMP returns an error message inside the JSON, skip and try the next year | |
| if "Error Message" in text_data or len(text_data.strip()) < 10: | |
| continue | |
| # If we got meaningful data, return it immediately | |
| return text_data | |
| except requests.exceptions.RequestException: | |
| # On request error or no data, try older year | |
| pass | |
| # If we exit the loop, no data was found for any year in that range | |
| return ( | |
| f"No 10-K data found for {symbol} in the last 5 years " | |
| "(or API returned an error)." | |
| ) | |
| summary_prompt = ( | |
| "You are a world-class financial analyst with extensive experience analyzing annual reports. " | |
| "Provide a comprehensive summary of the 10-K report. Focus on Strategic Insights, Key Financial Figures, and Risk Factors. " | |
| "Answer in extensive bullet points, summarizing the company's performance, strengths, and weaknesses." | |
| ) | |
| question_prompt = ( | |
| "You are a financial analyst with extensive experience analyzing annual reports. " | |
| "Read the 10-K report and generate 10 strategic questions focusing on the company's performance, risks, and financial figures. " | |
| "Ask questions that provide strategic insights into the company's long-term goals, revenue trends, competitive position, and more. " | |
| "Format the questions as a numbered list (e.g., '1. Question')." | |
| ) | |
| async def on_chat_start(): | |
| ticker_response = await cl.AskUserMessage( | |
| content=( | |
| "This tool is designed to analyze 10-K annual reports for publicly traded companies. " | |
| "Provide the company's ticker symbol, and the tool will fetch the latest available 10-K report " | |
| "from the last few years. It generates summaries and strategic due diligence. " | |
| "Ask your own questions afterwards.\n\n" | |
| "Please enter the ticker symbol for the company you want to analyze (e.g. MSFT):" | |
| ) | |
| ).send() | |
| if not ticker_response or 'content' not in ticker_response: | |
| await cl.Message(content="No ticker symbol provided. Please enter a valid ticker symbol to proceed.").send() | |
| return | |
| ticker_symbol = ticker_response['content'].upper() | |
| msg = cl.Message(content=f"Retrieving the latest 10-K report for {ticker_symbol}...") | |
| await msg.send() | |
| try: | |
| annual_report_text = fetch_annual_report_10k(ticker_symbol) | |
| # Check if we failed for all years | |
| if annual_report_text.startswith("No 10-K data found") or \ | |
| annual_report_text.startswith("HTTP error") or \ | |
| annual_report_text.startswith("Request error") or \ | |
| annual_report_text.startswith("An unexpected error occurred"): | |
| await cl.Message(content=annual_report_text).send() | |
| return | |
| document = Document(text=annual_report_text, metadata={"company": ticker_symbol}) | |
| index = VectorStoreIndex.from_documents([document], service_context=service_context) | |
| cl.user_session.set("index", index) | |
| query_engine = index.as_query_engine() | |
| summary_response = await cl.make_async(query_engine.query)(summary_prompt) | |
| await cl.Message(content=f"**Summary:**\n{summary_response}").send() | |
| questions_response = await cl.make_async(query_engine.query)(question_prompt) | |
| questions_format = str(questions_response).split('\n') | |
| relevant_questions = [ | |
| question.strip() | |
| for question in questions_format | |
| if question.strip() and question.strip()[0].isdigit() | |
| ] | |
| await cl.Message(content="Generated strategic questions and answers:").send() | |
| for question in relevant_questions: | |
| await cl.Message(content=f"**{question}**").send() | |
| answer = await cl.make_async(query_engine.query)(question) | |
| await cl.Message(content=f"**Answer:**\n{answer}").send() | |
| msg.content = "Processing done. You can now ask more questions about the 10-K report!" | |
| await msg.update() | |
| except Exception as e: | |
| await cl.Message(content=f"An error occurred during processing: {str(e)}").send() | |
| async def main(message: cl.Message): | |
| index = cl.user_session.get("index") | |
| if index is None: | |
| await cl.Message(content="Please provide a ticker symbol first before asking questions.").send() | |
| return | |
| query_engine = index.as_query_engine() | |
| response = await cl.make_async(query_engine.query)(message.content) | |
| response_message = cl.Message(content="") | |
| for token in str(response): | |
| await response_message.stream_token(token=token) | |
| await response_message.send() | |