Spaces:
Running
Running
| # chatbot_service.py | |
| import os | |
| import pickle | |
| import logging | |
| from typing import List | |
| from fastapi import HTTPException, UploadFile, File | |
| from PyPDF2 import PdfReader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from drive_service import DriveService | |
| from config import BASE_MODEL_PATH, GOOGLE_DRIVE_FOLDER_ID | |
| logger = logging.getLogger(__name__) | |
| class ChatbotService: | |
| def __init__(self): | |
| self.drive_service = DriveService() | |
| def create_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): | |
| """Create a new chatbot using only PDF files.""" | |
| try: | |
| # Validate input | |
| if not pdf_files: | |
| raise HTTPException(status_code=400, detail="At least one PDF file is required.") | |
| # Create folder for the chatbot | |
| folder_path = os.path.join(CUSTOM_CHATBOTS_DIR, folder_name) | |
| os.makedirs(folder_path, exist_ok=True) | |
| # Process PDFs | |
| raw_text = "" | |
| for pdf_file in pdf_files: | |
| pdf_reader = PdfReader(pdf_file.file) | |
| for page in pdf_reader.pages: | |
| raw_text += page.extract_text() or "" | |
| # Split text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
| text_chunks = text_splitter.split_text(raw_text) | |
| # Create vector store | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) | |
| # Save the vector store and documents | |
| vector_store.save_local(os.path.join(folder_path, "faiss_index")) | |
| with open(os.path.join(folder_path, "documents.pkl"), 'wb') as f: | |
| pickle.dump(text_chunks, f) | |
| # Upload files to Google Drive | |
| # Step 1: Create or find the folder with the provided `folder_name` | |
| chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| if not chatbot_folder_id: | |
| chatbot_folder_id = self.drive_service.create_folder_in_google_drive(folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| # Step 2: Create or find the `faiss_index` subfolder inside the `folder_name` folder | |
| faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) | |
| if not faiss_index_folder_id: | |
| faiss_index_folder_id = self.drive_service.create_folder_in_google_drive("faiss_index", chatbot_folder_id) | |
| # Step 3: Upload index.faiss and index.pkl to the `faiss_index` subfolder | |
| index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") | |
| os.rename(os.path.join(folder_path, "documents.pkl"), index_pkl_path) | |
| faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") | |
| self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) | |
| self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) | |
| return { | |
| "status": "success", | |
| "message": f"Chatbot '{folder_name}' created and files uploaded to Google Drive successfully!" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating chatbot: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create chatbot: {str(e)}") | |
| # def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): | |
| # """Update an existing chatbot by appending new data (PDF files).""" | |
| # try: | |
| # # Validate input | |
| # if not pdf_files: | |
| # raise HTTPException(status_code=400, detail="At least one PDF file is required.") | |
| # # First verify the Drive structure | |
| # logger.info("Verifying Google Drive structure...") | |
| # is_valid, message = self.drive_service.verify_drive_structure(folder_name) | |
| # if not is_valid: | |
| # raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}") | |
| # # First, download the existing files from Google Drive | |
| # logger.info(f"Downloading existing model files for: {folder_name}") | |
| # self.drive_service.download_model_files_from_subfolder( | |
| # parent_folder_id=GOOGLE_DRIVE_FOLDER_ID, | |
| # subfolder_name=folder_name | |
| # ) | |
| # # Verify files exist after download | |
| # if not self.verify_files_exist(folder_name): | |
| # raise HTTPException( | |
| # status_code=404, | |
| # detail="Required files not found after download. Please check if the chatbot exists in Google Drive." | |
| # ) | |
| # # Now check if the chatbot folder exists | |
| # folder_path = os.path.join(BASE_MODEL_PATH, folder_name) | |
| # logger.info(f"Checking for chatbot folder at: {folder_path}") | |
| # if not os.path.exists(folder_path): | |
| # logger.error(f"Chatbot folder '{folder_name}' not found at: {folder_path}") | |
| # raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found.") | |
| # # Process new PDFs | |
| # raw_text = "" | |
| # for pdf_file in pdf_files: | |
| # pdf_reader = PdfReader(pdf_file.file) | |
| # for page in pdf_reader.pages: | |
| # raw_text += page.extract_text() or "" | |
| # # Split new text into chunks | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
| # new_text_chunks = text_splitter.split_text(raw_text) | |
| # # Load the existing vector store | |
| # embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| # vector_store = FAISS.load_local( | |
| # os.path.join(folder_path, "faiss_index"), | |
| # embeddings, | |
| # allow_dangerous_deserialization=True | |
| # ) | |
| # # Add new text chunks to the existing vector store | |
| # vector_store.add_texts(new_text_chunks) | |
| # # Save the updated vector store | |
| # vector_store.save_local(os.path.join(folder_path, "faiss_index")) | |
| # # Update the documents.pkl file with the new text chunks | |
| # documents_path = os.path.join(folder_path, "documents.pkl") | |
| # if os.path.exists(documents_path): | |
| # with open(documents_path, 'rb') as f: | |
| # existing_text_chunks = pickle.load(f) | |
| # updated_text_chunks = existing_text_chunks + new_text_chunks | |
| # else: | |
| # updated_text_chunks = new_text_chunks | |
| # with open(documents_path, 'wb') as f: | |
| # pickle.dump(updated_text_chunks, f) | |
| # # Upload updated files to Google Drive | |
| # chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| # if not chatbot_folder_id: | |
| # raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.") | |
| # faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) | |
| # if not faiss_index_folder_id: | |
| # raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.") | |
| # # Upload updated index.faiss and index.pkl to Google Drive | |
| # index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") | |
| # faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") | |
| # self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) | |
| # self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) | |
| # return { | |
| # "status": "success", | |
| # "message": f"Chatbot '{folder_name}' updated successfully!" | |
| # } | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Error updating chatbot: {str(e)}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}") | |
| def update_chatbot(self, folder_name: str, pdf_files: List[UploadFile]): | |
| """Update an existing chatbot by appending new data (PDF files).""" | |
| try: | |
| # Validate input | |
| if not pdf_files: | |
| raise HTTPException(status_code=400, detail="At least one PDF file is required.") | |
| # Verify the Google Drive structure | |
| logger.info("Verifying Google Drive structure...") | |
| is_valid, message = self.drive_service.verify_drive_structure(folder_name) | |
| if not is_valid: | |
| raise HTTPException(status_code=404, detail=f"Invalid Drive structure: {message}") | |
| # Download existing model files from Google Drive | |
| logger.info(f"Downloading existing model files for: {folder_name}") | |
| self.drive_service.download_model_files_from_subfolder( | |
| parent_folder_id=GOOGLE_DRIVE_FOLDER_ID, | |
| subfolder_name=folder_name | |
| ) | |
| # Verify files exist after download | |
| folder_path = os.path.join(BASE_MODEL_PATH, folder_name) | |
| if not self.verify_files_exist(folder_name): | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Required files not found after download. Please check if the chatbot exists in Google Drive." | |
| ) | |
| # Process new PDFs | |
| raw_text = "" | |
| for pdf_file in pdf_files: | |
| pdf_reader = PdfReader(pdf_file.file) | |
| for page in pdf_reader.pages: | |
| raw_text += page.extract_text() or "" | |
| # Split new text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
| new_text_chunks = text_splitter.split_text(raw_text) | |
| # Load the existing vector store | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| vector_store_path = os.path.join(folder_path, "faiss_index") | |
| if os.path.exists(vector_store_path): | |
| # Load the existing FAISS index | |
| logger.info("Loading existing FAISS index...") | |
| vector_store = FAISS.load_local( | |
| vector_store_path, | |
| embeddings, | |
| allow_dangerous_deserialization=True | |
| ) | |
| else: | |
| # If no existing FAISS index, create a new one | |
| logger.info("No existing FAISS index found. Creating a new one...") | |
| vector_store = FAISS.from_texts(new_text_chunks, embedding=embeddings) | |
| # Append new text chunks to the existing vector store | |
| logger.info("Appending new text chunks to the vector store...") | |
| vector_store.add_texts(new_text_chunks) | |
| # Save the updated vector store | |
| logger.info("Saving the updated vector store...") | |
| vector_store.save_local(vector_store_path) | |
| # Update the documents.pkl file with the new text chunks | |
| documents_path = os.path.join(folder_path, "documents.pkl") | |
| if os.path.exists(documents_path): | |
| logger.info("Loading existing documents.pkl...") | |
| with open(documents_path, 'rb') as f: | |
| existing_text_chunks = pickle.load(f) | |
| updated_text_chunks = existing_text_chunks + new_text_chunks | |
| else: | |
| logger.info("No existing documents.pkl found. Creating a new one...") | |
| updated_text_chunks = new_text_chunks | |
| logger.info("Saving updated documents.pkl...") | |
| with open(documents_path, 'wb') as f: | |
| pickle.dump(updated_text_chunks, f) | |
| # Upload updated files to Google Drive | |
| logger.info("Uploading updated files to Google Drive...") | |
| chatbot_folder_id = self.drive_service.find_folder_by_name(folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| if not chatbot_folder_id: | |
| raise HTTPException(status_code=404, detail=f"Chatbot '{folder_name}' not found in Google Drive.") | |
| faiss_index_folder_id = self.drive_service.find_folder_by_name("faiss_index", chatbot_folder_id) | |
| if not faiss_index_folder_id: | |
| raise HTTPException(status_code=404, detail=f"'faiss_index' folder not found for chatbot '{folder_name}'.") | |
| # Upload updated index.faiss and index.pkl to Google Drive | |
| index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") | |
| faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") | |
| self.drive_service.upload_file_to_google_drive(faiss_index_path, faiss_index_folder_id) | |
| self.drive_service.upload_file_to_google_drive(index_pkl_path, faiss_index_folder_id) | |
| return { | |
| "status": "success", | |
| "message": f"Chatbot '{folder_name}' updated successfully!" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error updating chatbot: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to update chatbot: {str(e)}") | |
| def verify_files_exist(self, folder_name: str): | |
| """Verify that necessary files exist before operations.""" | |
| base_path = os.path.join(BASE_MODEL_PATH, folder_name, "faiss_index") | |
| files_to_check = ["index.faiss", "index.pkl"] | |
| logger.info(f"Verifying files in: {base_path}") | |
| # First check if the directory exists | |
| if not os.path.exists(base_path): | |
| logger.error(f"Directory not found: {base_path}") | |
| return False | |
| for file in files_to_check: | |
| file_path = os.path.join(base_path, file) | |
| if not os.path.exists(file_path): | |
| logger.error(f"Required file not found: {file_path}") | |
| return False | |
| logger.info("All required files found") | |
| return True | |