Spaces:
Runtime error
Runtime error
| import openai | |
| import os | |
| import pdfplumber | |
| from langchain.chains.mapreduce import MapReduceChain | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.chains.summarize import load_summarize_chain | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.document_loaders import UnstructuredFileLoader | |
| from langchain.prompts import PromptTemplate | |
| import logging | |
| import json | |
| from typing import List | |
| import mimetypes | |
| import validators | |
| import requests | |
| import tempfile | |
| from bs4 import BeautifulSoup | |
| from langchain.chains import create_extraction_chain | |
| from GoogleNews import GoogleNews | |
| import pandas as pd | |
| import requests | |
| import gradio as gr | |
| import re | |
| from langchain.document_loaders import WebBaseLoader | |
| from langchain.chains.llm import LLMChain | |
| from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
| from transformers import pipeline | |
| import plotly.express as px | |
| import yfinance as yf | |
| import pandas as pd | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| class KeyValueExtractor: | |
| def __init__(self): | |
| """ | |
| Initialize the ContractSummarizer object. | |
| Parameters: | |
| pdf_file_path (str): The path to the input PDF file. | |
| """ | |
| self.model = "facebook/bart-large-mnli" | |
| def get_news(self,keyword): | |
| googlenews = GoogleNews(lang='en', region='US', period='1d', encode='utf-8') | |
| googlenews.clear() | |
| googlenews.search(keyword) | |
| googlenews.get_page(2) | |
| news_result = googlenews.result(sort=True) | |
| news_data_df = pd.DataFrame.from_dict(news_result) | |
| news_data_df.info() | |
| # Display header of dataframe. | |
| news_data_df.head() | |
| tot_news_link = [] | |
| for index, headers in news_data_df.iterrows(): | |
| news_link = str(headers['link']) | |
| tot_news_link.append(news_link) | |
| return tot_news_link | |
| def url_format(self,urls): | |
| tot_url_links = [] | |
| for url_text in urls: | |
| # Define a regex pattern to match URLs starting with 'http' or 'https' | |
| pattern = r'(https?://[^\s]+)' | |
| # Search for the URL in the text using the regex pattern | |
| match = re.search(pattern, url_text) | |
| if match: | |
| extracted_url = match.group(1) | |
| tot_url_links.append(extracted_url) | |
| else: | |
| print("No URL found in the given text.") | |
| return tot_url_links | |
| def clear_error_ulr(self,urls): | |
| error_url = [] | |
| for url in urls: | |
| if validators.url(url): | |
| headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
| r = requests.get(url,headers=headers) | |
| if r.status_code != 200: | |
| # raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
| print(f"Error fetching {url}:") | |
| error_url.append(url) | |
| continue | |
| cleaned_list_url = [item for item in urls if item not in error_url] | |
| return cleaned_list_url | |
| def get_each_link_summary(self,urls): | |
| each_link_summary = "" | |
| for url in urls: | |
| loader = WebBaseLoader(url) | |
| docs = loader.load() | |
| text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=3000, chunk_overlap=200 | |
| ) | |
| # Split the documents into chunks | |
| split_docs = text_splitter.split_documents(docs) | |
| # Prepare the prompt template for summarization | |
| prompt_template = """Write a concise summary of the following: | |
| {text} | |
| CONCISE SUMMARY:""" | |
| prompt = PromptTemplate.from_template(prompt_template) | |
| # Prepare the template for refining the summary with additional context | |
| refine_template = ( | |
| "Your job is to produce a final summary\n" | |
| "We have provided an existing summary up to a certain point: {existing_answer}\n" | |
| "We have the opportunity to refine the existing summary" | |
| "(only if needed) with some more context below.\n" | |
| "------------\n" | |
| "{text}\n" | |
| "------------\n" | |
| "Given the new context, refine the original summary" | |
| "If the context isn't useful, return the original summary." | |
| ) | |
| refine_prompt = PromptTemplate.from_template(refine_template) | |
| # Load the summarization chain using the ChatOpenAI language model | |
| chain = load_summarize_chain( | |
| llm = ChatOpenAI(temperature=0), | |
| chain_type="refine", | |
| question_prompt=prompt, | |
| refine_prompt=refine_prompt, | |
| return_intermediate_steps=True, | |
| input_key="input_documents", | |
| output_key="output_text", | |
| ) | |
| # Generate the refined summary using the loaded summarization chain | |
| result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
| print(result["output_text"]) | |
| # Return the refined summary | |
| each_link_summary = each_link_summary + result["output_text"] | |
| return each_link_summary | |
| def save_text_to_file(self,each_link_summary) -> str: | |
| """ | |
| Load the text from the saved file and split it into documents. | |
| Returns: | |
| List[str]: List of document texts. | |
| """ | |
| # Get the path to the text file where the extracted text will be saved | |
| file_path = "extracted_text.txt" | |
| try: | |
| with open(file_path, 'w') as file: | |
| # Write the extracted text into the text file | |
| file.write(each_link_summary) | |
| # Return the file path of the saved text file | |
| return file_path | |
| except IOError as e: | |
| # If an IOError occurs during the file saving process, log the error | |
| logging.error(f"Error while saving text to file: {e}") | |
| def document_loader(self,file_path) -> List[str]: | |
| """ | |
| Load the text from the saved file and split it into documents. | |
| Returns: | |
| List[str]: List of document texts. | |
| """ | |
| # Initialize the UnstructuredFileLoader | |
| loader = UnstructuredFileLoader(file_path, strategy="fast") | |
| # Load the documents from the file | |
| docs = loader.load() | |
| # Return the list of loaded document texts | |
| return docs | |
| def document_text_spilliter(self,docs) -> List[str]: | |
| """ | |
| Split documents into chunks for efficient processing. | |
| Returns: | |
| List[str]: List of split document chunks. | |
| """ | |
| # Initialize the text splitter with specified chunk size and overlap | |
| text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=3000, chunk_overlap=200 | |
| ) | |
| # Split the documents into chunks | |
| split_docs = text_splitter.split_documents(docs) | |
| # Return the list of split document chunks | |
| return split_docs | |
| def extract_key_value_pair_for_news(self,content) -> None: | |
| """ | |
| Extract key-value pairs from the refined summary. | |
| Prints the extracted key-value pairs. | |
| """ | |
| try: | |
| # Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
| response = openai.Completion.create( | |
| engine="text-davinci-003", # You can choose a different engine as well | |
| temperature = 0, | |
| prompt=f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```.", | |
| max_tokens=1000 # You can adjust the length of the response | |
| ) | |
| # Extract and return the chatbot's reply | |
| result = response['choices'][0]['text'].strip() | |
| return result | |
| except Exception as e: | |
| # If an error occurs during the key-value extraction process, log the error | |
| logging.error(f"Error while extracting key-value pairs: {e}") | |
| print("Error:", e) | |
| def refine_summary(self,split_docs) -> str: | |
| """ | |
| Refine the summary using the provided context. | |
| Returns: | |
| str: Refined summary. | |
| """ | |
| # Prepare the prompt template for summarization | |
| prompt_template = """Write a detalied broad abractive summary of the following: | |
| {text} | |
| CONCISE SUMMARY:""" | |
| prompt = PromptTemplate.from_template(prompt_template) | |
| # Prepare the template for refining the summary with additional context | |
| refine_template = ( | |
| "Your job is to produce a final summary\n" | |
| "We have provided an existing summary up to a certain point: {existing_answer}\n" | |
| "We have the opportunity to refine the existing summary" | |
| "(only if needed) with some more context below.\n" | |
| "------------\n" | |
| "{text}\n" | |
| "------------\n" | |
| "Given the new context, refine the original summary" | |
| "If the context isn't useful, return the original summary." | |
| ) | |
| refine_prompt = PromptTemplate.from_template(refine_template) | |
| # Load the summarization chain using the ChatOpenAI language model | |
| chain = load_summarize_chain( | |
| llm = ChatOpenAI(temperature=0), | |
| chain_type="refine", | |
| question_prompt=prompt, | |
| refine_prompt=refine_prompt, | |
| return_intermediate_steps=True, | |
| input_key="input_documents", | |
| output_key="output_text", | |
| ) | |
| # Generate the refined summary using the loaded summarization chain | |
| result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
| key_value_pair = self.extract_key_value_pair_for_news(result["output_text"]) | |
| # Return the refined summary | |
| return result["output_text"],key_value_pair | |
| def analyze_sentiment_for_graph(self, text): | |
| pipe = pipeline("zero-shot-classification", model=self.model) | |
| label=["Positive", "Negative", "Neutral"] | |
| result = pipe(text, label) | |
| sentiment_scores = { | |
| result['labels'][0]: result['scores'][0], | |
| result['labels'][1]: result['scores'][1], | |
| result['labels'][2]: result['scores'][2] | |
| } | |
| return sentiment_scores | |
| def display_graph_for_news(self,text): | |
| sentiment_scores = self.analyze_sentiment_for_graph(text) | |
| labels = sentiment_scores.keys() | |
| scores = sentiment_scores.values() | |
| fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
| fig.update_traces(texttemplate='%{x:.1%}', textposition='outside',textfont=dict(size=6)) | |
| fig.update_layout(title="Sentiment Analysis",width=600) | |
| formatted_pairs = [] | |
| for key, value in sentiment_scores.items(): | |
| formatted_value = round(value, 2) # Round the value to two decimal places | |
| formatted_pairs.append(f"{key} : {formatted_value}") | |
| result_string = '\t'.join(formatted_pairs) | |
| return fig | |
| def main_for_news(self,keyword): | |
| try: | |
| urls = self.get_news(keyword) | |
| tot_urls = self.url_format(urls) | |
| clean_url = self.clear_error_ulr(tot_urls) | |
| each_link_summary = self.get_each_link_summary(clean_url) | |
| print("half") | |
| file_path = self.save_text_to_file(each_link_summary) | |
| docs = self.document_loader(file_path) | |
| split_docs = self.document_text_spilliter(docs) | |
| print("half1") | |
| result_summary_for_news,key_value_pair_for_news = self.refine_summary(split_docs) | |
| fig = self.display_graph_for_news(result_summary_for_news) | |
| return result_summary_for_news,key_value_pair_for_news,fig | |
| except: | |
| return "Sorry No URL Found!! Please Try Again","",None | |
| def get_url(self,keyword): | |
| return f"https://finance.yahoo.com/quote/{keyword}?p={keyword}" | |
| def get_link_summary_for_finance(self,url): | |
| loader = WebBaseLoader(url) | |
| docs = loader.load() | |
| text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=3000, chunk_overlap=200 | |
| ) | |
| # Split the documents into chunks | |
| split_docs = text_splitter.split_documents(docs) | |
| # Prepare the prompt template for summarization | |
| prompt_template = """The give text is Finance Stock Details for one company i want to get values for | |
| Previous Close : [value] | |
| Open : [value] | |
| Bid : [value] | |
| Ask : [value] | |
| Day's Range : [value] | |
| 52 Week Range : [value] | |
| Volume : [value] | |
| Avg. Volume : [value] | |
| Market Cap : [value] | |
| Beta (5Y Monthly) : [value] | |
| PE Ratio (TTM) : [value] | |
| EPS (TTM) : [value] | |
| Earnings Date : [value] | |
| Forward Dividend & Yield : [value] | |
| Ex-Dividend Date : [value] | |
| 1y Target Est : [value] | |
| these details form that and Write a abractive summary about those details: | |
| Given Text: {text} | |
| CONCISE SUMMARY:""" | |
| prompt = PromptTemplate.from_template(prompt_template) | |
| # Prepare the template for refining the summary with additional context | |
| refine_template = ( | |
| "Your job is to produce a final summary\n" | |
| "We have provided an existing summary up to a certain point: {existing_answer}\n" | |
| "We have the opportunity to refine the existing summary" | |
| "(only if needed) with some more context below.\n" | |
| "------------\n" | |
| "{text}\n" | |
| "------------\n" | |
| "Given the new context, refine the original summary" | |
| "If the context isn't useful, return the original summary." | |
| ) | |
| refine_prompt = PromptTemplate.from_template(refine_template) | |
| # Load the summarization chain using the ChatOpenAI language model | |
| chain = load_summarize_chain( | |
| llm = ChatOpenAI(temperature=0), | |
| chain_type="refine", | |
| question_prompt=prompt, | |
| refine_prompt=refine_prompt, | |
| return_intermediate_steps=True, | |
| input_key="input_documents", | |
| output_key="output_text", | |
| ) | |
| # Generate the refined summary using the loaded summarization chain | |
| result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
| print(result["output_text"]) | |
| return result["output_text"] | |
| def one_day_summary_finance(self,content) -> None: | |
| # Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
| response = openai.Completion.create( | |
| engine="text-davinci-003", # You can choose a different engine as well | |
| temperature = 0, | |
| prompt=f"i want detailed Summary from given finance details. i want information like what happen today comparing last day good or bad Bullish or Bearish like these details i want summary. content in backticks.```{content}```.", | |
| max_tokens=1000 # You can adjust the length of the response | |
| ) | |
| # Extract and return the chatbot's reply | |
| result = response['choices'][0]['text'].strip() | |
| print(result) | |
| return result | |
| def extract_key_value_pair_for_finance(self,content) -> None: | |
| """ | |
| Extract key-value pairs from the refined summary. | |
| Prints the extracted key-value pairs. | |
| """ | |
| try: | |
| # Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
| response = openai.Completion.create( | |
| engine="text-davinci-003", # You can choose a different engine as well | |
| temperature = 0, | |
| prompt=f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```.", | |
| max_tokens=1000 # You can adjust the length of the response | |
| ) | |
| # Extract and return the chatbot's reply | |
| result = response['choices'][0]['text'].strip() | |
| return result | |
| except Exception as e: | |
| # If an error occurs during the key-value extraction process, log the error | |
| logging.error(f"Error while extracting key-value pairs: {e}") | |
| print("Error:", e) | |
| def analyze_sentiment_for_graph_finance(self, text): | |
| pipe = pipeline("zero-shot-classification", model=self.model) | |
| label=["Positive", "Negative", "Neutral"] | |
| result = pipe(text, label) | |
| sentiment_scores = { | |
| result['labels'][0]: result['scores'][0], | |
| result['labels'][1]: result['scores'][1], | |
| result['labels'][2]: result['scores'][2] | |
| } | |
| return sentiment_scores | |
| def display_graph_for_finance(self,text): | |
| sentiment_scores = self.analyze_sentiment_for_graph_finance(text) | |
| labels = sentiment_scores.keys() | |
| scores = sentiment_scores.values() | |
| fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
| fig.update_traces(texttemplate='%{x:.1%}', textposition='outside',textfont=dict(size=6)) | |
| fig.update_layout(title="Sentiment Analysis",width=600) | |
| formatted_pairs = [] | |
| for key, value in sentiment_scores.items(): | |
| formatted_value = round(value, 2) # Round the value to two decimal places | |
| formatted_pairs.append(f"{key} : {formatted_value}") | |
| result_string = '\t'.join(formatted_pairs) | |
| return fig | |
| def get_finance_data(self,symbol): | |
| # Define the stock symbol and date range | |
| start_date = '2022-08-19' | |
| end_date = '2023-08-19' | |
| # Fetch historical OHLC data using yfinance | |
| data = yf.download(symbol, start=start_date, end=end_date) | |
| # Select only the OHLC columns | |
| ohlc_data = data[['Open', 'High', 'Low', 'Close']] | |
| csv_path = "ohlc_data.csv" | |
| # Save the OHLC data to a CSV file | |
| ohlc_data.to_csv(csv_path) | |
| return csv_path | |
| def csv_to_dataframe(self,csv_path): | |
| # Replace 'your_file.csv' with the actual path to your CSV file | |
| csv_file_path = csv_path | |
| # Read the CSV file into a DataFrame | |
| df = pd.read_csv(csv_file_path) | |
| # Now you can work with the 'df' DataFrame | |
| return df # Display the first few rows of the DataFrame | |
| def save_dataframe_in_text_file(self,df): | |
| output_file_path = 'output.txt' | |
| # Convert the DataFrame to a text file | |
| df.to_csv(output_file_path, sep='\t', index=False) | |
| return output_file_path | |
| def csv_loader(self,output_file_path): | |
| loader = UnstructuredFileLoader(output_file_path, strategy="fast") | |
| docs = loader.load() | |
| return docs | |
| def document_text_spilliter_finance(self,docs): | |
| """ | |
| Split documents into chunks for efficient processing. | |
| Returns: | |
| List[str]: List of split document chunks. | |
| """ | |
| # Initialize the text splitter with specified chunk size and overlap | |
| text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=1000, chunk_overlap=200 | |
| ) | |
| # Split the documents into chunks | |
| split_docs = text_splitter.split_documents(docs) | |
| # Return the list of split document chunks | |
| return split_docs | |
| def change_bullet_points(self,text): | |
| nltk.download('punkt') # Download the sentence tokenizer data (only need to run this once) | |
| # Example passage | |
| passage = text | |
| # Tokenize the passage into sentences | |
| sentences = sent_tokenize(passage) | |
| bullet_string = "" | |
| # Print the extracted sentences | |
| for sentence in sentences: | |
| bullet_string+="* "+sentence+"\n" | |
| return bullet_string | |
| def one_year_summary_for_finance(self,keyword): | |
| csv_path = self.get_finance_data(keyword) | |
| df = self.csv_to_dataframe(csv_path) | |
| output_file_path = self.save_dataframe_in_text_file(df) | |
| docs = self.csv_loader(output_file_path) | |
| split_docs = self.document_text_spilliter(docs) | |
| prompt_template = """Analyze the Financial Details and Write a abractive quick short summary how the company perform up and down,Bullish/Bearish of the following: | |
| {text} | |
| CONCISE SUMMARY:""" | |
| prompt = PromptTemplate.from_template(prompt_template) | |
| # Prepare the template for refining the summary with additional context | |
| refine_template = ( | |
| "Your job is to produce a final summary\n" | |
| "We have provided an existing summary up to a certain point: {existing_answer}\n" | |
| "We have the opportunity to refine the existing summary" | |
| "(only if needed) with some more context below.\n" | |
| "------------\n" | |
| "{text}\n" | |
| "------------\n" | |
| "Given the new context, refine the original summary" | |
| "If the context isn't useful, return the original summary." | |
| "10 line summary is enough" | |
| ) | |
| refine_prompt = PromptTemplate.from_template(refine_template) | |
| # Load the summarization chain using the ChatOpenAI language model | |
| chain = load_summarize_chain( | |
| llm = ChatOpenAI(temperature=0), | |
| chain_type="refine", | |
| question_prompt=prompt, | |
| refine_prompt=refine_prompt, | |
| return_intermediate_steps=True, | |
| input_key="input_documents", | |
| output_key="output_text", | |
| ) | |
| # Generate the refined summary using the loaded summarization chain | |
| result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
| one_year_perfomance_summary = self.change_bullet_points(result["output_text"]) | |
| plot_for_year = self.display_graph_for_finance(one_year_perfomance_summary) | |
| # Return the refined summary | |
| return one_year_perfomance_summary, plot_for_year | |
| def main_for_finance_tool(self,keyword): | |
| clean_url = self.get_url(keyword) | |
| link_summary = self.get_link_summary_for_finance(clean_url) | |
| clean_summary = self.one_day_summary_finance(link_summary) | |
| key_value = self.extract_key_value_pair_for_finance(clean_summary) | |
| sentiment_plot_for_one_day = self.display_graph_for_finance(clean_summary) | |
| return clean_summary, key_value, sentiment_plot_for_one_day | |
| def company_names(self,input_text): | |
| words = input_text.split("-") | |
| return words[1] | |
| def clear(self,input_news,result_summary_for_news,key_value_pair_result_for_news,sentiment_plot): | |
| input_news = None | |
| result_summary_for_news = None | |
| key_value_pair_result_for_news = None | |
| sentiment_plot = None | |
| return input_news,result_summary_for_news,key_value_pair_result_for_news,sentiment_plot | |
| def gradio_interface(self): | |
| with gr.Blocks(css="style.css",theme= 'karthikeyan-adople/hudsonhayes-gray') as app: | |
| gr.HTML("""<center class="darkblue" style='background-color:rgb(0,1,36); text-align:center;padding:25px;'><center><h1 class ="center"> | |
| <img src="file=logo.png" height="110px" width="280px"></h1></center> | |
| <br><h1 style="color:#fff">Company performance summarisation and sentiment analysis</h1></center>""") | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1.0, min_width=150, ): | |
| input_news = gr.Textbox(label="Company Name") | |
| with gr.Accordion("Sample Inputs", open = True): | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1.0, min_width=150 ): | |
| gr.Examples( | |
| [["Apple Inc. - AAPL"], ["Microsoft Corporation - MSFT"],["Amazon.com Inc. - AMZN"],["Tesla Inc. - TSLA"],["Alphabet Inc. - GOOG"],[" NVIDIA Corporation - NVDA"]], | |
| [input_news], | |
| input_news, | |
| fn=self.company_names, | |
| cache_examples=True, | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("Last Day Analysis"): | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1.0, min_width=150): | |
| analyse_summary_for_finance = gr.Button("Analyse") | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1, min_width=150): | |
| result_summary = gr.Textbox(label="Summary", lines = 10) | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=0.50, min_width=0): | |
| key_value_pair_result = gr.Textbox(label="Topic Reflected", lines = 10) | |
| with gr.Column(scale=0.50, min_width=0): | |
| plot_for_one_day =gr.Plot(label="Sentiment", size=(500, 500)) | |
| with gr.TabItem("One Year Analyis"): | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1.0, min_width=150): | |
| one_year = gr.Button("Analyse") | |
| with gr.Row(elem_id="col-container"): | |
| with gr.Column(scale=1.0, min_width=150, ): | |
| one_year_summary = gr.Textbox(label="Summary Of One Year Perfomance",lines = 20) | |
| with gr.Column(scale=1.0, min_width=0): | |
| plot_for_year =gr.Plot(label="Sentiment", size=(500, 500)) | |
| analyse_summary_for_finance.click(self.main_for_finance_tool, input_news, [result_summary,key_value_pair_result,plot_for_one_day]) | |
| one_year.click(self.one_year_summary_for_finance,input_news,[one_year_summary,plot_for_year]) | |
| app.launch(debug = True) | |
| if __name__ == "__main__": | |
| text_process = KeyValueExtractor() | |
| text_process.gradio_interface() |