RAG-ChatBOT / api.py
aadil732's picture
Update endpoint
72a7cb8 verified
from fastapi import FastAPI, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
import time
import os
import requests
import bs4
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings, GoogleGenerativeAI
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import PromptTemplate
def chunk_document(document, chunk_size=600, chunk_overlap=80):
"""
Divides the document into smaller, overlapping chunks for better processing efficiency.
Args:
document (list): A list of fetched content from document.
chunk_size (int, optional): The maximum number of words ia a chunk. Default is 600.
chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80.
Returns:
list: A list of document chunks, where each chunk is a Documentof content with the specified size and overlap.
"""
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks = text_splitter.split_documents(document)
return chunks
def chunk_article(document, chunk_size=600, chunk_overlap=80):
"""
Divides the article text into smaller, overlapping chunks for better processing efficiency.
Args:
extracted_text (str): The extracted text content from the article URL.
chunk_size (int, optional): The maximum number of words in a chunk. Default is 600.
chunk_overlap (int, optional): The number of overlapping words between consecutive chunks. Default is 80.
Returns:
list: A list of article chunks, where each chunk is a Documentof content with the specified size and overlap.
"""
splitter = RecursiveCharacterTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap)
splitted_docs = splitter.split_documents(document)
return splitted_docs
def creating_qdrant_index(embeddings):
"""
Creates a Qdrant index using the provided embedding model.
Args:
embedding (object): The embedding model or function used to generate vector embeddings.
Returns:
QdrantVectorStore: An instance of Qdrant index where the vectors can be processed.
"""
# client = QdrantClient(url="http://localhost:6333")
# client.create_collection(
# collection_name="rag-documents",
# vectors_config=VectorParams(size=768, distance=Distance.COSINE),
# )
vector_store = QdrantVectorStore.from_existing_collection(
url="https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io",
collection_name="rag-documents",
embedding=embeddings,
api_key=QDRANT_CLOUD_KEY,
prefer_grpc=False
)
return vector_store
def uploading_document_to_database(directory):
"""
Uploads a document from a specified directory to the Qdrant index after processing and chunking the content.
Args:
directory (str): The file path of the PDF document that will be uploaded to Qdrant.
Returns:
None: This function does not return any value.
"""
print("Loading PDF : ", directory)
pdf_loader = PyPDFLoader(directory)
document = pdf_loader.load()
# Replacing newline characters with spaces
for chunk in document:
chunk.page_content = chunk.page_content.replace('\n', ' ')
# Dividing document content into chunks
chunked_data = chunk_document(document)
print("Deleting file")
try:
# Deleting all existing data on Pinecone index
qdrant_index.delete(delete_all=True)
time.sleep(5)
except:
print("Namespace is already empty")
print("Uploading File to Database")
# Uploading the chunked data to Pinecone index
qdrant_index.from_documents(
chunked_data,
embeddings,
prefer_grpc=False,
collection_name="rag-documents",
)
print("Document Uploaded to Database")
time.sleep(5)
prompt = "What is the Title of the document and a small description of the content."
description = response_generator(query = prompt, profession="Student")
return description
def uploading_article_to_database(url):
strainer = bs4.SoupStrainer(["article", "main"])
loader = WebBaseLoader(
web_path = url,
bs_kwargs = {"parse_only": strainer},
)
document = loader.load()
document[0].page_content = document[0].page_content.replace("\n\n\n", " ").strip()
document[0].page_content = document[0].page_content.replace("\n\n", " ").strip()
chunked_data = chunk_article(document)
print("Deleting previous data")
try:
# Deleting all existing data on Pinecone index
qdrant_index.delete(delete_all=True)
time.sleep(5)
except:
print("Namespace is already empty")
print("Uploading Article to Database")
# Uploading the chunked data to Qdrant index
qdrant_index.from_documents(
chunked_data,
embeddings,
url = url,
prefer_grpc=False,
collection_name="rag-documents",
)
print("Article Uploaded to Database")
time.sleep(15)
prompt = "What is the Title of the document and a small description of the content."
description = response_generator(query = prompt, profession="Student")
return description
def retrieve_response_from_database(query, k=5):
"""
Retrieves the most similar responses from the Qdrant index based on the given query.
Args:
query (str): The input query used to search the Qdrant index for vectors.
k (int, optional): Indicates top results to choose. Default is 5.
Returns:
list: A list of results containing the most similar vectors from the Qdrant index.
"""
results = qdrant_index.similarity_search(query, k=k)
return results
def response_generator(query, profession):
"""
Generates a response to the given query by retrieving relevant information from the Qdrant index and invoking
a processing chain with llm.
Args:
query (str): The user's input or question that will be used to retrieve relevant information and generate a response.
Returns:
str: The generated response to the query, either based on the retrieved information or an error messageif the process fails.
"""
try:
results = retrieve_response_from_database(query)
print("results", results)
# Generating a response by invoking the chain with retrieved content and the original query
answer = chain.invoke(input={"profession": profession, "context": results, "user_query": query})
except Exception as e:
# Returning an error message if any exception occurs
answer = f"Sorry, I am unable to find the answer to your query. Please try again later. The error is {e}"
return answer
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/get_response")
def root(query: str, profession: str):
"""
FastAPI endpoint to handle GET requests and return a generated response for a user's query.
Args:
query (str): The query string input from the user, passed as a path parameter in the API request.
Returns:
dict: A dictionary containing the response generated from the query.
"""
print("User_query : " + query)
answer = response_generator(query, profession)
return JSONResponse(content={"answer": answer})
@app.post("/upload_document")
def upload_document(file_bytes: bytes = File(...)):
"""
FastAPI endpoint to handle POST requests for uploading a document to the Qdrant index.
Args:
file_bytes (bytes): The byte data of the document file that will be uploaded to the Pinecone index.
Returns:
dict: A dictionary containing the description of the document uploaded.
"""
try:
# Save the uploaded file
with open("/tmp/document.pdf", "wb") as f:
f.write(file_bytes)
description = uploading_document_to_database("/tmp/document.pdf")
response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description})
return {"status": description}
except Exception as e:
return {"status": f"Error uploading file: {e}"}
@app.post("/upload_article")
def upload_article(url: str):
"""
FastAPI endpoint to handle POST requests for uploading a web article to the Qdrant index.
Args:
url (string): The string value that contains the url of the web article.
Returns:
dict: A dictionary containing the description of the article uploaded."""
try:
print("URL to server : ", url)
#Uploading process of article and getting description
description = uploading_article_to_database(url)
# Providing the description to the AI agents
response = requests.post("http://0.0.0.0:8080/send_desc", json={"description": description})
print("type(description) : ", type(description))
# Returning the description of the article
return {"status": description}
except Exception as e:
return {"status": f"Error uploading file: {e}"}
if __name__ == "__main__":
"""
Initializes the FastAPI server, loads environment variables, creates an embedding model and Qdrant index,
uploads a document for processing, and sets up a language model for generating responses.
This block of code performs the following tasks:
- Loads environment variables.
- Initializes the embedding model for Qdrant chunking and retrieval.
- Creates a Qdrant index to store document embeddings.
- Uploads a specific PDF document to the Qdrant index for later query-based retrieval.
- Sets up a language model (LLM) for generating human-like responses.
- Defines the system prompt and response behavior for the assistant.
- Sets up a chain that combines document retrieval with response generation.
- Starts the FastAPI server on host `0.0.0.0` at port 8000.
"""
# Loading environment variables from .env file
load_dotenv()
QDRANT_CLOUD_KEY = os.getenv(key="QDRANT_CLOUD_KEY")
# Initializing embedding model for creating document vectors
embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")
url = "https://a2790d6c-f701-4a62-a57c-81d8fb4558f8.us-east-1-0.aws.cloud.qdrant.io"
# Qdrant index name for storing document embeddings
index_name = "rag-chatbot"
# Creating Qdrant index using the embedding model
qdrant_index = creating_qdrant_index(embeddings)
# Initializing the LLM with the 'gemini-1.5-flash' model and a specified temperature for response generation
llm = GoogleGenerativeAI(model="gemini-1.5-flash", temperature=0.6)
# Creating a prompt template for generating responses based on retrieved content and human input
prompt_template = PromptTemplate(
template="I am {profession}. You have to provide a good information regarding my query. This is the information from my document : {context}. Here is my query for you: {user_query}. Answer in a proper markdown format.",
input_variables=["profession", "context", "user_query"]
)
# Setting up the document processing chain for response generation based on retrieved documents
chain = create_stuff_documents_chain(llm, prompt_template, document_variable_name="context")
# Starting the FastAPI server with Uvicorn, accessible at 0.0.0.0 on port 8000
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)