diff --git "a/RAG/RAG-1.ipynb" "b/RAG/RAG-1.ipynb" deleted file mode 100644--- "a/RAG/RAG-1.ipynb" +++ /dev/null @@ -1,2454 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "e549bafd-78b1-4a83-80b4-2cb597efff79", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import json\n", - "from google.oauth2 import service_account\n", - "from googleapiclient.discovery import build\n", - "from googleapiclient.http import MediaIoBaseDownload\n", - "import openai\n", - "from dotenv import load_dotenv, dotenv_values\n", - "import io" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "04361235-7896-4439-9d04-1400e043528b", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "load_dotenv()" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "9ae411c5-c84b-4bfd-b089-69b5c5ba70ae", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "OPENAI_API_KEY\n", - "ANTHROPIC_API_KEY\n", - "GOOGLE_SERVICE_ACCOUNT_FILE\n" - ] - } - ], - "source": [ - "config = dotenv_values(\".env\")\n", - "for key in config.keys():\n", - " print(key)" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "7622a0e4-64a6-4848-b588-bd65d56c55e0", - "metadata": {}, - "outputs": [], - "source": [ - "from openai import OpenAI\n", - "openai.api_key = os.getenv('OPENAI_API_KEY')\n", - "openai = OpenAI(api_key = openai.api_key)" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "2ead7c59-1f11-478a-bb69-2928ddc38901", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Hello! How can I assist you today?\n" - ] - } - ], - "source": [ - "response = openai.chat.completions.create(\n", - " model = \"gpt-4o-mini\",\n", - " messages = [\n", - " {\"role\":\"system\", \"content\":\"you are a helpful assistant\"},\n", - " {\"role\":\"user\", \"content\":\"hi\"}\n", - " ])\n", - "\n", - "reply = response.choices[0].message.content\n", - "print(reply)" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "b3bcbada-6a72-4cc8-a166-d2e596cd1fc4", - "metadata": {}, - "outputs": [], - "source": [ - "service_account_file_path = os.getenv(\"GOOGLE_SERVICE_ACCOUNT_FILE\")" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "81c59a6d-0831-4bff-b3fc-fbe3d4cc1e31", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "openai activated\n", - "service_account_file_path activated\n" - ] - } - ], - "source": [ - "#troubleshoot\n", - "if openai is None:\n", - " print(\"openai not activated\")\n", - "else: \n", - " print (\"openai activated\")\n", - "\n", - "if service_account_file_path is None:\n", - " print(\"service_account_file_path not activated\")\n", - "else: \n", - " print (\"service_account_file_path activated\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a70f32aa-9e43-4175-8cca-d6af723aef91", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "0efe325d-badd-4533-affe-47d572ef128e", - "metadata": {}, - "outputs": [], - "source": [ - "class GPTDriveIntegration:\n", - " def __init__(self):\n", - " # Initialize Google Drive API\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n", - " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", - " )\n", - " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", - " \n", - " # Initialize OpenAI\n", - " openai.api_key = os.getenv('OPENAI_API_KEY')\n", - " \n", - " def search_files(self, query, file_types=None):\n", - " \"\"\"Search for files in Google Drive\"\"\"\n", - " search_query = f\"name contains '{query}'\"\n", - " \n", - " if file_types:\n", - " type_queries = []\n", - " for file_type in file_types:\n", - " if file_type.lower() == 'pdf':\n", - " type_queries.append(\"mimeType='application/pdf'\")\n", - " elif file_type.lower() in ['doc', 'docx']:\n", - " type_queries.append(\"mimeType contains 'document'\")\n", - " elif file_type.lower() in ['xls', 'xlsx']:\n", - " type_queries.append(\"mimeType contains 'spreadsheet'\")\n", - " \n", - " if type_queries:\n", - " search_query += f\" and ({' or '.join(type_queries)})\"\n", - " \n", - " results = self.drive_service.files().list(\n", - " q=search_query,\n", - " fields=\"files(id, name, mimeType, size)\"\n", - " ).execute()\n", - " \n", - " return results.get('files', [])\n", - " \n", - " def get_file_content(self, file_id, mime_type):\n", - " \"\"\"Download and extract text content from file\"\"\"\n", - " try:\n", - " if 'text' in mime_type or 'document' in mime_type:\n", - " # For Google Docs, export as plain text\n", - " if 'document' in mime_type:\n", - " request = self.drive_service.files().export_media(\n", - " fileId=file_id, mimeType='text/plain'\n", - " )\n", - " else:\n", - " request = self.drive_service.files().get_media(fileId=file_id)\n", - " \n", - " file_content = io.BytesIO()\n", - " downloader = MediaIoBaseDownload(file_content, request)\n", - " done = False\n", - " while done is False:\n", - " status, done = downloader.next_chunk()\n", - " \n", - " return file_content.getvalue().decode('utf-8')\n", - " \n", - " elif 'spreadsheet' in mime_type:\n", - " # For Google Sheets, export as CSV\n", - " request = self.drive_service.files().export_media(\n", - " fileId=file_id, mimeType='text/csv'\n", - " )\n", - " file_content = io.BytesIO()\n", - " downloader = MediaIoBaseDownload(file_content, request)\n", - " done = False\n", - " while done is False:\n", - " status, done = downloader.next_chunk()\n", - " \n", - " return file_content.getvalue().decode('utf-8')\n", - " \n", - " elif mime_type == 'application/pdf':\n", - " # For PDF files, download binary content and extract text\n", - " request = self.drive_service.files().get_media(fileId=file_id)\n", - " file_content = io.BytesIO()\n", - " downloader = MediaIoBaseDownload(file_content, request)\n", - " done = False\n", - " while done is False:\n", - " status, done = downloader.next_chunk()\n", - " \n", - " # Extract text from PDF using PyPDF2 or pdfplumber\n", - " file_content.seek(0) # Reset buffer position\n", - " \n", - " # Option 1: Using PyPDF2\n", - " try:\n", - " import PyPDF2\n", - " pdf_reader = PyPDF2.PdfReader(file_content)\n", - " text = \"\"\n", - " for page in pdf_reader.pages:\n", - " text += page.extract_text() + \"\\n\"\n", - " return text\n", - " except ImportError:\n", - " pass\n", - " \n", - " # Option 2: Using pdfplumber (better for complex PDFs)\n", - " try:\n", - " import pdfplumber\n", - " text = \"\"\n", - " with pdfplumber.open(file_content) as pdf:\n", - " for page in pdf.pages:\n", - " page_text = page.extract_text()\n", - " if page_text:\n", - " text += page_text + \"\\n\"\n", - " return text\n", - " except ImportError:\n", - " pass\n", - " \n", - " # Option 3: Using pymupdf (fitz) - fastest option\n", - " try:\n", - " import fitz # pymupdf\n", - " pdf_document = fitz.open(stream=file_content.read(), filetype=\"pdf\")\n", - " text = \"\"\n", - " for page_num in range(pdf_document.page_count):\n", - " page = pdf_document[page_num]\n", - " text += page.get_text() + \"\\n\"\n", - " pdf_document.close()\n", - " return text\n", - " except ImportError:\n", - " pass\n", - " \n", - " return \"PDF text extraction requires PyPDF2, pdfplumber, or pymupdf library\"\n", - " \n", - " else:\n", - " return \"File type not supported for text extraction\"\n", - " \n", - " except Exception as e:\n", - " return f\"Error reading file: {str(e)}\"\n", - " \n", - " def query_gpt_with_context(self, user_query, file_contents):\n", - " \"\"\"Send query to GPT with file context\"\"\"\n", - " context = \"\\n\\n\".join([\n", - " f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n", - " for content in file_contents\n", - " ])\n", - " \n", - " messages = [\n", - " {\n", - " \"role\": \"system\", \n", - " \"content\": \"\"\"\n", - " You are an AI assistant that can analyze documents from Google Drive. \n", - " Use the provided file contents to answer user questions.\"\"\"\n", - " },\n", - " {\n", - " \"role\": \"user\", \n", - " \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n", - " }\n", - " ]\n", - " \n", - " response = openai.chat.completions.create(\n", - " model=\"gpt-4o-mini\",\n", - " messages=messages,\n", - " max_tokens=1000\n", - " )\n", - " \n", - " return response.choices[0].message.content\n", - " \n", - " def process_query(self, user_query, search_terms=None):\n", - " \"\"\"Main function to process user queries\"\"\"\n", - " # Extract search terms from query if not provided\n", - " if not search_terms:\n", - " search_terms = user_query.split()[:3] # Simple extraction\n", - " \n", - " # Search for relevant files\n", - " files = []\n", - " for term in search_terms:\n", - " files.extend(self.search_files(term))\n", - " \n", - " # Remove duplicates\n", - " unique_files = {f['id']: f for f in files}.values()\n", - " \n", - " # Get content from top 3 most relevant files\n", - " file_contents = []\n", - " for file in list(unique_files)[:3]:\n", - " content = self.get_file_content(file['id'], file['mimeType'])\n", - " file_contents.append({\n", - " 'name': file['name'],\n", - " 'text': content\n", - " })\n", - " \n", - " # Query GPT with context\n", - " if file_contents:\n", - " response = self.query_gpt_with_context(user_query, file_contents)\n", - " return {\n", - " 'answer': response,\n", - " 'sources': [f['name'] for f in file_contents]\n", - " }\n", - " else:\n", - " return {\n", - " 'answer': \"No relevant files found in your Google Drive.\",\n", - " 'sources': []\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "3c2c1ccf-9ade-482d-a170-978e97bc1c08", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Answer: The transmission of nerves is called \"nerve conduction.\" This process involves the propagation of electrical impulses along the nerve fibers, allowing for communication between different parts of the body.\n", - "Sources: ['Neuro Note Dr Clement.docx']\n" - ] - } - ], - "source": [ - "if __name__ == \"__main__\":\n", - " integration = GPTDriveIntegration()\n", - " \n", - " # Test query\n", - " result = integration.process_query(\n", - " \"The transmission of nerves is called?\",\n", - " search_terms=[\"nerves\", \"Dr Clement\"]\n", - " )\n", - " \n", - " print(\"Answer:\", result['answer'])\n", - " print(\"Sources:\", result['sources'])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "120e7c93-b38a-4c89-8e76-5b0170d22548", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "61e976a6-1fe4-45de-a63d-3cf849eedbe1", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "c514b8af-1af4-4497-9044-139a71aedd36", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "* Running on local URL: http://127.0.0.1:7860\n", - "* Running on public URL: https://5320171f6af3e2eef9.gradio.live\n", - "\n", - "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n" - ] - }, - { - "data": { - "text/html": [ - "
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Keyboard interruption in main thread... closing server.\n", - "Killing tunnel 127.0.0.1:7860 <> https://5320171f6af3e2eef9.gradio.live\n" - ] - } - ], - "source": [ - "gpt_drive = GPTDriveIntegration()\n", - "\n", - "def process_user_query(query, search_terms_input):\n", - " \"\"\"Process user query and return formatted response\"\"\"\n", - " if not query.strip():\n", - " return \"Please enter a question.\", \"\"\n", - " \n", - " # Parse search terms if provided\n", - " search_terms = None\n", - " # if search_terms_input.strip():\n", - " # search_terms = [term.strip() for term in search_terms_input.split(',')]\n", - " \n", - " # Process the query\n", - " result = gpt_drive.process_query(query, search_terms)\n", - " \n", - " # Format the response\n", - " answer = result['answer']\n", - " sources = result['sources']\n", - " \n", - " sources_text = \"\"\n", - " if sources:\n", - " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n", - " \n", - " return answer, sources_text\n", - "\n", - "def check_setup():\n", - " \"\"\"Check if the APIs are properly configured\"\"\"\n", - " status_messages = []\n", - " \n", - " # Check Google Drive API\n", - " if gpt_drive.drive_initialized:\n", - " status_messages.append(\"✅ Google Drive API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n", - " \n", - " # Check OpenAI API\n", - " if gpt_drive.openai_initialized:\n", - " status_messages.append(\"✅ OpenAI API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n", - " \n", - " return \"\\n\".join(status_messages)\n", - "\n", - "# Create Gradio interface\n", - "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n", - " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n", - " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n", - " \n", - " with gr.Row():\n", - " with gr.Column(scale=2):\n", - " # Main query interface\n", - " with gr.Group():\n", - " gr.Markdown(\"### Ask a Question\")\n", - " query_input = gr.Textbox(\n", - " label=\"Your Question\",\n", - " placeholder=\"Ask me any question about your anatomy books?\",\n", - " lines=3\n", - " )\n", - " \n", - " search_terms_input = gr.Textbox(\n", - " label=\"Search Terms (optional)\",\n", - " placeholder=\"Enter comma-separated terms to search for specific files\",\n", - " lines=1\n", - " )\n", - " \n", - " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n", - " \n", - " # Results section\n", - " with gr.Group():\n", - " gr.Markdown(\"### Answer\")\n", - " answer_output = gr.Textbox(\n", - " label=\"AI Response\",\n", - " lines=10,\n", - " interactive=False\n", - " )\n", - " \n", - " sources_output = gr.Textbox(\n", - " label=\"Sources\",\n", - " lines=3,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Column(scale=1):\n", - " # Status and setup info\n", - " with gr.Group():\n", - " gr.Markdown(\"### System Status\")\n", - " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n", - " status_output = gr.Textbox(\n", - " label=\"API Status\",\n", - " lines=4,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Group():\n", - " gr.Markdown(\"### Setup Instructions\")\n", - " gr.Markdown(\"\"\"\n", - " **Important Notes:**\n", - " 1.Only documents shared with it, it can answer\n", - " \n", - " **File Types Supported:**\n", - " - Google Docs\n", - " - Google Sheets \n", - " - PDF files\n", - " - Text files\n", - " \n", - " **Tips:**\n", - " - Use specific search terms for better results\n", - " - The system searches the top 3 most relevant files\n", - " - Ask clear, specific questions for better answers\n", - " \"\"\")\n", - " \n", - " # Event handlers\n", - " submit_btn.click(\n", - " fn=process_user_query,\n", - " inputs=[query_input, search_terms_input],\n", - " outputs=[answer_output, sources_output]\n", - " )\n", - " \n", - " status_btn.click(\n", - " fn=check_setup,\n", - " outputs=status_output\n", - " )\n", - " \n", - " # Example queries\n", - " with gr.Row():\n", - " gr.Examples(\n", - " examples=[\n", - " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n", - " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n", - " ],\n", - " inputs=[query_input, search_terms_input],\n", - " )\n", - "\n", - "# Launch the app\n", - "if __name__ == \"__main__\":\n", - " app.launch(\n", - " share=True,debug =True)" - ] - }, - { - "cell_type": "code", - "execution_count": 42, - "id": "593c4f63-cdda-46e8-aad6-9f9d7db66615", - "metadata": {}, - "outputs": [], - "source": [ - "#Gradio\n", - "# !pip install gradio" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "859060a9-56f9-49e9-bb41-04ccf7b10d3f", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d893999c-4f3f-4306-8b0b-072e3fbb8236", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "01b70311-ff22-4c88-bf22-64dc441afaab", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\tqdm\\auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", - " from .autonotebook import tqdm as notebook_tqdm\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "* Running on local URL: http://127.0.0.1:7860\n", - "* Running on public URL: https://8fac475cac6193423b.gradio.live\n", - "\n", - "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n" - ] - }, - { - "data": { - "text/html": [ - "
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Keyboard interruption in main thread... closing server.\n", - "Killing tunnel 127.0.0.1:7860 <> https://8fac475cac6193423b.gradio.live\n" - ] - } - ], - "source": [ - "import gradio as gr\n", - "import os\n", - "import io\n", - "import openai\n", - "from google.oauth2 import service_account\n", - "from googleapiclient.discovery import build\n", - "from googleapiclient.http import MediaIoBaseDownload\n", - "\n", - "class GPTDriveIntegration:\n", - " def __init__(self):\n", - " # Initialize Google Drive API\n", - " try:\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n", - " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", - " )\n", - " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", - " self.drive_initialized = True\n", - " except Exception as e:\n", - " self.drive_initialized = False\n", - " self.drive_error = str(e)\n", - " \n", - " # Initialize OpenAI\n", - " try:\n", - " openai.api_key = os.getenv('OPENAI_API_KEY')\n", - " self.openai_initialized = True\n", - " except Exception as e:\n", - " self.openai_initialized = False\n", - " self.openai_error = str(e)\n", - " \n", - " def search_files(self, query, file_types=None):\n", - " \"\"\"Search for files in Google Drive\"\"\"\n", - " if not self.drive_initialized:\n", - " return []\n", - " \n", - " search_query = f\"name contains '{query}'\"\n", - " \n", - " if file_types:\n", - " type_queries = []\n", - " for file_type in file_types:\n", - " if file_type.lower() == 'pdf':\n", - " type_queries.append(\"mimeType='application/pdf'\")\n", - " elif file_type.lower() in ['doc', 'docx']:\n", - " type_queries.append(\"mimeType contains 'document'\")\n", - " elif file_type.lower() in ['xls', 'xlsx']:\n", - " type_queries.append(\"mimeType contains 'spreadsheet'\")\n", - " \n", - " if type_queries:\n", - " search_query += f\" and ({' or '.join(type_queries)})\"\n", - " \n", - " try:\n", - " results = self.drive_service.files().list(\n", - " q=search_query,\n", - " fields=\"files(id, name, mimeType, size)\"\n", - " ).execute()\n", - " \n", - " return results.get('files', [])\n", - " except Exception as e:\n", - " return []\n", - " \n", - " def get_file_content(self, file_id, mime_type):\n", - " \"\"\"Download and extract text content from file\"\"\"\n", - " try:\n", - " if 'text' in mime_type or 'document' in mime_type:\n", - " # For Google Docs, export as plain text\n", - " if 'document' in mime_type:\n", - " request = self.drive_service.files().export_media(\n", - " fileId=file_id, mimeType='text/plain'\n", - " )\n", - " else:\n", - " request = self.drive_service.files().get_media(fileId=file_id)\n", - " \n", - " file_content = io.BytesIO()\n", - " downloader = MediaIoBaseDownload(file_content, request)\n", - " done = False\n", - " while done is False:\n", - " status, done = downloader.next_chunk()\n", - " \n", - " return file_content.getvalue().decode('utf-8')\n", - " \n", - " elif 'spreadsheet' in mime_type:\n", - " # For Google Sheets, export as CSV\n", - " request = self.drive_service.files().export_media(\n", - " fileId=file_id, mimeType='text/csv'\n", - " )\n", - " file_content = io.BytesIO()\n", - " downloader = MediaIoBaseDownload(file_content, request)\n", - " done = False\n", - " while done is False:\n", - " status, done = downloader.next_chunk()\n", - " \n", - " return file_content.getvalue().decode('utf-8')\n", - " \n", - " else:\n", - " return \"File type not supported for text extraction\"\n", - " \n", - " except Exception as e:\n", - " return f\"Error reading file: {str(e)}\"\n", - " \n", - " def query_gpt_with_context(self, user_query, file_contents):\n", - " \"\"\"Send query to GPT with file context\"\"\"\n", - " context = \"\\n\\n\".join([\n", - " f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n", - " for content in file_contents\n", - " ])\n", - " \n", - " messages = [\n", - " {\n", - " \"role\": \"system\", \n", - " \"content\": \"\"\"\n", - " You are an AI assistant that can analyze anatomy documents from Google Drive. \n", - " Use the provided file contents to answer user questions. Always answer straightforwardly. Don't Hallucinate.\n", - " Always end with 'Do you have any other question?'\n", - " \n", - " \"\"\"\n", - " },\n", - " {\n", - " \"role\": \"user\", \n", - " \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n", - " }\n", - " ]\n", - " \n", - " try:\n", - " response = openai.chat.completions.create(\n", - " model=\"gpt-4o-mini\",\n", - " messages=messages,\n", - " max_tokens=1000\n", - " )\n", - " \n", - " return response.choices[0].message.content\n", - " except Exception as e:\n", - " return f\"Error querying OpenAI: {str(e)}\"\n", - " \n", - " def process_query(self, user_query, search_terms=None):\n", - " \"\"\"Main function to process user queries\"\"\"\n", - " # Check if services are initialized\n", - " if not self.drive_initialized:\n", - " return {\n", - " 'answer': f\"Google Drive API not initialized: {getattr(self, 'drive_error', 'Unknown error')}\",\n", - " 'sources': []\n", - " }\n", - " \n", - " if not self.openai_initialized:\n", - " return {\n", - " 'answer': f\"OpenAI API not initialized: {getattr(self, 'openai_error', 'Unknown error')}\",\n", - " 'sources': []\n", - " }\n", - " \n", - " # Extract search terms from query if not provided\n", - " if not search_terms:\n", - " search_terms = user_query.split()[:3] # Simple extraction\n", - " \n", - " # Search for relevant files\n", - " files = []\n", - " for term in search_terms:\n", - " files.extend(self.search_files(term))\n", - " \n", - " # Remove duplicates\n", - " unique_files = {f['id']: f for f in files}.values()\n", - " \n", - " # Get content from top 3 most relevant files\n", - " file_contents = []\n", - " for file in list(unique_files)[:3]:\n", - " content = self.get_file_content(file['id'], file['mimeType'])\n", - " file_contents.append({\n", - " 'name': file['name'],\n", - " 'text': content\n", - " })\n", - " \n", - " # Query GPT with context\n", - " if file_contents:\n", - " response = self.query_gpt_with_context(user_query, file_contents)\n", - " return {\n", - " 'answer': response,\n", - " 'sources': [f['name'] for f in file_contents]\n", - " }\n", - " else:\n", - " return {\n", - " 'answer': \"No relevant files found in your Google Drive.\",\n", - " 'sources': []\n", - " }\n", - "\n", - "# Initialize the GPT Drive Integration\n", - "gpt_drive = GPTDriveIntegration()\n", - "\n", - "def process_user_query(query, search_terms_input):\n", - " \"\"\"Process user query and return formatted response\"\"\"\n", - " if not query.strip():\n", - " return \"Please enter a question.\", \"\"\n", - " \n", - " # Parse search terms if provided\n", - " search_terms = None\n", - " if search_terms_input.strip():\n", - " search_terms = [term.strip() for term in search_terms_input.split(',')]\n", - " \n", - " # Process the query\n", - " result = gpt_drive.process_query(query, search_terms)\n", - " \n", - " # Format the response\n", - " answer = result['answer']\n", - " sources = result['sources']\n", - " \n", - " sources_text = \"\"\n", - " if sources:\n", - " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n", - " \n", - " return answer, sources_text\n", - "\n", - "def check_setup():\n", - " \"\"\"Check if the APIs are properly configured\"\"\"\n", - " status_messages = []\n", - " \n", - " # Check Google Drive API\n", - " if gpt_drive.drive_initialized:\n", - " status_messages.append(\"✅ Google Drive API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n", - " \n", - " # Check OpenAI API\n", - " if gpt_drive.openai_initialized:\n", - " status_messages.append(\"✅ OpenAI API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n", - " \n", - " return \"\\n\".join(status_messages)\n", - "\n", - "# Create Gradio interface\n", - "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n", - " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n", - " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n", - " \n", - " with gr.Row():\n", - " with gr.Column(scale=2):\n", - " # Main query interface\n", - " with gr.Group():\n", - " gr.Markdown(\"### Ask a Question\")\n", - " query_input = gr.Textbox(\n", - " label=\"Your Question\",\n", - " placeholder=\"Ask me any question about your anatomy books?\",\n", - " lines=3\n", - " )\n", - " \n", - " search_terms_input = gr.Textbox(\n", - " label=\"Search Terms (optional)\",\n", - " placeholder=\"Enter comma-separated terms to search for specific files\",\n", - " lines=1\n", - " )\n", - " \n", - " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n", - " \n", - " # Results section\n", - " with gr.Group():\n", - " gr.Markdown(\"### Answer\")\n", - " answer_output = gr.Textbox(\n", - " label=\"AI Response\",\n", - " lines=10,\n", - " interactive=False\n", - " )\n", - " \n", - " sources_output = gr.Textbox(\n", - " label=\"Sources\",\n", - " lines=3,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Column(scale=1):\n", - " # Status and setup info\n", - " with gr.Group():\n", - " gr.Markdown(\"### System Status\")\n", - " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n", - " status_output = gr.Textbox(\n", - " label=\"API Status\",\n", - " lines=4,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Group():\n", - " gr.Markdown(\"### Setup Instructions\")\n", - " gr.Markdown(\"\"\"\n", - " **Important Notes:**\n", - " 1.Only documents shared with it, it can answer\n", - " \n", - " **File Types Supported:**\n", - " - Google Docs\n", - " - Google Sheets \n", - " - PDF files\n", - " - Text files\n", - " \n", - " **Tips:**\n", - " - Use specific search terms for better results\n", - " - The system searches the top 3 most relevant files\n", - " - Ask clear, specific questions for better answers\n", - " \"\"\")\n", - " \n", - " # Event handlers\n", - " submit_btn.click(\n", - " fn=process_user_query,\n", - " inputs=[query_input, search_terms_input],\n", - " outputs=[answer_output, sources_output]\n", - " )\n", - " \n", - " status_btn.click(\n", - " fn=check_setup,\n", - " outputs=status_output\n", - " )\n", - " \n", - " # Example queries\n", - " with gr.Row():\n", - " gr.Examples(\n", - " examples=[\n", - " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n", - " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n", - " ],\n", - " inputs=[query_input, search_terms_input],\n", - " )\n", - "\n", - "# Launch the app\n", - "if __name__ == \"__main__\":\n", - " app.launch(\n", - " share=True,\n", - " debug=True\n", - " )" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a6fd9c8c-e8e9-4b1c-8d21-d7ea2c42303a", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "95c85a4f-470f-4858-8172-78d7f9d04d1a", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f237c114-7924-4074-9c11-c7ba13833293", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a2f9c7ae-35bb-4261-adc4-f24c6ab5c8a1", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "aaa446b0-a14e-4c98-b466-f503a521f052", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5770d683-7bfc-49e9-a3c4-a67060e27620", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2ef7744c-1346-4692-ac28-013ae0e8c4ac", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "08643ace-f94f-4a4a-bbc1-3289b7fc29a1", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "70849fd3-6f4e-4c57-b780-bda53db83e14", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c18d5d20-439a-45a7-a847-25f659fd4a10", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "634b9787-d493-46e5-8114-0851c6172ed6", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "🔍 Starting Google Drive API Diagnostic...\n", - "==================================================\n", - "\n", - "1️⃣ Checking service account file...\n", - "✅ Service account file is valid\n", - " 📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n", - " 🏗️ Project ID: rag-system-463320\n", - "\n", - "2️⃣ Checking credentials...\n", - "✅ Credentials created successfully\n", - "\n", - "3️⃣ Testing API connection...\n", - "✅ Successfully connected to Google Drive API\n", - " 👤 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n", - "\n", - "4️⃣ Checking basic permissions...\n", - "✅ Can access Drive API - found 0 files in test query\n", - "⚠️ No files found - this might mean:\n", - " • Service account has no shared files\n", - " • No files are shared with the service account\n", - "\n", - "5️⃣ Testing access to 'Blue berry' folder...\n", - "❌ Folder 'Blue berry' not found or not accessible\n", - "💡 Possible solutions:\n", - " • Make sure the folder exists in Google Drive\n", - " • Share the folder with your service account email\n", - " • Check folder name spelling (case sensitive)\n", - "\n", - "✅ All basic checks passed!\n" - ] - } - ], - "source": [ - "import json\n", - "from google.auth.exceptions import RefreshError\n", - "from googleapiclient.errors import HttpError\n", - "\n", - "class GPTDriveTroubleshooter:\n", - " def __init__(self, service_account_file_path):\n", - " self.service_account_file = service_account_file_path\n", - " self.credentials = None\n", - " self.drive_service = None\n", - " \n", - " def run_full_diagnostic(self):\n", - " \"\"\"Run complete diagnostic check\"\"\"\n", - " print(\"🔍 Starting Google Drive API Diagnostic...\")\n", - " print(\"=\" * 50)\n", - " \n", - " # Step 1: Check service account file\n", - " if not self.check_service_account_file():\n", - " return False\n", - " \n", - " # Step 2: Check credentials\n", - " if not self.check_credentials():\n", - " return False\n", - " \n", - " # Step 3: Test API connection\n", - " if not self.test_api_connection():\n", - " return False\n", - " \n", - " # Step 4: Check permissions\n", - " if not self.check_basic_permissions():\n", - " return False\n", - " \n", - " # Step 5: Test folder access\n", - " self.test_folder_access()\n", - " \n", - " print(\"\\n✅ All basic checks passed!\")\n", - " return True\n", - " \n", - " def check_service_account_file(self):\n", - " \"\"\"Check if service account file exists and is valid\"\"\"\n", - " print(\"\\n1️⃣ Checking service account file...\")\n", - " \n", - " if not os.path.exists(self.service_account_file):\n", - " print(f\"❌ Service account file not found: {self.service_account_file}\")\n", - " print(\"💡 Make sure you've downloaded the JSON key file from Google Cloud Console\")\n", - " return False\n", - " \n", - " try:\n", - " with open(self.service_account_file, 'r') as f:\n", - " service_account_info = json.load(f)\n", - " \n", - " required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n", - " missing_fields = [field for field in required_fields if field not in service_account_info]\n", - " \n", - " if missing_fields:\n", - " print(f\"❌ Service account file missing required fields: {missing_fields}\")\n", - " return False\n", - " \n", - " print(f\"✅ Service account file is valid\")\n", - " print(f\" 📧 Service account email: {service_account_info['client_email']}\")\n", - " print(f\" 🏗️ Project ID: {service_account_info['project_id']}\")\n", - " \n", - " return True\n", - " \n", - " except json.JSONDecodeError:\n", - " print(\"❌ Service account file is not valid JSON\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Error reading service account file: {e}\")\n", - " return False\n", - " \n", - " def check_credentials(self):\n", - " \"\"\"Check if credentials can be created\"\"\"\n", - " print(\"\\n2️⃣ Checking credentials...\")\n", - " \n", - " try:\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " self.service_account_file,\n", - " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", - " )\n", - " print(\"✅ Credentials created successfully\")\n", - " return True\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Failed to create credentials: {e}\")\n", - " print(\"💡 Check if your service account key file is corrupted\")\n", - " return False\n", - " \n", - " def test_api_connection(self):\n", - " \"\"\"Test basic API connection\"\"\"\n", - " print(\"\\n3️⃣ Testing API connection...\")\n", - " \n", - " try:\n", - " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", - " \n", - " # Try a simple API call\n", - " about = self.drive_service.about().get(fields=\"user, storageQuota\").execute()\n", - " print(\"✅ Successfully connected to Google Drive API\")\n", - " print(f\" 👤 Connected as: {about.get('user', {}).get('emailAddress', 'Unknown')}\")\n", - " \n", - " return True\n", - " \n", - " except HttpError as e:\n", - " print(f\"❌ HTTP Error connecting to API: {e}\")\n", - " if e.resp.status == 403:\n", - " print(\"💡 This is likely a permissions issue - check if Drive API is enabled\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Failed to connect to API: {e}\")\n", - " return False\n", - " \n", - " def check_basic_permissions(self):\n", - " \"\"\"Check basic file listing permissions\"\"\"\n", - " print(\"\\n4️⃣ Checking basic permissions...\")\n", - " \n", - " try:\n", - " # Try to list files (this should work with readonly access)\n", - " results = self.drive_service.files().list(\n", - " pageSize=1,\n", - " fields=\"files(id, name)\"\n", - " ).execute()\n", - " \n", - " files = results.get('files', [])\n", - " print(f\"✅ Can access Drive API - found {len(files)} files in test query\")\n", - " \n", - " if len(files) == 0:\n", - " print(\"⚠️ No files found - this might mean:\")\n", - " print(\" • Service account has no shared files\")\n", - " print(\" • No files are shared with the service account\")\n", - " \n", - " return True\n", - " \n", - " except HttpError as e:\n", - " print(f\"❌ Permission error: {e}\")\n", - " if e.resp.status == 403:\n", - " print(\"💡 Common causes:\")\n", - " print(\" • Google Drive API not enabled in Google Cloud Console\")\n", - " print(\" • Service account doesn't have proper permissions\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Error checking permissions: {e}\")\n", - " return False\n", - " \n", - " def test_folder_access(self, folder_name=\"Blue berry\"):\n", - " \"\"\"Test access to specific folder\"\"\"\n", - " print(f\"\\n5️⃣ Testing access to '{folder_name}' folder...\")\n", - " \n", - " try:\n", - " # Search for the folder\n", - " query = f\"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false\"\n", - " results = self.drive_service.files().list(\n", - " q=query,\n", - " fields=\"files(id, name, owners, permissions)\"\n", - " ).execute()\n", - " \n", - " folders = results.get('files', [])\n", - " \n", - " if not folders:\n", - " print(f\"❌ Folder '{folder_name}' not found or not accessible\")\n", - " print(\"💡 Possible solutions:\")\n", - " print(\" • Make sure the folder exists in Google Drive\")\n", - " print(\" • Share the folder with your service account email\")\n", - " print(\" • Check folder name spelling (case sensitive)\")\n", - " return False\n", - " \n", - " folder = folders[0]\n", - " print(f\"✅ Found folder '{folder_name}'\")\n", - " print(f\" 📁 Folder ID: {folder['id']}\")\n", - " \n", - " # Test listing files in the folder\n", - " files_in_folder = self.drive_service.files().list(\n", - " q=f\"'{folder['id']}' in parents and trashed=false\",\n", - " fields=\"files(id, name, mimeType)\"\n", - " ).execute()\n", - " \n", - " files = files_in_folder.get('files', [])\n", - " print(f\" 📄 Contains {len(files)} files\")\n", - " \n", - " if files:\n", - " print(\" 📝 Sample files:\")\n", - " for file in files[:3]: # Show first 3 files\n", - " print(f\" • {file['name']} ({file['mimeType']})\")\n", - " \n", - " return True\n", - " \n", - " except HttpError as e:\n", - " print(f\"❌ HTTP Error accessing folder: {e}\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Error accessing folder: {e}\")\n", - " return False\n", - " \n", - " def check_file_permissions(self, file_id):\n", - " \"\"\"Check permissions for a specific file\"\"\"\n", - " print(f\"\\n🔍 Checking permissions for file ID: {file_id}\")\n", - " \n", - " try:\n", - " file_info = self.drive_service.files().get(\n", - " fileId=file_id,\n", - " fields=\"id, name, mimeType, owners, permissions, capabilities\"\n", - " ).execute()\n", - " \n", - " print(f\"✅ File: {file_info['name']}\")\n", - " print(f\" 🔗 Type: {file_info['mimeType']}\")\n", - " print(f\" 👤 Owner: {file_info.get('owners', [{}])[0].get('emailAddress', 'Unknown')}\")\n", - " \n", - " capabilities = file_info.get('capabilities', {})\n", - " print(f\" 📖 Can read: {capabilities.get('canDownload', False)}\")\n", - " print(f\" 📤 Can export: {capabilities.get('canExport', False)}\")\n", - " \n", - " except HttpError as e:\n", - " print(f\"❌ Cannot access file: {e}\")\n", - " except Exception as e:\n", - " print(f\"❌ Error: {e}\")\n", - " \n", - " def get_sharing_instructions(self):\n", - " \"\"\"Provide step-by-step sharing instructions\"\"\"\n", - " print(\"\\n📋 HOW TO SHARE FOLDER WITH SERVICE ACCOUNT:\")\n", - " print(\"=\" * 50)\n", - " \n", - " if self.credentials:\n", - " service_email = self.credentials.service_account_email\n", - " print(f\"1. Copy this service account email: {service_email}\")\n", - " else:\n", - " print(\"1. Find your service account email in the JSON key file (client_email field)\")\n", - " \n", - " print(\"2. Open Google Drive in your browser\")\n", - " print(\"3. Right-click on your 'Blue berry' folder\")\n", - " print(\"4. Select 'Share'\")\n", - " print(\"5. Paste the service account email\")\n", - " print(\"6. Set permission to 'Viewer' or 'Editor'\")\n", - " print(\"7. Click 'Send' (you can uncheck 'Notify people')\")\n", - " print(\"8. Wait a few minutes for permissions to propagate\")\n", - " \n", - " def run_connection_test(self):\n", - " \"\"\"Quick connection test\"\"\"\n", - " print(\"🚀 Quick Connection Test\")\n", - " print(\"-\" * 30)\n", - " \n", - " try:\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " self.service_account_file,\n", - " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", - " )\n", - " \n", - " service = build('drive', 'v3', credentials=self.credentials)\n", - " \n", - " # Test basic query\n", - " results = service.files().list(pageSize=5).execute()\n", - " files = results.get('files', [])\n", - " \n", - " print(f\"✅ Connected! Found {len(files)} accessible files\")\n", - " print(f\"📧 Service account: {self.credentials.service_account_email}\")\n", - " \n", - " return True\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Connection failed: {e}\")\n", - " return False\n", - "\n", - "# Usage Examples\n", - "def troubleshoot_drive_access():\n", - " \"\"\"Main troubleshooting function\"\"\"\n", - " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n", - " \n", - " if not service_account_file:\n", - " print(\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n", - " print(\"💡 Add this to your .env file:\")\n", - " print(\"GOOGLE_SERVICE_ACCOUNT_FILE=path/to/your/service-account-key.json\")\n", - " return\n", - " \n", - " troubleshooter = GPTDriveTroubleshooter(service_account_file)\n", - " \n", - " # Run full diagnostic\n", - " success = troubleshooter.run_full_diagnostic()\n", - " \n", - " if not success:\n", - " print(\"\\n\" + \"=\"*50)\n", - " troubleshooter.get_sharing_instructions()\n", - " \n", - " return success\n", - "\n", - "# Quick test function\n", - "def quick_test():\n", - " \"\"\"Quick test to verify everything works\"\"\"\n", - " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n", - " troubleshooter = GPTDriveTroubleshooter(service_account_file)\n", - " return troubleshooter.run_connection_test()\n", - "\n", - "if __name__ == \"__main__\":\n", - " # Uncomment the test you want to run:\n", - " \n", - " # Full diagnostic (recommended for first-time setup)\n", - " troubleshoot_drive_access()\n", - " \n", - " # Quick test (for regular checks)\n", - " # quick_test()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6b336789-d402-455d-a631-26e987d79ed6", - "metadata": {}, - "outputs": [], - "source": [ - "troubleshooter = GPTDriveTroubleshooter(GOOGLE_SERVICE_ACCOUNT_FILE)\n", - "troubleshooter.test_folder_access(\"Blue berry\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e2786c67-215e-46b0-b3ca-2c3176bd8971", - "metadata": {}, - "outputs": [], - "source": [ - "# Add this test function to verify access\n", - "def test_folder_access():\n", - " integration = GPTDriveIntegration()\n", - " folder_id = integration.find_folder_by_name(\"Blue berry\")\n", - " if folder_id:\n", - " print(\"✅ Successfully found 'Blue berry' folder!\")\n", - " \n", - " # List files in the folder\n", - " results = integration.drive_service.files().list(\n", - " q=f\"'{folder_id}' in parents and trashed=false\",\n", - " fields=\"files(id, name, mimeType)\"\n", - " ).execute()\n", - " \n", - " files = results.get('files', [])\n", - " print(f\"Found {len(files)} files in the folder:\")\n", - " for file in files[:5]: # Show first 5 files\n", - " print(f\" - {file['name']} ({file['mimeType']})\")\n", - " else:\n", - " print(\"❌ Could not access 'Blue berry' folder\")\n", - " print(\"Make sure you've shared the folder with your service account\")\n", - "\n", - "# Run this test first\n", - "test_folder_access()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c3dede9f-5e01-436d-a7b7-905e1646baf9", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b933c8b6-add6-40fc-827f-e5e07447ac00", - "metadata": {}, - "outputs": [], - "source": [ - "#Troubleshooting" - ] - }, - { - "cell_type": "code", - "execution_count": 32, - "id": "b63a1f4b-1315-44fe-ba84-93fb1c2655cc", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "🔍 Searching for 'Blue Berry' folder...\n", - "Total folders found: 2\n", - "\n", - "✅ Found 1 'Blue Berry' folder(s):\n", - "📁 Anatomy Books \n", - " Full Path: Anatomy Books \n", - " Folder ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q\n", - "\n" - ] - } - ], - "source": [ - "import os\n", - "from google.oauth2 import service_account\n", - "from googleapiclient.discovery import build\n", - "\n", - "class DrivefolderLister:\n", - " def __init__(self):\n", - " # Initialize Google Drive API\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n", - " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", - " )\n", - " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", - " \n", - " def get_all_folders(self):\n", - " \"\"\"Get ALL folders in the Google Drive\"\"\"\n", - " try:\n", - " # Query to get all folders (not trashed)\n", - " query = \"mimeType='application/vnd.google-apps.folder' and trashed=false\"\n", - " \n", - " all_folders = []\n", - " page_token = None\n", - " \n", - " while True:\n", - " results = self.drive_service.files().list(\n", - " q=query,\n", - " fields=\"nextPageToken, files(id, name, parents)\",\n", - " pageSize=1000, # Maximum allowed\n", - " pageToken=page_token\n", - " ).execute()\n", - " \n", - " folders = results.get('files', [])\n", - " all_folders.extend(folders)\n", - " \n", - " page_token = results.get('nextPageToken')\n", - " if not page_token:\n", - " break\n", - " \n", - " print(f\"Total folders found: {len(all_folders)}\")\n", - " return all_folders\n", - " \n", - " except Exception as e:\n", - " print(f\"Error retrieving folders: {e}\")\n", - " return []\n", - " \n", - " def build_folder_tree(self, folders):\n", - " \"\"\"Build a tree structure from the flat list of folders\"\"\"\n", - " # Create a dictionary for quick lookup\n", - " folder_dict = {folder['id']: folder for folder in folders}\n", - " \n", - " # Add root folder\n", - " folder_dict['root'] = {'id': 'root', 'name': 'My Drive', 'parents': []}\n", - " \n", - " # Build the tree structure\n", - " tree = {}\n", - " \n", - " def build_path(folder_id, visited=None):\n", - " if visited is None:\n", - " visited = set()\n", - " \n", - " if folder_id in visited: # Prevent infinite loops\n", - " return \"CIRCULAR_REFERENCE\"\n", - " \n", - " visited.add(folder_id)\n", - " \n", - " if folder_id not in folder_dict:\n", - " return \"UNKNOWN\"\n", - " \n", - " folder = folder_dict[folder_id]\n", - " \n", - " if folder_id == 'root':\n", - " return \"My Drive\"\n", - " \n", - " parents = folder.get('parents', [])\n", - " if not parents:\n", - " return folder['name']\n", - " \n", - " parent_path = build_path(parents[0], visited.copy())\n", - " return f\"{parent_path}/{folder['name']}\"\n", - " \n", - " # Build paths for all folders\n", - " folder_paths = []\n", - " for folder in folders:\n", - " path = build_path(folder['id'])\n", - " folder_paths.append({\n", - " 'name': folder['name'],\n", - " 'id': folder['id'],\n", - " 'path': path\n", - " })\n", - " \n", - " return folder_paths\n", - " \n", - " def search_folder_by_name(self, folder_name, folders_with_paths):\n", - " \"\"\"Search for folders by name (case-insensitive)\"\"\"\n", - " matches = []\n", - " search_name = folder_name.lower()\n", - " \n", - " for folder in folders_with_paths:\n", - " if search_name in folder['name'].lower():\n", - " matches.append(folder)\n", - " \n", - " return matches\n", - " \n", - " def display_all_folders(self, folders_with_paths, search_term=None):\n", - " \"\"\"Display all folders in a readable format\"\"\"\n", - " if search_term:\n", - " print(f\"\\n=== Searching for folders containing '{search_term}' ===\")\n", - " matches = self.search_folder_by_name(search_term, folders_with_paths)\n", - " if matches:\n", - " print(f\"Found {len(matches)} matching folders:\")\n", - " for folder in matches:\n", - " print(f\"📁 {folder['name']}\")\n", - " print(f\" Path: {folder['path']}\")\n", - " print(f\" ID: {folder['id']}\")\n", - " print()\n", - " else:\n", - " print(f\"No folders found containing '{search_term}'\")\n", - " else:\n", - " print(f\"\\n=== All {len(folders_with_paths)} folders in your Google Drive ===\")\n", - " \n", - " # Sort by path for better readability\n", - " sorted_folders = sorted(folders_with_paths, key=lambda x: x['path'])\n", - " \n", - " for folder in sorted_folders:\n", - " print(f\"📁 {folder['path']}\")\n", - " # print(f\" ID: {folder['id']}\") # Uncomment if you need IDs\n", - " \n", - " def find_blue_berry_folder(self):\n", - " \"\"\"Specifically look for the Anatomy Books folder\"\"\"\n", - " print(\"🔍 Searching for 'Blue Berry' folder...\")\n", - " \n", - " folders = self.get_all_folders()\n", - " if not folders:\n", - " print(\"No folders found or error occurred.\")\n", - " return\n", - " \n", - " folders_with_paths = self.build_folder_tree(folders)\n", - " \n", - " # Search for Anatomy Books specifically\n", - " blue_berry_matches = self.search_folder_by_name(\"Anatomy Books\", folders_with_paths)\n", - " \n", - " if blue_berry_matches:\n", - " print(f\"\\n✅ Found {len(blue_berry_matches)} 'Blue Berry' folder(s):\")\n", - " for folder in blue_berry_matches:\n", - " print(f\"📁 {folder['name']}\")\n", - " print(f\" Full Path: {folder['path']}\")\n", - " print(f\" Folder ID: {folder['id']}\")\n", - " print()\n", - " else:\n", - " print(\"\\n❌ No 'Blue Berry' folder found.\")\n", - " print(\"Let me show you all folders to help you locate it:\")\n", - " \n", - " # Show all folders if Blue Berry not found\n", - " self.display_all_folders(folders_with_paths)\n", - " \n", - " def interactive_folder_search(self):\n", - " \"\"\"Interactive search for any folder\"\"\"\n", - " print(\"📂 Google Drive Folder Explorer\")\n", - " print(\"=\" * 40)\n", - " \n", - " folders = self.get_all_folders()\n", - " if not folders:\n", - " print(\"No folders found or error occurred.\")\n", - " return\n", - " \n", - " folders_with_paths = self.build_folder_tree(folders)\n", - " \n", - " while True:\n", - " print(\"\\nOptions:\")\n", - " print(\"1. Search for a specific folder\")\n", - " print(\"2. Show all folders\")\n", - " print(\"3. Find 'Blue Berry' folder\")\n", - " print(\"4. Exit\")\n", - " \n", - " choice = input(\"\\nEnter your choice (1-4): \").strip()\n", - " \n", - " if choice == '1':\n", - " search_term = input(\"Enter folder name to search: \").strip()\n", - " if search_term:\n", - " self.display_all_folders(folders_with_paths, search_term)\n", - " \n", - " elif choice == '2':\n", - " self.display_all_folders(folders_with_paths)\n", - " \n", - " elif choice == '3':\n", - " blue_berry_matches = self.search_folder_by_name(\"blue berry\", folders_with_paths)\n", - " if blue_berry_matches:\n", - " print(f\"\\n✅ Found 'Blue Berry' folder(s):\")\n", - " for folder in blue_berry_matches:\n", - " print(f\"📁 {folder['name']} -> {folder['path']}\")\n", - " else:\n", - " print(\"\\n❌ 'Blue Berry' folder not found in your Drive.\")\n", - " \n", - " elif choice == '4':\n", - " print(\"Goodbye!\")\n", - " break\n", - " \n", - " else:\n", - " print(\"Invalid choice. Please try again.\")\n", - "\n", - "def main():\n", - " \"\"\"Main function to run the folder lister\"\"\"\n", - " try:\n", - " lister = DrivefolderLister()\n", - " \n", - " # Quick search for Blue Berry folder\n", - " lister.find_blue_berry_folder()\n", - " \n", - " # Uncomment the line below for interactive mode\n", - " # lister.interactive_folder_search()\n", - " \n", - " except Exception as e:\n", - " print(f\"Error initializing Drive connection: {e}\")\n", - " print(\"Make sure your GOOGLE_SERVICE_ACCOUNT_FILE environment variable is set correctly.\")\n", - "\n", - "if __name__ == \"__main__\":\n", - " main()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "29a61039-9043-44d5-94b1-4847350b2200", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 31, - "id": "7350d687-f76e-46f4-a30b-0518e5b8236e", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "🚀 Google Drive API Diagnostic Tool\n", - "==================================================\n", - "🔧 Testing Environment Variables...\n", - "----------------------------------------\n", - "✅ GOOGLE_SERVICE_ACCOUNT_FILE: C:/Users/Uche Buzz/myprojects/RAG/rag-system-463320-f292991d0516.json\n", - "✅ Service account file exists\n", - "✅ Service account file is valid JSON\n", - "✅ All required fields present in service account file\n", - "📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n", - "✅ OPENAI_API_KEY: ************************************************************************************************************************************************************K1OkRRkA\n", - "\n", - "🔑 Testing Credentials Loading...\n", - "----------------------------------------\n", - "\n", - "Testing scope set 1: ['https://www.googleapis.com/auth/drive.readonly']\n", - "✅ Credentials loaded successfully with scopes: ['https://www.googleapis.com/auth/drive.readonly']\n", - "\n", - "🔧 Testing Drive Service Creation...\n", - "----------------------------------------\n", - "✅ Google Drive service created successfully\n", - "\n", - "📡 Testing Basic API Call...\n", - "----------------------------------------\n", - "✅ API call successful!\n", - "📧 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n", - "👤 Display name: rag-base@rag-system-463320.iam.gserviceaccount.com\n", - "\n", - "🏠 Testing Root Folder Access...\n", - "----------------------------------------\n", - "✅ Root folder accessible\n", - "📁 Root folder name: My Drive\n", - "📄 Items in root folder: 0\n", - "\n", - "📁 Testing File Listing...\n", - "----------------------------------------\n", - "Test 1: Listing all files (including documents)...\n", - "✅ Found 7 files total\n", - "📄 First few files:\n", - " - Neuro Note Dr Clement.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)\n", - " - Morbid Anatomy - ANA 3107.pdf (application/pdf)\n", - " - ANT & POST TRIANGLES OF THE NECK.pdf (application/pdf)\n", - " - Anatomy Books (application/vnd.google-apps.folder)\n", - " - Blue Berry (application/vnd.google-apps.folder)\n", - "\n", - "Test 2: Listing folders only...\n", - "✅ Found 2 folders\n", - "📁 Folders found:\n", - " - Anatomy Books (ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q)\n", - " - Blue Berry (ID: 1AYaS0yt_srFlgdE4mSNlqA6FLm10rdt4)\n", - "\n", - "✅ All diagnostic tests passed!\n", - "🎉 Your Google Drive API connection is working correctly!\n" - ] - } - ], - "source": [ - "import os\n", - "from google.oauth2 import service_account\n", - "from googleapiclient.discovery import build\n", - "from googleapiclient.errors import HttpError\n", - "import json\n", - "\n", - "class DriveConnectionDiagnostic:\n", - " def __init__(self):\n", - " self.credentials = None\n", - " self.drive_service = None\n", - " self.setup_success = False\n", - " \n", - " def test_environment_variables(self):\n", - " \"\"\"Test if environment variables are set correctly\"\"\"\n", - " print(\"🔧 Testing Environment Variables...\")\n", - " print(\"-\" * 40)\n", - " \n", - " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n", - " openai_key = os.getenv('OPENAI_API_KEY')\n", - " \n", - " if service_account_file:\n", - " print(f\"✅ GOOGLE_SERVICE_ACCOUNT_FILE: {service_account_file}\")\n", - " \n", - " # Check if file exists\n", - " if os.path.exists(service_account_file):\n", - " print(f\"✅ Service account file exists\")\n", - " \n", - " # Check if file is readable\n", - " try:\n", - " with open(service_account_file, 'r') as f:\n", - " service_data = json.load(f)\n", - " print(f\"✅ Service account file is valid JSON\")\n", - " \n", - " # Check required fields\n", - " required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n", - " missing_fields = [field for field in required_fields if field not in service_data]\n", - " \n", - " if missing_fields:\n", - " print(f\"❌ Missing required fields in service account file: {missing_fields}\")\n", - " return False\n", - " else:\n", - " print(f\"✅ All required fields present in service account file\")\n", - " print(f\"📧 Service account email: {service_data.get('client_email')}\")\n", - " \n", - " except json.JSONDecodeError:\n", - " print(f\"❌ Service account file is not valid JSON\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Error reading service account file: {e}\")\n", - " return False\n", - " \n", - " else:\n", - " print(f\"❌ Service account file does not exist at: {service_account_file}\")\n", - " return False\n", - " else:\n", - " print(f\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n", - " return False\n", - " \n", - " if openai_key:\n", - " print(f\"✅ OPENAI_API_KEY: {'*' * (len(openai_key) - 8) + openai_key[-8:]}\")\n", - " else:\n", - " print(f\"⚠️ OPENAI_API_KEY not set (not required for folder listing)\")\n", - " \n", - " return True\n", - " \n", - " def test_credentials_loading(self):\n", - " \"\"\"Test if credentials can be loaded\"\"\"\n", - " print(\"\\n🔑 Testing Credentials Loading...\")\n", - " print(\"-\" * 40)\n", - " \n", - " try:\n", - " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n", - " \n", - " # Try different scope combinations\n", - " scopes_to_test = [\n", - " ['https://www.googleapis.com/auth/drive.readonly'],\n", - " ['https://www.googleapis.com/auth/drive'],\n", - " ['https://www.googleapis.com/auth/drive.metadata.readonly'],\n", - " ['https://www.googleapis.com/auth/drive.file']\n", - " ]\n", - " \n", - " for i, scopes in enumerate(scopes_to_test):\n", - " try:\n", - " print(f\"\\nTesting scope set {i+1}: {scopes}\")\n", - " self.credentials = service_account.Credentials.from_service_account_file(\n", - " service_account_file,\n", - " scopes=scopes\n", - " )\n", - " print(f\"✅ Credentials loaded successfully with scopes: {scopes}\")\n", - " return True\n", - " except Exception as e:\n", - " print(f\"❌ Failed to load credentials with scopes {scopes}: {e}\")\n", - " \n", - " return False\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Error loading credentials: {e}\")\n", - " return False\n", - " \n", - " def test_drive_service_creation(self):\n", - " \"\"\"Test if Drive service can be created\"\"\"\n", - " print(\"\\n🔧 Testing Drive Service Creation...\")\n", - " print(\"-\" * 40)\n", - " \n", - " try:\n", - " if not self.credentials:\n", - " print(\"❌ No credentials available\")\n", - " return False\n", - " \n", - " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", - " print(\"✅ Google Drive service created successfully\")\n", - " return True\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Error creating Drive service: {e}\")\n", - " return False\n", - " \n", - " def test_basic_api_call(self):\n", - " \"\"\"Test basic API functionality\"\"\"\n", - " print(\"\\n📡 Testing Basic API Call...\")\n", - " print(\"-\" * 40)\n", - " \n", - " try:\n", - " if not self.drive_service:\n", - " print(\"❌ No Drive service available\")\n", - " return False\n", - " \n", - " # Test getting user info\n", - " about = self.drive_service.about().get(fields=\"user\").execute()\n", - " user_info = about.get('user', {})\n", - " print(f\"✅ API call successful!\")\n", - " print(f\"📧 Connected as: {user_info.get('emailAddress', 'Unknown')}\")\n", - " print(f\"👤 Display name: {user_info.get('displayName', 'Unknown')}\")\n", - " \n", - " return True\n", - " \n", - " except HttpError as e:\n", - " print(f\"❌ HTTP Error: {e}\")\n", - " if e.resp.status == 403:\n", - " print(\" This might be a permissions issue. Check:\")\n", - " print(\" 1. Is the service account enabled?\")\n", - " print(\" 2. Does it have the right permissions?\")\n", - " print(\" 3. Is the Google Drive API enabled in your project?\")\n", - " return False\n", - " except Exception as e:\n", - " print(f\"❌ Error making API call: {e}\")\n", - " return False\n", - " \n", - " def test_file_listing(self):\n", - " \"\"\"Test different file listing approaches\"\"\"\n", - " print(\"\\n📁 Testing File Listing...\")\n", - " print(\"-\" * 40)\n", - " \n", - " if not self.drive_service:\n", - " print(\"❌ No Drive service available\")\n", - " return False\n", - " \n", - " # Test 1: List any files (not just folders)\n", - " try:\n", - " print(\"Test 1: Listing all files (including documents)...\")\n", - " results = self.drive_service.files().list(\n", - " q=\"trashed=false\",\n", - " fields=\"files(id, name, mimeType)\",\n", - " pageSize=10\n", - " ).execute()\n", - " \n", - " files = results.get('files', [])\n", - " print(f\"✅ Found {len(files)} files total\")\n", - " \n", - " if files:\n", - " print(\"📄 First few files:\")\n", - " for file in files[:5]:\n", - " print(f\" - {file['name']} ({file['mimeType']})\")\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Error listing files: {e}\")\n", - " return False\n", - " \n", - " # Test 2: List only folders\n", - " try:\n", - " print(f\"\\nTest 2: Listing folders only...\")\n", - " results = self.drive_service.files().list(\n", - " q=\"mimeType='application/vnd.google-apps.folder' and trashed=false\",\n", - " fields=\"files(id, name, parents)\",\n", - " pageSize=10\n", - " ).execute()\n", - " \n", - " folders = results.get('files', [])\n", - " print(f\"✅ Found {len(folders)} folders\")\n", - " \n", - " if folders:\n", - " print(\"📁 Folders found:\")\n", - " for folder in folders:\n", - " print(f\" - {folder['name']} (ID: {folder['id']})\")\n", - " else:\n", - " print(\"⚠️ No folders found - this might indicate:\")\n", - " print(\" 1. Your Google Drive is empty\")\n", - " print(\" 2. The service account doesn't have access to your personal Drive\")\n", - " print(\" 3. You need to share folders with the service account\")\n", - " \n", - " return len(folders) > 0\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Error listing folders: {e}\")\n", - " return False\n", - " \n", - " def test_root_access(self):\n", - " \"\"\"Test access to root folder\"\"\"\n", - " print(\"\\n🏠 Testing Root Folder Access...\")\n", - " print(\"-\" * 40)\n", - " \n", - " try:\n", - " if not self.drive_service:\n", - " print(\"❌ No Drive service available\")\n", - " return False\n", - " \n", - " # Try to get root folder info\n", - " root_info = self.drive_service.files().get(\n", - " fileId='root',\n", - " fields=\"id, name, mimeType\"\n", - " ).execute()\n", - " \n", - " print(f\"✅ Root folder accessible\")\n", - " print(f\"📁 Root folder name: {root_info.get('name', 'My Drive')}\")\n", - " \n", - " # Try to list contents of root\n", - " results = self.drive_service.files().list(\n", - " q=\"'root' in parents and trashed=false\",\n", - " fields=\"files(id, name, mimeType)\",\n", - " pageSize=10\n", - " ).execute()\n", - " \n", - " root_contents = results.get('files', [])\n", - " print(f\"📄 Items in root folder: {len(root_contents)}\")\n", - " \n", - " if root_contents:\n", - " print(\"🔍 Root folder contents:\")\n", - " for item in root_contents[:5]:\n", - " item_type = \"📁\" if \"folder\" in item['mimeType'] else \"📄\"\n", - " print(f\" {item_type} {item['name']}\")\n", - " \n", - " return True\n", - " \n", - " except Exception as e:\n", - " print(f\"❌ Error accessing root folder: {e}\")\n", - " return False\n", - " \n", - " def run_full_diagnostic(self):\n", - " \"\"\"Run complete diagnostic\"\"\"\n", - " print(\"🚀 Google Drive API Diagnostic Tool\")\n", - " print(\"=\" * 50)\n", - " \n", - " # Step 1: Environment variables\n", - " if not self.test_environment_variables():\n", - " print(\"\\n❌ Environment setup failed. Please check your service account file.\")\n", - " return False\n", - " \n", - " # Step 2: Credentials\n", - " if not self.test_credentials_loading():\n", - " print(\"\\n❌ Credentials loading failed.\")\n", - " return False\n", - " \n", - " # Step 3: Service creation\n", - " if not self.test_drive_service_creation():\n", - " print(\"\\n❌ Drive service creation failed.\")\n", - " return False\n", - " \n", - " # Step 4: Basic API call\n", - " if not self.test_basic_api_call():\n", - " print(\"\\n❌ Basic API call failed.\")\n", - " return False\n", - " \n", - " # Step 5: Root access\n", - " if not self.test_root_access():\n", - " print(\"\\n❌ Root folder access failed.\")\n", - " return False\n", - " \n", - " # Step 6: File listing\n", - " if not self.test_file_listing():\n", - " print(\"\\n❌ File listing failed or no folders found.\")\n", - " print(\"\\n🔧 SOLUTION SUGGESTIONS:\")\n", - " print(\"1. If you're using a service account, you need to SHARE folders with it\")\n", - " print(\"2. Share your 'Blue Berry' folder with the service account email\")\n", - " print(\"3. Or consider using OAuth2 instead of service account for personal Drive access\")\n", - " return False\n", - " \n", - " print(\"\\n✅ All diagnostic tests passed!\")\n", - " print(\"🎉 Your Google Drive API connection is working correctly!\")\n", - " return True\n", - " \n", - " def show_service_account_sharing_instructions(self):\n", - " \"\"\"Show instructions for sharing with service account\"\"\"\n", - " print(\"\\n📋 SERVICE ACCOUNT SHARING INSTRUCTIONS:\")\n", - " print(\"=\" * 50)\n", - " print(\"If you're using a service account, you need to share folders with it:\")\n", - " print()\n", - " print(\"1. Open Google Drive in your browser\")\n", - " print(\"2. Find the folder you want to access (e.g., 'Blue Berry')\")\n", - " print(\"3. Right-click the folder → 'Share'\")\n", - " print(\"4. Add the service account email address\")\n", - " print(\"5. Give it 'Viewer' or 'Editor' permissions\")\n", - " print(\"6. Click 'Send'\")\n", - " print()\n", - " \n", - " if self.credentials:\n", - " try:\n", - " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n", - " with open(service_account_file, 'r') as f:\n", - " service_data = json.load(f)\n", - " email = service_data.get('client_email')\n", - " print(f\"📧 Your service account email: {email}\")\n", - " print(\" ^ Share your folders with this email address\")\n", - " except:\n", - " print(\"❌ Could not read service account email\")\n", - "\n", - "def main():\n", - " diagnostic = DriveConnectionDiagnostic()\n", - " success = diagnostic.run_full_diagnostic()\n", - " \n", - " if not success:\n", - " diagnostic.show_service_account_sharing_instructions()\n", - "\n", - "if __name__ == \"__main__\":\n", - " main()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e3374b17-d2fa-4c80-a70b-76b8bf94c702", - "metadata": {}, - "outputs": [], - "source": [ - "#flask" - ] - }, - { - "cell_type": "code", - "execution_count": 30, - "id": "c8ac4c93-8b36-49d2-b8e1-efb7d1c4b104", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " * Serving Flask app '__main__'\n", - " * Debug mode: on\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.\n", - " * Running on http://127.0.0.1:5000\n", - "Press CTRL+C to quit\n", - " * Restarting with stat\n" - ] - }, - { - "ename": "SystemExit", - "evalue": "1", - "output_type": "error", - "traceback": [ - "An exception has occurred, use %tb to see the full traceback.\n", - "\u001b[31mSystemExit\u001b[39m\u001b[31m:\u001b[39m 1\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\IPython\\core\\interactiveshell.py:3680: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.\n", - " warn(\"To exit: use 'exit', 'quit', or Ctrl-D.\", stacklevel=1)\n" - ] - } - ], - "source": [ - "from flask import Flask, request, jsonify, render_template_string\n", - "\n", - "app = Flask(__name__)\n", - "integration = GPTDriveIntegration()\n", - "\n", - "HTML_TEMPLATE = \"\"\"\n", - "\n", - "\n", - "\n", - " GPT-Drive Integration\n", - " \n", - "\n", - "\n", - "

GPT-Google Drive Integration

\n", - "
\n", - " \n", - " \n", - "
\n", - "
\n", - "\n", - " \n", - "\n", - "\n", - "\"\"\"\n", - "\n", - "@app.route('/')\n", - "def home():\n", - " return render_template_string(HTML_TEMPLATE)\n", - "\n", - "@app.route('/query', methods=['POST'])\n", - "def query():\n", - " data = request.json\n", - " result = integration.process_query(data['query'])\n", - " return jsonify(result)\n", - "\n", - "if __name__ == '__main__':\n", - " app.run(debug=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "caef8af5-73f9-4627-9203-0e9c8d098db0", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ff7582bd-f80c-46c4-a51b-b20fd7493b0e", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "96ea63b3-3cf3-41c1-b160-72d677ab0949", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a8a64f0d-c9b5-4ce7-a887-e1b60de4dac3", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "abbc0f6c-5350-4fd9-8711-de68d4aa739d", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "02ae258f-bef0-469b-a696-1edf6a7b5921", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 13, - "id": "09a0b418-eb0e-486e-bf05-bc7b43645784", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "* Running on local URL: http://127.0.0.1:7860\n", - "* Running on public URL: https://a9ef60dd3438b2ef0e.gradio.live\n", - "\n", - "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n" - ] - }, - { - "data": { - "text/html": [ - "
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Keyboard interruption in main thread... closing server.\n", - "Killing tunnel 127.0.0.1:7860 <> https://a9ef60dd3438b2ef0e.gradio.live\n" - ] - } - ], - "source": [ - "gpt_drive = GPTDriveIntegration()\n", - "\n", - "def process_user_query(query, search_terms_input):\n", - " \"\"\"Process user query and return formatted response\"\"\"\n", - " if not query.strip():\n", - " return \"Please enter a question.\", \"\"\n", - " \n", - " # Parse search terms if provided\n", - " search_terms = None\n", - " # if search_terms_input.strip():\n", - " # search_terms = [term.strip() for term in search_terms_input.split(',')]\n", - " \n", - " # Process the query\n", - " result = gpt_drive.process_query(query, search_terms)\n", - " \n", - " # Format the response\n", - " answer = result['answer']\n", - " sources = result['sources']\n", - " \n", - " sources_text = \"\"\n", - " if sources:\n", - " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n", - " \n", - " return answer, sources_text\n", - "\n", - "def check_setup():\n", - " \"\"\"Check if the APIs are properly configured\"\"\"\n", - " status_messages = []\n", - " \n", - " # Check Google Drive API\n", - " if gpt_drive.drive_initialized:\n", - " status_messages.append(\"✅ Google Drive API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n", - " \n", - " # Check OpenAI API\n", - " if gpt_drive.openai_initialized:\n", - " status_messages.append(\"✅ OpenAI API: Connected\")\n", - " else:\n", - " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n", - " \n", - " return \"\\n\".join(status_messages)\n", - "\n", - "# Create Gradio interface\n", - "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n", - " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n", - " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n", - " \n", - " with gr.Row():\n", - " with gr.Column(scale=2):\n", - " # Main query interface\n", - " with gr.Group():\n", - " gr.Markdown(\"### Ask a Question\")\n", - " query_input = gr.Textbox(\n", - " label=\"Your Question\",\n", - " placeholder=\"Ask me any question about your anatomy books?\",\n", - " lines=3\n", - " )\n", - " \n", - " search_terms_input = gr.Textbox(\n", - " label=\"Search Terms (optional)\",\n", - " placeholder=\"Enter comma-separated terms to search for specific files\",\n", - " lines=1\n", - " )\n", - " \n", - " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n", - " \n", - " # Results section\n", - " with gr.Group():\n", - " gr.Markdown(\"### Answer\")\n", - " answer_output = gr.Textbox(\n", - " label=\"AI Response\",\n", - " lines=10,\n", - " interactive=False\n", - " )\n", - " \n", - " sources_output = gr.Textbox(\n", - " label=\"Sources\",\n", - " lines=3,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Column(scale=1):\n", - " # Status and setup info\n", - " with gr.Group():\n", - " gr.Markdown(\"### System Status\")\n", - " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n", - " status_output = gr.Textbox(\n", - " label=\"API Status\",\n", - " lines=4,\n", - " interactive=False\n", - " )\n", - " \n", - " with gr.Group():\n", - " gr.Markdown(\"### Setup Instructions\")\n", - " gr.Markdown(\"\"\"\n", - " **Important Notes:**\n", - " 1.Only documents shared with it, it can answer\n", - " \n", - " **File Types Supported:**\n", - " - Google Docs\n", - " - Google Sheets \n", - " - PDF files\n", - " - Text files\n", - " \n", - " **Tips:**\n", - " - Use specific search terms for better results\n", - " - The system searches the top 3 most relevant files\n", - " - Ask clear, specific questions for better answers\n", - " \"\"\")\n", - " \n", - " # Event handlers\n", - " submit_btn.click(\n", - " fn=process_user_query,\n", - " inputs=[query_input, search_terms_input],\n", - " outputs=[answer_output, sources_output]\n", - " )\n", - " \n", - " status_btn.click(\n", - " fn=check_setup,\n", - " outputs=status_output\n", - " )\n", - " \n", - " # Example queries\n", - " with gr.Row():\n", - " gr.Examples(\n", - " examples=[\n", - " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n", - " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n", - " ],\n", - " inputs=[query_input, search_terms_input],\n", - " )\n", - "\n", - "# Launch the app\n", - "if __name__ == \"__main__\":\n", - " app.launch(\n", - " share=True,debug =True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "35d0a347-2cc6-4f4e-bb70-cc47067eaf53", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.13.5" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -}