Spaces:
Sleeping
Sleeping
| import os | |
| import nltk | |
| import requests | |
| # Use a directory within the user's home directory | |
| nltk_data_dir = os.path.expanduser("~/.nltk_data") | |
| os.makedirs(nltk_data_dir, exist_ok=True) | |
| nltk.data.path.append(nltk_data_dir) | |
| # Download NLTK data | |
| nltk.download('punkt', download_dir=nltk_data_dir, quiet=True) | |
| import chainlit as cl | |
| from llama_index.core import VectorStoreIndex, Document | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.llms.groq import Groq | |
| from llama_index.core import ServiceContext | |
| from llama_index.core.node_parser import SentenceSplitter | |
| from dotenv import load_dotenv | |
| import yfinance as yf | |
| import pandas as pd | |
| load_dotenv() | |
| # Fetch the API keys from environment variables | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| FMP_API_KEY = os.getenv("FMP_API_KEY") | |
| # Initialize models | |
| embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| llm = Groq(model="llama3-70b-8192", api_key=GROQ_API_KEY) | |
| # Create service context | |
| service_context = ServiceContext.from_defaults( | |
| llm=llm, | |
| embed_model=embed_model, | |
| node_parser=SentenceSplitter(chunk_size=1000, chunk_overlap=200) | |
| ) | |
| def fetch_earnings_transcript(symbol: str) -> str: | |
| """ | |
| Fetch the latest transcript for a company's earnings call. | |
| Args: | |
| - symbol (str): The stock ticker symbol (e.g., 'AAPL'). | |
| Returns: | |
| - str: The earnings call transcript or an error message. | |
| """ | |
| transcript_url = f"https://financialmodelingprep.com/api/v3/earning_call_transcript/{symbol}?apikey={FMP_API_KEY}" | |
| try: | |
| response = requests.get(transcript_url, timeout=10) | |
| response.raise_for_status() | |
| transcript_data = response.json() | |
| if not transcript_data: | |
| return f"No transcript available for {symbol}." | |
| # Extract the first available transcript | |
| latest_transcript = transcript_data[0].get("content", "") | |
| if not latest_transcript: | |
| return f"No transcript content found for {symbol}." | |
| return latest_transcript | |
| except requests.exceptions.HTTPError as http_err: | |
| return f"HTTP error occurred: {http_err}" | |
| except requests.exceptions.RequestException as req_err: | |
| return f"Request error occurred: {req_err}" | |
| except Exception as err: | |
| return f"An unexpected error occurred: {err}" | |
| # Prompts | |
| summary_prompt = ( | |
| "You are a world-class financial analyst with extensive experience analyzing quarterly reports. " | |
| "Give me a comprehensive summary of the earnings report. Focus on the Strategic Insights and Key Financial Figures. " | |
| "Answer in extensive bullet points please." | |
| ) | |
| question_prompt = ( | |
| "You are a financial analyst with extensive experience analyzing quarterly reports. " | |
| "Read the earnings call transcript and earnings presentation report and generate 10 questions focusing on the strategic insights and financial figures. " | |
| "Ask questions that require precise answers and provide strategic insight into the company's financial and strategic performance, such as revenue growth, market trends, profit margins, and more. " | |
| "Only ask questions that can be answered using the provided document, without making any assumptions or inferences beyond the text. " | |
| "Please format the questions as a list with a simple '1. Question 1', '2. Question 2', etc. structure. " | |
| "Unless retrievable from the documents, don't ask questions which cannot be compared to previous periods." | |
| ) | |
| async def on_chat_start(): | |
| ticker_response = await cl.AskUserMessage( | |
| content=( | |
| "This tool is designed to analyze earnings call transcripts for publicly traded companies. " | |
| "Provide the company's ticker symbol, and the tool will fetch the latest earnings call transcript. " | |
| "It generates summaries and strategic due diligence. Ask your own questions afterwards. \n\n" | |
| "Please enter the ticker symbol for the company you want to analyze (e.g. MSFT):" | |
| ) | |
| ).send() | |
| ticker_symbol = ticker_response['content'].upper() | |
| msg = cl.Message(content=f"Retrieving earnings call transcript for {ticker_symbol}...") | |
| await msg.send() | |
| try: | |
| # Fetch the transcript using FMP API | |
| transcript_text = fetch_earnings_transcript(ticker_symbol) | |
| # Check if an error message was returned | |
| if transcript_text.startswith("No transcript") or \ | |
| transcript_text.startswith("HTTP error") or \ | |
| transcript_text.startswith("Request error") or \ | |
| transcript_text.startswith("An unexpected error occurred"): | |
| await cl.Message(content=transcript_text).send() | |
| return | |
| # Create a Document object with the transcript text | |
| document = Document(text=transcript_text, metadata={"company": ticker_symbol}) | |
| # Create index | |
| index = VectorStoreIndex.from_documents( | |
| [document], service_context=service_context | |
| ) | |
| # Store the index in the user session | |
| cl.user_session.set("index", index) | |
| # Generate summary | |
| query_engine = index.as_query_engine() | |
| summary_response = await cl.make_async(query_engine.query)(summary_prompt) | |
| await cl.Message(content=f"**Summary:**\n{summary_response}").send() | |
| # Generate questions | |
| questions_response = await cl.make_async(query_engine.query)(question_prompt) | |
| questions_format = str(questions_response).split('\n') | |
| relevant_questions = [question.strip() for question in questions_format if question.strip() and question.strip()[0].isdigit()] | |
| # Answer generated questions | |
| await cl.Message(content="Generated questions and answers:").send() | |
| for question in relevant_questions: | |
| response = await cl.make_async(query_engine.query)(question) | |
| await cl.Message(content=f"**{question}**\n{response}").send() | |
| msg.content = "Processing done. You can now ask more questions about the earnings call transcript!" | |
| await msg.update() | |
| except Exception as e: | |
| await cl.Message(content=f"An error occurred during processing: {str(e)}").send() | |
| async def main(message: cl.Message): | |
| index = cl.user_session.get("index") | |
| if index is None: | |
| await cl.Message(content="Please provide a ticker symbol first before asking questions.").send() | |
| return | |
| query_engine = index.as_query_engine() | |
| response = await cl.make_async(query_engine.query)(message.content) | |
| response_message = cl.Message(content="") | |
| for token in str(response): | |
| await response_message.stream_token(token=token) | |
| await response_message.send() |