sbburaflaskapi / chatbot_service.py
shakeel143's picture
Update chatbot_service.py
5b34c8d verified
# chatbot_service.py
import os
import pickle
import logging
from typing import List
from fastapi import HTTPException, UploadFile, File
from PyPDF2 import PdfReader
from langchain_community.vectorstores import FAISS
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from drive_service import DriveService
from config import BASE_MODEL_PATH, GOOGLE_DRIVE_FOLDER_ID
logger = logging.getLogger(__name__)
class ChatbotService:
def __init__(self):
self.drive_service = DriveService()
def create_chatbot(self, folder_name: str, pdf_files: List[UploadFile]):
"""Create a new chatbot using only PDF files."""
try:
# Validate input
if not pdf_files:
raise HTTPException(status_code=400, detail="At least one PDF file is required.")
# Create folder for the chatbot
folder_path = os.path.join(CUSTOM_CHATBOTS_DIR, folder_name)
os.makedirs(folder_path, exist_ok=True)
# Process PDFs
raw_text = ""
for pdf_file in pdf_files:
pdf_reader = PdfReader(pdf_file.file)
for page in pdf_reader.pages:
raw_text += page.extract_text() or ""
# Split text into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
text_chunks = text_splitter.split_text(raw_text)
# Create vector store
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings)
# Save the vector store and documents
vector_store.save_local(os.path.join(folder_path, "faiss_index"))
with open(os.path.join(folder_path, "documents.pkl"), 'wb') as f:
pickle.dump(text_chunks, f)
# Upload files to Google Drive
# Step 1: Create or find the folder with the provided `folder_name`
chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID)
if not chatbot_folder_id:
chatbot_folder_id = self.drive_service.create_folder_in_google_drive(folder_name, GOOGLE_DRIVE_FOLDER_ID)
# Step 2: Create or find the `faiss_index` subfolder inside the `folder_name` folder
faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id)
if not faiss_index_folder_id:
faiss_index_folder_id = self.drive_service.create_folder_in_google_drive("faiss_index", chatbot_folder_id)
# Step 3: Upload index.faiss and index.pkl to the `faiss_index` subfolder
index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl")
os.rename(os.path.join(folder_path, "documents.pkl"), index_pkl_path)
faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss")
self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id)
self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id)
return {
"status": "success",
"message": f"Chatbot '{folder_name}' created and files uploaded to Google Drive successfully!"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating chatbot: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to create chatbot: {str(e)}")
# def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]):
# """Update an existing chatbot by appending new data (PDF files)."""
# try:
# # Validate input
# if not pdf_files:
# raise HTTPException(status_code=400, detail="At least one PDF file is required.")
# # First verify the Drive structure
# logger.info("Verifying Google Drive structure...")
# is_valid, message = self.drive_service.verify_drive_structure(folder_name)
# if not is_valid:
# raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}")
# # First, download the existing files from Google Drive
# logger.info(f"Downloading existing model files for: {folder_name}")
# self.drive_service.download_model_files_from_subfolder(
# parent_folder_id=GOOGLE_DRIVE_FOLDER_ID,
# subfolder_name=folder_name
# )
# # Verify files exist after download
# if not self.verify_files_exist(folder_name):
# raise HTTPException(
# status_code=404,
# detail="Required files not found after download. Please check if the chatbot exists in Google Drive."
# )
# # Now check if the chatbot folder exists
# folder_path = os.path.join(BASE_MODEL_PATH, folder_name)
# logger.info(f"Checking for chatbot folder at: {folder_path}")
# if not os.path.exists(folder_path):
# logger.error(f"Chatbot folder '{folder_name}' not found at: {folder_path}")
# raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found.")
# # Process new PDFs
# raw_text = ""
# for pdf_file in pdf_files:
# pdf_reader = PdfReader(pdf_file.file)
# for page in pdf_reader.pages:
# raw_text += page.extract_text() or ""
# # Split new text into chunks
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
# new_text_chunks = text_splitter.split_text(raw_text)
# # Load the existing vector store
# embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
# vector_store = FAISS.load_local(
# os.path.join(folder_path, "faiss_index"),
# embeddings,
# allow_dangerous_deserialization=True
# )
# # Add new text chunks to the existing vector store
# vector_store.add_texts(new_text_chunks)
# # Save the updated vector store
# vector_store.save_local(os.path.join(folder_path, "faiss_index"))
# # Update the documents.pkl file with the new text chunks
# documents_path = os.path.join(folder_path, "documents.pkl")
# if os.path.exists(documents_path):
# with open(documents_path, 'rb') as f:
# existing_text_chunks = pickle.load(f)
# updated_text_chunks = existing_text_chunks + new_text_chunks
# else:
# updated_text_chunks = new_text_chunks
# with open(documents_path, 'wb') as f:
# pickle.dump(updated_text_chunks, f)
# # Upload updated files to Google Drive
# chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID)
# if not chatbot_folder_id:
# raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.")
# faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id)
# if not faiss_index_folder_id:
# raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.")
# # Upload updated index.faiss and index.pkl to Google Drive
# index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl")
# faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss")
# self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id)
# self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id)
# return {
# "status": "success",
# "message": f"Chatbot '{folder_name}' updated successfully!"
# }
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Error updating chatbot: {str(e)}")
# raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}")
def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]):
"""Update an existing chatbot by appending new data (PDF files)."""
try:
# Validate input
if not pdf_files:
raise HTTPException(status_code=400, detail="At least one PDF file is required.")
# Verify the Google Drive structure
logger.info("Verifying Google Drive structure...")
is_valid, message = self.drive_service.verify_drive_structure(folder_name)
if not is_valid:
raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}")
# Download existing model files from Google Drive
logger.info(f"Downloading existing model files for: {folder_name}")
self.drive_service.download_model_files_from_subfolder(
parent_folder_id=GOOGLE_DRIVE_FOLDER_ID,
subfolder_name=folder_name
)
# Verify files exist after download
folder_path = os.path.join(BASE_MODEL_PATH, folder_name)
if not self.verify_files_exist(folder_name):
raise HTTPException(
status_code=404,
detail="Required files not found after download. Please check if the chatbot exists in Google Drive."
)
# Process new PDFs
raw_text = ""
for pdf_file in pdf_files:
pdf_reader = PdfReader(pdf_file.file)
for page in pdf_reader.pages:
raw_text += page.extract_text() or ""
# Split new text into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
new_text_chunks = text_splitter.split_text(raw_text)
# Load the existing vector store
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector_store_path = os.path.join(folder_path, "faiss_index")
if os.path.exists(vector_store_path):
# Load the existing FAISS index
logger.info("Loading existing FAISS index...")
vector_store = FAISS.load_local(
vector_store_path,
embeddings,
allow_dangerous_deserialization=True
)
else:
# If no existing FAISS index, create a new one
logger.info("No existing FAISS index found. Creating a new one...")
vector_store = FAISS.from_texts(new_text_chunks, embedding=embeddings)
# Append new text chunks to the existing vector store
logger.info("Appending new text chunks to the vector store...")
vector_store.add_texts(new_text_chunks)
# Save the updated vector store
logger.info("Saving the updated vector store...")
vector_store.save_local(vector_store_path)
# Update the documents.pkl file with the new text chunks
documents_path = os.path.join(folder_path, "documents.pkl")
if os.path.exists(documents_path):
logger.info("Loading existing documents.pkl...")
with open(documents_path, 'rb') as f:
existing_text_chunks = pickle.load(f)
updated_text_chunks = existing_text_chunks + new_text_chunks
else:
logger.info("No existing documents.pkl found. Creating a new one...")
updated_text_chunks = new_text_chunks
logger.info("Saving updated documents.pkl...")
with open(documents_path, 'wb') as f:
pickle.dump(updated_text_chunks, f)
# Upload updated files to Google Drive
logger.info("Uploading updated files to Google Drive...")
chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID)
if not chatbot_folder_id:
raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.")
faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id)
if not faiss_index_folder_id:
raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.")
# Upload updated index.faiss and index.pkl to Google Drive
index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl")
faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss")
self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id)
self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id)
return {
"status": "success",
"message": f"Chatbot '{folder_name}' updated successfully!"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating chatbot: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}")
def verify_files_exist(self, folder_name: str):
"""Verify that necessary files exist before operations."""
base_path = os.path.join(BASE_MODEL_PATH, folder_name, "faiss_index")
files_to_check = ["index.faiss", "index.pkl"]
logger.info(f"Verifying files in: {base_path}")
# First check if the directory exists
if not os.path.exists(base_path):
logger.error(f"Directory not found: {base_path}")
return False
for file in files_to_check:
file_path = os.path.join(base_path, file)
if not os.path.exists(file_path):
logger.error(f"Required file not found: {file_path}")
return False
logger.info("All required files found")
return True