Spaces:
Sleeping
Sleeping
| import feedparser | |
| from urllib.parse import quote | |
| from newspaper import Article | |
| from urllib.parse import quote | |
| from llama_index import Document | |
| from typing import Any, List, Tuple | |
| from datetime import datetime, timedelta | |
| from llama_index import PromptTemplate | |
| from llama_index.query_engine import RetrieverQueryEngine | |
| from llama_index import get_response_synthesizer | |
| from llama_index.schema import NodeWithScore | |
| from llama_index.query_engine import RetrieverQueryEngine | |
| from llama_index import VectorStoreIndex, ServiceContext | |
| from llama_index.query_engine import RetrieverQueryEngine | |
| from llama_index.llms.base import llm_completion_callback | |
| from llama_index.postprocessor import SentenceTransformerRerank | |
| from llama_index.retrievers import VectorIndexRetriever, BaseRetriever, BM25Retriever | |
| class NewsFeedParser: | |
| def __init__(self): | |
| """ | |
| Initializes the NewsFeedParser class. | |
| """ | |
| self.articles_data = [] | |
| def create_search_urls(self, question, base_urls): | |
| """ | |
| Converts a question into properly formatted search URLs for multiple base URLs. | |
| Parameters: | |
| question (str): The query or question to be converted into search URLs. | |
| base_urls (list): A list of base URLs for the search services. | |
| Returns: | |
| list: A list of formatted search URLs, each containing the encoded question. | |
| """ | |
| # URL Encoding | |
| encoded_question = quote(question) | |
| # Constructing the full URLs for each base URL | |
| search_urls = [base_url + encoded_question for base_url in base_urls] | |
| return search_urls | |
| def parse_feed(self, rss_url): | |
| """ | |
| Parses the RSS feed from a given URL and processes each entry. | |
| Parameters: | |
| rss_url (str): URL of the RSS feed to be parsed. | |
| """ | |
| news_feed = feedparser.parse(rss_url) | |
| content = news_feed.entries | |
| # Get the current date | |
| current_date = datetime.now() | |
| for entry in content: | |
| # Extract and format the publication date | |
| newformat = "%a, %d %b %Y %H:%M:%S %Z" | |
| published_date = datetime.strptime(entry.published, newformat) | |
| # Check if the article is within the last week | |
| if current_date - published_date <= timedelta(days=7): | |
| # Extract the article text | |
| article_text = self.extract_article_text(entry.link) | |
| # Only add to the list if article text is successfully extracted | |
| if article_text: | |
| self.articles_data.append({ | |
| 'link': entry.link, | |
| 'published': published_date.strftime("%Y-%m-%d %H:%M:%S"), | |
| 'article_text': article_text | |
| }) | |
| def extract_article_text(self, url): | |
| """ | |
| Extracts text from a given article URL. | |
| Parameters: | |
| url (str): The URL of the article from which to extract text. | |
| Returns: | |
| str: Extracted article text. Returns None if extraction fails. | |
| """ | |
| try: | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| return article.text | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return None | |
| def process_query(self, input_query): | |
| """ | |
| Processes an input query to extract articles related to the query from multiple sources. | |
| Parameters: | |
| input_query (str): The query from which to extract information. | |
| Returns: | |
| list: A list of articles with their details. | |
| """ | |
| # Define base URLs for multiple news sources | |
| base_urls = [ | |
| 'https://news.google.com/rss/search?q=', | |
| 'http://www.ft.com/rss/markets?q=' | |
| #'https://www.bloomberg.com/search?query=' | |
| ] | |
| # Step 1: Create search URLs for each base URL | |
| search_urls = self.create_search_urls(input_query, base_urls) | |
| # Step 2: Parse the feed for each search URL | |
| for url in search_urls: | |
| self.parse_feed(url) | |
| # Return the accumulated articles from all feeds | |
| return self.articles_data | |
| def chunk_text_by_words_with_overlap(self, text, max_words, overlap, metadata): | |
| """ | |
| Splits the text into chunks of a specified number of words with a specified overlap | |
| and attaches metadata to each chunk. | |
| """ | |
| words = text.split() | |
| chunks = [' '.join(words[i:i + max_words]) for i in range(0, len(words), max_words - overlap)] | |
| # Ensure the last chunk doesn't exceed the text length | |
| if len(chunks) > 1 and len(chunks[-1].split()) < overlap: | |
| chunks[-2] = ' '.join(chunks[-2:]) | |
| chunks.pop(-1) | |
| return [{'text': chunk, **metadata} for chunk in chunks] | |
| def process_and_chunk_articles(self, input_query, max_words=250, overlap=20): | |
| """ | |
| Processes an input query, fetches related articles, and chunks their text. | |
| Returns a list of Document objects with attached metadata. | |
| """ | |
| # Process the query and get articles | |
| articles = self.process_query(input_query) | |
| print(len(articles)) | |
| # Chunk each article's text and create Document objects | |
| documents = [] | |
| metadata_list = [] | |
| for article in articles: | |
| article_chunks = self.chunk_text_by_words_with_overlap( | |
| article['article_text'], | |
| max_words, | |
| overlap, | |
| metadata={'link': article['link'], 'published': article['published']} | |
| ) | |
| for chunk in article_chunks: | |
| documents.append(Document(text=chunk['text'])) | |
| metadata_list.append({'link': chunk['link'], 'published_date': chunk['published']}) | |
| # Add metadata to each document | |
| for doc, meta in zip(documents, metadata_list): | |
| doc.metadata = meta | |
| return documents | |
| class HybridRetriever(BaseRetriever): | |
| """ | |
| A hybrid retriever that combines results from two different retrieval methods: | |
| vector-based retrieval and BM25 retrieval. | |
| Attributes: | |
| vector_retriever: An instance of a retriever that uses vector embeddings for retrieval. | |
| bm25_retriever: An instance of a retriever that uses BM25 algorithm for retrieval. | |
| The class inherits from BaseRetriever, indicating that it follows a similar interface. | |
| """ | |
| def __init__(self, vector_retriever, bm25_retriever): | |
| """ | |
| Initializes the HybridRetriever with two different types of retrievers. | |
| Args: | |
| vector_retriever: The retriever instance which uses vector-based retrieval methods. | |
| bm25_retriever: The retriever instance which uses BM25 algorithm for retrieval. | |
| """ | |
| self.vector_retriever = vector_retriever | |
| self.bm25_retriever = bm25_retriever | |
| super().__init__() | |
| def _retrieve(self, query, **kwargs): | |
| """ | |
| Performs a retrieval operation by combining results from both the vector and BM25 retrievers. | |
| Args: | |
| query: The query string based on which the documents are to be retrieved. | |
| **kwargs: Additional keyword arguments that might be required for retrieval. | |
| Returns: | |
| all_nodes: A list of nodes (documents) retrieved by combining results from both retrievers. | |
| This ensures a diverse set of results leveraging the strengths of both retrieval methods. | |
| """ | |
| # Retrieve nodes using BM25 retriever | |
| bm25_nodes = self.bm25_retriever.retrieve(query, **kwargs) | |
| # Retrieve nodes using vector retriever | |
| vector_nodes = self.vector_retriever.retrieve(query, **kwargs) | |
| # Combine the two lists of nodes, ensuring no duplicates | |
| all_nodes = [] | |
| node_ids = set() | |
| for n in bm25_nodes + vector_nodes: | |
| # Check if node is already added; if not, add it to the list | |
| if n.node.node_id not in node_ids: | |
| all_nodes.append(n) | |
| node_ids.add(n.node.node_id) | |
| return all_nodes | |
| class NewsQueryEngine: | |
| """ | |
| A class to handle the process of setting up a query engine and performing queries on PDF documents. | |
| This class encapsulates the functionality of creating prompt templates, embedding models, service contexts, | |
| indexes, hybrid retrievers, response synthesizers, and executing queries on the set up engine. | |
| Attributes: | |
| documents (List): A list of documents to be indexed. | |
| llm (Language Model): The language model to be used for embeddings and queries. | |
| qa_prompt_tmpl (str): Template for creating query prompts. | |
| queries (List[str]): List of queries to be executed. | |
| Methods: | |
| setup_query_engine(): Sets up the query engine with all necessary components. | |
| execute_queries(): Executes the predefined queries and prints the results. | |
| """ | |
| def __init__(self, documents: List[Any], llm: Any, embed_model: Any): | |
| self.documents = documents | |
| self.llm = llm | |
| self.embed_model = embed_model | |
| self.qa_prompt_tmpl = ( | |
| "Context information is below.\n" | |
| "---------------------\n" | |
| "{context_str}\n" | |
| "---------------------\n" | |
| "As an experienced financial analyst and researcher, you are tasked with helping fellow analysts in research using the latest financial news.\n " | |
| "Your answer will be based on the snippets of latest news provided as context information for each query.\n " | |
| "For each query, provide a concise answer derived from the information provided in the form of news.\n" | |
| "Try to not assume any critical information that might impact the answer. \n " | |
| "Note any major issues in the paper's results and analysis.\n" | |
| "If a query cannot be answered due to lack of information in the context, state this explicitly.\n" | |
| "Query: {query_str}\n" | |
| "Answer:" | |
| ) | |
| def setup_query_engine(self) -> Any: | |
| """ | |
| Sets up the query engine by initializing and configuring the embedding model, service context, index, | |
| hybrid retriever (combining vector and BM25 retrievers), and the response synthesizer. Returns the configured query engine. | |
| """ | |
| # Initialize the service context with the language model and embedding model | |
| service_context = ServiceContext.from_defaults(llm=self.llm, embed_model=self.embed_model) | |
| # Create an index from documents | |
| index = VectorStoreIndex.from_documents(documents=self.documents, service_context=service_context) | |
| nodes = service_context.node_parser.get_nodes_from_documents(self.documents) | |
| # Set up vector and BM25 retrievers | |
| vector_retriever = index.as_retriever(similarity_top_k=5) | |
| bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5) | |
| hybrid_retriever = HybridRetriever(vector_retriever, bm25_retriever) | |
| # Configure the response synthesizer with the prompt template | |
| qa_prompt = PromptTemplate(self.qa_prompt_tmpl) | |
| response_synthesizer = get_response_synthesizer( | |
| service_context=service_context, | |
| text_qa_template=qa_prompt, | |
| response_mode="tree_summarize", | |
| ) | |
| # Assemble the query engine with a reranker and the synthesizer | |
| reranker = SentenceTransformerRerank(top_n=4, model="BAAI/bge-reranker-base") | |
| query_engine = RetrieverQueryEngine.from_args( | |
| retriever=hybrid_retriever, | |
| node_postprocessors=[reranker], | |
| response_synthesizer=response_synthesizer, | |
| ) | |
| return query_engine | |