Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from dotenv import load_dotenv | |
| import time | |
| import os | |
| import requests | |
| import bs4 | |
| from langchain_qdrant import QdrantVectorStore | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import Distance, VectorParams | |
| from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings, GoogleGenerativeAI | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| from langchain_core.prompts import PromptTemplate | |
| def chunk_document(document, chunk_size=600, chunk_overlap=80): | |
| """ | |
| Divides the document into smaller, overlapping chunks for better processing efficiency. | |
| Args: | |
| document (list): A list of fetched content from document. | |
| chunk_size (int, optional): The maximum number of words ia a chunk. Default is 600. | |
| chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80. | |
| Returns: | |
| list: A list of document chunks, where each chunk is a Documentof content with the specified size and overlap. | |
| """ | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| chunks = text_splitter.split_documents(document) | |
| return chunks | |
| def chunk_article(document, chunk_size=600, chunk_overlap=80): | |
| """ | |
| Divides the article text into smaller, overlapping chunks for better processing efficiency. | |
| Args: | |
| extracted_text (str): The extracted text content from the article URL. | |
| chunk_size (int, optional): The maximum number of words in a chunk. Default is 600. | |
| chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80. | |
| Returns: | |
| list: A list of article chunks, where each chunk is a Documentof content with the specified size and overlap. | |
| """ | |
| splitter = RecursiveCharacterTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap) | |
| splitted_docs = splitter.split_documents(document) | |
| return splitted_docs | |
| def creating_qdrant_index(embeddings): | |
| """ | |
| Creates a Qdrant index using the provided embedding model. | |
| Args: | |
| embedding (object): The embedding model or function used to generate vector embeddings. | |
| Returns: | |
| QdrantVectorStore: An instance of Qdrant index where the vectors can be processed. | |
| """ | |
| # client = QdrantClient(url="http://localhost:6333") | |
| # client.create_collection( | |
| # collection_name="rag-documents", | |
| # vectors_config=VectorParams(size=768, distance=Distance.COSINE), | |
| # ) | |
| vector_store = QdrantVectorStore.from_existing_collection( | |
| url="https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io", | |
| collection_name="rag-documents", | |
| embedding=embeddings, | |
| api_key=QDRANT_CLOUD_KEY, | |
| prefer_grpc=False | |
| ) | |
| return vector_store | |
| def uploading_document_to_database(directory): | |
| """ | |
| Uploads a document from a specified directory to the Qdrant index after processing and chunking the content. | |
| Args: | |
| directory (str): The file path of the PDF document that will be uploaded to Qdrant. | |
| Returns: | |
| None: This function does not return any value. | |
| """ | |
| print("Loading PDF : ", directory) | |
| pdf_loader = PyPDFLoader(directory) | |
| document = pdf_loader.load() | |
| # Replacing newline characters with spaces | |
| for chunk in document: | |
| chunk.page_content = chunk.page_content.replace('\n', ' ') | |
| # Dividing document content into chunks | |
| chunked_data = chunk_document(document) | |
| print("Deleting file") | |
| try: | |
| # Deleting all existing data on Pinecone index | |
| qdrant_index.delete(delete_all=True) | |
| time.sleep(5) | |
| except: | |
| print("Namespace is already empty") | |
| print("Uploading File to Database") | |
| # Uploading the chunked data to Pinecone index | |
| qdrant_index.from_documents( | |
| chunked_data, | |
| embeddings, | |
| prefer_grpc=False, | |
| collection_name="rag-documents", | |
| ) | |
| print("Document Uploaded to Database") | |
| time.sleep(5) | |
| prompt = "What is the Title of the document and a small description of the content." | |
| description = response_generator(query = prompt, profession="Student") | |
| return description | |
| def uploading_article_to_database(url): | |
| strainer = bs4.SoupStrainer(["article", "main"]) | |
| loader = WebBaseLoader( | |
| web_path = url, | |
| bs_kwargs = {"parse_only": strainer}, | |
| ) | |
| document = loader.load() | |
| document[0].page_content = document[0].page_content.replace("\n\n\n", " ").strip() | |
| document[0].page_content = document[0].page_content.replace("\n\n", " ").strip() | |
| chunked_data = chunk_article(document) | |
| print("Deleting previous data") | |
| try: | |
| # Deleting all existing data on Pinecone index | |
| qdrant_index.delete(delete_all=True) | |
| time.sleep(5) | |
| except: | |
| print("Namespace is already empty") | |
| print("Uploading Article to Database") | |
| # Uploading the chunked data to Qdrant index | |
| qdrant_index.from_documents( | |
| chunked_data, | |
| embeddings, | |
| url = url, | |
| prefer_grpc=False, | |
| collection_name="rag-documents", | |
| ) | |
| print("Article Uploaded to Database") | |
| time.sleep(15) | |
| prompt = "What is the Title of the document and a small description of the content." | |
| description = response_generator(query = prompt, profession="Student") | |
| return description | |
| def retrieve_response_from_database(query, k=5): | |
| """ | |
| Retrieves the most similar responses from the Qdrant index based on the given query. | |
| Args: | |
| query (str): The input query used to search the Qdrant index for vectors. | |
| k (int, optional): Indicates top results to choose. Default is 5. | |
| Returns: | |
| list: A list of results containing the most similar vectors from the Qdrant index. | |
| """ | |
| results = qdrant_index.similarity_search(query, k=k) | |
| return results | |
| def response_generator(query, profession): | |
| """ | |
| Generates a response to the given query by retrieving relevant information from the Qdrant index and invoking | |
| a processing chain with llm. | |
| Args: | |
| query (str): The user's input or question that will be used to retrieve relevant information and generate a response. | |
| Returns: | |
| str: The generated response to the query, either based on the retrieved information or an error messageif the process fails. | |
| """ | |
| try: | |
| results = retrieve_response_from_database(query) | |
| print("results", results) | |
| # Generating a response by invoking the chain with retrieved content and the original query | |
| answer = chain.invoke(input={"profession": profession, "context": results, "user_query": query}) | |
| except Exception as e: | |
| # Returning an error message if any exception occurs | |
| answer = f"Sorry, I am unable to find the answer to your query. Please try again later. The error is {e}" | |
| return answer | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def root(query: str, profession: str): | |
| """ | |
| FastAPI endpoint to handle GET requests and return a generated response for a user's query. | |
| Args: | |
| query (str): The query string input from the user, passed as a path parameter in the API request. | |
| Returns: | |
| dict: A dictionary containing the response generated from the query. | |
| """ | |
| print("User_query : " + query) | |
| answer = response_generator(query, profession) | |
| return JSONResponse(content={"answer": answer}) | |
| def upload_document(file_bytes: bytes = File(...)): | |
| """ | |
| FastAPI endpoint to handle POST requests for uploading a document to the Qdrant index. | |
| Args: | |
| file_bytes (bytes): The byte data of the document file that will be uploaded to the Pinecone index. | |
| Returns: | |
| dict: A dictionary containing the description of the document uploaded. | |
| """ | |
| try: | |
| # Save the uploaded file | |
| with open("/tmp/document.pdf", "wb") as f: | |
| f.write(file_bytes) | |
| description = uploading_document_to_database("/tmp/document.pdf") | |
| response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description}) | |
| return {"status": description} | |
| except Exception as e: | |
| return {"status": f"Error uploading file: {e}"} | |
| def upload_article(url: str): | |
| """ | |
| FastAPI endpoint to handle POST requests for uploading a web article to the Qdrant index. | |
| Args: | |
| url (string): The string value that contains the url of the web article. | |
| Returns: | |
| dict: A dictionary containing the description of the article uploaded.""" | |
| try: | |
| print("URL to server : ", url) | |
| #Uploading process of article and getting description | |
| description = uploading_article_to_database(url) | |
| # Providing the description to the AI agents | |
| response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description}) | |
| print("type(description) : ", type(description)) | |
| # Returning the description of the article | |
| return {"status": description} | |
| except Exception as e: | |
| return {"status": f"Error uploading file: {e}"} | |
| if __name__ == "__main__": | |
| """ | |
| Initializes the FastAPI server, loads environment variables, creates an embedding model and Qdrant index, | |
| uploads a document for processing, and sets up a language model for generating responses. | |
| This block of code performs the following tasks: | |
| - Loads environment variables. | |
| - Initializes the embedding model for Qdrant chunking and retrieval. | |
| - Creates a Qdrant index to store document embeddings. | |
| - Uploads a specific PDF document to the Qdrant index for later query-based retrieval. | |
| - Sets up a language model (LLM) for generating human-like responses. | |
| - Defines the system prompt and response behavior for the assistant. | |
| - Sets up a chain that combines document retrieval with response generation. | |
| - Starts the FastAPI server on host `0.0.0.0` at port 8000. | |
| """ | |
| # Loading environment variables from .env file | |
| load_dotenv() | |
| QDRANT_CLOUD_KEY = os.getenv(key="QDRANT_CLOUD_KEY") | |
| # Initializing embedding model for creating document vectors | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004") | |
| url = "https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io" | |
| # Qdrant index name for storing document embeddings | |
| index_name = "rag-chatbot" | |
| # Creating Qdrant index using the embedding model | |
| qdrant_index = creating_qdrant_index(embeddings) | |
| # Initializing the LLM with the 'gemini-1.5-flash' model and a specified temperature for response generation | |
| llm = GoogleGenerativeAI(model="gemini-1.5-flash", temperature=0.6) | |
| # Creating a prompt template for generating responses based on retrieved content and human input | |
| prompt_template = PromptTemplate( | |
| template="I am {profession}. You have to provide a good information regarding my query. This is the information from my document : {context}. Here is my query for you: {user_query}. Answer in a proper markdown format.", | |
| input_variables=["profession", "context", "user_query"] | |
| ) | |
| # Setting up the document processing chain for response generation based on retrieved documents | |
| chain = create_stuff_documents_chain(llm, prompt_template, document_variable_name="context") | |
| # Starting the FastAPI server with Uvicorn, accessible at 0.0.0.0 on port 8000 | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |