from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from langchain.chains import create_history_aware_retriever, create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_groq import ChatGroq from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_huggingface import HuggingFaceEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader from chromadb.config import Settings from langchain_chroma import Chroma import os from dotenv import load_dotenv import shutil # Load environment variables load_dotenv() os.environ['HF_TOKEN'] = os.getenv("HF_TOKEN" ) os.environ['GROQ_API_KEY'] = os.getenv("GROQ_API_KEY") # Initialize FastAPI app = FastAPI() origins = [ "https://codebug.lk", "http://localhost:3000" ] # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=origins, # Allows all origins, you can specify a list of allowed origins allow_credentials=True, allow_methods=["*"], # Allows all methods (GET, POST, PUT, DELETE, etc.) allow_headers=["*"], # Allows all headers ) # Initialize HuggingFace embeddings embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") # Initialize ChatGroq llm = ChatGroq(model_name="Deepseek-R1-Distill-Llama-70b") # Initialize session store session_store = {} class QuestionRequest(BaseModel): session_id: str question: str def process_pdf(file_path): loader = PyPDFLoader(file_path) documents = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) splits = text_splitter.split_documents(documents) vectorstore = Chroma.from_documents( documents=splits, embedding=embeddings, persist_directory=f"./falcon_db", # Directory for persistent storage client_settings=Settings( persist_directory=f"./falcon_db" # Only this field is required for persistence ) ) return vectorstore # Initialize Chroma with the pre-loaded PDF vectorstore = process_pdf('codebug.pdf') retriever = vectorstore.as_retriever() # @app.post("/upload_pdf/") # async def upload_pdf(file: UploadFile = File(...), session_id: str = Form(...)): # # Save uploaded PDF to a temporary file # temppdf = f"./temp_{session_id}.pdf" # with open(temppdf, "wb") as buffer: # shutil.copyfileobj(file.file, buffer) # # Load and parse the PDF # loader = PyPDFLoader(temppdf) # documents = loader.load() # # Split and create embeddings for the documents # text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) # splits = text_splitter.split_documents(documents) # # Initialize Chroma with persistence # vectorstore = Chroma.from_documents( # documents=splits, # embedding=embeddings, # persist_directory=f"./chroma_db_{session_id}", # Directory for persistent storage # client_settings=Settings( # persist_directory=f"./chroma_db_{session_id}" # Only this field is required for persistence # ) # ) # # Save the vector store for reuse # retriever = vectorstore.as_retriever() # # Define system prompts # contextualize_q_system_prompt = ( # "Given a chat history and the latest user question " # "which might reference context in the chat history, " # "formulate a standalone question which can be understood " # "without the chat history. Do NOT answer the question, " # "just reformulate it if needed and otherwise return it as is." # "Say that the question is out of your scope if you are asked questions out of the context of chat history" # ) # contextualize_q_prompt = ChatPromptTemplate.from_messages( # [ # ("system", contextualize_q_system_prompt), # MessagesPlaceholder("chat_history"), # ("human", "{input}"), # ] # ) # history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt) # system_prompt = ( # "You are an assistant for question-answering tasks. " # "Use the following pieces of retrieved context to answer " # "the question. If you don't know the answer, say that you " # "don't know. Use three sentences maximum and keep the " # "answer concise.\n\n{context}" # ) # qa_prompt = ChatPromptTemplate.from_messages( # [ # ("system", system_prompt), # MessagesPlaceholder("chat_history"), # ("human", "{input}"), # ] # ) # question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) # rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) # # Store the RAG chain in the session store # session_store[session_id] = { # "rag_chain": rag_chain, # "history": ChatMessageHistory() # } # return {"message": "PDF uploaded and processed successfully"} # @app.post("/ask_question/") # async def ask_question(request: QuestionRequest): # session_id = request.session_id # question = request.question # if session_id not in session_store: # return JSONResponse(status_code=400, content={"message": "Session ID not found"}) # rag_chain = session_store[session_id]["rag_chain"] # session_history = session_store[session_id]["history"] # # Retrieve only the last 6 messages from the session history # last_6_messages = session_history.messages[-6:] # # Pass the last 6 messages to the RAG chain # response = rag_chain.invoke( # {"input": question, "chat_history": last_6_messages}, # config={ # "configurable": {"session_id": session_id} # } # ) # # Update the session history # session_history.add_user_message(question) # session_history.add_user_message(response["answer"]) # return {"answer": response["answer"]} @app.post("/ask_question/") async def ask_question(request: QuestionRequest): session_id = request.session_id question = request.question if session_id not in session_store: retriever2 = retriever else: retriever2 = session_store[session_id]["retriever"] # Define system prompts contextualize_q_system_prompt = ( "Given a chat history and the latest user question " "which might reference context in the chat history, " "formulate a standalone question which can be understood " "without the chat history. Do NOT answer the question, " "just reformulate it if needed and otherwise return it as is. " "Say that the question is out of your scope ONLY if you are asked questions out of the context of chat history" ) contextualize_q_prompt = ChatPromptTemplate.from_messages( [ ("system", contextualize_q_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) history_aware_retriever = create_history_aware_retriever(llm, retriever2, contextualize_q_prompt) system_prompt = ( "Your name is Falcon. You are a friendly personal AI assistant developed by Codebug Technologies" "for provide information related Codebug. " "the question. Use three sentences maximum and keep the " "answer concise and professional. " "Say that the question is out of your scope ONLY if you are asked questions out of the context of chat history." "Use the following pieces of retrieved context to answer :\n\n{context}" ) qa_prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) # Store the RAG chain in the session store if not already stored if session_id not in session_store: session_store[session_id] = { "rag_chain": rag_chain, "history": ChatMessageHistory(), "retriever": retriever2, } rag_chain = session_store[session_id]["rag_chain"] session_history = session_store[session_id]["history"] # Retrieve only the last 6 messages from the session history last_6_messages = session_history.messages[-6:] # Pass the last 6 messages to the RAG chain response = rag_chain.invoke( {"input": question, "chat_history": last_6_messages}, config={ "configurable": {"session_id": session_id} } ) # Update the session history session_history.add_user_message(question) session_history.add_message({"role": "assistant", "content": response["answer"]}) answer = response["answer"] end_tag = '' end_tag_pos = answer.find(end_tag) if end_tag_pos != -1: # Extract the string after the closing tag result = answer[end_tag_pos + len(end_tag):].strip() print(result) else: end_tag = '' end_tag_pos = answer.find(end_tag) if end_tag_pos != -1: # Extract the string after the closing tag result = answer[end_tag_pos + len(end_tag):].strip() print(result) else: result = answer print("Closing tag not found") return {"answer": result, "chat_history": last_6_messages} @app.get("/") def home(): return {"message": "Welcome to the Text Generation API"}