# chatbot_service.py import os import pickle import logging from typing import List from fastapi import HTTPException, UploadFile, File from PyPDF2 import PdfReader from langchain_community.vectorstores import FAISS from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from drive_service import DriveService from config import BASE_MODEL_PATH, GOOGLE_DRIVE_FOLDER_ID logger = logging.getLogger(__name__) class ChatbotService: def __init__(self): self.drive_service = DriveService() def create_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): """Create a new chatbot using only PDF files.""" try: # Validate input if not pdf_files: raise HTTPException(status_code=400, detail="At least one PDF file is required.") # Create folder for the chatbot folder_path = os.path.join(CUSTOM_CHATBOTS_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) # Process PDFs raw_text = "" for pdf_file in pdf_files: pdf_reader = PdfReader(pdf_file.file) for page in pdf_reader.pages: raw_text += page.extract_text() or "" # Split text into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) text_chunks = text_splitter.split_text(raw_text) # Create vector store embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) # Save the vector store and documents vector_store.save_local(os.path.join(folder_path, "faiss_index")) with open(os.path.join(folder_path, "documents.pkl"), 'wb') as f: pickle.dump(text_chunks, f) # Upload files to Google Drive # Step 1: Create or find the folder with the provided `folder_name` chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) if not chatbot_folder_id: chatbot_folder_id = self.drive_service.create_folder_in_google_drive(folder_name, GOOGLE_DRIVE_FOLDER_ID) # Step 2: Create or find the `faiss_index` subfolder inside the `folder_name` folder faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) if not faiss_index_folder_id: faiss_index_folder_id = self.drive_service.create_folder_in_google_drive("faiss_index", chatbot_folder_id) # Step 3: Upload index.faiss and index.pkl to the `faiss_index` subfolder index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") os.rename(os.path.join(folder_path, "documents.pkl"), index_pkl_path) faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) return { "status": "success", "message": f"Chatbot '{folder_name}' created and files uploaded to Google Drive successfully!" } except HTTPException: raise except Exception as e: logger.error(f"Error creating chatbot: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create chatbot: {str(e)}") # def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): # """Update an existing chatbot by appending new data (PDF files).""" # try: # # Validate input # if not pdf_files: # raise HTTPException(status_code=400, detail="At least one PDF file is required.") # # First verify the Drive structure # logger.info("Verifying Google Drive structure...") # is_valid, message = self.drive_service.verify_drive_structure(folder_name) # if not is_valid: # raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}") # # First, download the existing files from Google Drive # logger.info(f"Downloading existing model files for: {folder_name}") # self.drive_service.download_model_files_from_subfolder( # parent_folder_id=GOOGLE_DRIVE_FOLDER_ID, # subfolder_name=folder_name # ) # # Verify files exist after download # if not self.verify_files_exist(folder_name): # raise HTTPException( # status_code=404, # detail="Required files not found after download. Please check if the chatbot exists in Google Drive." # ) # # Now check if the chatbot folder exists # folder_path = os.path.join(BASE_MODEL_PATH, folder_name) # logger.info(f"Checking for chatbot folder at: {folder_path}") # if not os.path.exists(folder_path): # logger.error(f"Chatbot folder '{folder_name}' not found at: {folder_path}") # raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found.") # # Process new PDFs # raw_text = "" # for pdf_file in pdf_files: # pdf_reader = PdfReader(pdf_file.file) # for page in pdf_reader.pages: # raw_text += page.extract_text() or "" # # Split new text into chunks # text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) # new_text_chunks = text_splitter.split_text(raw_text) # # Load the existing vector store # embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") # vector_store = FAISS.load_local( # os.path.join(folder_path, "faiss_index"), # embeddings, # allow_dangerous_deserialization=True # ) # # Add new text chunks to the existing vector store # vector_store.add_texts(new_text_chunks) # # Save the updated vector store # vector_store.save_local(os.path.join(folder_path, "faiss_index")) # # Update the documents.pkl file with the new text chunks # documents_path = os.path.join(folder_path, "documents.pkl") # if os.path.exists(documents_path): # with open(documents_path, 'rb') as f: # existing_text_chunks = pickle.load(f) # updated_text_chunks = existing_text_chunks + new_text_chunks # else: # updated_text_chunks = new_text_chunks # with open(documents_path, 'wb') as f: # pickle.dump(updated_text_chunks, f) # # Upload updated files to Google Drive # chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) # if not chatbot_folder_id: # raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.") # faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) # if not faiss_index_folder_id: # raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.") # # Upload updated index.faiss and index.pkl to Google Drive # index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") # faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") # self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) # self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) # return { # "status": "success", # "message": f"Chatbot '{folder_name}' updated successfully!" # } # except HTTPException: # raise # except Exception as e: # logger.error(f"Error updating chatbot: {str(e)}") # raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}") def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): """Update an existing chatbot by appending new data (PDF files).""" try: # Validate input if not pdf_files: raise HTTPException(status_code=400, detail="At least one PDF file is required.") # Verify the Google Drive structure logger.info("Verifying Google Drive structure...") is_valid, message = self.drive_service.verify_drive_structure(folder_name) if not is_valid: raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}") # Download existing model files from Google Drive logger.info(f"Downloading existing model files for: {folder_name}") self.drive_service.download_model_files_from_subfolder( parent_folder_id=GOOGLE_DRIVE_FOLDER_ID, subfolder_name=folder_name ) # Verify files exist after download folder_path = os.path.join(BASE_MODEL_PATH, folder_name) if not self.verify_files_exist(folder_name): raise HTTPException( status_code=404, detail="Required files not found after download. Please check if the chatbot exists in Google Drive." ) # Process new PDFs raw_text = "" for pdf_file in pdf_files: pdf_reader = PdfReader(pdf_file.file) for page in pdf_reader.pages: raw_text += page.extract_text() or "" # Split new text into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) new_text_chunks = text_splitter.split_text(raw_text) # Load the existing vector store embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") vector_store_path = os.path.join(folder_path, "faiss_index") if os.path.exists(vector_store_path): # Load the existing FAISS index logger.info("Loading existing FAISS index...") vector_store = FAISS.load_local( vector_store_path, embeddings, allow_dangerous_deserialization=True ) else: # If no existing FAISS index, create a new one logger.info("No existing FAISS index found. Creating a new one...") vector_store = FAISS.from_texts(new_text_chunks, embedding=embeddings) # Append new text chunks to the existing vector store logger.info("Appending new text chunks to the vector store...") vector_store.add_texts(new_text_chunks) # Save the updated vector store logger.info("Saving the updated vector store...") vector_store.save_local(vector_store_path) # Update the documents.pkl file with the new text chunks documents_path = os.path.join(folder_path, "documents.pkl") if os.path.exists(documents_path): logger.info("Loading existing documents.pkl...") with open(documents_path, 'rb') as f: existing_text_chunks = pickle.load(f) updated_text_chunks = existing_text_chunks + new_text_chunks else: logger.info("No existing documents.pkl found. Creating a new one...") updated_text_chunks = new_text_chunks logger.info("Saving updated documents.pkl...") with open(documents_path, 'wb') as f: pickle.dump(updated_text_chunks, f) # Upload updated files to Google Drive logger.info("Uploading updated files to Google Drive...") chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) if not chatbot_folder_id: raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.") faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) if not faiss_index_folder_id: raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.") # Upload updated index.faiss and index.pkl to Google Drive index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) return { "status": "success", "message": f"Chatbot '{folder_name}' updated successfully!" } except HTTPException: raise except Exception as e: logger.error(f"Error updating chatbot: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}") def verify_files_exist(self, folder_name: str): """Verify that necessary files exist before operations.""" base_path = os.path.join(BASE_MODEL_PATH, folder_name, "faiss_index") files_to_check = ["index.faiss", "index.pkl"] logger.info(f"Verifying files in: {base_path}") # First check if the directory exists if not os.path.exists(base_path): logger.error(f"Directory not found: {base_path}") return False for file in files_to_check: file_path = os.path.join(base_path, file) if not os.path.exists(file_path): logger.error(f"Required file not found: {file_path}") return False logger.info("All required files found") return True