studybuddy / app.py
Ephraimmm's picture
Rename appyu.py to app.py
bda02e1 verified
raw
history blame
100 kB
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "e549bafd-78b1-4a83-80b4-2cb597efff79",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"from google.oauth2 import service_account\n",
"from googleapiclient.discovery import build\n",
"from googleapiclient.http import MediaIoBaseDownload\n",
"import openai\n",
"from dotenv import load_dotenv, dotenv_values\n",
"import io"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "04361235-7896-4439-9d04-1400e043528b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"load_dotenv()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "9ae411c5-c84b-4bfd-b089-69b5c5ba70ae",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"OPENAI_API_KEY\n",
"ANTHROPIC_API_KEY\n",
"GOOGLE_SERVICE_ACCOUNT_FILE\n"
]
}
],
"source": [
"config = dotenv_values(\".env\")\n",
"for key in config.keys():\n",
" print(key)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "7622a0e4-64a6-4848-b588-bd65d56c55e0",
"metadata": {},
"outputs": [],
"source": [
"from openai import OpenAI\n",
"openai.api_key = os.getenv('OPENAI_API_KEY')\n",
"openai = OpenAI(api_key = openai.api_key)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "2ead7c59-1f11-478a-bb69-2928ddc38901",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Hello! How can I assist you today?\n"
]
}
],
"source": [
"response = openai.chat.completions.create(\n",
" model = \"gpt-4o-mini\",\n",
" messages = [\n",
" {\"role\":\"system\", \"content\":\"you are a helpful assistant\"},\n",
" {\"role\":\"user\", \"content\":\"hi\"}\n",
" ])\n",
"\n",
"reply = response.choices[0].message.content\n",
"print(reply)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "b3bcbada-6a72-4cc8-a166-d2e596cd1fc4",
"metadata": {},
"outputs": [],
"source": [
"service_account_file_path = os.getenv(\"GOOGLE_SERVICE_ACCOUNT_FILE\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "81c59a6d-0831-4bff-b3fc-fbe3d4cc1e31",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"openai activated\n",
"service_account_file_path activated\n"
]
}
],
"source": [
"#troubleshoot\n",
"if openai is None:\n",
" print(\"openai not activated\")\n",
"else: \n",
" print (\"openai activated\")\n",
"\n",
"if service_account_file_path is None:\n",
" print(\"service_account_file_path not activated\")\n",
"else: \n",
" print (\"service_account_file_path activated\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a70f32aa-9e43-4175-8cca-d6af723aef91",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 8,
"id": "0efe325d-badd-4533-affe-47d572ef128e",
"metadata": {},
"outputs": [],
"source": [
"class GPTDriveIntegration:\n",
" def __init__(self):\n",
" # Initialize Google Drive API\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
" scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
" )\n",
" self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
" \n",
" # Initialize OpenAI\n",
" openai.api_key = os.getenv('OPENAI_API_KEY')\n",
" \n",
" def search_files(self, query, file_types=None):\n",
" \"\"\"Search for files in Google Drive\"\"\"\n",
" search_query = f\"name contains '{query}'\"\n",
" \n",
" if file_types:\n",
" type_queries = []\n",
" for file_type in file_types:\n",
" if file_type.lower() == 'pdf':\n",
" type_queries.append(\"mimeType='application/pdf'\")\n",
" elif file_type.lower() in ['doc', 'docx']:\n",
" type_queries.append(\"mimeType contains 'document'\")\n",
" elif file_type.lower() in ['xls', 'xlsx']:\n",
" type_queries.append(\"mimeType contains 'spreadsheet'\")\n",
" \n",
" if type_queries:\n",
" search_query += f\" and ({' or '.join(type_queries)})\"\n",
" \n",
" results = self.drive_service.files().list(\n",
" q=search_query,\n",
" fields=\"files(id, name, mimeType, size)\"\n",
" ).execute()\n",
" \n",
" return results.get('files', [])\n",
" \n",
" def get_file_content(self, file_id, mime_type):\n",
" \"\"\"Download and extract text content from file\"\"\"\n",
" try:\n",
" if 'text' in mime_type or 'document' in mime_type:\n",
" # For Google Docs, export as plain text\n",
" if 'document' in mime_type:\n",
" request = self.drive_service.files().export_media(\n",
" fileId=file_id, mimeType='text/plain'\n",
" )\n",
" else:\n",
" request = self.drive_service.files().get_media(fileId=file_id)\n",
" \n",
" file_content = io.BytesIO()\n",
" downloader = MediaIoBaseDownload(file_content, request)\n",
" done = False\n",
" while done is False:\n",
" status, done = downloader.next_chunk()\n",
" \n",
" return file_content.getvalue().decode('utf-8')\n",
" \n",
" elif 'spreadsheet' in mime_type:\n",
" # For Google Sheets, export as CSV\n",
" request = self.drive_service.files().export_media(\n",
" fileId=file_id, mimeType='text/csv'\n",
" )\n",
" file_content = io.BytesIO()\n",
" downloader = MediaIoBaseDownload(file_content, request)\n",
" done = False\n",
" while done is False:\n",
" status, done = downloader.next_chunk()\n",
" \n",
" return file_content.getvalue().decode('utf-8')\n",
" \n",
" elif mime_type == 'application/pdf':\n",
" # For PDF files, download binary content and extract text\n",
" request = self.drive_service.files().get_media(fileId=file_id)\n",
" file_content = io.BytesIO()\n",
" downloader = MediaIoBaseDownload(file_content, request)\n",
" done = False\n",
" while done is False:\n",
" status, done = downloader.next_chunk()\n",
" \n",
" # Extract text from PDF using PyPDF2 or pdfplumber\n",
" file_content.seek(0) # Reset buffer position\n",
" \n",
" # Option 1: Using PyPDF2\n",
" try:\n",
" import PyPDF2\n",
" pdf_reader = PyPDF2.PdfReader(file_content)\n",
" text = \"\"\n",
" for page in pdf_reader.pages:\n",
" text += page.extract_text() + \"\\n\"\n",
" return text\n",
" except ImportError:\n",
" pass\n",
" \n",
" # Option 2: Using pdfplumber (better for complex PDFs)\n",
" try:\n",
" import pdfplumber\n",
" text = \"\"\n",
" with pdfplumber.open(file_content) as pdf:\n",
" for page in pdf.pages:\n",
" page_text = page.extract_text()\n",
" if page_text:\n",
" text += page_text + \"\\n\"\n",
" return text\n",
" except ImportError:\n",
" pass\n",
" \n",
" # Option 3: Using pymupdf (fitz) - fastest option\n",
" try:\n",
" import fitz # pymupdf\n",
" pdf_document = fitz.open(stream=file_content.read(), filetype=\"pdf\")\n",
" text = \"\"\n",
" for page_num in range(pdf_document.page_count):\n",
" page = pdf_document[page_num]\n",
" text += page.get_text() + \"\\n\"\n",
" pdf_document.close()\n",
" return text\n",
" except ImportError:\n",
" pass\n",
" \n",
" return \"PDF text extraction requires PyPDF2, pdfplumber, or pymupdf library\"\n",
" \n",
" else:\n",
" return \"File type not supported for text extraction\"\n",
" \n",
" except Exception as e:\n",
" return f\"Error reading file: {str(e)}\"\n",
" \n",
" def query_gpt_with_context(self, user_query, file_contents):\n",
" \"\"\"Send query to GPT with file context\"\"\"\n",
" context = \"\\n\\n\".join([\n",
" f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n",
" for content in file_contents\n",
" ])\n",
" \n",
" messages = [\n",
" {\n",
" \"role\": \"system\", \n",
" \"content\": \"\"\"\n",
" You are an AI assistant that can analyze documents from Google Drive. \n",
" Use the provided file contents to answer user questions.\"\"\"\n",
" },\n",
" {\n",
" \"role\": \"user\", \n",
" \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n",
" }\n",
" ]\n",
" \n",
" response = openai.chat.completions.create(\n",
" model=\"gpt-4o-mini\",\n",
" messages=messages,\n",
" max_tokens=1000\n",
" )\n",
" \n",
" return response.choices[0].message.content\n",
" \n",
" def process_query(self, user_query, search_terms=None):\n",
" \"\"\"Main function to process user queries\"\"\"\n",
" # Extract search terms from query if not provided\n",
" if not search_terms:\n",
" search_terms = user_query.split()[:3] # Simple extraction\n",
" \n",
" # Search for relevant files\n",
" files = []\n",
" for term in search_terms:\n",
" files.extend(self.search_files(term))\n",
" \n",
" # Remove duplicates\n",
" unique_files = {f['id']: f for f in files}.values()\n",
" \n",
" # Get content from top 3 most relevant files\n",
" file_contents = []\n",
" for file in list(unique_files)[:3]:\n",
" content = self.get_file_content(file['id'], file['mimeType'])\n",
" file_contents.append({\n",
" 'name': file['name'],\n",
" 'text': content\n",
" })\n",
" \n",
" # Query GPT with context\n",
" if file_contents:\n",
" response = self.query_gpt_with_context(user_query, file_contents)\n",
" return {\n",
" 'answer': response,\n",
" 'sources': [f['name'] for f in file_contents]\n",
" }\n",
" else:\n",
" return {\n",
" 'answer': \"No relevant files found in your Google Drive.\",\n",
" 'sources': []\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "3c2c1ccf-9ade-482d-a170-978e97bc1c08",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Answer: The transmission of nerves is called \"nerve conduction.\" This process involves the propagation of electrical impulses along the nerve fibers, allowing for communication between different parts of the body.\n",
"Sources: ['Neuro Note Dr Clement.docx']\n"
]
}
],
"source": [
"if __name__ == \"__main__\":\n",
" integration = GPTDriveIntegration()\n",
" \n",
" # Test query\n",
" result = integration.process_query(\n",
" \"The transmission of nerves is called?\",\n",
" search_terms=[\"nerves\", \"Dr Clement\"]\n",
" )\n",
" \n",
" print(\"Answer:\", result['answer'])\n",
" print(\"Sources:\", result['sources'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "120e7c93-b38a-4c89-8e76-5b0170d22548",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "61e976a6-1fe4-45de-a63d-3cf849eedbe1",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 11,
"id": "c514b8af-1af4-4497-9044-139a71aedd36",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"* Running on local URL: http://127.0.0.1:7860\n",
"* Running on public URL: https://5320171f6af3e2eef9.gradio.live\n",
"\n",
"This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
]
},
{
"data": {
"text/html": [
"<div><iframe src=\"https://5320171f6af3e2eef9.gradio.live\" width=\"100%\" height=\"500\" allow=\"autoplay; camera; microphone; clipboard-read; clipboard-write;\" frameborder=\"0\" allowfullscreen></iframe></div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Keyboard interruption in main thread... closing server.\n",
"Killing tunnel 127.0.0.1:7860 <> https://5320171f6af3e2eef9.gradio.live\n"
]
}
],
"source": [
"gpt_drive = GPTDriveIntegration()\n",
"\n",
"def process_user_query(query, search_terms_input):\n",
" \"\"\"Process user query and return formatted response\"\"\"\n",
" if not query.strip():\n",
" return \"Please enter a question.\", \"\"\n",
" \n",
" # Parse search terms if provided\n",
" search_terms = None\n",
" # if search_terms_input.strip():\n",
" # search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
" \n",
" # Process the query\n",
" result = gpt_drive.process_query(query, search_terms)\n",
" \n",
" # Format the response\n",
" answer = result['answer']\n",
" sources = result['sources']\n",
" \n",
" sources_text = \"\"\n",
" if sources:\n",
" sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
" \n",
" return answer, sources_text\n",
"\n",
"def check_setup():\n",
" \"\"\"Check if the APIs are properly configured\"\"\"\n",
" status_messages = []\n",
" \n",
" # Check Google Drive API\n",
" if gpt_drive.drive_initialized:\n",
" status_messages.append(\"✅ Google Drive API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
" \n",
" # Check OpenAI API\n",
" if gpt_drive.openai_initialized:\n",
" status_messages.append(\"✅ OpenAI API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
" \n",
" return \"\\n\".join(status_messages)\n",
"\n",
"# Create Gradio interface\n",
"with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
" gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
" gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
" \n",
" with gr.Row():\n",
" with gr.Column(scale=2):\n",
" # Main query interface\n",
" with gr.Group():\n",
" gr.Markdown(\"### Ask a Question\")\n",
" query_input = gr.Textbox(\n",
" label=\"Your Question\",\n",
" placeholder=\"Ask me any question about your anatomy books?\",\n",
" lines=3\n",
" )\n",
" \n",
" search_terms_input = gr.Textbox(\n",
" label=\"Search Terms (optional)\",\n",
" placeholder=\"Enter comma-separated terms to search for specific files\",\n",
" lines=1\n",
" )\n",
" \n",
" submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
" \n",
" # Results section\n",
" with gr.Group():\n",
" gr.Markdown(\"### Answer\")\n",
" answer_output = gr.Textbox(\n",
" label=\"AI Response\",\n",
" lines=10,\n",
" interactive=False\n",
" )\n",
" \n",
" sources_output = gr.Textbox(\n",
" label=\"Sources\",\n",
" lines=3,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Column(scale=1):\n",
" # Status and setup info\n",
" with gr.Group():\n",
" gr.Markdown(\"### System Status\")\n",
" status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
" status_output = gr.Textbox(\n",
" label=\"API Status\",\n",
" lines=4,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Group():\n",
" gr.Markdown(\"### Setup Instructions\")\n",
" gr.Markdown(\"\"\"\n",
" **Important Notes:**\n",
" 1.Only documents shared with it, it can answer\n",
" \n",
" **File Types Supported:**\n",
" - Google Docs\n",
" - Google Sheets \n",
" - PDF files\n",
" - Text files\n",
" \n",
" **Tips:**\n",
" - Use specific search terms for better results\n",
" - The system searches the top 3 most relevant files\n",
" - Ask clear, specific questions for better answers\n",
" \"\"\")\n",
" \n",
" # Event handlers\n",
" submit_btn.click(\n",
" fn=process_user_query,\n",
" inputs=[query_input, search_terms_input],\n",
" outputs=[answer_output, sources_output]\n",
" )\n",
" \n",
" status_btn.click(\n",
" fn=check_setup,\n",
" outputs=status_output\n",
" )\n",
" \n",
" # Example queries\n",
" with gr.Row():\n",
" gr.Examples(\n",
" examples=[\n",
" [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
" [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
" ],\n",
" inputs=[query_input, search_terms_input],\n",
" )\n",
"\n",
"# Launch the app\n",
"if __name__ == \"__main__\":\n",
" app.launch(\n",
" share=True,debug =True)"
]
},
{
"cell_type": "code",
"execution_count": 42,
"id": "593c4f63-cdda-46e8-aad6-9f9d7db66615",
"metadata": {},
"outputs": [],
"source": [
"#Gradio\n",
"# !pip install gradio"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "859060a9-56f9-49e9-bb41-04ccf7b10d3f",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "d893999c-4f3f-4306-8b0b-072e3fbb8236",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 10,
"id": "01b70311-ff22-4c88-bf22-64dc441afaab",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\tqdm\\auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"* Running on local URL: http://127.0.0.1:7860\n",
"* Running on public URL: https://8fac475cac6193423b.gradio.live\n",
"\n",
"This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
]
},
{
"data": {
"text/html": [
"<div><iframe src=\"https://8fac475cac6193423b.gradio.live\" width=\"100%\" height=\"500\" allow=\"autoplay; camera; microphone; clipboard-read; clipboard-write;\" frameborder=\"0\" allowfullscreen></iframe></div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Keyboard interruption in main thread... closing server.\n",
"Killing tunnel 127.0.0.1:7860 <> https://8fac475cac6193423b.gradio.live\n"
]
}
],
"source": [
"import gradio as gr\n",
"import os\n",
"import io\n",
"import openai\n",
"from google.oauth2 import service_account\n",
"from googleapiclient.discovery import build\n",
"from googleapiclient.http import MediaIoBaseDownload\n",
"\n",
"class GPTDriveIntegration:\n",
" def __init__(self):\n",
" # Initialize Google Drive API\n",
" try:\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
" scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
" )\n",
" self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
" self.drive_initialized = True\n",
" except Exception as e:\n",
" self.drive_initialized = False\n",
" self.drive_error = str(e)\n",
" \n",
" # Initialize OpenAI\n",
" try:\n",
" openai.api_key = os.getenv('OPENAI_API_KEY')\n",
" self.openai_initialized = True\n",
" except Exception as e:\n",
" self.openai_initialized = False\n",
" self.openai_error = str(e)\n",
" \n",
" def search_files(self, query, file_types=None):\n",
" \"\"\"Search for files in Google Drive\"\"\"\n",
" if not self.drive_initialized:\n",
" return []\n",
" \n",
" search_query = f\"name contains '{query}'\"\n",
" \n",
" if file_types:\n",
" type_queries = []\n",
" for file_type in file_types:\n",
" if file_type.lower() == 'pdf':\n",
" type_queries.append(\"mimeType='application/pdf'\")\n",
" elif file_type.lower() in ['doc', 'docx']:\n",
" type_queries.append(\"mimeType contains 'document'\")\n",
" elif file_type.lower() in ['xls', 'xlsx']:\n",
" type_queries.append(\"mimeType contains 'spreadsheet'\")\n",
" \n",
" if type_queries:\n",
" search_query += f\" and ({' or '.join(type_queries)})\"\n",
" \n",
" try:\n",
" results = self.drive_service.files().list(\n",
" q=search_query,\n",
" fields=\"files(id, name, mimeType, size)\"\n",
" ).execute()\n",
" \n",
" return results.get('files', [])\n",
" except Exception as e:\n",
" return []\n",
" \n",
" def get_file_content(self, file_id, mime_type):\n",
" \"\"\"Download and extract text content from file\"\"\"\n",
" try:\n",
" if 'text' in mime_type or 'document' in mime_type:\n",
" # For Google Docs, export as plain text\n",
" if 'document' in mime_type:\n",
" request = self.drive_service.files().export_media(\n",
" fileId=file_id, mimeType='text/plain'\n",
" )\n",
" else:\n",
" request = self.drive_service.files().get_media(fileId=file_id)\n",
" \n",
" file_content = io.BytesIO()\n",
" downloader = MediaIoBaseDownload(file_content, request)\n",
" done = False\n",
" while done is False:\n",
" status, done = downloader.next_chunk()\n",
" \n",
" return file_content.getvalue().decode('utf-8')\n",
" \n",
" elif 'spreadsheet' in mime_type:\n",
" # For Google Sheets, export as CSV\n",
" request = self.drive_service.files().export_media(\n",
" fileId=file_id, mimeType='text/csv'\n",
" )\n",
" file_content = io.BytesIO()\n",
" downloader = MediaIoBaseDownload(file_content, request)\n",
" done = False\n",
" while done is False:\n",
" status, done = downloader.next_chunk()\n",
" \n",
" return file_content.getvalue().decode('utf-8')\n",
" \n",
" else:\n",
" return \"File type not supported for text extraction\"\n",
" \n",
" except Exception as e:\n",
" return f\"Error reading file: {str(e)}\"\n",
" \n",
" def query_gpt_with_context(self, user_query, file_contents):\n",
" \"\"\"Send query to GPT with file context\"\"\"\n",
" context = \"\\n\\n\".join([\n",
" f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n",
" for content in file_contents\n",
" ])\n",
" \n",
" messages = [\n",
" {\n",
" \"role\": \"system\", \n",
" \"content\": \"\"\"\n",
" You are an AI assistant that can analyze anatomy documents from Google Drive. \n",
" Use the provided file contents to answer user questions. Always answer straightforwardly. Don't Hallucinate.\n",
" Always end with 'Do you have any other question?'\n",
" \n",
" \"\"\"\n",
" },\n",
" {\n",
" \"role\": \"user\", \n",
" \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n",
" }\n",
" ]\n",
" \n",
" try:\n",
" response = openai.chat.completions.create(\n",
" model=\"gpt-4o-mini\",\n",
" messages=messages,\n",
" max_tokens=1000\n",
" )\n",
" \n",
" return response.choices[0].message.content\n",
" except Exception as e:\n",
" return f\"Error querying OpenAI: {str(e)}\"\n",
" \n",
" def process_query(self, user_query, search_terms=None):\n",
" \"\"\"Main function to process user queries\"\"\"\n",
" # Check if services are initialized\n",
" if not self.drive_initialized:\n",
" return {\n",
" 'answer': f\"Google Drive API not initialized: {getattr(self, 'drive_error', 'Unknown error')}\",\n",
" 'sources': []\n",
" }\n",
" \n",
" if not self.openai_initialized:\n",
" return {\n",
" 'answer': f\"OpenAI API not initialized: {getattr(self, 'openai_error', 'Unknown error')}\",\n",
" 'sources': []\n",
" }\n",
" \n",
" # Extract search terms from query if not provided\n",
" if not search_terms:\n",
" search_terms = user_query.split()[:3] # Simple extraction\n",
" \n",
" # Search for relevant files\n",
" files = []\n",
" for term in search_terms:\n",
" files.extend(self.search_files(term))\n",
" \n",
" # Remove duplicates\n",
" unique_files = {f['id']: f for f in files}.values()\n",
" \n",
" # Get content from top 3 most relevant files\n",
" file_contents = []\n",
" for file in list(unique_files)[:3]:\n",
" content = self.get_file_content(file['id'], file['mimeType'])\n",
" file_contents.append({\n",
" 'name': file['name'],\n",
" 'text': content\n",
" })\n",
" \n",
" # Query GPT with context\n",
" if file_contents:\n",
" response = self.query_gpt_with_context(user_query, file_contents)\n",
" return {\n",
" 'answer': response,\n",
" 'sources': [f['name'] for f in file_contents]\n",
" }\n",
" else:\n",
" return {\n",
" 'answer': \"No relevant files found in your Google Drive.\",\n",
" 'sources': []\n",
" }\n",
"\n",
"# Initialize the GPT Drive Integration\n",
"gpt_drive = GPTDriveIntegration()\n",
"\n",
"def process_user_query(query, search_terms_input):\n",
" \"\"\"Process user query and return formatted response\"\"\"\n",
" if not query.strip():\n",
" return \"Please enter a question.\", \"\"\n",
" \n",
" # Parse search terms if provided\n",
" search_terms = None\n",
" if search_terms_input.strip():\n",
" search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
" \n",
" # Process the query\n",
" result = gpt_drive.process_query(query, search_terms)\n",
" \n",
" # Format the response\n",
" answer = result['answer']\n",
" sources = result['sources']\n",
" \n",
" sources_text = \"\"\n",
" if sources:\n",
" sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
" \n",
" return answer, sources_text\n",
"\n",
"def check_setup():\n",
" \"\"\"Check if the APIs are properly configured\"\"\"\n",
" status_messages = []\n",
" \n",
" # Check Google Drive API\n",
" if gpt_drive.drive_initialized:\n",
" status_messages.append(\"✅ Google Drive API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
" \n",
" # Check OpenAI API\n",
" if gpt_drive.openai_initialized:\n",
" status_messages.append(\"✅ OpenAI API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
" \n",
" return \"\\n\".join(status_messages)\n",
"\n",
"# Create Gradio interface\n",
"with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
" gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
" gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
" \n",
" with gr.Row():\n",
" with gr.Column(scale=2):\n",
" # Main query interface\n",
" with gr.Group():\n",
" gr.Markdown(\"### Ask a Question\")\n",
" query_input = gr.Textbox(\n",
" label=\"Your Question\",\n",
" placeholder=\"Ask me any question about your anatomy books?\",\n",
" lines=3\n",
" )\n",
" \n",
" search_terms_input = gr.Textbox(\n",
" label=\"Search Terms (optional)\",\n",
" placeholder=\"Enter comma-separated terms to search for specific files\",\n",
" lines=1\n",
" )\n",
" \n",
" submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
" \n",
" # Results section\n",
" with gr.Group():\n",
" gr.Markdown(\"### Answer\")\n",
" answer_output = gr.Textbox(\n",
" label=\"AI Response\",\n",
" lines=10,\n",
" interactive=False\n",
" )\n",
" \n",
" sources_output = gr.Textbox(\n",
" label=\"Sources\",\n",
" lines=3,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Column(scale=1):\n",
" # Status and setup info\n",
" with gr.Group():\n",
" gr.Markdown(\"### System Status\")\n",
" status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
" status_output = gr.Textbox(\n",
" label=\"API Status\",\n",
" lines=4,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Group():\n",
" gr.Markdown(\"### Setup Instructions\")\n",
" gr.Markdown(\"\"\"\n",
" **Important Notes:**\n",
" 1.Only documents shared with it, it can answer\n",
" \n",
" **File Types Supported:**\n",
" - Google Docs\n",
" - Google Sheets \n",
" - PDF files\n",
" - Text files\n",
" \n",
" **Tips:**\n",
" - Use specific search terms for better results\n",
" - The system searches the top 3 most relevant files\n",
" - Ask clear, specific questions for better answers\n",
" \"\"\")\n",
" \n",
" # Event handlers\n",
" submit_btn.click(\n",
" fn=process_user_query,\n",
" inputs=[query_input, search_terms_input],\n",
" outputs=[answer_output, sources_output]\n",
" )\n",
" \n",
" status_btn.click(\n",
" fn=check_setup,\n",
" outputs=status_output\n",
" )\n",
" \n",
" # Example queries\n",
" with gr.Row():\n",
" gr.Examples(\n",
" examples=[\n",
" [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
" [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
" ],\n",
" inputs=[query_input, search_terms_input],\n",
" )\n",
"\n",
"# Launch the app\n",
"if __name__ == \"__main__\":\n",
" app.launch(\n",
" share=True,\n",
" debug=True\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a6fd9c8c-e8e9-4b1c-8d21-d7ea2c42303a",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "95c85a4f-470f-4858-8172-78d7f9d04d1a",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "f237c114-7924-4074-9c11-c7ba13833293",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2f9c7ae-35bb-4261-adc4-f24c6ab5c8a1",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "aaa446b0-a14e-4c98-b466-f503a521f052",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5770d683-7bfc-49e9-a3c4-a67060e27620",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "2ef7744c-1346-4692-ac28-013ae0e8c4ac",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "08643ace-f94f-4a4a-bbc1-3289b7fc29a1",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "70849fd3-6f4e-4c57-b780-bda53db83e14",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "c18d5d20-439a-45a7-a847-25f659fd4a10",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 11,
"id": "634b9787-d493-46e5-8114-0851c6172ed6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔍 Starting Google Drive API Diagnostic...\n",
"==================================================\n",
"\n",
"1️⃣ Checking service account file...\n",
"✅ Service account file is valid\n",
" 📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
" 🏗️ Project ID: rag-system-463320\n",
"\n",
"2️⃣ Checking credentials...\n",
"✅ Credentials created successfully\n",
"\n",
"3️⃣ Testing API connection...\n",
"✅ Successfully connected to Google Drive API\n",
" 👤 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
"\n",
"4️⃣ Checking basic permissions...\n",
"✅ Can access Drive API - found 0 files in test query\n",
"⚠️ No files found - this might mean:\n",
" • Service account has no shared files\n",
" • No files are shared with the service account\n",
"\n",
"5️⃣ Testing access to 'Blue berry' folder...\n",
"❌ Folder 'Blue berry' not found or not accessible\n",
"💡 Possible solutions:\n",
" • Make sure the folder exists in Google Drive\n",
" • Share the folder with your service account email\n",
" • Check folder name spelling (case sensitive)\n",
"\n",
"✅ All basic checks passed!\n"
]
}
],
"source": [
"import json\n",
"from google.auth.exceptions import RefreshError\n",
"from googleapiclient.errors import HttpError\n",
"\n",
"class GPTDriveTroubleshooter:\n",
" def __init__(self, service_account_file_path):\n",
" self.service_account_file = service_account_file_path\n",
" self.credentials = None\n",
" self.drive_service = None\n",
" \n",
" def run_full_diagnostic(self):\n",
" \"\"\"Run complete diagnostic check\"\"\"\n",
" print(\"🔍 Starting Google Drive API Diagnostic...\")\n",
" print(\"=\" * 50)\n",
" \n",
" # Step 1: Check service account file\n",
" if not self.check_service_account_file():\n",
" return False\n",
" \n",
" # Step 2: Check credentials\n",
" if not self.check_credentials():\n",
" return False\n",
" \n",
" # Step 3: Test API connection\n",
" if not self.test_api_connection():\n",
" return False\n",
" \n",
" # Step 4: Check permissions\n",
" if not self.check_basic_permissions():\n",
" return False\n",
" \n",
" # Step 5: Test folder access\n",
" self.test_folder_access()\n",
" \n",
" print(\"\\n✅ All basic checks passed!\")\n",
" return True\n",
" \n",
" def check_service_account_file(self):\n",
" \"\"\"Check if service account file exists and is valid\"\"\"\n",
" print(\"\\n1️⃣ Checking service account file...\")\n",
" \n",
" if not os.path.exists(self.service_account_file):\n",
" print(f\"❌ Service account file not found: {self.service_account_file}\")\n",
" print(\"💡 Make sure you've downloaded the JSON key file from Google Cloud Console\")\n",
" return False\n",
" \n",
" try:\n",
" with open(self.service_account_file, 'r') as f:\n",
" service_account_info = json.load(f)\n",
" \n",
" required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n",
" missing_fields = [field for field in required_fields if field not in service_account_info]\n",
" \n",
" if missing_fields:\n",
" print(f\"❌ Service account file missing required fields: {missing_fields}\")\n",
" return False\n",
" \n",
" print(f\"✅ Service account file is valid\")\n",
" print(f\" 📧 Service account email: {service_account_info['client_email']}\")\n",
" print(f\" 🏗️ Project ID: {service_account_info['project_id']}\")\n",
" \n",
" return True\n",
" \n",
" except json.JSONDecodeError:\n",
" print(\"❌ Service account file is not valid JSON\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Error reading service account file: {e}\")\n",
" return False\n",
" \n",
" def check_credentials(self):\n",
" \"\"\"Check if credentials can be created\"\"\"\n",
" print(\"\\n2️⃣ Checking credentials...\")\n",
" \n",
" try:\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" self.service_account_file,\n",
" scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
" )\n",
" print(\"✅ Credentials created successfully\")\n",
" return True\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Failed to create credentials: {e}\")\n",
" print(\"💡 Check if your service account key file is corrupted\")\n",
" return False\n",
" \n",
" def test_api_connection(self):\n",
" \"\"\"Test basic API connection\"\"\"\n",
" print(\"\\n3️⃣ Testing API connection...\")\n",
" \n",
" try:\n",
" self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
" \n",
" # Try a simple API call\n",
" about = self.drive_service.about().get(fields=\"user, storageQuota\").execute()\n",
" print(\"✅ Successfully connected to Google Drive API\")\n",
" print(f\" 👤 Connected as: {about.get('user', {}).get('emailAddress', 'Unknown')}\")\n",
" \n",
" return True\n",
" \n",
" except HttpError as e:\n",
" print(f\"❌ HTTP Error connecting to API: {e}\")\n",
" if e.resp.status == 403:\n",
" print(\"💡 This is likely a permissions issue - check if Drive API is enabled\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Failed to connect to API: {e}\")\n",
" return False\n",
" \n",
" def check_basic_permissions(self):\n",
" \"\"\"Check basic file listing permissions\"\"\"\n",
" print(\"\\n4️⃣ Checking basic permissions...\")\n",
" \n",
" try:\n",
" # Try to list files (this should work with readonly access)\n",
" results = self.drive_service.files().list(\n",
" pageSize=1,\n",
" fields=\"files(id, name)\"\n",
" ).execute()\n",
" \n",
" files = results.get('files', [])\n",
" print(f\"✅ Can access Drive API - found {len(files)} files in test query\")\n",
" \n",
" if len(files) == 0:\n",
" print(\"⚠️ No files found - this might mean:\")\n",
" print(\" • Service account has no shared files\")\n",
" print(\" • No files are shared with the service account\")\n",
" \n",
" return True\n",
" \n",
" except HttpError as e:\n",
" print(f\"❌ Permission error: {e}\")\n",
" if e.resp.status == 403:\n",
" print(\"💡 Common causes:\")\n",
" print(\" • Google Drive API not enabled in Google Cloud Console\")\n",
" print(\" • Service account doesn't have proper permissions\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Error checking permissions: {e}\")\n",
" return False\n",
" \n",
" def test_folder_access(self, folder_name=\"Blue berry\"):\n",
" \"\"\"Test access to specific folder\"\"\"\n",
" print(f\"\\n5️⃣ Testing access to '{folder_name}' folder...\")\n",
" \n",
" try:\n",
" # Search for the folder\n",
" query = f\"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false\"\n",
" results = self.drive_service.files().list(\n",
" q=query,\n",
" fields=\"files(id, name, owners, permissions)\"\n",
" ).execute()\n",
" \n",
" folders = results.get('files', [])\n",
" \n",
" if not folders:\n",
" print(f\"❌ Folder '{folder_name}' not found or not accessible\")\n",
" print(\"💡 Possible solutions:\")\n",
" print(\" • Make sure the folder exists in Google Drive\")\n",
" print(\" • Share the folder with your service account email\")\n",
" print(\" • Check folder name spelling (case sensitive)\")\n",
" return False\n",
" \n",
" folder = folders[0]\n",
" print(f\"✅ Found folder '{folder_name}'\")\n",
" print(f\" 📁 Folder ID: {folder['id']}\")\n",
" \n",
" # Test listing files in the folder\n",
" files_in_folder = self.drive_service.files().list(\n",
" q=f\"'{folder['id']}' in parents and trashed=false\",\n",
" fields=\"files(id, name, mimeType)\"\n",
" ).execute()\n",
" \n",
" files = files_in_folder.get('files', [])\n",
" print(f\" 📄 Contains {len(files)} files\")\n",
" \n",
" if files:\n",
" print(\" 📝 Sample files:\")\n",
" for file in files[:3]: # Show first 3 files\n",
" print(f\" • {file['name']} ({file['mimeType']})\")\n",
" \n",
" return True\n",
" \n",
" except HttpError as e:\n",
" print(f\"❌ HTTP Error accessing folder: {e}\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Error accessing folder: {e}\")\n",
" return False\n",
" \n",
" def check_file_permissions(self, file_id):\n",
" \"\"\"Check permissions for a specific file\"\"\"\n",
" print(f\"\\n🔍 Checking permissions for file ID: {file_id}\")\n",
" \n",
" try:\n",
" file_info = self.drive_service.files().get(\n",
" fileId=file_id,\n",
" fields=\"id, name, mimeType, owners, permissions, capabilities\"\n",
" ).execute()\n",
" \n",
" print(f\"✅ File: {file_info['name']}\")\n",
" print(f\" 🔗 Type: {file_info['mimeType']}\")\n",
" print(f\" 👤 Owner: {file_info.get('owners', [{}])[0].get('emailAddress', 'Unknown')}\")\n",
" \n",
" capabilities = file_info.get('capabilities', {})\n",
" print(f\" 📖 Can read: {capabilities.get('canDownload', False)}\")\n",
" print(f\" 📤 Can export: {capabilities.get('canExport', False)}\")\n",
" \n",
" except HttpError as e:\n",
" print(f\"❌ Cannot access file: {e}\")\n",
" except Exception as e:\n",
" print(f\"❌ Error: {e}\")\n",
" \n",
" def get_sharing_instructions(self):\n",
" \"\"\"Provide step-by-step sharing instructions\"\"\"\n",
" print(\"\\n📋 HOW TO SHARE FOLDER WITH SERVICE ACCOUNT:\")\n",
" print(\"=\" * 50)\n",
" \n",
" if self.credentials:\n",
" service_email = self.credentials.service_account_email\n",
" print(f\"1. Copy this service account email: {service_email}\")\n",
" else:\n",
" print(\"1. Find your service account email in the JSON key file (client_email field)\")\n",
" \n",
" print(\"2. Open Google Drive in your browser\")\n",
" print(\"3. Right-click on your 'Blue berry' folder\")\n",
" print(\"4. Select 'Share'\")\n",
" print(\"5. Paste the service account email\")\n",
" print(\"6. Set permission to 'Viewer' or 'Editor'\")\n",
" print(\"7. Click 'Send' (you can uncheck 'Notify people')\")\n",
" print(\"8. Wait a few minutes for permissions to propagate\")\n",
" \n",
" def run_connection_test(self):\n",
" \"\"\"Quick connection test\"\"\"\n",
" print(\"🚀 Quick Connection Test\")\n",
" print(\"-\" * 30)\n",
" \n",
" try:\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" self.service_account_file,\n",
" scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
" )\n",
" \n",
" service = build('drive', 'v3', credentials=self.credentials)\n",
" \n",
" # Test basic query\n",
" results = service.files().list(pageSize=5).execute()\n",
" files = results.get('files', [])\n",
" \n",
" print(f\"✅ Connected! Found {len(files)} accessible files\")\n",
" print(f\"📧 Service account: {self.credentials.service_account_email}\")\n",
" \n",
" return True\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Connection failed: {e}\")\n",
" return False\n",
"\n",
"# Usage Examples\n",
"def troubleshoot_drive_access():\n",
" \"\"\"Main troubleshooting function\"\"\"\n",
" service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
" \n",
" if not service_account_file:\n",
" print(\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n",
" print(\"💡 Add this to your .env file:\")\n",
" print(\"GOOGLE_SERVICE_ACCOUNT_FILE=path/to/your/service-account-key.json\")\n",
" return\n",
" \n",
" troubleshooter = GPTDriveTroubleshooter(service_account_file)\n",
" \n",
" # Run full diagnostic\n",
" success = troubleshooter.run_full_diagnostic()\n",
" \n",
" if not success:\n",
" print(\"\\n\" + \"=\"*50)\n",
" troubleshooter.get_sharing_instructions()\n",
" \n",
" return success\n",
"\n",
"# Quick test function\n",
"def quick_test():\n",
" \"\"\"Quick test to verify everything works\"\"\"\n",
" service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
" troubleshooter = GPTDriveTroubleshooter(service_account_file)\n",
" return troubleshooter.run_connection_test()\n",
"\n",
"if __name__ == \"__main__\":\n",
" # Uncomment the test you want to run:\n",
" \n",
" # Full diagnostic (recommended for first-time setup)\n",
" troubleshoot_drive_access()\n",
" \n",
" # Quick test (for regular checks)\n",
" # quick_test()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b336789-d402-455d-a631-26e987d79ed6",
"metadata": {},
"outputs": [],
"source": [
"troubleshooter = GPTDriveTroubleshooter(GOOGLE_SERVICE_ACCOUNT_FILE)\n",
"troubleshooter.test_folder_access(\"Blue berry\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2786c67-215e-46b0-b3ca-2c3176bd8971",
"metadata": {},
"outputs": [],
"source": [
"# Add this test function to verify access\n",
"def test_folder_access():\n",
" integration = GPTDriveIntegration()\n",
" folder_id = integration.find_folder_by_name(\"Blue berry\")\n",
" if folder_id:\n",
" print(\"✅ Successfully found 'Blue berry' folder!\")\n",
" \n",
" # List files in the folder\n",
" results = integration.drive_service.files().list(\n",
" q=f\"'{folder_id}' in parents and trashed=false\",\n",
" fields=\"files(id, name, mimeType)\"\n",
" ).execute()\n",
" \n",
" files = results.get('files', [])\n",
" print(f\"Found {len(files)} files in the folder:\")\n",
" for file in files[:5]: # Show first 5 files\n",
" print(f\" - {file['name']} ({file['mimeType']})\")\n",
" else:\n",
" print(\"❌ Could not access 'Blue berry' folder\")\n",
" print(\"Make sure you've shared the folder with your service account\")\n",
"\n",
"# Run this test first\n",
"test_folder_access()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3dede9f-5e01-436d-a7b7-905e1646baf9",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "b933c8b6-add6-40fc-827f-e5e07447ac00",
"metadata": {},
"outputs": [],
"source": [
"#Troubleshooting"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "b63a1f4b-1315-44fe-ba84-93fb1c2655cc",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔍 Searching for 'Blue Berry' folder...\n",
"Total folders found: 2\n",
"\n",
"✅ Found 1 'Blue Berry' folder(s):\n",
"📁 Anatomy Books \n",
" Full Path: Anatomy Books \n",
" Folder ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q\n",
"\n"
]
}
],
"source": [
"import os\n",
"from google.oauth2 import service_account\n",
"from googleapiclient.discovery import build\n",
"\n",
"class DrivefolderLister:\n",
" def __init__(self):\n",
" # Initialize Google Drive API\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n",
" scopes=['https://www.googleapis.com/auth/drive.readonly']\n",
" )\n",
" self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
" \n",
" def get_all_folders(self):\n",
" \"\"\"Get ALL folders in the Google Drive\"\"\"\n",
" try:\n",
" # Query to get all folders (not trashed)\n",
" query = \"mimeType='application/vnd.google-apps.folder' and trashed=false\"\n",
" \n",
" all_folders = []\n",
" page_token = None\n",
" \n",
" while True:\n",
" results = self.drive_service.files().list(\n",
" q=query,\n",
" fields=\"nextPageToken, files(id, name, parents)\",\n",
" pageSize=1000, # Maximum allowed\n",
" pageToken=page_token\n",
" ).execute()\n",
" \n",
" folders = results.get('files', [])\n",
" all_folders.extend(folders)\n",
" \n",
" page_token = results.get('nextPageToken')\n",
" if not page_token:\n",
" break\n",
" \n",
" print(f\"Total folders found: {len(all_folders)}\")\n",
" return all_folders\n",
" \n",
" except Exception as e:\n",
" print(f\"Error retrieving folders: {e}\")\n",
" return []\n",
" \n",
" def build_folder_tree(self, folders):\n",
" \"\"\"Build a tree structure from the flat list of folders\"\"\"\n",
" # Create a dictionary for quick lookup\n",
" folder_dict = {folder['id']: folder for folder in folders}\n",
" \n",
" # Add root folder\n",
" folder_dict['root'] = {'id': 'root', 'name': 'My Drive', 'parents': []}\n",
" \n",
" # Build the tree structure\n",
" tree = {}\n",
" \n",
" def build_path(folder_id, visited=None):\n",
" if visited is None:\n",
" visited = set()\n",
" \n",
" if folder_id in visited: # Prevent infinite loops\n",
" return \"CIRCULAR_REFERENCE\"\n",
" \n",
" visited.add(folder_id)\n",
" \n",
" if folder_id not in folder_dict:\n",
" return \"UNKNOWN\"\n",
" \n",
" folder = folder_dict[folder_id]\n",
" \n",
" if folder_id == 'root':\n",
" return \"My Drive\"\n",
" \n",
" parents = folder.get('parents', [])\n",
" if not parents:\n",
" return folder['name']\n",
" \n",
" parent_path = build_path(parents[0], visited.copy())\n",
" return f\"{parent_path}/{folder['name']}\"\n",
" \n",
" # Build paths for all folders\n",
" folder_paths = []\n",
" for folder in folders:\n",
" path = build_path(folder['id'])\n",
" folder_paths.append({\n",
" 'name': folder['name'],\n",
" 'id': folder['id'],\n",
" 'path': path\n",
" })\n",
" \n",
" return folder_paths\n",
" \n",
" def search_folder_by_name(self, folder_name, folders_with_paths):\n",
" \"\"\"Search for folders by name (case-insensitive)\"\"\"\n",
" matches = []\n",
" search_name = folder_name.lower()\n",
" \n",
" for folder in folders_with_paths:\n",
" if search_name in folder['name'].lower():\n",
" matches.append(folder)\n",
" \n",
" return matches\n",
" \n",
" def display_all_folders(self, folders_with_paths, search_term=None):\n",
" \"\"\"Display all folders in a readable format\"\"\"\n",
" if search_term:\n",
" print(f\"\\n=== Searching for folders containing '{search_term}' ===\")\n",
" matches = self.search_folder_by_name(search_term, folders_with_paths)\n",
" if matches:\n",
" print(f\"Found {len(matches)} matching folders:\")\n",
" for folder in matches:\n",
" print(f\"📁 {folder['name']}\")\n",
" print(f\" Path: {folder['path']}\")\n",
" print(f\" ID: {folder['id']}\")\n",
" print()\n",
" else:\n",
" print(f\"No folders found containing '{search_term}'\")\n",
" else:\n",
" print(f\"\\n=== All {len(folders_with_paths)} folders in your Google Drive ===\")\n",
" \n",
" # Sort by path for better readability\n",
" sorted_folders = sorted(folders_with_paths, key=lambda x: x['path'])\n",
" \n",
" for folder in sorted_folders:\n",
" print(f\"📁 {folder['path']}\")\n",
" # print(f\" ID: {folder['id']}\") # Uncomment if you need IDs\n",
" \n",
" def find_blue_berry_folder(self):\n",
" \"\"\"Specifically look for the Anatomy Books folder\"\"\"\n",
" print(\"🔍 Searching for 'Blue Berry' folder...\")\n",
" \n",
" folders = self.get_all_folders()\n",
" if not folders:\n",
" print(\"No folders found or error occurred.\")\n",
" return\n",
" \n",
" folders_with_paths = self.build_folder_tree(folders)\n",
" \n",
" # Search for Anatomy Books specifically\n",
" blue_berry_matches = self.search_folder_by_name(\"Anatomy Books\", folders_with_paths)\n",
" \n",
" if blue_berry_matches:\n",
" print(f\"\\n✅ Found {len(blue_berry_matches)} 'Blue Berry' folder(s):\")\n",
" for folder in blue_berry_matches:\n",
" print(f\"📁 {folder['name']}\")\n",
" print(f\" Full Path: {folder['path']}\")\n",
" print(f\" Folder ID: {folder['id']}\")\n",
" print()\n",
" else:\n",
" print(\"\\n❌ No 'Blue Berry' folder found.\")\n",
" print(\"Let me show you all folders to help you locate it:\")\n",
" \n",
" # Show all folders if Blue Berry not found\n",
" self.display_all_folders(folders_with_paths)\n",
" \n",
" def interactive_folder_search(self):\n",
" \"\"\"Interactive search for any folder\"\"\"\n",
" print(\"📂 Google Drive Folder Explorer\")\n",
" print(\"=\" * 40)\n",
" \n",
" folders = self.get_all_folders()\n",
" if not folders:\n",
" print(\"No folders found or error occurred.\")\n",
" return\n",
" \n",
" folders_with_paths = self.build_folder_tree(folders)\n",
" \n",
" while True:\n",
" print(\"\\nOptions:\")\n",
" print(\"1. Search for a specific folder\")\n",
" print(\"2. Show all folders\")\n",
" print(\"3. Find 'Blue Berry' folder\")\n",
" print(\"4. Exit\")\n",
" \n",
" choice = input(\"\\nEnter your choice (1-4): \").strip()\n",
" \n",
" if choice == '1':\n",
" search_term = input(\"Enter folder name to search: \").strip()\n",
" if search_term:\n",
" self.display_all_folders(folders_with_paths, search_term)\n",
" \n",
" elif choice == '2':\n",
" self.display_all_folders(folders_with_paths)\n",
" \n",
" elif choice == '3':\n",
" blue_berry_matches = self.search_folder_by_name(\"blue berry\", folders_with_paths)\n",
" if blue_berry_matches:\n",
" print(f\"\\n✅ Found 'Blue Berry' folder(s):\")\n",
" for folder in blue_berry_matches:\n",
" print(f\"📁 {folder['name']} -> {folder['path']}\")\n",
" else:\n",
" print(\"\\n❌ 'Blue Berry' folder not found in your Drive.\")\n",
" \n",
" elif choice == '4':\n",
" print(\"Goodbye!\")\n",
" break\n",
" \n",
" else:\n",
" print(\"Invalid choice. Please try again.\")\n",
"\n",
"def main():\n",
" \"\"\"Main function to run the folder lister\"\"\"\n",
" try:\n",
" lister = DrivefolderLister()\n",
" \n",
" # Quick search for Blue Berry folder\n",
" lister.find_blue_berry_folder()\n",
" \n",
" # Uncomment the line below for interactive mode\n",
" # lister.interactive_folder_search()\n",
" \n",
" except Exception as e:\n",
" print(f\"Error initializing Drive connection: {e}\")\n",
" print(\"Make sure your GOOGLE_SERVICE_ACCOUNT_FILE environment variable is set correctly.\")\n",
"\n",
"if __name__ == \"__main__\":\n",
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29a61039-9043-44d5-94b1-4847350b2200",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 31,
"id": "7350d687-f76e-46f4-a30b-0518e5b8236e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🚀 Google Drive API Diagnostic Tool\n",
"==================================================\n",
"🔧 Testing Environment Variables...\n",
"----------------------------------------\n",
"✅ GOOGLE_SERVICE_ACCOUNT_FILE: C:/Users/Uche Buzz/myprojects/RAG/rag-system-463320-f292991d0516.json\n",
"✅ Service account file exists\n",
"✅ Service account file is valid JSON\n",
"✅ All required fields present in service account file\n",
"📧 Service account email: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
"✅ OPENAI_API_KEY: ************************************************************************************************************************************************************K1OkRRkA\n",
"\n",
"🔑 Testing Credentials Loading...\n",
"----------------------------------------\n",
"\n",
"Testing scope set 1: ['https://www.googleapis.com/auth/drive.readonly']\n",
"✅ Credentials loaded successfully with scopes: ['https://www.googleapis.com/auth/drive.readonly']\n",
"\n",
"🔧 Testing Drive Service Creation...\n",
"----------------------------------------\n",
"✅ Google Drive service created successfully\n",
"\n",
"📡 Testing Basic API Call...\n",
"----------------------------------------\n",
"✅ API call successful!\n",
"📧 Connected as: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
"👤 Display name: rag-base@rag-system-463320.iam.gserviceaccount.com\n",
"\n",
"🏠 Testing Root Folder Access...\n",
"----------------------------------------\n",
"✅ Root folder accessible\n",
"📁 Root folder name: My Drive\n",
"📄 Items in root folder: 0\n",
"\n",
"📁 Testing File Listing...\n",
"----------------------------------------\n",
"Test 1: Listing all files (including documents)...\n",
"✅ Found 7 files total\n",
"📄 First few files:\n",
" - Neuro Note Dr Clement.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)\n",
" - Morbid Anatomy - ANA 3107.pdf (application/pdf)\n",
" - ANT & POST TRIANGLES OF THE NECK.pdf (application/pdf)\n",
" - Anatomy Books (application/vnd.google-apps.folder)\n",
" - Blue Berry (application/vnd.google-apps.folder)\n",
"\n",
"Test 2: Listing folders only...\n",
"✅ Found 2 folders\n",
"📁 Folders found:\n",
" - Anatomy Books (ID: 125NdiCL7moQsNYWRuHojVM7If88Bxh0q)\n",
" - Blue Berry (ID: 1AYaS0yt_srFlgdE4mSNlqA6FLm10rdt4)\n",
"\n",
"✅ All diagnostic tests passed!\n",
"🎉 Your Google Drive API connection is working correctly!\n"
]
}
],
"source": [
"import os\n",
"from google.oauth2 import service_account\n",
"from googleapiclient.discovery import build\n",
"from googleapiclient.errors import HttpError\n",
"import json\n",
"\n",
"class DriveConnectionDiagnostic:\n",
" def __init__(self):\n",
" self.credentials = None\n",
" self.drive_service = None\n",
" self.setup_success = False\n",
" \n",
" def test_environment_variables(self):\n",
" \"\"\"Test if environment variables are set correctly\"\"\"\n",
" print(\"🔧 Testing Environment Variables...\")\n",
" print(\"-\" * 40)\n",
" \n",
" service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
" openai_key = os.getenv('OPENAI_API_KEY')\n",
" \n",
" if service_account_file:\n",
" print(f\"✅ GOOGLE_SERVICE_ACCOUNT_FILE: {service_account_file}\")\n",
" \n",
" # Check if file exists\n",
" if os.path.exists(service_account_file):\n",
" print(f\"✅ Service account file exists\")\n",
" \n",
" # Check if file is readable\n",
" try:\n",
" with open(service_account_file, 'r') as f:\n",
" service_data = json.load(f)\n",
" print(f\"✅ Service account file is valid JSON\")\n",
" \n",
" # Check required fields\n",
" required_fields = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email']\n",
" missing_fields = [field for field in required_fields if field not in service_data]\n",
" \n",
" if missing_fields:\n",
" print(f\"❌ Missing required fields in service account file: {missing_fields}\")\n",
" return False\n",
" else:\n",
" print(f\"✅ All required fields present in service account file\")\n",
" print(f\"📧 Service account email: {service_data.get('client_email')}\")\n",
" \n",
" except json.JSONDecodeError:\n",
" print(f\"❌ Service account file is not valid JSON\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Error reading service account file: {e}\")\n",
" return False\n",
" \n",
" else:\n",
" print(f\"❌ Service account file does not exist at: {service_account_file}\")\n",
" return False\n",
" else:\n",
" print(f\"❌ GOOGLE_SERVICE_ACCOUNT_FILE environment variable not set\")\n",
" return False\n",
" \n",
" if openai_key:\n",
" print(f\"✅ OPENAI_API_KEY: {'*' * (len(openai_key) - 8) + openai_key[-8:]}\")\n",
" else:\n",
" print(f\"⚠️ OPENAI_API_KEY not set (not required for folder listing)\")\n",
" \n",
" return True\n",
" \n",
" def test_credentials_loading(self):\n",
" \"\"\"Test if credentials can be loaded\"\"\"\n",
" print(\"\\n🔑 Testing Credentials Loading...\")\n",
" print(\"-\" * 40)\n",
" \n",
" try:\n",
" service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
" \n",
" # Try different scope combinations\n",
" scopes_to_test = [\n",
" ['https://www.googleapis.com/auth/drive.readonly'],\n",
" ['https://www.googleapis.com/auth/drive'],\n",
" ['https://www.googleapis.com/auth/drive.metadata.readonly'],\n",
" ['https://www.googleapis.com/auth/drive.file']\n",
" ]\n",
" \n",
" for i, scopes in enumerate(scopes_to_test):\n",
" try:\n",
" print(f\"\\nTesting scope set {i+1}: {scopes}\")\n",
" self.credentials = service_account.Credentials.from_service_account_file(\n",
" service_account_file,\n",
" scopes=scopes\n",
" )\n",
" print(f\"✅ Credentials loaded successfully with scopes: {scopes}\")\n",
" return True\n",
" except Exception as e:\n",
" print(f\"❌ Failed to load credentials with scopes {scopes}: {e}\")\n",
" \n",
" return False\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Error loading credentials: {e}\")\n",
" return False\n",
" \n",
" def test_drive_service_creation(self):\n",
" \"\"\"Test if Drive service can be created\"\"\"\n",
" print(\"\\n🔧 Testing Drive Service Creation...\")\n",
" print(\"-\" * 40)\n",
" \n",
" try:\n",
" if not self.credentials:\n",
" print(\"❌ No credentials available\")\n",
" return False\n",
" \n",
" self.drive_service = build('drive', 'v3', credentials=self.credentials)\n",
" print(\"✅ Google Drive service created successfully\")\n",
" return True\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Error creating Drive service: {e}\")\n",
" return False\n",
" \n",
" def test_basic_api_call(self):\n",
" \"\"\"Test basic API functionality\"\"\"\n",
" print(\"\\n📡 Testing Basic API Call...\")\n",
" print(\"-\" * 40)\n",
" \n",
" try:\n",
" if not self.drive_service:\n",
" print(\"❌ No Drive service available\")\n",
" return False\n",
" \n",
" # Test getting user info\n",
" about = self.drive_service.about().get(fields=\"user\").execute()\n",
" user_info = about.get('user', {})\n",
" print(f\"✅ API call successful!\")\n",
" print(f\"📧 Connected as: {user_info.get('emailAddress', 'Unknown')}\")\n",
" print(f\"👤 Display name: {user_info.get('displayName', 'Unknown')}\")\n",
" \n",
" return True\n",
" \n",
" except HttpError as e:\n",
" print(f\"❌ HTTP Error: {e}\")\n",
" if e.resp.status == 403:\n",
" print(\" This might be a permissions issue. Check:\")\n",
" print(\" 1. Is the service account enabled?\")\n",
" print(\" 2. Does it have the right permissions?\")\n",
" print(\" 3. Is the Google Drive API enabled in your project?\")\n",
" return False\n",
" except Exception as e:\n",
" print(f\"❌ Error making API call: {e}\")\n",
" return False\n",
" \n",
" def test_file_listing(self):\n",
" \"\"\"Test different file listing approaches\"\"\"\n",
" print(\"\\n📁 Testing File Listing...\")\n",
" print(\"-\" * 40)\n",
" \n",
" if not self.drive_service:\n",
" print(\"❌ No Drive service available\")\n",
" return False\n",
" \n",
" # Test 1: List any files (not just folders)\n",
" try:\n",
" print(\"Test 1: Listing all files (including documents)...\")\n",
" results = self.drive_service.files().list(\n",
" q=\"trashed=false\",\n",
" fields=\"files(id, name, mimeType)\",\n",
" pageSize=10\n",
" ).execute()\n",
" \n",
" files = results.get('files', [])\n",
" print(f\"✅ Found {len(files)} files total\")\n",
" \n",
" if files:\n",
" print(\"📄 First few files:\")\n",
" for file in files[:5]:\n",
" print(f\" - {file['name']} ({file['mimeType']})\")\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Error listing files: {e}\")\n",
" return False\n",
" \n",
" # Test 2: List only folders\n",
" try:\n",
" print(f\"\\nTest 2: Listing folders only...\")\n",
" results = self.drive_service.files().list(\n",
" q=\"mimeType='application/vnd.google-apps.folder' and trashed=false\",\n",
" fields=\"files(id, name, parents)\",\n",
" pageSize=10\n",
" ).execute()\n",
" \n",
" folders = results.get('files', [])\n",
" print(f\"✅ Found {len(folders)} folders\")\n",
" \n",
" if folders:\n",
" print(\"📁 Folders found:\")\n",
" for folder in folders:\n",
" print(f\" - {folder['name']} (ID: {folder['id']})\")\n",
" else:\n",
" print(\"⚠️ No folders found - this might indicate:\")\n",
" print(\" 1. Your Google Drive is empty\")\n",
" print(\" 2. The service account doesn't have access to your personal Drive\")\n",
" print(\" 3. You need to share folders with the service account\")\n",
" \n",
" return len(folders) > 0\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Error listing folders: {e}\")\n",
" return False\n",
" \n",
" def test_root_access(self):\n",
" \"\"\"Test access to root folder\"\"\"\n",
" print(\"\\n🏠 Testing Root Folder Access...\")\n",
" print(\"-\" * 40)\n",
" \n",
" try:\n",
" if not self.drive_service:\n",
" print(\"❌ No Drive service available\")\n",
" return False\n",
" \n",
" # Try to get root folder info\n",
" root_info = self.drive_service.files().get(\n",
" fileId='root',\n",
" fields=\"id, name, mimeType\"\n",
" ).execute()\n",
" \n",
" print(f\"✅ Root folder accessible\")\n",
" print(f\"📁 Root folder name: {root_info.get('name', 'My Drive')}\")\n",
" \n",
" # Try to list contents of root\n",
" results = self.drive_service.files().list(\n",
" q=\"'root' in parents and trashed=false\",\n",
" fields=\"files(id, name, mimeType)\",\n",
" pageSize=10\n",
" ).execute()\n",
" \n",
" root_contents = results.get('files', [])\n",
" print(f\"📄 Items in root folder: {len(root_contents)}\")\n",
" \n",
" if root_contents:\n",
" print(\"🔍 Root folder contents:\")\n",
" for item in root_contents[:5]:\n",
" item_type = \"📁\" if \"folder\" in item['mimeType'] else \"📄\"\n",
" print(f\" {item_type} {item['name']}\")\n",
" \n",
" return True\n",
" \n",
" except Exception as e:\n",
" print(f\"❌ Error accessing root folder: {e}\")\n",
" return False\n",
" \n",
" def run_full_diagnostic(self):\n",
" \"\"\"Run complete diagnostic\"\"\"\n",
" print(\"🚀 Google Drive API Diagnostic Tool\")\n",
" print(\"=\" * 50)\n",
" \n",
" # Step 1: Environment variables\n",
" if not self.test_environment_variables():\n",
" print(\"\\n❌ Environment setup failed. Please check your service account file.\")\n",
" return False\n",
" \n",
" # Step 2: Credentials\n",
" if not self.test_credentials_loading():\n",
" print(\"\\n❌ Credentials loading failed.\")\n",
" return False\n",
" \n",
" # Step 3: Service creation\n",
" if not self.test_drive_service_creation():\n",
" print(\"\\n❌ Drive service creation failed.\")\n",
" return False\n",
" \n",
" # Step 4: Basic API call\n",
" if not self.test_basic_api_call():\n",
" print(\"\\n❌ Basic API call failed.\")\n",
" return False\n",
" \n",
" # Step 5: Root access\n",
" if not self.test_root_access():\n",
" print(\"\\n❌ Root folder access failed.\")\n",
" return False\n",
" \n",
" # Step 6: File listing\n",
" if not self.test_file_listing():\n",
" print(\"\\n❌ File listing failed or no folders found.\")\n",
" print(\"\\n🔧 SOLUTION SUGGESTIONS:\")\n",
" print(\"1. If you're using a service account, you need to SHARE folders with it\")\n",
" print(\"2. Share your 'Blue Berry' folder with the service account email\")\n",
" print(\"3. Or consider using OAuth2 instead of service account for personal Drive access\")\n",
" return False\n",
" \n",
" print(\"\\n✅ All diagnostic tests passed!\")\n",
" print(\"🎉 Your Google Drive API connection is working correctly!\")\n",
" return True\n",
" \n",
" def show_service_account_sharing_instructions(self):\n",
" \"\"\"Show instructions for sharing with service account\"\"\"\n",
" print(\"\\n📋 SERVICE ACCOUNT SHARING INSTRUCTIONS:\")\n",
" print(\"=\" * 50)\n",
" print(\"If you're using a service account, you need to share folders with it:\")\n",
" print()\n",
" print(\"1. Open Google Drive in your browser\")\n",
" print(\"2. Find the folder you want to access (e.g., 'Blue Berry')\")\n",
" print(\"3. Right-click the folder → 'Share'\")\n",
" print(\"4. Add the service account email address\")\n",
" print(\"5. Give it 'Viewer' or 'Editor' permissions\")\n",
" print(\"6. Click 'Send'\")\n",
" print()\n",
" \n",
" if self.credentials:\n",
" try:\n",
" service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE')\n",
" with open(service_account_file, 'r') as f:\n",
" service_data = json.load(f)\n",
" email = service_data.get('client_email')\n",
" print(f\"📧 Your service account email: {email}\")\n",
" print(\" ^ Share your folders with this email address\")\n",
" except:\n",
" print(\"❌ Could not read service account email\")\n",
"\n",
"def main():\n",
" diagnostic = DriveConnectionDiagnostic()\n",
" success = diagnostic.run_full_diagnostic()\n",
" \n",
" if not success:\n",
" diagnostic.show_service_account_sharing_instructions()\n",
"\n",
"if __name__ == \"__main__\":\n",
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e3374b17-d2fa-4c80-a70b-76b8bf94c702",
"metadata": {},
"outputs": [],
"source": [
"#flask"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "c8ac4c93-8b36-49d2-b8e1-efb7d1c4b104",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" * Serving Flask app '__main__'\n",
" * Debug mode: on\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.\n",
" * Running on http://127.0.0.1:5000\n",
"Press CTRL+C to quit\n",
" * Restarting with stat\n"
]
},
{
"ename": "SystemExit",
"evalue": "1",
"output_type": "error",
"traceback": [
"An exception has occurred, use %tb to see the full traceback.\n",
"\u001b[31mSystemExit\u001b[39m\u001b[31m:\u001b[39m 1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"C:\\Users\\Uche Buzz\\anaconda3\\envs\\RAG\\Lib\\site-packages\\IPython\\core\\interactiveshell.py:3680: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.\n",
" warn(\"To exit: use 'exit', 'quit', or Ctrl-D.\", stacklevel=1)\n"
]
}
],
"source": [
"from flask import Flask, request, jsonify, render_template_string\n",
"\n",
"app = Flask(__name__)\n",
"integration = GPTDriveIntegration()\n",
"\n",
"HTML_TEMPLATE = \"\"\"\n",
"<!DOCTYPE html>\n",
"<html>\n",
"<head>\n",
" <title>GPT-Drive Integration</title>\n",
" <style>\n",
" body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }\n",
" .query-box { width: 100%; padding: 10px; margin: 10px 0; }\n",
" .result { background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }\n",
" .sources { color: #666; font-size: 0.9em; }\n",
" </style>\n",
"</head>\n",
"<body>\n",
" <h1>GPT-Google Drive Integration</h1>\n",
" <form id=\"queryForm\">\n",
" <input type=\"text\" id=\"query\" class=\"query-box\" placeholder=\"Ask something about your Google Drive files...\">\n",
" <button type=\"submit\">Search & Ask</button>\n",
" </form>\n",
" <div id=\"results\"></div>\n",
"\n",
" <script>\n",
" document.getElementById('queryForm').onsubmit = async function(e) {\n",
" e.preventDefault();\n",
" const query = document.getElementById('query').value;\n",
" const response = await fetch('/query', {\n",
" method: 'POST',\n",
" headers: {'Content-Type': 'application/json'},\n",
" body: JSON.stringify({query: query})\n",
" });\n",
" const result = await response.json();\n",
" document.getElementById('results').innerHTML = \n",
" `<div class=\"result\">\n",
" <strong>Answer:</strong> ${result.answer}\n",
" <div class=\"sources\"><strong>Sources:</strong> ${result.sources.join(', ')}</div>\n",
" </div>`;\n",
" };\n",
" </script>\n",
"</body>\n",
"</html>\n",
"\"\"\"\n",
"\n",
"@app.route('/')\n",
"def home():\n",
" return render_template_string(HTML_TEMPLATE)\n",
"\n",
"@app.route('/query', methods=['POST'])\n",
"def query():\n",
" data = request.json\n",
" result = integration.process_query(data['query'])\n",
" return jsonify(result)\n",
"\n",
"if __name__ == '__main__':\n",
" app.run(debug=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "caef8af5-73f9-4627-9203-0e9c8d098db0",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "ff7582bd-f80c-46c4-a51b-b20fd7493b0e",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "96ea63b3-3cf3-41c1-b160-72d677ab0949",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "a8a64f0d-c9b5-4ce7-a887-e1b60de4dac3",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 12,
"id": "abbc0f6c-5350-4fd9-8711-de68d4aa739d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "02ae258f-bef0-469b-a696-1edf6a7b5921",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 13,
"id": "09a0b418-eb0e-486e-bf05-bc7b43645784",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"* Running on local URL: http://127.0.0.1:7860\n",
"* Running on public URL: https://a9ef60dd3438b2ef0e.gradio.live\n",
"\n",
"This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n"
]
},
{
"data": {
"text/html": [
"<div><iframe src=\"https://a9ef60dd3438b2ef0e.gradio.live\" width=\"100%\" height=\"500\" allow=\"autoplay; camera; microphone; clipboard-read; clipboard-write;\" frameborder=\"0\" allowfullscreen></iframe></div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Keyboard interruption in main thread... closing server.\n",
"Killing tunnel 127.0.0.1:7860 <> https://a9ef60dd3438b2ef0e.gradio.live\n"
]
}
],
"source": [
"gpt_drive = GPTDriveIntegration()\n",
"\n",
"def process_user_query(query, search_terms_input):\n",
" \"\"\"Process user query and return formatted response\"\"\"\n",
" if not query.strip():\n",
" return \"Please enter a question.\", \"\"\n",
" \n",
" # Parse search terms if provided\n",
" search_terms = None\n",
" # if search_terms_input.strip():\n",
" # search_terms = [term.strip() for term in search_terms_input.split(',')]\n",
" \n",
" # Process the query\n",
" result = gpt_drive.process_query(query, search_terms)\n",
" \n",
" # Format the response\n",
" answer = result['answer']\n",
" sources = result['sources']\n",
" \n",
" sources_text = \"\"\n",
" if sources:\n",
" sources_text = \"**Sources used:**\\n\" + \"\\n\".join([f\"• {source}\" for source in sources])\n",
" \n",
" return answer, sources_text\n",
"\n",
"def check_setup():\n",
" \"\"\"Check if the APIs are properly configured\"\"\"\n",
" status_messages = []\n",
" \n",
" # Check Google Drive API\n",
" if gpt_drive.drive_initialized:\n",
" status_messages.append(\"✅ Google Drive API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}\")\n",
" \n",
" # Check OpenAI API\n",
" if gpt_drive.openai_initialized:\n",
" status_messages.append(\"✅ OpenAI API: Connected\")\n",
" else:\n",
" status_messages.append(f\"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}\")\n",
" \n",
" return \"\\n\".join(status_messages)\n",
"\n",
"# Create Gradio interface\n",
"with gr.Blocks(title=\"Augusta's Anatomy Reading Assistant\", theme=gr.themes.Soft()) as app:\n",
" gr.Markdown(\"# 🤖 Augusta's Anatomy bot\")\n",
" gr.Markdown(\"Ask questions about your anatomy books using AI!\")\n",
" \n",
" with gr.Row():\n",
" with gr.Column(scale=2):\n",
" # Main query interface\n",
" with gr.Group():\n",
" gr.Markdown(\"### Ask a Question\")\n",
" query_input = gr.Textbox(\n",
" label=\"Your Question\",\n",
" placeholder=\"Ask me any question about your anatomy books?\",\n",
" lines=3\n",
" )\n",
" \n",
" search_terms_input = gr.Textbox(\n",
" label=\"Search Terms (optional)\",\n",
" placeholder=\"Enter comma-separated terms to search for specific files\",\n",
" lines=1\n",
" )\n",
" \n",
" submit_btn = gr.Button(\"Search & Ask\", variant=\"primary\", size=\"lg\")\n",
" \n",
" # Results section\n",
" with gr.Group():\n",
" gr.Markdown(\"### Answer\")\n",
" answer_output = gr.Textbox(\n",
" label=\"AI Response\",\n",
" lines=10,\n",
" interactive=False\n",
" )\n",
" \n",
" sources_output = gr.Textbox(\n",
" label=\"Sources\",\n",
" lines=3,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Column(scale=1):\n",
" # Status and setup info\n",
" with gr.Group():\n",
" gr.Markdown(\"### System Status\")\n",
" status_btn = gr.Button(\"Check Status\", size=\"sm\")\n",
" status_output = gr.Textbox(\n",
" label=\"API Status\",\n",
" lines=4,\n",
" interactive=False\n",
" )\n",
" \n",
" with gr.Group():\n",
" gr.Markdown(\"### Setup Instructions\")\n",
" gr.Markdown(\"\"\"\n",
" **Important Notes:**\n",
" 1.Only documents shared with it, it can answer\n",
" \n",
" **File Types Supported:**\n",
" - Google Docs\n",
" - Google Sheets \n",
" - PDF files\n",
" - Text files\n",
" \n",
" **Tips:**\n",
" - Use specific search terms for better results\n",
" - The system searches the top 3 most relevant files\n",
" - Ask clear, specific questions for better answers\n",
" \"\"\")\n",
" \n",
" # Event handlers\n",
" submit_btn.click(\n",
" fn=process_user_query,\n",
" inputs=[query_input, search_terms_input],\n",
" outputs=[answer_output, sources_output]\n",
" )\n",
" \n",
" status_btn.click(\n",
" fn=check_setup,\n",
" outputs=status_output\n",
" )\n",
" \n",
" # Example queries\n",
" with gr.Row():\n",
" gr.Examples(\n",
" examples=[\n",
" [\"What is morbid Anatomy?\", \"morbid, Anatomy\"],\n",
" [\"The transmission of nerves from one neuron to another is as a result of what?\", \"neuron, nerves, Dr Clement\"],\n",
" ],\n",
" inputs=[query_input, search_terms_input],\n",
" )\n",
"\n",
"# Launch the app\n",
"if __name__ == \"__main__\":\n",
" app.launch(\n",
" share=True,debug =True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "35d0a347-2cc6-4f4e-bb70-cc47067eaf53",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}