from fastapi import FastAPI, File from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from dotenv import load_dotenv import time import os import requests import bs4 from langchain_qdrant import QdrantVectorStore from qdrant_client import QdrantClient from qdrant_client.http.models import Distance, VectorParams from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_google_genai import GoogleGenerativeAIEmbeddings, GoogleGenerativeAI from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import PromptTemplate def chunk_document(document, chunk_size=600, chunk_overlap=80): """ Divides the document into smaller, overlapping chunks for better processing efficiency. Args: document (list): A list of fetched content from document. chunk_size (int, optional): The maximum number of words ia a chunk. Default is 600. chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80. Returns: list: A list of document chunks, where each chunk is a Documentof content with the specified size and overlap. """ text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) chunks = text_splitter.split_documents(document) return chunks def chunk_article(document, chunk_size=600, chunk_overlap=80): """ Divides the article text into smaller, overlapping chunks for better processing efficiency. Args: extracted_text (str): The extracted text content from the article URL. chunk_size (int, optional): The maximum number of words in a chunk. Default is 600. chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80. Returns: list: A list of article chunks, where each chunk is a Documentof content with the specified size and overlap. """ splitter = RecursiveCharacterTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap) splitted_docs = splitter.split_documents(document) return splitted_docs def creating_qdrant_index(embeddings): """ Creates a Qdrant index using the provided embedding model. Args: embedding (object): The embedding model or function used to generate vector embeddings. Returns: QdrantVectorStore: An instance of Qdrant index where the vectors can be processed. """ # client = QdrantClient(url="http://localhost:6333") # client.create_collection( # collection_name="rag-documents", # vectors_config=VectorParams(size=768, distance=Distance.COSINE), # ) vector_store = QdrantVectorStore.from_existing_collection( url="https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io", collection_name="rag-documents", embedding=embeddings, api_key=QDRANT_CLOUD_KEY, prefer_grpc=False ) return vector_store def uploading_document_to_database(directory): """ Uploads a document from a specified directory to the Qdrant index after processing and chunking the content. Args: directory (str): The file path of the PDF document that will be uploaded to Qdrant. Returns: None: This function does not return any value. """ print("Loading PDF : ", directory) pdf_loader = PyPDFLoader(directory) document = pdf_loader.load() # Replacing newline characters with spaces for chunk in document: chunk.page_content = chunk.page_content.replace('\n', ' ') # Dividing document content into chunks chunked_data = chunk_document(document) print("Deleting file") try: # Deleting all existing data on Pinecone index qdrant_index.delete(delete_all=True) time.sleep(5) except: print("Namespace is already empty") print("Uploading File to Database") # Uploading the chunked data to Pinecone index qdrant_index.from_documents( chunked_data, embeddings, prefer_grpc=False, collection_name="rag-documents", ) print("Document Uploaded to Database") time.sleep(5) prompt = "What is the Title of the document and a small description of the content." description = response_generator(query = prompt, profession="Student") return description def uploading_article_to_database(url): strainer = bs4.SoupStrainer(["article", "main"]) loader = WebBaseLoader( web_path = url, bs_kwargs = {"parse_only": strainer}, ) document = loader.load() document[0].page_content = document[0].page_content.replace("\n\n\n", " ").strip() document[0].page_content = document[0].page_content.replace("\n\n", " ").strip() chunked_data = chunk_article(document) print("Deleting previous data") try: # Deleting all existing data on Pinecone index qdrant_index.delete(delete_all=True) time.sleep(5) except: print("Namespace is already empty") print("Uploading Article to Database") # Uploading the chunked data to Qdrant index qdrant_index.from_documents( chunked_data, embeddings, url = url, prefer_grpc=False, collection_name="rag-documents", ) print("Article Uploaded to Database") time.sleep(15) prompt = "What is the Title of the document and a small description of the content." description = response_generator(query = prompt, profession="Student") return description def retrieve_response_from_database(query, k=5): """ Retrieves the most similar responses from the Qdrant index based on the given query. Args: query (str): The input query used to search the Qdrant index for vectors. k (int, optional): Indicates top results to choose. Default is 5. Returns: list: A list of results containing the most similar vectors from the Qdrant index. """ results = qdrant_index.similarity_search(query, k=k) return results def response_generator(query, profession): """ Generates a response to the given query by retrieving relevant information from the Qdrant index and invoking a processing chain with llm. Args: query (str): The user's input or question that will be used to retrieve relevant information and generate a response. Returns: str: The generated response to the query, either based on the retrieved information or an error messageif the process fails. """ try: results = retrieve_response_from_database(query) print("results", results) # Generating a response by invoking the chain with retrieved content and the original query answer = chain.invoke(input={"profession": profession, "context": results, "user_query": query}) except Exception as e: # Returning an error message if any exception occurs answer = f"Sorry, I am unable to find the answer to your query. Please try again later. The error is {e}" return answer app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/get_response") def root(query: str, profession: str): """ FastAPI endpoint to handle GET requests and return a generated response for a user's query. Args: query (str): The query string input from the user, passed as a path parameter in the API request. Returns: dict: A dictionary containing the response generated from the query. """ print("User_query : " + query) answer = response_generator(query, profession) return JSONResponse(content={"answer": answer}) @app.post("/upload_document") def upload_document(file_bytes: bytes = File(...)): """ FastAPI endpoint to handle POST requests for uploading a document to the Qdrant index. Args: file_bytes (bytes): The byte data of the document file that will be uploaded to the Pinecone index. Returns: dict: A dictionary containing the description of the document uploaded. """ try: # Save the uploaded file with open("/tmp/document.pdf", "wb") as f: f.write(file_bytes) description = uploading_document_to_database("/tmp/document.pdf") response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description}) return {"status": description} except Exception as e: return {"status": f"Error uploading file: {e}"} @app.post("/upload_article") def upload_article(url: str): """ FastAPI endpoint to handle POST requests for uploading a web article to the Qdrant index. Args: url (string): The string value that contains the url of the web article. Returns: dict: A dictionary containing the description of the article uploaded.""" try: print("URL to server : ", url) #Uploading process of article and getting description description = uploading_article_to_database(url) # Providing the description to the AI agents response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description}) print("type(description) : ", type(description)) # Returning the description of the article return {"status": description} except Exception as e: return {"status": f"Error uploading file: {e}"} if __name__ == "__main__": """ Initializes the FastAPI server, loads environment variables, creates an embedding model and Qdrant index, uploads a document for processing, and sets up a language model for generating responses. This block of code performs the following tasks: - Loads environment variables. - Initializes the embedding model for Qdrant chunking and retrieval. - Creates a Qdrant index to store document embeddings. - Uploads a specific PDF document to the Qdrant index for later query-based retrieval. - Sets up a language model (LLM) for generating human-like responses. - Defines the system prompt and response behavior for the assistant. - Sets up a chain that combines document retrieval with response generation. - Starts the FastAPI server on host `0.0.0.0` at port 8000. """ # Loading environment variables from .env file load_dotenv() QDRANT_CLOUD_KEY = os.getenv(key="QDRANT_CLOUD_KEY") # Initializing embedding model for creating document vectors embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004") url = "https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io" # Qdrant index name for storing document embeddings index_name = "rag-chatbot" # Creating Qdrant index using the embedding model qdrant_index = creating_qdrant_index(embeddings) # Initializing the LLM with the 'gemini-1.5-flash' model and a specified temperature for response generation llm = GoogleGenerativeAI(model="gemini-1.5-flash", temperature=0.6) # Creating a prompt template for generating responses based on retrieved content and human input prompt_template = PromptTemplate( template="I am {profession}. You have to provide a good information regarding my query. This is the information from my document : {context}. Here is my query for you: {user_query}. Answer in a proper markdown format.", input_variables=["profession", "context", "user_query"] ) # Setting up the document processing chain for response generation based on retrieved documents chain = create_stuff_documents_chain(llm, prompt_template, document_variable_name="context") # Starting the FastAPI server with Uvicorn, accessible at 0.0.0.0 on port 8000 import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)