diff --git "a/app.py" "b/app.py"
deleted file mode 100644--- "a/app.py"
+++ /dev/null
@@ -1,2454 +0,0 @@
-{
- "cells": [
- {
- "cell_type": "code",
- "execution_count": 1,
- "id": "e549bafd-78b1-4a83-80b4-2cb597efff79",
- "metadata": {},
- "outputs": [],
- "source": [
- "import os\n",
- "import json\n",
- "from google.oauth2 import service_account\n",
- "from googleapiclient.discovery import build\n",
- "from googleapiclient.http import MediaIoBaseDownload\n",
- "import openai\n",
- "from dotenv import load_dotenv, dotenv_values\n",
- "import io"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 2,
- "id": "04361235-7896-4439-9d04-1400e043528b",
- "metadata": {},
- "outputs": [
- {
- "data": {
- "text/plain": [
- "True"
- ]
- },
- "execution_count": 2,
- "metadata": {},
- "output_type": "execute_result"
- }
- ],
- "source": [
- "load_dotenv()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 3,
- "id": "9ae411c5-c84b-4bfd-b089-69b5c5ba70ae",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "OPENAI_API_KEY\n",
- "ANTHROPIC_API_KEY\n",
- "GOOGLE_SERVICE_ACCOUNT_FILE\n"
- ]
- }
- ],
- "source": [
- "config = dotenv_values(\".env\")\n",
- "for key in config.keys():\n",
- " print(key)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 4,
- "id": "7622a0e4-64a6-4848-b588-bd65d56c55e0",
- "metadata": {},
- "outputs": [],
- "source": [
- "from openai import OpenAI\n",
- "openai.api_key = os.getenv('OPENAI_API_KEY')\n",
- "openai = OpenAI(api_key = openai.api_key)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 5,
- "id": "2ead7c59-1f11-478a-bb69-2928ddc38901",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Hello! How can I assist you today?\n"
- ]
- }
- ],
- "source": [
- "response = openai.chat.completions.create(\n",
- " model = \"gpt-4o-mini\",\n",
- " messages = [\n",
- " {\"role\":\"system\", \"content\":\"you are a helpful assistant\"},\n",
- " {\"role\":\"user\", \"content\":\"hi\"}\n",
- " ])\n",
- "\n",
- "reply = response.choices[0].message.content\n",
- "print(reply)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 6,
- "id": "b3bcbada-6a72-4cc8-a166-d2e596cd1fc4",
- "metadata": {},
- "outputs": [],
- "source": [
- "service_account_file_path = os.getenv(\"GOOGLE_SERVICE_ACCOUNT_FILE\")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 7,
- "id": "81c59a6d-0831-4bff-b3fc-fbe3d4cc1e31",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "openai activated\n",
- "service_account_file_path activated\n"
- ]
- }
- ],
- "source": [
- "#troubleshoot\n",
- "if openai is None:\n",
- " print(\"openai not activated\")\n",
- "else: \n",
- " print (\"openai activated\")\n",
- "\n",
- "if service_account_file_path is None:\n",
- " print(\"service_account_file_path not activated\")\n",
- "else: \n",
- " print (\"service_account_file_path activated\")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a70f32aa-9e43-4175-8cca-d6af723aef91",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 8,
- "id": "0efe325d-badd-4533-affe-47d572ef128e",
- "metadata": {},
- "outputs": [],
- "source": [
- "class GPTDriveIntegration:\n",
- " def __init__(self):\n",
- " # Initialize Google Drive API\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
- " scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
- " )\n",
- " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
- " \n",
- " # Initialize OpenAI\n",
- " openai.api_key = os.getenv('OPENAI_API_KEY')\n",
- " \n",
- " def search_files(self, query, file_types=None):\n",
- " \"\"\"Search for files in Google Drive\"\"\"\n",
- " search_query = f\"name contains '{query}'\"\n",
- " \n",
- " if file_types:\n",
- " type_queries = []\n",
- " for file_type in file_types:\n",
- " if file_type.lower() == 'pdf':\n",
- " type_queries.append(\"mimeType='application/pdf'\")\n",
- " elif file_type.lower() in ['doc', 'docx']:\n",
- " type_queries.append(\"mimeType contains 'document'\")\n",
- " elif file_type.lower() in ['xls', 'xlsx']:\n",
- " type_queries.append(\"mimeType contains 'spreadsheet'\")\n",
- " \n",
- " if type_queries:\n",
- " search_query += f\" and ({' or '.join(type_queries)})\"\n",
- " \n",
- " results = self.drive_service.files().list(\n",
- " q=search_query,\n",
- " fields=\"files(id, name, mimeType, size)\"\n",
- " ).execute()\n",
- " \n",
- " return results.get('files', [])\n",
- " \n",
- " def get_file_content(self, file_id, mime_type):\n",
- " \"\"\"Download and extract text content from file\"\"\"\n",
- " try:\n",
- " if 'text' in mime_type or 'document' in mime_type:\n",
- " # For Google Docs, export as plain text\n",
- " if 'document' in mime_type:\n",
- " request = self.drive_service.files().export_media(\n",
- " fileId=file_id, mimeType='text/plain'\n",
- " )\n",
- " else:\n",
- " request = self.drive_service.files().get_media(fileId=file_id)\n",
- " \n",
- " file_content = io.BytesIO()\n",
- " downloader = MediaIoBaseDownload(file_content, request)\n",
- " done = False\n",
- " while done is False:\n",
- " status, done = downloader.next_chunk()\n",
- " \n",
- " return file_content.getvalue().decode('utf-8')\n",
- " \n",
- " elif 'spreadsheet' in mime_type:\n",
- " # For Google Sheets, export as CSV\n",
- " request = self.drive_service.files().export_media(\n",
- " fileId=file_id, mimeType='text/csv'\n",
- " )\n",
- " file_content = io.BytesIO()\n",
- " downloader = MediaIoBaseDownload(file_content, request)\n",
- " done = False\n",
- " while done is False:\n",
- " status, done = downloader.next_chunk()\n",
- " \n",
- " return file_content.getvalue().decode('utf-8')\n",
- " \n",
- " elif mime_type == 'application/pdf':\n",
- " # For PDF files, download binary content and extract text\n",
- " request = self.drive_service.files().get_media(fileId=file_id)\n",
- " file_content = io.BytesIO()\n",
- " downloader = MediaIoBaseDownload(file_content, request)\n",
- " done = False\n",
- " while done is False:\n",
- " status, done = downloader.next_chunk()\n",
- " \n",
- " # Extract text from PDF using PyPDF2 or pdfplumber\n",
- " file_content.seek(0) # Reset buffer position\n",
- " \n",
- " # Option 1: Using PyPDF2\n",
- " try:\n",
- " import PyPDF2\n",
- " pdf_reader = PyPDF2.PdfReader(file_content)\n",
- " text = \"\"\n",
- " for page in pdf_reader.pages:\n",
- " text += page.extract_text() + \"\\n\"\n",
- " return text\n",
- " except ImportError:\n",
- " pass\n",
- " \n",
- " # Option 2: Using pdfplumber (better for complex PDFs)\n",
- " try:\n",
- " import pdfplumber\n",
- " text = \"\"\n",
- " with pdfplumber.open(file_content) as pdf:\n",
- " for page in pdf.pages:\n",
- " page_text = page.extract_text()\n",
- " if page_text:\n",
- " text += page_text + \"\\n\"\n",
- " return text\n",
- " except ImportError:\n",
- " pass\n",
- " \n",
- " # Option 3: Using pymupdf (fitz) - fastest option\n",
- " try:\n",
- " import fitz # pymupdf\n",
- " pdf_document = fitz.open(stream=file_content.read(), filetype=\"pdf\")\n",
- " text = \"\"\n",
- " for page_num in range(pdf_document.page_count):\n",
- " page = pdf_document[page_num]\n",
- " text += page.get_text() + \"\\n\"\n",
- " pdf_document.close()\n",
- " return text\n",
- " except ImportError:\n",
- " pass\n",
- " \n",
- " return \"PDF text extraction requires PyPDF2, pdfplumber, or pymupdf library\"\n",
- " \n",
- " else:\n",
- " return \"File type not supported for text extraction\"\n",
- " \n",
- " except Exception as e:\n",
- " return f\"Error reading file: {str(e)}\"\n",
- " \n",
- " def query_gpt_with_context(self, user_query, file_contents):\n",
- " \"\"\"Send query to GPT with file context\"\"\"\n",
- " context = \"\\n\\n\".join([\n",
- " f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n",
- " for content in file_contents\n",
- " ])\n",
- " \n",
- " messages = [\n",
- " {\n",
- " \"role\": \"system\", \n",
- " \"content\": \"\"\"\n",
- " You are an AI assistant that can analyze documents from Google Drive. \n",
- " Use the provided file contents to answer user questions.\"\"\"\n",
- " },\n",
- " {\n",
- " \"role\": \"user\", \n",
- " \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n",
- " }\n",
- " ]\n",
- " \n",
- " response = openai.chat.completions.create(\n",
- " model=\"gpt-4o-mini\",\n",
- " messages=messages,\n",
- " max_tokens=1000\n",
- " )\n",
- " \n",
- " return response.choices[0].message.content\n",
- " \n",
- " def process_query(self, user_query, search_terms=None):\n",
- " \"\"\"Main function to process user queries\"\"\"\n",
- " # Extract search terms from query if not provided\n",
- " if not search_terms:\n",
- " search_terms = user_query.split()[:3] # Simple extraction\n",
- " \n",
- " # Search for relevant files\n",
- " files = []\n",
- " for term in search_terms:\n",
- " files.extend(self.search_files(term))\n",
- " \n",
- " # Remove duplicates\n",
- " unique_files = {f['id']: f for f in files}.values()\n",
- " \n",
- " # Get content from top 3 most relevant files\n",
- " file_contents = []\n",
- " for file in list(unique_files)[:3]:\n",
- " content = self.get_file_content(file['id'], file['mimeType'])\n",
- " file_contents.append({\n",
- " 'name': file['name'],\n",
- " 'text': content\n",
- " })\n",
- " \n",
- " # Query GPT with context\n",
- " if file_contents:\n",
- " response = self.query_gpt_with_context(user_query, file_contents)\n",
- " return {\n",
- " 'answer': response,\n",
- " 'sources': [f['name'] for f in file_contents]\n",
- " }\n",
- " else:\n",
- " return {\n",
- " 'answer': \"No relevant files found in your Google Drive.\",\n",
- " 'sources': []\n",
- " }"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 9,
- "id": "3c2c1ccf-9ade-482d-a170-978e97bc1c08",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Answer: The transmission of nerves is called \"nerve conduction.\" This process involves the propagation of electrical impulses along the nerve fibers, allowing for communication between different parts of the body.\n",
- "Sources: ['Neuro Note Dr Clement.docx']\n"
- ]
- }
- ],
- "source": [
- "if __name__ == \"__main__\":\n",
- " integration = GPTDriveIntegration()\n",
- " \n",
- " # Test query\n",
- " result = integration.process_query(\n",
- " \"The transmission of nerves is called?\",\n",
- " search_terms=[\"nerves\", \"Dr Clement\"]\n",
- " )\n",
- " \n",
- " print(\"Answer:\", result['answer'])\n",
- " print(\"Sources:\", result['sources'])"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "120e7c93-b38a-4c89-8e76-5b0170d22548",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "61e976a6-1fe4-45de-a63d-3cf849eedbe1",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 11,
- "id": "c514b8af-1af4-4497-9044-139a71aedd36",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "* Running on local URL: http://127.0.0.1:7860\n",
- "* Running on public URL: https://5320171f6af3e2eef9.gradio.live\n",
- "\n",
- "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
- ]
- },
- {
- "data": {
- "text/html": [
- "
"
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Keyboard interruption in main thread... closing server.\n",
- "Killing tunnel 127.0.0.1:7860 <> https://5320171f6af3e2eef9.gradio.live\n"
- ]
- }
- ],
- "source": [
- "gpt_drive = GPTDriveIntegration()\n",
- "\n",
- "def process_user_query(query, search_terms_input):\n",
- " \"\"\"Process user query and return formatted response\"\"\"\n",
- " if not query.strip():\n",
- " return \"Please enter a question.\", \"\"\n",
- " \n",
- " # Parse search terms if provided\n",
- " search_terms = None\n",
- " # if search_terms_input.strip():\n",
- " # search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
- " \n",
- " # Process the query\n",
- " result = gpt_drive.process_query(query, search_terms)\n",
- " \n",
- " # Format the response\n",
- " answer = result['answer']\n",
- " sources = result['sources']\n",
- " \n",
- " sources_text = \"\"\n",
- " if sources:\n",
- " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
- " \n",
- " return answer, sources_text\n",
- "\n",
- "def check_setup():\n",
- " \"\"\"Check if the APIs are properly configured\"\"\"\n",
- " status_messages = []\n",
- " \n",
- " # Check Google Drive API\n",
- " if gpt_drive.drive_initialized:\n",
- " status_messages.append(\"✅ Google Drive API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
- " \n",
- " # Check OpenAI API\n",
- " if gpt_drive.openai_initialized:\n",
- " status_messages.append(\"✅ OpenAI API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
- " \n",
- " return \"\\n\".join(status_messages)\n",
- "\n",
- "# Create Gradio interface\n",
- "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
- " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
- " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
- " \n",
- " with gr.Row():\n",
- " with gr.Column(scale=2):\n",
- " # Main query interface\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Ask a Question\")\n",
- " query_input = gr.Textbox(\n",
- " label=\"Your Question\",\n",
- " placeholder=\"Ask me any question about your anatomy books?\",\n",
- " lines=3\n",
- " )\n",
- " \n",
- " search_terms_input = gr.Textbox(\n",
- " label=\"Search Terms (optional)\",\n",
- " placeholder=\"Enter comma-separated terms to search for specific files\",\n",
- " lines=1\n",
- " )\n",
- " \n",
- " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
- " \n",
- " # Results section\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Answer\")\n",
- " answer_output = gr.Textbox(\n",
- " label=\"AI Response\",\n",
- " lines=10,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " sources_output = gr.Textbox(\n",
- " label=\"Sources\",\n",
- " lines=3,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Column(scale=1):\n",
- " # Status and setup info\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### System Status\")\n",
- " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
- " status_output = gr.Textbox(\n",
- " label=\"API Status\",\n",
- " lines=4,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Setup Instructions\")\n",
- " gr.Markdown(\"\"\"\n",
- " **Important Notes:**\n",
- " 1.Only documents shared with it, it can answer\n",
- " \n",
- " **File Types Supported:**\n",
- " - Google Docs\n",
- " - Google Sheets \n",
- " - PDF files\n",
- " - Text files\n",
- " \n",
- " **Tips:**\n",
- " - Use specific search terms for better results\n",
- " - The system searches the top 3 most relevant files\n",
- " - Ask clear, specific questions for better answers\n",
- " \"\"\")\n",
- " \n",
- " # Event handlers\n",
- " submit_btn.click(\n",
- " fn=process_user_query,\n",
- " inputs=[query_input, search_terms_input],\n",
- " outputs=[answer_output, sources_output]\n",
- " )\n",
- " \n",
- " status_btn.click(\n",
- " fn=check_setup,\n",
- " outputs=status_output\n",
- " )\n",
- " \n",
- " # Example queries\n",
- " with gr.Row():\n",
- " gr.Examples(\n",
- " examples=[\n",
- " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
- " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
- " ],\n",
- " inputs=[query_input, search_terms_input],\n",
- " )\n",
- "\n",
- "# Launch the app\n",
- "if __name__ == \"__main__\":\n",
- " app.launch(\n",
- " share=True,debug =True)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 42,
- "id": "593c4f63-cdda-46e8-aad6-9f9d7db66615",
- "metadata": {},
- "outputs": [],
- "source": [
- "#Gradio\n",
- "# !pip install gradio"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "859060a9-56f9-49e9-bb41-04ccf7b10d3f",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "d893999c-4f3f-4306-8b0b-072e3fbb8236",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 10,
- "id": "01b70311-ff22-4c88-bf22-64dc441afaab",
- "metadata": {},
- "outputs": [
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\tqdm\\auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
- " from .autonotebook import tqdm as notebook_tqdm\n"
- ]
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "* Running on local URL: http://127.0.0.1:7860\n",
- "* Running on public URL: https://8fac475cac6193423b.gradio.live\n",
- "\n",
- "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
- ]
- },
- {
- "data": {
- "text/html": [
- ""
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Keyboard interruption in main thread... closing server.\n",
- "Killing tunnel 127.0.0.1:7860 <> https://8fac475cac6193423b.gradio.live\n"
- ]
- }
- ],
- "source": [
- "import gradio as gr\n",
- "import os\n",
- "import io\n",
- "import openai\n",
- "from google.oauth2 import service_account\n",
- "from googleapiclient.discovery import build\n",
- "from googleapiclient.http import MediaIoBaseDownload\n",
- "\n",
- "class GPTDriveIntegration:\n",
- " def __init__(self):\n",
- " # Initialize Google Drive API\n",
- " try:\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
- " scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
- " )\n",
- " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
- " self.drive_initialized = True\n",
- " except Exception as e:\n",
- " self.drive_initialized = False\n",
- " self.drive_error = str(e)\n",
- " \n",
- " # Initialize OpenAI\n",
- " try:\n",
- " openai.api_key = os.getenv('OPENAI_API_KEY')\n",
- " self.openai_initialized = True\n",
- " except Exception as e:\n",
- " self.openai_initialized = False\n",
- " self.openai_error = str(e)\n",
- " \n",
- " def search_files(self, query, file_types=None):\n",
- " \"\"\"Search for files in Google Drive\"\"\"\n",
- " if not self.drive_initialized:\n",
- " return []\n",
- " \n",
- " search_query = f\"name contains '{query}'\"\n",
- " \n",
- " if file_types:\n",
- " type_queries = []\n",
- " for file_type in file_types:\n",
- " if file_type.lower() == 'pdf':\n",
- " type_queries.append(\"mimeType='application/pdf'\")\n",
- " elif file_type.lower() in ['doc', 'docx']:\n",
- " type_queries.append(\"mimeType contains 'document'\")\n",
- " elif file_type.lower() in ['xls', 'xlsx']:\n",
- " type_queries.append(\"mimeType contains 'spreadsheet'\")\n",
- " \n",
- " if type_queries:\n",
- " search_query += f\" and ({' or '.join(type_queries)})\"\n",
- " \n",
- " try:\n",
- " results = self.drive_service.files().list(\n",
- " q=search_query,\n",
- " fields=\"files(id, name, mimeType, size)\"\n",
- " ).execute()\n",
- " \n",
- " return results.get('files', [])\n",
- " except Exception as e:\n",
- " return []\n",
- " \n",
- " def get_file_content(self, file_id, mime_type):\n",
- " \"\"\"Download and extract text content from file\"\"\"\n",
- " try:\n",
- " if 'text' in mime_type or 'document' in mime_type:\n",
- " # For Google Docs, export as plain text\n",
- " if 'document' in mime_type:\n",
- " request = self.drive_service.files().export_media(\n",
- " fileId=file_id, mimeType='text/plain'\n",
- " )\n",
- " else:\n",
- " request = self.drive_service.files().get_media(fileId=file_id)\n",
- " \n",
- " file_content = io.BytesIO()\n",
- " downloader = MediaIoBaseDownload(file_content, request)\n",
- " done = False\n",
- " while done is False:\n",
- " status, done = downloader.next_chunk()\n",
- " \n",
- " return file_content.getvalue().decode('utf-8')\n",
- " \n",
- " elif 'spreadsheet' in mime_type:\n",
- " # For Google Sheets, export as CSV\n",
- " request = self.drive_service.files().export_media(\n",
- " fileId=file_id, mimeType='text/csv'\n",
- " )\n",
- " file_content = io.BytesIO()\n",
- " downloader = MediaIoBaseDownload(file_content, request)\n",
- " done = False\n",
- " while done is False:\n",
- " status, done = downloader.next_chunk()\n",
- " \n",
- " return file_content.getvalue().decode('utf-8')\n",
- " \n",
- " else:\n",
- " return \"File type not supported for text extraction\"\n",
- " \n",
- " except Exception as e:\n",
- " return f\"Error reading file: {str(e)}\"\n",
- " \n",
- " def query_gpt_with_context(self, user_query, file_contents):\n",
- " \"\"\"Send query to GPT with file context\"\"\"\n",
- " context = \"\\n\\n\".join([\n",
- " f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n",
- " for content in file_contents\n",
- " ])\n",
- " \n",
- " messages = [\n",
- " {\n",
- " \"role\": \"system\", \n",
- " \"content\": \"\"\"\n",
- " You are an AI assistant that can analyze anatomy documents from Google Drive. \n",
- " Use the provided file contents to answer user questions. Always answer straightforwardly. Don't Hallucinate.\n",
- " Always end with 'Do you have any other question?'\n",
- " \n",
- " \"\"\"\n",
- " },\n",
- " {\n",
- " \"role\": \"user\", \n",
- " \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n",
- " }\n",
- " ]\n",
- " \n",
- " try:\n",
- " response = openai.chat.completions.create(\n",
- " model=\"gpt-4o-mini\",\n",
- " messages=messages,\n",
- " max_tokens=1000\n",
- " )\n",
- " \n",
- " return response.choices[0].message.content\n",
- " except Exception as e:\n",
- " return f\"Error querying OpenAI: {str(e)}\"\n",
- " \n",
- " def process_query(self, user_query, search_terms=None):\n",
- " \"\"\"Main function to process user queries\"\"\"\n",
- " # Check if services are initialized\n",
- " if not self.drive_initialized:\n",
- " return {\n",
- " 'answer': f\"Google Drive API not initialized: {getattr(self, 'drive_error', 'Unknown error')}\",\n",
- " 'sources': []\n",
- " }\n",
- " \n",
- " if not self.openai_initialized:\n",
- " return {\n",
- " 'answer': f\"OpenAI API not initialized: {getattr(self, 'openai_error', 'Unknown error')}\",\n",
- " 'sources': []\n",
- " }\n",
- " \n",
- " # Extract search terms from query if not provided\n",
- " if not search_terms:\n",
- " search_terms = user_query.split()[:3] # Simple extraction\n",
- " \n",
- " # Search for relevant files\n",
- " files = []\n",
- " for term in search_terms:\n",
- " files.extend(self.search_files(term))\n",
- " \n",
- " # Remove duplicates\n",
- " unique_files = {f['id']: f for f in files}.values()\n",
- " \n",
- " # Get content from top 3 most relevant files\n",
- " file_contents = []\n",
- " for file in list(unique_files)[:3]:\n",
- " content = self.get_file_content(file['id'], file['mimeType'])\n",
- " file_contents.append({\n",
- " 'name': file['name'],\n",
- " 'text': content\n",
- " })\n",
- " \n",
- " # Query GPT with context\n",
- " if file_contents:\n",
- " response = self.query_gpt_with_context(user_query, file_contents)\n",
- " return {\n",
- " 'answer': response,\n",
- " 'sources': [f['name'] for f in file_contents]\n",
- " }\n",
- " else:\n",
- " return {\n",
- " 'answer': \"No relevant files found in your Google Drive.\",\n",
- " 'sources': []\n",
- " }\n",
- "\n",
- "# Initialize the GPT Drive Integration\n",
- "gpt_drive = GPTDriveIntegration()\n",
- "\n",
- "def process_user_query(query, search_terms_input):\n",
- " \"\"\"Process user query and return formatted response\"\"\"\n",
- " if not query.strip():\n",
- " return \"Please enter a question.\", \"\"\n",
- " \n",
- " # Parse search terms if provided\n",
- " search_terms = None\n",
- " if search_terms_input.strip():\n",
- " search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
- " \n",
- " # Process the query\n",
- " result = gpt_drive.process_query(query, search_terms)\n",
- " \n",
- " # Format the response\n",
- " answer = result['answer']\n",
- " sources = result['sources']\n",
- " \n",
- " sources_text = \"\"\n",
- " if sources:\n",
- " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
- " \n",
- " return answer, sources_text\n",
- "\n",
- "def check_setup():\n",
- " \"\"\"Check if the APIs are properly configured\"\"\"\n",
- " status_messages = []\n",
- " \n",
- " # Check Google Drive API\n",
- " if gpt_drive.drive_initialized:\n",
- " status_messages.append(\"✅ Google Drive API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
- " \n",
- " # Check OpenAI API\n",
- " if gpt_drive.openai_initialized:\n",
- " status_messages.append(\"✅ OpenAI API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
- " \n",
- " return \"\\n\".join(status_messages)\n",
- "\n",
- "# Create Gradio interface\n",
- "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
- " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
- " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
- " \n",
- " with gr.Row():\n",
- " with gr.Column(scale=2):\n",
- " # Main query interface\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Ask a Question\")\n",
- " query_input = gr.Textbox(\n",
- " label=\"Your Question\",\n",
- " placeholder=\"Ask me any question about your anatomy books?\",\n",
- " lines=3\n",
- " )\n",
- " \n",
- " search_terms_input = gr.Textbox(\n",
- " label=\"Search Terms (optional)\",\n",
- " placeholder=\"Enter comma-separated terms to search for specific files\",\n",
- " lines=1\n",
- " )\n",
- " \n",
- " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
- " \n",
- " # Results section\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Answer\")\n",
- " answer_output = gr.Textbox(\n",
- " label=\"AI Response\",\n",
- " lines=10,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " sources_output = gr.Textbox(\n",
- " label=\"Sources\",\n",
- " lines=3,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Column(scale=1):\n",
- " # Status and setup info\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### System Status\")\n",
- " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
- " status_output = gr.Textbox(\n",
- " label=\"API Status\",\n",
- " lines=4,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Setup Instructions\")\n",
- " gr.Markdown(\"\"\"\n",
- " **Important Notes:**\n",
- " 1.Only documents shared with it, it can answer\n",
- " \n",
- " **File Types Supported:**\n",
- " - Google Docs\n",
- " - Google Sheets \n",
- " - PDF files\n",
- " - Text files\n",
- " \n",
- " **Tips:**\n",
- " - Use specific search terms for better results\n",
- " - The system searches the top 3 most relevant files\n",
- " - Ask clear, specific questions for better answers\n",
- " \"\"\")\n",
- " \n",
- " # Event handlers\n",
- " submit_btn.click(\n",
- " fn=process_user_query,\n",
- " inputs=[query_input, search_terms_input],\n",
- " outputs=[answer_output, sources_output]\n",
- " )\n",
- " \n",
- " status_btn.click(\n",
- " fn=check_setup,\n",
- " outputs=status_output\n",
- " )\n",
- " \n",
- " # Example queries\n",
- " with gr.Row():\n",
- " gr.Examples(\n",
- " examples=[\n",
- " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
- " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
- " ],\n",
- " inputs=[query_input, search_terms_input],\n",
- " )\n",
- "\n",
- "# Launch the app\n",
- "if __name__ == \"__main__\":\n",
- " app.launch(\n",
- " share=True,\n",
- " debug=True\n",
- " )"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a6fd9c8c-e8e9-4b1c-8d21-d7ea2c42303a",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "95c85a4f-470f-4858-8172-78d7f9d04d1a",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "f237c114-7924-4074-9c11-c7ba13833293",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a2f9c7ae-35bb-4261-adc4-f24c6ab5c8a1",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "aaa446b0-a14e-4c98-b466-f503a521f052",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "5770d683-7bfc-49e9-a3c4-a67060e27620",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "2ef7744c-1346-4692-ac28-013ae0e8c4ac",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "08643ace-f94f-4a4a-bbc1-3289b7fc29a1",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "70849fd3-6f4e-4c57-b780-bda53db83e14",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "c18d5d20-439a-45a7-a847-25f659fd4a10",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 11,
- "id": "634b9787-d493-46e5-8114-0851c6172ed6",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "🔍 Starting Google Drive API Diagnostic...\n",
- "==================================================\n",
- "\n",
- "1️⃣ Checking service account file...\n",
- "✅ Service account file is valid\n",
- " 📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
- " 🏗️ Project ID: rag-system-463320\n",
- "\n",
- "2️⃣ Checking credentials...\n",
- "✅ Credentials created successfully\n",
- "\n",
- "3️⃣ Testing API connection...\n",
- "✅ Successfully connected to Google Drive API\n",
- " 👤 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
- "\n",
- "4️⃣ Checking basic permissions...\n",
- "✅ Can access Drive API - found 0 files in test query\n",
- "⚠️ No files found - this might mean:\n",
- " • Service account has no shared files\n",
- " • No files are shared with the service account\n",
- "\n",
- "5️⃣ Testing access to 'Blue berry' folder...\n",
- "❌ Folder 'Blue berry' not found or not accessible\n",
- "💡 Possible solutions:\n",
- " • Make sure the folder exists in Google Drive\n",
- " • Share the folder with your service account email\n",
- " • Check folder name spelling (case sensitive)\n",
- "\n",
- "✅ All basic checks passed!\n"
- ]
- }
- ],
- "source": [
- "import json\n",
- "from google.auth.exceptions import RefreshError\n",
- "from googleapiclient.errors import HttpError\n",
- "\n",
- "class GPTDriveTroubleshooter:\n",
- " def __init__(self, service_account_file_path):\n",
- " self.service_account_file = service_account_file_path\n",
- " self.credentials = None\n",
- " self.drive_service = None\n",
- " \n",
- " def run_full_diagnostic(self):\n",
- " \"\"\"Run complete diagnostic check\"\"\"\n",
- " print(\"🔍 Starting Google Drive API Diagnostic...\")\n",
- " print(\"=\" * 50)\n",
- " \n",
- " # Step 1: Check service account file\n",
- " if not self.check_service_account_file():\n",
- " return False\n",
- " \n",
- " # Step 2: Check credentials\n",
- " if not self.check_credentials():\n",
- " return False\n",
- " \n",
- " # Step 3: Test API connection\n",
- " if not self.test_api_connection():\n",
- " return False\n",
- " \n",
- " # Step 4: Check permissions\n",
- " if not self.check_basic_permissions():\n",
- " return False\n",
- " \n",
- " # Step 5: Test folder access\n",
- " self.test_folder_access()\n",
- " \n",
- " print(\"\\n✅ All basic checks passed!\")\n",
- " return True\n",
- " \n",
- " def check_service_account_file(self):\n",
- " \"\"\"Check if service account file exists and is valid\"\"\"\n",
- " print(\"\\n1️⃣ Checking service account file...\")\n",
- " \n",
- " if not os.path.exists(self.service_account_file):\n",
- " print(f\"❌ Service account file not found: {self.service_account_file}\")\n",
- " print(\"💡 Make sure you've downloaded the JSON key file from Google Cloud Console\")\n",
- " return False\n",
- " \n",
- " try:\n",
- " with open(self.service_account_file, 'r') as f:\n",
- " service_account_info = json.load(f)\n",
- " \n",
- " required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n",
- " missing_fields = [field for field in required_fields if field not in service_account_info]\n",
- " \n",
- " if missing_fields:\n",
- " print(f\"❌ Service account file missing required fields: {missing_fields}\")\n",
- " return False\n",
- " \n",
- " print(f\"✅ Service account file is valid\")\n",
- " print(f\" 📧 Service account email: {service_account_info['client_email']}\")\n",
- " print(f\" 🏗️ Project ID: {service_account_info['project_id']}\")\n",
- " \n",
- " return True\n",
- " \n",
- " except json.JSONDecodeError:\n",
- " print(\"❌ Service account file is not valid JSON\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Error reading service account file: {e}\")\n",
- " return False\n",
- " \n",
- " def check_credentials(self):\n",
- " \"\"\"Check if credentials can be created\"\"\"\n",
- " print(\"\\n2️⃣ Checking credentials...\")\n",
- " \n",
- " try:\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " self.service_account_file,\n",
- " scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
- " )\n",
- " print(\"✅ Credentials created successfully\")\n",
- " return True\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Failed to create credentials: {e}\")\n",
- " print(\"💡 Check if your service account key file is corrupted\")\n",
- " return False\n",
- " \n",
- " def test_api_connection(self):\n",
- " \"\"\"Test basic API connection\"\"\"\n",
- " print(\"\\n3️⃣ Testing API connection...\")\n",
- " \n",
- " try:\n",
- " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
- " \n",
- " # Try a simple API call\n",
- " about = self.drive_service.about().get(fields=\"user, storageQuota\").execute()\n",
- " print(\"✅ Successfully connected to Google Drive API\")\n",
- " print(f\" 👤 Connected as: {about.get('user', {}).get('emailAddress', 'Unknown')}\")\n",
- " \n",
- " return True\n",
- " \n",
- " except HttpError as e:\n",
- " print(f\"❌ HTTP Error connecting to API: {e}\")\n",
- " if e.resp.status == 403:\n",
- " print(\"💡 This is likely a permissions issue - check if Drive API is enabled\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Failed to connect to API: {e}\")\n",
- " return False\n",
- " \n",
- " def check_basic_permissions(self):\n",
- " \"\"\"Check basic file listing permissions\"\"\"\n",
- " print(\"\\n4️⃣ Checking basic permissions...\")\n",
- " \n",
- " try:\n",
- " # Try to list files (this should work with readonly access)\n",
- " results = self.drive_service.files().list(\n",
- " pageSize=1,\n",
- " fields=\"files(id, name)\"\n",
- " ).execute()\n",
- " \n",
- " files = results.get('files', [])\n",
- " print(f\"✅ Can access Drive API - found {len(files)} files in test query\")\n",
- " \n",
- " if len(files) == 0:\n",
- " print(\"⚠️ No files found - this might mean:\")\n",
- " print(\" • Service account has no shared files\")\n",
- " print(\" • No files are shared with the service account\")\n",
- " \n",
- " return True\n",
- " \n",
- " except HttpError as e:\n",
- " print(f\"❌ Permission error: {e}\")\n",
- " if e.resp.status == 403:\n",
- " print(\"💡 Common causes:\")\n",
- " print(\" • Google Drive API not enabled in Google Cloud Console\")\n",
- " print(\" • Service account doesn't have proper permissions\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Error checking permissions: {e}\")\n",
- " return False\n",
- " \n",
- " def test_folder_access(self, folder_name=\"Blue berry\"):\n",
- " \"\"\"Test access to specific folder\"\"\"\n",
- " print(f\"\\n5️⃣ Testing access to '{folder_name}' folder...\")\n",
- " \n",
- " try:\n",
- " # Search for the folder\n",
- " query = f\"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false\"\n",
- " results = self.drive_service.files().list(\n",
- " q=query,\n",
- " fields=\"files(id, name, owners, permissions)\"\n",
- " ).execute()\n",
- " \n",
- " folders = results.get('files', [])\n",
- " \n",
- " if not folders:\n",
- " print(f\"❌ Folder '{folder_name}' not found or not accessible\")\n",
- " print(\"💡 Possible solutions:\")\n",
- " print(\" • Make sure the folder exists in Google Drive\")\n",
- " print(\" • Share the folder with your service account email\")\n",
- " print(\" • Check folder name spelling (case sensitive)\")\n",
- " return False\n",
- " \n",
- " folder = folders[0]\n",
- " print(f\"✅ Found folder '{folder_name}'\")\n",
- " print(f\" 📁 Folder ID: {folder['id']}\")\n",
- " \n",
- " # Test listing files in the folder\n",
- " files_in_folder = self.drive_service.files().list(\n",
- " q=f\"'{folder['id']}' in parents and trashed=false\",\n",
- " fields=\"files(id, name, mimeType)\"\n",
- " ).execute()\n",
- " \n",
- " files = files_in_folder.get('files', [])\n",
- " print(f\" 📄 Contains {len(files)} files\")\n",
- " \n",
- " if files:\n",
- " print(\" 📝 Sample files:\")\n",
- " for file in files[:3]: # Show first 3 files\n",
- " print(f\" • {file['name']} ({file['mimeType']})\")\n",
- " \n",
- " return True\n",
- " \n",
- " except HttpError as e:\n",
- " print(f\"❌ HTTP Error accessing folder: {e}\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Error accessing folder: {e}\")\n",
- " return False\n",
- " \n",
- " def check_file_permissions(self, file_id):\n",
- " \"\"\"Check permissions for a specific file\"\"\"\n",
- " print(f\"\\n🔍 Checking permissions for file ID: {file_id}\")\n",
- " \n",
- " try:\n",
- " file_info = self.drive_service.files().get(\n",
- " fileId=file_id,\n",
- " fields=\"id, name, mimeType, owners, permissions, capabilities\"\n",
- " ).execute()\n",
- " \n",
- " print(f\"✅ File: {file_info['name']}\")\n",
- " print(f\" 🔗 Type: {file_info['mimeType']}\")\n",
- " print(f\" 👤 Owner: {file_info.get('owners', [{}])[0].get('emailAddress', 'Unknown')}\")\n",
- " \n",
- " capabilities = file_info.get('capabilities', {})\n",
- " print(f\" 📖 Can read: {capabilities.get('canDownload', False)}\")\n",
- " print(f\" 📤 Can export: {capabilities.get('canExport', False)}\")\n",
- " \n",
- " except HttpError as e:\n",
- " print(f\"❌ Cannot access file: {e}\")\n",
- " except Exception as e:\n",
- " print(f\"❌ Error: {e}\")\n",
- " \n",
- " def get_sharing_instructions(self):\n",
- " \"\"\"Provide step-by-step sharing instructions\"\"\"\n",
- " print(\"\\n📋 HOW TO SHARE FOLDER WITH SERVICE ACCOUNT:\")\n",
- " print(\"=\" * 50)\n",
- " \n",
- " if self.credentials:\n",
- " service_email = self.credentials.service_account_email\n",
- " print(f\"1. Copy this service account email: {service_email}\")\n",
- " else:\n",
- " print(\"1. Find your service account email in the JSON key file (client_email field)\")\n",
- " \n",
- " print(\"2. Open Google Drive in your browser\")\n",
- " print(\"3. Right-click on your 'Blue berry' folder\")\n",
- " print(\"4. Select 'Share'\")\n",
- " print(\"5. Paste the service account email\")\n",
- " print(\"6. Set permission to 'Viewer' or 'Editor'\")\n",
- " print(\"7. Click 'Send' (you can uncheck 'Notify people')\")\n",
- " print(\"8. Wait a few minutes for permissions to propagate\")\n",
- " \n",
- " def run_connection_test(self):\n",
- " \"\"\"Quick connection test\"\"\"\n",
- " print(\"🚀 Quick Connection Test\")\n",
- " print(\"-\" * 30)\n",
- " \n",
- " try:\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " self.service_account_file,\n",
- " scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
- " )\n",
- " \n",
- " service = build('drive', 'v3', credentials=self.credentials)\n",
- " \n",
- " # Test basic query\n",
- " results = service.files().list(pageSize=5).execute()\n",
- " files = results.get('files', [])\n",
- " \n",
- " print(f\"✅ Connected! Found {len(files)} accessible files\")\n",
- " print(f\"📧 Service account: {self.credentials.service_account_email}\")\n",
- " \n",
- " return True\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Connection failed: {e}\")\n",
- " return False\n",
- "\n",
- "# Usage Examples\n",
- "def troubleshoot_drive_access():\n",
- " \"\"\"Main troubleshooting function\"\"\"\n",
- " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
- " \n",
- " if not service_account_file:\n",
- " print(\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n",
- " print(\"💡 Add this to your .env file:\")\n",
- " print(\"GOOGLE_SERVICE_ACCOUNT_FILE=path/to/your/service-account-key.json\")\n",
- " return\n",
- " \n",
- " troubleshooter = GPTDriveTroubleshooter(service_account_file)\n",
- " \n",
- " # Run full diagnostic\n",
- " success = troubleshooter.run_full_diagnostic()\n",
- " \n",
- " if not success:\n",
- " print(\"\\n\" + \"=\"*50)\n",
- " troubleshooter.get_sharing_instructions()\n",
- " \n",
- " return success\n",
- "\n",
- "# Quick test function\n",
- "def quick_test():\n",
- " \"\"\"Quick test to verify everything works\"\"\"\n",
- " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
- " troubleshooter = GPTDriveTroubleshooter(service_account_file)\n",
- " return troubleshooter.run_connection_test()\n",
- "\n",
- "if __name__ == \"__main__\":\n",
- " # Uncomment the test you want to run:\n",
- " \n",
- " # Full diagnostic (recommended for first-time setup)\n",
- " troubleshoot_drive_access()\n",
- " \n",
- " # Quick test (for regular checks)\n",
- " # quick_test()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "6b336789-d402-455d-a631-26e987d79ed6",
- "metadata": {},
- "outputs": [],
- "source": [
- "troubleshooter = GPTDriveTroubleshooter(GOOGLE_SERVICE_ACCOUNT_FILE)\n",
- "troubleshooter.test_folder_access(\"Blue berry\")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "e2786c67-215e-46b0-b3ca-2c3176bd8971",
- "metadata": {},
- "outputs": [],
- "source": [
- "# Add this test function to verify access\n",
- "def test_folder_access():\n",
- " integration = GPTDriveIntegration()\n",
- " folder_id = integration.find_folder_by_name(\"Blue berry\")\n",
- " if folder_id:\n",
- " print(\"✅ Successfully found 'Blue berry' folder!\")\n",
- " \n",
- " # List files in the folder\n",
- " results = integration.drive_service.files().list(\n",
- " q=f\"'{folder_id}' in parents and trashed=false\",\n",
- " fields=\"files(id, name, mimeType)\"\n",
- " ).execute()\n",
- " \n",
- " files = results.get('files', [])\n",
- " print(f\"Found {len(files)} files in the folder:\")\n",
- " for file in files[:5]: # Show first 5 files\n",
- " print(f\" - {file['name']} ({file['mimeType']})\")\n",
- " else:\n",
- " print(\"❌ Could not access 'Blue berry' folder\")\n",
- " print(\"Make sure you've shared the folder with your service account\")\n",
- "\n",
- "# Run this test first\n",
- "test_folder_access()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "c3dede9f-5e01-436d-a7b7-905e1646baf9",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "b933c8b6-add6-40fc-827f-e5e07447ac00",
- "metadata": {},
- "outputs": [],
- "source": [
- "#Troubleshooting"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 32,
- "id": "b63a1f4b-1315-44fe-ba84-93fb1c2655cc",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "🔍 Searching for 'Blue Berry' folder...\n",
- "Total folders found: 2\n",
- "\n",
- "✅ Found 1 'Blue Berry' folder(s):\n",
- "📁 Anatomy Books \n",
- " Full Path: Anatomy Books \n",
- " Folder ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q\n",
- "\n"
- ]
- }
- ],
- "source": [
- "import os\n",
- "from google.oauth2 import service_account\n",
- "from googleapiclient.discovery import build\n",
- "\n",
- "class DrivefolderLister:\n",
- " def __init__(self):\n",
- " # Initialize Google Drive API\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
- " scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
- " )\n",
- " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
- " \n",
- " def get_all_folders(self):\n",
- " \"\"\"Get ALL folders in the Google Drive\"\"\"\n",
- " try:\n",
- " # Query to get all folders (not trashed)\n",
- " query = \"mimeType='application/vnd.google-apps.folder' and trashed=false\"\n",
- " \n",
- " all_folders = []\n",
- " page_token = None\n",
- " \n",
- " while True:\n",
- " results = self.drive_service.files().list(\n",
- " q=query,\n",
- " fields=\"nextPageToken, files(id, name, parents)\",\n",
- " pageSize=1000, # Maximum allowed\n",
- " pageToken=page_token\n",
- " ).execute()\n",
- " \n",
- " folders = results.get('files', [])\n",
- " all_folders.extend(folders)\n",
- " \n",
- " page_token = results.get('nextPageToken')\n",
- " if not page_token:\n",
- " break\n",
- " \n",
- " print(f\"Total folders found: {len(all_folders)}\")\n",
- " return all_folders\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"Error retrieving folders: {e}\")\n",
- " return []\n",
- " \n",
- " def build_folder_tree(self, folders):\n",
- " \"\"\"Build a tree structure from the flat list of folders\"\"\"\n",
- " # Create a dictionary for quick lookup\n",
- " folder_dict = {folder['id']: folder for folder in folders}\n",
- " \n",
- " # Add root folder\n",
- " folder_dict['root'] = {'id': 'root', 'name': 'My Drive', 'parents': []}\n",
- " \n",
- " # Build the tree structure\n",
- " tree = {}\n",
- " \n",
- " def build_path(folder_id, visited=None):\n",
- " if visited is None:\n",
- " visited = set()\n",
- " \n",
- " if folder_id in visited: # Prevent infinite loops\n",
- " return \"CIRCULAR_REFERENCE\"\n",
- " \n",
- " visited.add(folder_id)\n",
- " \n",
- " if folder_id not in folder_dict:\n",
- " return \"UNKNOWN\"\n",
- " \n",
- " folder = folder_dict[folder_id]\n",
- " \n",
- " if folder_id == 'root':\n",
- " return \"My Drive\"\n",
- " \n",
- " parents = folder.get('parents', [])\n",
- " if not parents:\n",
- " return folder['name']\n",
- " \n",
- " parent_path = build_path(parents[0], visited.copy())\n",
- " return f\"{parent_path}/{folder['name']}\"\n",
- " \n",
- " # Build paths for all folders\n",
- " folder_paths = []\n",
- " for folder in folders:\n",
- " path = build_path(folder['id'])\n",
- " folder_paths.append({\n",
- " 'name': folder['name'],\n",
- " 'id': folder['id'],\n",
- " 'path': path\n",
- " })\n",
- " \n",
- " return folder_paths\n",
- " \n",
- " def search_folder_by_name(self, folder_name, folders_with_paths):\n",
- " \"\"\"Search for folders by name (case-insensitive)\"\"\"\n",
- " matches = []\n",
- " search_name = folder_name.lower()\n",
- " \n",
- " for folder in folders_with_paths:\n",
- " if search_name in folder['name'].lower():\n",
- " matches.append(folder)\n",
- " \n",
- " return matches\n",
- " \n",
- " def display_all_folders(self, folders_with_paths, search_term=None):\n",
- " \"\"\"Display all folders in a readable format\"\"\"\n",
- " if search_term:\n",
- " print(f\"\\n=== Searching for folders containing '{search_term}' ===\")\n",
- " matches = self.search_folder_by_name(search_term, folders_with_paths)\n",
- " if matches:\n",
- " print(f\"Found {len(matches)} matching folders:\")\n",
- " for folder in matches:\n",
- " print(f\"📁 {folder['name']}\")\n",
- " print(f\" Path: {folder['path']}\")\n",
- " print(f\" ID: {folder['id']}\")\n",
- " print()\n",
- " else:\n",
- " print(f\"No folders found containing '{search_term}'\")\n",
- " else:\n",
- " print(f\"\\n=== All {len(folders_with_paths)} folders in your Google Drive ===\")\n",
- " \n",
- " # Sort by path for better readability\n",
- " sorted_folders = sorted(folders_with_paths, key=lambda x: x['path'])\n",
- " \n",
- " for folder in sorted_folders:\n",
- " print(f\"📁 {folder['path']}\")\n",
- " # print(f\" ID: {folder['id']}\") # Uncomment if you need IDs\n",
- " \n",
- " def find_blue_berry_folder(self):\n",
- " \"\"\"Specifically look for the Anatomy Books folder\"\"\"\n",
- " print(\"🔍 Searching for 'Blue Berry' folder...\")\n",
- " \n",
- " folders = self.get_all_folders()\n",
- " if not folders:\n",
- " print(\"No folders found or error occurred.\")\n",
- " return\n",
- " \n",
- " folders_with_paths = self.build_folder_tree(folders)\n",
- " \n",
- " # Search for Anatomy Books specifically\n",
- " blue_berry_matches = self.search_folder_by_name(\"Anatomy Books\", folders_with_paths)\n",
- " \n",
- " if blue_berry_matches:\n",
- " print(f\"\\n✅ Found {len(blue_berry_matches)} 'Blue Berry' folder(s):\")\n",
- " for folder in blue_berry_matches:\n",
- " print(f\"📁 {folder['name']}\")\n",
- " print(f\" Full Path: {folder['path']}\")\n",
- " print(f\" Folder ID: {folder['id']}\")\n",
- " print()\n",
- " else:\n",
- " print(\"\\n❌ No 'Blue Berry' folder found.\")\n",
- " print(\"Let me show you all folders to help you locate it:\")\n",
- " \n",
- " # Show all folders if Blue Berry not found\n",
- " self.display_all_folders(folders_with_paths)\n",
- " \n",
- " def interactive_folder_search(self):\n",
- " \"\"\"Interactive search for any folder\"\"\"\n",
- " print(\"📂 Google Drive Folder Explorer\")\n",
- " print(\"=\" * 40)\n",
- " \n",
- " folders = self.get_all_folders()\n",
- " if not folders:\n",
- " print(\"No folders found or error occurred.\")\n",
- " return\n",
- " \n",
- " folders_with_paths = self.build_folder_tree(folders)\n",
- " \n",
- " while True:\n",
- " print(\"\\nOptions:\")\n",
- " print(\"1. Search for a specific folder\")\n",
- " print(\"2. Show all folders\")\n",
- " print(\"3. Find 'Blue Berry' folder\")\n",
- " print(\"4. Exit\")\n",
- " \n",
- " choice = input(\"\\nEnter your choice (1-4): \").strip()\n",
- " \n",
- " if choice == '1':\n",
- " search_term = input(\"Enter folder name to search: \").strip()\n",
- " if search_term:\n",
- " self.display_all_folders(folders_with_paths, search_term)\n",
- " \n",
- " elif choice == '2':\n",
- " self.display_all_folders(folders_with_paths)\n",
- " \n",
- " elif choice == '3':\n",
- " blue_berry_matches = self.search_folder_by_name(\"blue berry\", folders_with_paths)\n",
- " if blue_berry_matches:\n",
- " print(f\"\\n✅ Found 'Blue Berry' folder(s):\")\n",
- " for folder in blue_berry_matches:\n",
- " print(f\"📁 {folder['name']} -> {folder['path']}\")\n",
- " else:\n",
- " print(\"\\n❌ 'Blue Berry' folder not found in your Drive.\")\n",
- " \n",
- " elif choice == '4':\n",
- " print(\"Goodbye!\")\n",
- " break\n",
- " \n",
- " else:\n",
- " print(\"Invalid choice. Please try again.\")\n",
- "\n",
- "def main():\n",
- " \"\"\"Main function to run the folder lister\"\"\"\n",
- " try:\n",
- " lister = DrivefolderLister()\n",
- " \n",
- " # Quick search for Blue Berry folder\n",
- " lister.find_blue_berry_folder()\n",
- " \n",
- " # Uncomment the line below for interactive mode\n",
- " # lister.interactive_folder_search()\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"Error initializing Drive connection: {e}\")\n",
- " print(\"Make sure your GOOGLE_SERVICE_ACCOUNT_FILE environment variable is set correctly.\")\n",
- "\n",
- "if __name__ == \"__main__\":\n",
- " main()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "29a61039-9043-44d5-94b1-4847350b2200",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 31,
- "id": "7350d687-f76e-46f4-a30b-0518e5b8236e",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "🚀 Google Drive API Diagnostic Tool\n",
- "==================================================\n",
- "🔧 Testing Environment Variables...\n",
- "----------------------------------------\n",
- "✅ GOOGLE_SERVICE_ACCOUNT_FILE: C:/Users/Uche Buzz/myprojects/RAG/rag-system-463320-f292991d0516.json\n",
- "✅ Service account file exists\n",
- "✅ Service account file is valid JSON\n",
- "✅ All required fields present in service account file\n",
- "📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
- "✅ OPENAI_API_KEY: ************************************************************************************************************************************************************K1OkRRkA\n",
- "\n",
- "🔑 Testing Credentials Loading...\n",
- "----------------------------------------\n",
- "\n",
- "Testing scope set 1: ['https://www.googleapis.com/auth/drive.readonly']\n",
- "✅ Credentials loaded successfully with scopes: ['https://www.googleapis.com/auth/drive.readonly']\n",
- "\n",
- "🔧 Testing Drive Service Creation...\n",
- "----------------------------------------\n",
- "✅ Google Drive service created successfully\n",
- "\n",
- "📡 Testing Basic API Call...\n",
- "----------------------------------------\n",
- "✅ API call successful!\n",
- "📧 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
- "👤 Display name: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
- "\n",
- "🏠 Testing Root Folder Access...\n",
- "----------------------------------------\n",
- "✅ Root folder accessible\n",
- "📁 Root folder name: My Drive\n",
- "📄 Items in root folder: 0\n",
- "\n",
- "📁 Testing File Listing...\n",
- "----------------------------------------\n",
- "Test 1: Listing all files (including documents)...\n",
- "✅ Found 7 files total\n",
- "📄 First few files:\n",
- " - Neuro Note Dr Clement.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)\n",
- " - Morbid Anatomy - ANA 3107.pdf (application/pdf)\n",
- " - ANT & POST TRIANGLES OF THE NECK.pdf (application/pdf)\n",
- " - Anatomy Books (application/vnd.google-apps.folder)\n",
- " - Blue Berry (application/vnd.google-apps.folder)\n",
- "\n",
- "Test 2: Listing folders only...\n",
- "✅ Found 2 folders\n",
- "📁 Folders found:\n",
- " - Anatomy Books (ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q)\n",
- " - Blue Berry (ID: 1AYaS0yt_srFlgdE4mSNlqA6FLm10rdt4)\n",
- "\n",
- "✅ All diagnostic tests passed!\n",
- "🎉 Your Google Drive API connection is working correctly!\n"
- ]
- }
- ],
- "source": [
- "import os\n",
- "from google.oauth2 import service_account\n",
- "from googleapiclient.discovery import build\n",
- "from googleapiclient.errors import HttpError\n",
- "import json\n",
- "\n",
- "class DriveConnectionDiagnostic:\n",
- " def __init__(self):\n",
- " self.credentials = None\n",
- " self.drive_service = None\n",
- " self.setup_success = False\n",
- " \n",
- " def test_environment_variables(self):\n",
- " \"\"\"Test if environment variables are set correctly\"\"\"\n",
- " print(\"🔧 Testing Environment Variables...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
- " openai_key = os.getenv('OPENAI_API_KEY')\n",
- " \n",
- " if service_account_file:\n",
- " print(f\"✅ GOOGLE_SERVICE_ACCOUNT_FILE: {service_account_file}\")\n",
- " \n",
- " # Check if file exists\n",
- " if os.path.exists(service_account_file):\n",
- " print(f\"✅ Service account file exists\")\n",
- " \n",
- " # Check if file is readable\n",
- " try:\n",
- " with open(service_account_file, 'r') as f:\n",
- " service_data = json.load(f)\n",
- " print(f\"✅ Service account file is valid JSON\")\n",
- " \n",
- " # Check required fields\n",
- " required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n",
- " missing_fields = [field for field in required_fields if field not in service_data]\n",
- " \n",
- " if missing_fields:\n",
- " print(f\"❌ Missing required fields in service account file: {missing_fields}\")\n",
- " return False\n",
- " else:\n",
- " print(f\"✅ All required fields present in service account file\")\n",
- " print(f\"📧 Service account email: {service_data.get('client_email')}\")\n",
- " \n",
- " except json.JSONDecodeError:\n",
- " print(f\"❌ Service account file is not valid JSON\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Error reading service account file: {e}\")\n",
- " return False\n",
- " \n",
- " else:\n",
- " print(f\"❌ Service account file does not exist at: {service_account_file}\")\n",
- " return False\n",
- " else:\n",
- " print(f\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n",
- " return False\n",
- " \n",
- " if openai_key:\n",
- " print(f\"✅ OPENAI_API_KEY: {'*' * (len(openai_key) - 8) + openai_key[-8:]}\")\n",
- " else:\n",
- " print(f\"⚠️ OPENAI_API_KEY not set (not required for folder listing)\")\n",
- " \n",
- " return True\n",
- " \n",
- " def test_credentials_loading(self):\n",
- " \"\"\"Test if credentials can be loaded\"\"\"\n",
- " print(\"\\n🔑 Testing Credentials Loading...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " try:\n",
- " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
- " \n",
- " # Try different scope combinations\n",
- " scopes_to_test = [\n",
- " ['https://www.googleapis.com/auth/drive.readonly'],\n",
- " ['https://www.googleapis.com/auth/drive'],\n",
- " ['https://www.googleapis.com/auth/drive.metadata.readonly'],\n",
- " ['https://www.googleapis.com/auth/drive.file']\n",
- " ]\n",
- " \n",
- " for i, scopes in enumerate(scopes_to_test):\n",
- " try:\n",
- " print(f\"\\nTesting scope set {i+1}: {scopes}\")\n",
- " self.credentials = service_account.Credentials.from_service_account_file(\n",
- " service_account_file,\n",
- " scopes=scopes\n",
- " )\n",
- " print(f\"✅ Credentials loaded successfully with scopes: {scopes}\")\n",
- " return True\n",
- " except Exception as e:\n",
- " print(f\"❌ Failed to load credentials with scopes {scopes}: {e}\")\n",
- " \n",
- " return False\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Error loading credentials: {e}\")\n",
- " return False\n",
- " \n",
- " def test_drive_service_creation(self):\n",
- " \"\"\"Test if Drive service can be created\"\"\"\n",
- " print(\"\\n🔧 Testing Drive Service Creation...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " try:\n",
- " if not self.credentials:\n",
- " print(\"❌ No credentials available\")\n",
- " return False\n",
- " \n",
- " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
- " print(\"✅ Google Drive service created successfully\")\n",
- " return True\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Error creating Drive service: {e}\")\n",
- " return False\n",
- " \n",
- " def test_basic_api_call(self):\n",
- " \"\"\"Test basic API functionality\"\"\"\n",
- " print(\"\\n📡 Testing Basic API Call...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " try:\n",
- " if not self.drive_service:\n",
- " print(\"❌ No Drive service available\")\n",
- " return False\n",
- " \n",
- " # Test getting user info\n",
- " about = self.drive_service.about().get(fields=\"user\").execute()\n",
- " user_info = about.get('user', {})\n",
- " print(f\"✅ API call successful!\")\n",
- " print(f\"📧 Connected as: {user_info.get('emailAddress', 'Unknown')}\")\n",
- " print(f\"👤 Display name: {user_info.get('displayName', 'Unknown')}\")\n",
- " \n",
- " return True\n",
- " \n",
- " except HttpError as e:\n",
- " print(f\"❌ HTTP Error: {e}\")\n",
- " if e.resp.status == 403:\n",
- " print(\" This might be a permissions issue. Check:\")\n",
- " print(\" 1. Is the service account enabled?\")\n",
- " print(\" 2. Does it have the right permissions?\")\n",
- " print(\" 3. Is the Google Drive API enabled in your project?\")\n",
- " return False\n",
- " except Exception as e:\n",
- " print(f\"❌ Error making API call: {e}\")\n",
- " return False\n",
- " \n",
- " def test_file_listing(self):\n",
- " \"\"\"Test different file listing approaches\"\"\"\n",
- " print(\"\\n📁 Testing File Listing...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " if not self.drive_service:\n",
- " print(\"❌ No Drive service available\")\n",
- " return False\n",
- " \n",
- " # Test 1: List any files (not just folders)\n",
- " try:\n",
- " print(\"Test 1: Listing all files (including documents)...\")\n",
- " results = self.drive_service.files().list(\n",
- " q=\"trashed=false\",\n",
- " fields=\"files(id, name, mimeType)\",\n",
- " pageSize=10\n",
- " ).execute()\n",
- " \n",
- " files = results.get('files', [])\n",
- " print(f\"✅ Found {len(files)} files total\")\n",
- " \n",
- " if files:\n",
- " print(\"📄 First few files:\")\n",
- " for file in files[:5]:\n",
- " print(f\" - {file['name']} ({file['mimeType']})\")\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Error listing files: {e}\")\n",
- " return False\n",
- " \n",
- " # Test 2: List only folders\n",
- " try:\n",
- " print(f\"\\nTest 2: Listing folders only...\")\n",
- " results = self.drive_service.files().list(\n",
- " q=\"mimeType='application/vnd.google-apps.folder' and trashed=false\",\n",
- " fields=\"files(id, name, parents)\",\n",
- " pageSize=10\n",
- " ).execute()\n",
- " \n",
- " folders = results.get('files', [])\n",
- " print(f\"✅ Found {len(folders)} folders\")\n",
- " \n",
- " if folders:\n",
- " print(\"📁 Folders found:\")\n",
- " for folder in folders:\n",
- " print(f\" - {folder['name']} (ID: {folder['id']})\")\n",
- " else:\n",
- " print(\"⚠️ No folders found - this might indicate:\")\n",
- " print(\" 1. Your Google Drive is empty\")\n",
- " print(\" 2. The service account doesn't have access to your personal Drive\")\n",
- " print(\" 3. You need to share folders with the service account\")\n",
- " \n",
- " return len(folders) > 0\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Error listing folders: {e}\")\n",
- " return False\n",
- " \n",
- " def test_root_access(self):\n",
- " \"\"\"Test access to root folder\"\"\"\n",
- " print(\"\\n🏠 Testing Root Folder Access...\")\n",
- " print(\"-\" * 40)\n",
- " \n",
- " try:\n",
- " if not self.drive_service:\n",
- " print(\"❌ No Drive service available\")\n",
- " return False\n",
- " \n",
- " # Try to get root folder info\n",
- " root_info = self.drive_service.files().get(\n",
- " fileId='root',\n",
- " fields=\"id, name, mimeType\"\n",
- " ).execute()\n",
- " \n",
- " print(f\"✅ Root folder accessible\")\n",
- " print(f\"📁 Root folder name: {root_info.get('name', 'My Drive')}\")\n",
- " \n",
- " # Try to list contents of root\n",
- " results = self.drive_service.files().list(\n",
- " q=\"'root' in parents and trashed=false\",\n",
- " fields=\"files(id, name, mimeType)\",\n",
- " pageSize=10\n",
- " ).execute()\n",
- " \n",
- " root_contents = results.get('files', [])\n",
- " print(f\"📄 Items in root folder: {len(root_contents)}\")\n",
- " \n",
- " if root_contents:\n",
- " print(\"🔍 Root folder contents:\")\n",
- " for item in root_contents[:5]:\n",
- " item_type = \"📁\" if \"folder\" in item['mimeType'] else \"📄\"\n",
- " print(f\" {item_type} {item['name']}\")\n",
- " \n",
- " return True\n",
- " \n",
- " except Exception as e:\n",
- " print(f\"❌ Error accessing root folder: {e}\")\n",
- " return False\n",
- " \n",
- " def run_full_diagnostic(self):\n",
- " \"\"\"Run complete diagnostic\"\"\"\n",
- " print(\"🚀 Google Drive API Diagnostic Tool\")\n",
- " print(\"=\" * 50)\n",
- " \n",
- " # Step 1: Environment variables\n",
- " if not self.test_environment_variables():\n",
- " print(\"\\n❌ Environment setup failed. Please check your service account file.\")\n",
- " return False\n",
- " \n",
- " # Step 2: Credentials\n",
- " if not self.test_credentials_loading():\n",
- " print(\"\\n❌ Credentials loading failed.\")\n",
- " return False\n",
- " \n",
- " # Step 3: Service creation\n",
- " if not self.test_drive_service_creation():\n",
- " print(\"\\n❌ Drive service creation failed.\")\n",
- " return False\n",
- " \n",
- " # Step 4: Basic API call\n",
- " if not self.test_basic_api_call():\n",
- " print(\"\\n❌ Basic API call failed.\")\n",
- " return False\n",
- " \n",
- " # Step 5: Root access\n",
- " if not self.test_root_access():\n",
- " print(\"\\n❌ Root folder access failed.\")\n",
- " return False\n",
- " \n",
- " # Step 6: File listing\n",
- " if not self.test_file_listing():\n",
- " print(\"\\n❌ File listing failed or no folders found.\")\n",
- " print(\"\\n🔧 SOLUTION SUGGESTIONS:\")\n",
- " print(\"1. If you're using a service account, you need to SHARE folders with it\")\n",
- " print(\"2. Share your 'Blue Berry' folder with the service account email\")\n",
- " print(\"3. Or consider using OAuth2 instead of service account for personal Drive access\")\n",
- " return False\n",
- " \n",
- " print(\"\\n✅ All diagnostic tests passed!\")\n",
- " print(\"🎉 Your Google Drive API connection is working correctly!\")\n",
- " return True\n",
- " \n",
- " def show_service_account_sharing_instructions(self):\n",
- " \"\"\"Show instructions for sharing with service account\"\"\"\n",
- " print(\"\\n📋 SERVICE ACCOUNT SHARING INSTRUCTIONS:\")\n",
- " print(\"=\" * 50)\n",
- " print(\"If you're using a service account, you need to share folders with it:\")\n",
- " print()\n",
- " print(\"1. Open Google Drive in your browser\")\n",
- " print(\"2. Find the folder you want to access (e.g., 'Blue Berry')\")\n",
- " print(\"3. Right-click the folder → 'Share'\")\n",
- " print(\"4. Add the service account email address\")\n",
- " print(\"5. Give it 'Viewer' or 'Editor' permissions\")\n",
- " print(\"6. Click 'Send'\")\n",
- " print()\n",
- " \n",
- " if self.credentials:\n",
- " try:\n",
- " service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
- " with open(service_account_file, 'r') as f:\n",
- " service_data = json.load(f)\n",
- " email = service_data.get('client_email')\n",
- " print(f\"📧 Your service account email: {email}\")\n",
- " print(\" ^ Share your folders with this email address\")\n",
- " except:\n",
- " print(\"❌ Could not read service account email\")\n",
- "\n",
- "def main():\n",
- " diagnostic = DriveConnectionDiagnostic()\n",
- " success = diagnostic.run_full_diagnostic()\n",
- " \n",
- " if not success:\n",
- " diagnostic.show_service_account_sharing_instructions()\n",
- "\n",
- "if __name__ == \"__main__\":\n",
- " main()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "e3374b17-d2fa-4c80-a70b-76b8bf94c702",
- "metadata": {},
- "outputs": [],
- "source": [
- "#flask"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 30,
- "id": "c8ac4c93-8b36-49d2-b8e1-efb7d1c4b104",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- " * Serving Flask app '__main__'\n",
- " * Debug mode: on\n"
- ]
- },
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.\n",
- " * Running on http://127.0.0.1:5000\n",
- "Press CTRL+C to quit\n",
- " * Restarting with stat\n"
- ]
- },
- {
- "ename": "SystemExit",
- "evalue": "1",
- "output_type": "error",
- "traceback": [
- "An exception has occurred, use %tb to see the full traceback.\n",
- "\u001b[31mSystemExit\u001b[39m\u001b[31m:\u001b[39m 1\n"
- ]
- },
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\IPython\\core\\interactiveshell.py:3680: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.\n",
- " warn(\"To exit: use 'exit', 'quit', or Ctrl-D.\", stacklevel=1)\n"
- ]
- }
- ],
- "source": [
- "from flask import Flask, request, jsonify, render_template_string\n",
- "\n",
- "app = Flask(__name__)\n",
- "integration = GPTDriveIntegration()\n",
- "\n",
- "HTML_TEMPLATE = \"\"\"\n",
- "\n",
- "\n",
- "\n",
- " GPT-Drive Integration\n",
- " \n",
- "\n",
- "\n",
- " GPT-Google Drive Integration
\n",
- " \n",
- " \n",
- "\n",
- " \n",
- "\n",
- "\n",
- "\"\"\"\n",
- "\n",
- "@app.route('/')\n",
- "def home():\n",
- " return render_template_string(HTML_TEMPLATE)\n",
- "\n",
- "@app.route('/query', methods=['POST'])\n",
- "def query():\n",
- " data = request.json\n",
- " result = integration.process_query(data['query'])\n",
- " return jsonify(result)\n",
- "\n",
- "if __name__ == '__main__':\n",
- " app.run(debug=True)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "caef8af5-73f9-4627-9203-0e9c8d098db0",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "ff7582bd-f80c-46c4-a51b-b20fd7493b0e",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "96ea63b3-3cf3-41c1-b160-72d677ab0949",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a8a64f0d-c9b5-4ce7-a887-e1b60de4dac3",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 12,
- "id": "abbc0f6c-5350-4fd9-8711-de68d4aa739d",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "02ae258f-bef0-469b-a696-1edf6a7b5921",
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": 13,
- "id": "09a0b418-eb0e-486e-bf05-bc7b43645784",
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "* Running on local URL: http://127.0.0.1:7860\n",
- "* Running on public URL: https://a9ef60dd3438b2ef0e.gradio.live\n",
- "\n",
- "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
- ]
- },
- {
- "data": {
- "text/html": [
- ""
- ],
- "text/plain": [
- ""
- ]
- },
- "metadata": {},
- "output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Keyboard interruption in main thread... closing server.\n",
- "Killing tunnel 127.0.0.1:7860 <> https://a9ef60dd3438b2ef0e.gradio.live\n"
- ]
- }
- ],
- "source": [
- "gpt_drive = GPTDriveIntegration()\n",
- "\n",
- "def process_user_query(query, search_terms_input):\n",
- " \"\"\"Process user query and return formatted response\"\"\"\n",
- " if not query.strip():\n",
- " return \"Please enter a question.\", \"\"\n",
- " \n",
- " # Parse search terms if provided\n",
- " search_terms = None\n",
- " # if search_terms_input.strip():\n",
- " # search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
- " \n",
- " # Process the query\n",
- " result = gpt_drive.process_query(query, search_terms)\n",
- " \n",
- " # Format the response\n",
- " answer = result['answer']\n",
- " sources = result['sources']\n",
- " \n",
- " sources_text = \"\"\n",
- " if sources:\n",
- " sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
- " \n",
- " return answer, sources_text\n",
- "\n",
- "def check_setup():\n",
- " \"\"\"Check if the APIs are properly configured\"\"\"\n",
- " status_messages = []\n",
- " \n",
- " # Check Google Drive API\n",
- " if gpt_drive.drive_initialized:\n",
- " status_messages.append(\"✅ Google Drive API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
- " \n",
- " # Check OpenAI API\n",
- " if gpt_drive.openai_initialized:\n",
- " status_messages.append(\"✅ OpenAI API: Connected\")\n",
- " else:\n",
- " status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
- " \n",
- " return \"\\n\".join(status_messages)\n",
- "\n",
- "# Create Gradio interface\n",
- "with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
- " gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
- " gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
- " \n",
- " with gr.Row():\n",
- " with gr.Column(scale=2):\n",
- " # Main query interface\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Ask a Question\")\n",
- " query_input = gr.Textbox(\n",
- " label=\"Your Question\",\n",
- " placeholder=\"Ask me any question about your anatomy books?\",\n",
- " lines=3\n",
- " )\n",
- " \n",
- " search_terms_input = gr.Textbox(\n",
- " label=\"Search Terms (optional)\",\n",
- " placeholder=\"Enter comma-separated terms to search for specific files\",\n",
- " lines=1\n",
- " )\n",
- " \n",
- " submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
- " \n",
- " # Results section\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Answer\")\n",
- " answer_output = gr.Textbox(\n",
- " label=\"AI Response\",\n",
- " lines=10,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " sources_output = gr.Textbox(\n",
- " label=\"Sources\",\n",
- " lines=3,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Column(scale=1):\n",
- " # Status and setup info\n",
- " with gr.Group():\n",
- " gr.Markdown(\"### System Status\")\n",
- " status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
- " status_output = gr.Textbox(\n",
- " label=\"API Status\",\n",
- " lines=4,\n",
- " interactive=False\n",
- " )\n",
- " \n",
- " with gr.Group():\n",
- " gr.Markdown(\"### Setup Instructions\")\n",
- " gr.Markdown(\"\"\"\n",
- " **Important Notes:**\n",
- " 1.Only documents shared with it, it can answer\n",
- " \n",
- " **File Types Supported:**\n",
- " - Google Docs\n",
- " - Google Sheets \n",
- " - PDF files\n",
- " - Text files\n",
- " \n",
- " **Tips:**\n",
- " - Use specific search terms for better results\n",
- " - The system searches the top 3 most relevant files\n",
- " - Ask clear, specific questions for better answers\n",
- " \"\"\")\n",
- " \n",
- " # Event handlers\n",
- " submit_btn.click(\n",
- " fn=process_user_query,\n",
- " inputs=[query_input, search_terms_input],\n",
- " outputs=[answer_output, sources_output]\n",
- " )\n",
- " \n",
- " status_btn.click(\n",
- " fn=check_setup,\n",
- " outputs=status_output\n",
- " )\n",
- " \n",
- " # Example queries\n",
- " with gr.Row():\n",
- " gr.Examples(\n",
- " examples=[\n",
- " [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
- " [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
- " ],\n",
- " inputs=[query_input, search_terms_input],\n",
- " )\n",
- "\n",
- "# Launch the app\n",
- "if __name__ == \"__main__\":\n",
- " app.launch(\n",
- " share=True,debug =True)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "35d0a347-2cc6-4f4e-bb70-cc47067eaf53",
- "metadata": {},
- "outputs": [],
- "source": []
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3 (ipykernel)",
- "language": "python",
- "name": "python3"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.13.5"
- }
- },
- "nbformat": 4,
- "nbformat_minor": 5
-}