Spaces:
Running
Running
| # app.py | |
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form | |
| from typing import List # Import List from typing | |
| from schemas import LoadModelRequest, ChatRequest | |
| from drive_service import DriveService | |
| from chatbot_service import ChatbotService | |
| from model_service import ModelService | |
| from config import BASE_MODEL_PATH, CUSTOM_CHATBOTS_DIR | |
| from config import GOOGLE_DRIVE_FOLDER_ID # Import GOOGLE_DRIVE_FOLDER_ID | |
| import os | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Initialize services | |
| drive_service = DriveService() | |
| chatbot_service = ChatbotService() | |
| model_service = ModelService() | |
| async def startup_event(): | |
| """Initialize necessary components on startup.""" | |
| try: | |
| logger.info("Application started successfully") | |
| except Exception as e: | |
| logger.error(f"Startup failed: {str(e)}") | |
| raise | |
| async def load_model(request: LoadModelRequest): | |
| return model_service.load_model(request.model_name, request.temperature) | |
| async def chat_with_model(model_name: str, request: ChatRequest): | |
| return model_service.chat_with_model(model_name, request.question) | |
| async def create_chatbot( | |
| folder_name: str = Form(...), | |
| pdf_files: List[UploadFile] = File(...) # Now List is imported | |
| ): | |
| return chatbot_service.create_chatbot(folder_name, pdf_files) | |
| async def health_check(): | |
| return {"status": "healthy"} | |
| async def root(): | |
| return {"message": "API is running"} | |
| async def list_available_models(): | |
| try: | |
| models = [name for name in os.listdir(BASE_MODEL_PATH) if os.path.isdir(os.path.join(BASE_MODEL_PATH, name))] | |
| return { | |
| "status": "success", | |
| "models": models | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing models: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Failed to list available models") | |
| async def list_models(): | |
| """List all models (chatbots) available in Google Drive.""" | |
| try: | |
| # List all folders in the Google Drive folder | |
| folders = drive_service.list_files_in_folder(GOOGLE_DRIVE_FOLDER_ID) | |
| models = [folder['name'] for folder in folders if folder['mimeType'] == 'application/vnd.google-apps.folder'] | |
| return { | |
| "status": "success", | |
| "models": models | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing models: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to list models: {str(e)}") | |
| async def update_chatbot( | |
| folder_name: str = Form(...), | |
| pdf_files: List[UploadFile] = File(...) | |
| ): | |
| """Update an existing chatbot by appending new data (PDF files).""" | |
| return chatbot_service.update_chatbot(folder_name, pdf_files) | |
| # import os | |
| # import io | |
| # import json | |
| # from typing import Dict, List | |
| # from fastapi import FastAPI, HTTPException, UploadFile, File,Form | |
| # from pydantic import BaseModel | |
| # from langchain_community.vectorstores import FAISS # Updated import | |
| # from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI | |
| # from langchain.chains.question_answering import load_qa_chain | |
| # from langchain.prompts import PromptTemplate | |
| # from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| # from langchain_community.document_loaders import UnstructuredURLLoader | |
| # from PyPDF2 import PdfReader | |
| # import pickle | |
| # from typing import List, Optional | |
| # from pydantic import BaseModel,HttpUrl | |
| # import uvicorn | |
| # import logging | |
| # from googleapiclient.http import MediaFileUpload | |
| # from googleapiclient.discovery import build | |
| # from googleapiclient.http import MediaIoBaseDownload | |
| # from google.oauth2.service_account import Credentials | |
| # # Configure logging | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| # app = FastAPI() | |
| # # Constants | |
| # BASE_MODEL_PATH = "/tmp/downloaded_models" # Use /tmp for writable storage in Hugging Face Spaces | |
| # GOOGLE_DRIVE_FOLDER_ID = "1nzACZH5I9_0KC6fAYAxOaRdMVxDMUDL-" # Google Drive folder ID | |
| # CUSTOM_CHATBOTS_DIR = "/tmp/Custom-Chatbots" # Use a writable directory | |
| # # Ensure the custom chatbots directory exists | |
| # os.makedirs(CUSTOM_CHATBOTS_DIR, exist_ok=True) | |
| # # Store loaded models in memory | |
| # loaded_models: Dict[str, dict] = {} | |
| # class LoadModelRequest(BaseModel): | |
| # model_name: str | |
| # temperature: float = 0.7 | |
| # class ChatRequest(BaseModel): | |
| # question: str | |
| # # Function to initialize the Google Drive API | |
| # def initialize_drive_api(): | |
| # try: | |
| # logger.info("Initializing Google Drive API...") | |
| # creds = Credentials.from_service_account_file( | |
| # './gemini-sheet-project-aeec326d036f.json', # Update with your service account file path | |
| # scopes=["https://www.googleapis.com/auth/drive"] | |
| # ) | |
| # service = build('drive', 'v3', credentials=creds) | |
| # logger.info("Google Drive API initialized successfully.") | |
| # return service | |
| # except Exception as e: | |
| # logger.error(f"Error initializing Google Drive API: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to initialize Google Drive API: {e}") | |
| # # Function to list files in a Google Drive folder by folder ID | |
| # def list_files_in_folder(service, folder_id): | |
| # try: | |
| # logger.info(f"Listing files in folder with ID: {folder_id}...") | |
| # results = service.files().list( | |
| # q=f"'{folder_id}' in parents", # List files in the specified folder | |
| # fields="files(id, name, mimeType)" | |
| # ).execute() | |
| # files = results.get('files', []) | |
| # logger.info(f"Found {len(files)} files in the folder.") | |
| # return files | |
| # except Exception as e: | |
| # logger.error(f"Error listing files: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to list files in folder: {e}") | |
| # # Function to download a file directly using the Google Drive API | |
| # def download_file(service, file_id, file_name, folder_name): | |
| # try: | |
| # # Ensure the parent subfolder exists | |
| # subfolder_path = os.path.join(BASE_MODEL_PATH, folder_name) | |
| # os.makedirs(subfolder_path, exist_ok=True) | |
| # logger.info(f"Starting download for file: {file_name} with ID: {file_id}...") | |
| # request = service.files().get_media(fileId=file_id) | |
| # fh = io.FileIO(os.path.join(subfolder_path, file_name), 'wb') | |
| # downloader = MediaIoBaseDownload(fh, request) | |
| # done = False | |
| # while done is False: | |
| # status, done = downloader.next_chunk() | |
| # logger.info(f"Download {file_name} {int(status.progress() * 100)}%.") | |
| # logger.info(f"Downloaded {file_name} successfully.") | |
| # except Exception as e: | |
| # logger.error(f"Error downloading file {file_name}: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to download file: {e}") | |
| # # Function to download model files from a subfolder in Google Drive | |
| # def download_model_files_from_subfolder(service, parent_folder_id, subfolder_name, target_files=['index.faiss', 'index.pkl']): | |
| # logger.info(f"Starting the process to download model files from the subfolder '{subfolder_name}'...") | |
| # # List all files and folders inside the parent folder | |
| # files = list_files_in_folder(service, parent_folder_id) | |
| # # Search for the specific subfolder by name | |
| # subfolder_id = None | |
| # for file in files: | |
| # if file['name'] == subfolder_name and file['mimeType'] == 'application/vnd.google-apps.folder': | |
| # subfolder_id = file['id'] | |
| # logger.info(f"Found subfolder: {subfolder_name} with ID: {subfolder_id}") | |
| # break | |
| # if subfolder_id: | |
| # # Now look for 'faiss_index' inside the subfolder | |
| # subfolder_files = list_files_in_folder(service, subfolder_id) | |
| # for subfolder_file in subfolder_files: | |
| # if subfolder_file['name'] == 'faiss_index' and subfolder_file['mimeType'] == 'application/vnd.google-apps.folder': | |
| # logger.info(f"Found 'faiss_index' folder in {subfolder_name}. Listing files...") | |
| # # List files in 'faiss_index' folder | |
| # faiss_index_files = list_files_in_folder(service, subfolder_file['id']) | |
| # for faiss_file in faiss_index_files: | |
| # if any(target in faiss_file['name'] for target in target_files): | |
| # logger.info(f"Found target file: {faiss_file['name']}") | |
| # download_file(service, faiss_file['id'], faiss_file['name'], subfolder_name) | |
| # else: | |
| # logger.info(f"Skipping file {faiss_file['name']} as it doesn't match target files.") | |
| # else: | |
| # logger.error(f"Subfolder '{subfolder_name}' not found in parent folder.") | |
| # raise HTTPException(status_code=404, detail=f"Subfolder '{subfolder_name}' not found in Google Drive.") | |
| # from googleapiclient.http import MediaFileUpload # Add this import | |
| # def upload_file_to_google_drive(service, file_path, folder_id): | |
| # """Upload a file to a specific folder in Google Drive.""" | |
| # try: | |
| # file_name = os.path.basename(file_path) | |
| # file_metadata = { | |
| # 'name': file_name, | |
| # 'parents': [folder_id] # Folder ID where the file will be uploaded | |
| # } | |
| # # Upload the file | |
| # media = MediaFileUpload(file_path, resumable=True) | |
| # file = service.files().create( | |
| # body=file_metadata, | |
| # media_body=media, | |
| # fields='id' | |
| # ).execute() | |
| # logger.info(f"File '{file_name}' uploaded successfully to Google Drive with ID: {file.get('id')}") | |
| # return file.get('id') | |
| # except Exception as e: | |
| # logger.error(f"Error uploading file to Google Drive: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to upload file to Google Drive: {e}") | |
| # def create_folder_in_google_drive(service, folder_name, parent_folder_id): | |
| # """Create a folder in Google Drive.""" | |
| # try: | |
| # folder_metadata = { | |
| # 'name': folder_name, | |
| # 'mimeType': 'application/vnd.google-apps.folder', | |
| # 'parents': [parent_folder_id] # Parent folder ID | |
| # } | |
| # folder = service.files().create( | |
| # body=folder_metadata, | |
| # fields='id' | |
| # ).execute() | |
| # logger.info(f"Folder '{folder_name}' created successfully with ID: {folder.get('id')}") | |
| # return folder.get('id') | |
| # except Exception as e: | |
| # logger.error(f"Error creating folder in Google Drive: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to create folder in Google Drive: {e}") | |
| # def find_folder_by_name(service, folder_name, parent_folder_id): | |
| # """Find a folder by name in a parent folder.""" | |
| # try: | |
| # results = service.files().list( | |
| # q=f"name='{folder_name}' and '{parent_folder_id}' in parents and mimeType='application/vnd.google-apps.folder'", | |
| # fields="files(id, name)" | |
| # ).execute() | |
| # folders = results.get('files', []) | |
| # if folders: | |
| # return folders[0]['id'] # Return the first matching folder ID | |
| # return None | |
| # except Exception as e: | |
| # logger.error(f"Error finding folder by name: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to find folder by name: {e}") | |
| # import requests | |
| # def validate_url(url): | |
| # try: | |
| # response = requests.get(url, timeout=10) | |
| # response.raise_for_status() | |
| # return True | |
| # except Exception: | |
| # return False | |
| # def process_urls(urls): | |
| # try: | |
| # loader = UnstructuredURLLoader(urls=urls) | |
| # data = loader.load() | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
| # return text_splitter.split_documents(data) | |
| # except Exception as e: | |
| # st.error(f"Error processing URLs: {e}") | |
| # return [] | |
| # @app.on_event("startup") | |
| # async def startup_event(): | |
| # """Initialize necessary components on startup.""" | |
| # try: | |
| # logger.info("Application started successfully") | |
| # except Exception as e: | |
| # logger.error(f"Startup failed: {str(e)}") | |
| # raise | |
| # @app.post("/load-model/") | |
| # async def load_model(request: LoadModelRequest): | |
| # """Load a model from Google Drive.""" | |
| # try: | |
| # # Initialize Google Drive API | |
| # service = initialize_drive_api() | |
| # # Download model files from Google Drive | |
| # download_model_files_from_subfolder( | |
| # service=service, | |
| # parent_folder_id=GOOGLE_DRIVE_FOLDER_ID, | |
| # subfolder_name=request.model_name | |
| # ) | |
| # # Load the downloaded model | |
| # model_path = os.path.join(BASE_MODEL_PATH, request.model_name) | |
| # # Initialize embeddings and load vector store | |
| # embeddings = GoogleGenerativeAIEmbeddings( | |
| # model="models/embedding-001", | |
| # google_api_key=os.getenv("GOOGLE_API_KEY") | |
| # ) | |
| # # Load the local FAISS index and vector store | |
| # vector_store = FAISS.load_local( | |
| # model_path, | |
| # embeddings, | |
| # allow_dangerous_deserialization=True | |
| # ) | |
| # # Configure the QA chain | |
| # chain = configure_chain(request.temperature) | |
| # # Store the loaded model in memory | |
| # loaded_models[request.model_name] = { | |
| # "vector_store": vector_store, | |
| # "chain": chain | |
| # } | |
| # return { | |
| # "status": "success", | |
| # "message": f"Model '{request.model_name}' loaded successfully" | |
| # } | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Error loading model: {str(e)}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to load model: {str(e)}") | |
| # def configure_chain(temperature: float) -> load_qa_chain: | |
| # """Configure the QA chain with the updated prompt template.""" | |
| # prompt_template = """ | |
| # You are an AI assistant for SBBU SBA university. Your task is to provide clear, accurate, and helpful responses based on the context provided, as well as to respond to basic greetings and conversational queries. However, if the user makes inappropriate or offensive remarks, you should respond politely and professionally, redirecting the conversation back to helpful topics. | |
| # Instructions: | |
| # 1. **Greeting Responses**: If the user greets you (e.g., "Hello," "Hi," "Hey," "Salam," etc.), respond warmly and politely. Example responses could be: | |
| # - "Hello! How can I assist you today?" | |
| # - "Hi there! How can I help you?" | |
| # - "Salam! What can I do for you today?" | |
| # 2. **Casual and Playful Inquiries**: If the user says something playful or informal like "I kiss you" or similar, acknowledge it politely but redirect the conversation back to the main topic. Example: | |
| # - "Thank you for the kind words! How can I assist you further?" | |
| # - "I appreciate your enthusiasm! How can I help you today?" | |
| # 3. **Inappropriate or Offensive Remarks**: If the user makes inappropriate, disrespectful, or offensive comments, such as offensive language or sexually explicit remarks, respond politely but firmly, maintaining professionalism: | |
| # - "I strive to maintain a respectful conversation. How can I assist you with your queries?" | |
| # - "Let's keep the conversation respectful. How can I help you today?" | |
| # - "I apologize, but I cannot engage in that kind of discussion. Please ask a relevant question related to the university." | |
| # 4. **Contextual Responses**: | |
| # - If the context contains relevant information to the question, provide a clear and direct answer. | |
| # - If the context only provides partial information, provide a helpful response based on available data and related details. | |
| # - If the context has no relevant information, respond with: "I apologize, but I don't have specific information about that. Could you please ask something else about the university?" | |
| # 5. **Accuracy and Clarity**: Ensure your responses are clear, concise, and accurate. Avoid unnecessary details or over-explanation. | |
| # 6. **Clarification**: If the user's question is unclear or lacks sufficient context, ask for clarification. For example: | |
| # - "Could you please clarify your question?" | |
| # - "I'm not sure I understand. Can you rephrase your question?" | |
| # Context Information: | |
| # --------------------- | |
| # {context} | |
| # Question: | |
| # {question} | |
| # Response: | |
| # Provide a friendly, clear, and direct response based on the context. Always aim to be helpful, especially for greetings or casual inquiries, and suggest follow-up questions or clarifications if needed. | |
| # no preamble | |
| # """ | |
| # try: | |
| # model = ChatGoogleGenerativeAI( | |
| # model="gemini-pro", | |
| # temperature=temperature, | |
| # google_api_key=os.getenv("GOOGLE_API_KEY") | |
| # ) | |
| # prompt = PromptTemplate( | |
| # template=prompt_template, | |
| # input_variables=["context", "question"] | |
| # ) | |
| # return load_qa_chain(model, chain_type="stuff", prompt=prompt) | |
| # except Exception as e: | |
| # logger.error(f"Error configuring chain: {str(e)}") | |
| # raise HTTPException(status_code=500, detail="Failed to configure model chain") | |
| # @app.post("/chat/{model_name}") | |
| # async def chat_with_model(model_name: str, request: ChatRequest): | |
| # """Generate a response using the loaded model.""" | |
| # if model_name not in loaded_models: | |
| # raise HTTPException( | |
| # status_code=404, | |
| # detail=f"Model '{model_name}' not loaded. Please load it first." | |
| # ) | |
| # try: | |
| # model_data = loaded_models[model_name] | |
| # docs = model_data["vector_store"].similarity_search(request.question) | |
| # response = model_data["chain"]( | |
| # { | |
| # "input_documents": docs, | |
| # "question": request.question | |
| # }, | |
| # return_only_outputs=True | |
| # ) | |
| # return { | |
| # "status": "success", | |
| # "response": response["output_text"] | |
| # } | |
| # except Exception as e: | |
| # logger.error(f"Error generating response: {str(e)}") | |
| # raise HTTPException( | |
| # status_code=500, | |
| # detail=f"Failed to generate response: {str(e)}") | |
| # @app.get("/health") | |
| # async def health_check(): | |
| # """Health check endpoint.""" | |
| # return {"status": "healthy"} | |
| # @app.get("/") | |
| # async def root(): | |
| # """Root endpoint to verify API is running.""" | |
| # return {"message": "API is running"} | |
| # @app.get("/available-models") | |
| # async def list_available_models(): | |
| # """List available models in the local directory.""" | |
| # try: | |
| # models = [name for name in os.listdir(BASE_MODEL_PATH) if os.path.isdir(os.path.join(BASE_MODEL_PATH, name))] | |
| # return { | |
| # "status": "success", | |
| # "models": models | |
| # } | |
| # except Exception as e: | |
| # logger.error(f"Error listing models: {str(e)}") | |
| # raise HTTPException(status_code=500, detail="Failed to list available models") | |
| # @app.post("/create-chatbot/") | |
| # async def create_chatbot( | |
| # folder_name: str, | |
| # pdf_files: List[UploadFile] = File(...) # Only PDF files are accepted | |
| # ): | |
| # """Create a new chatbot using only PDF files.""" | |
| # try: | |
| # # Validate input | |
| # if not pdf_files: | |
| # raise HTTPException(status_code=400, detail="At least one PDF file is required.") | |
| # # Create folder for the chatbot | |
| # folder_path = os.path.join(CUSTOM_CHATBOTS_DIR, folder_name) | |
| # os.makedirs(folder_path, exist_ok=True) | |
| # # Process PDFs | |
| # raw_text = "" | |
| # for pdf_file in pdf_files: | |
| # pdf_reader = PdfReader(pdf_file.file) | |
| # for page in pdf_reader.pages: | |
| # raw_text += page.extract_text() or "" | |
| # # Split text into chunks | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
| # text_chunks = text_splitter.split_text(raw_text) | |
| # # Create vector store | |
| # embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| # vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) | |
| # # Save the vector store and documents | |
| # vector_store.save_local(os.path.join(folder_path, "faiss_index")) | |
| # with open(os.path.join(folder_path, "documents.pkl"), 'wb') as f: | |
| # pickle.dump(text_chunks, f) | |
| # # Upload files to Google Drive | |
| # service = initialize_drive_api() | |
| # # Step 1: Create or find the folder with the provided `folder_name` | |
| # chatbot_folder_id = find_folder_by_name(service, folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| # if not chatbot_folder_id: | |
| # chatbot_folder_id = create_folder_in_google_drive(service, folder_name, GOOGLE_DRIVE_FOLDER_ID) | |
| # # Step 2: Create or find the `faiss_index` subfolder inside the `folder_name` folder | |
| # faiss_index_folder_id = find_folder_by_name(service, "faiss_index", chatbot_folder_id) | |
| # if not faiss_index_folder_id: | |
| # faiss_index_folder_id = create_folder_in_google_drive(service, "faiss_index", chatbot_folder_id) | |
| # # Step 3: Upload index.faiss and index.pkl to the `faiss_index` subfolder | |
| # index_pkl_path = os.path.join(folder_path, "faiss_index", "index.pkl") | |
| # os.rename(os.path.join(folder_path, "documents.pkl"), index_pkl_path) | |
| # faiss_index_path = os.path.join(folder_path, "faiss_index", "index.faiss") | |
| # upload_file_to_google_drive(service, faiss_index_path, faiss_index_folder_id) | |
| # upload_file_to_google_drive(service, index_pkl_path, faiss_index_folder_id) | |
| # return { | |
| # "status": "success", | |
| # "message": f"Chatbot '{folder_name}' created and files uploaded to Google Drive successfully!" | |
| # } | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Error creating chatbot: {str(e)}") | |
| # raise HTTPException(status_code=500, detail=f"Failed to create chatbot: {str(e)}") |