{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "e549bafd-78b1-4a83-80b4-2cb597efff79", "metadata": {}, "outputs": [], "source": [ "import os\n", "import json\n", "from google.oauth2 import service_account\n", "from googleapiclient.discovery import build\n", "from googleapiclient.http import MediaIoBaseDownload\n", "import openai\n", "from dotenv import load_dotenv, dotenv_values\n", "import io" ] }, { "cell_type": "code", "execution_count": 2, "id": "04361235-7896-4439-9d04-1400e043528b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "load_dotenv()" ] }, { "cell_type": "code", "execution_count": 3, "id": "9ae411c5-c84b-4bfd-b089-69b5c5ba70ae", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "OPENAI_API_KEY\n", "ANTHROPIC_API_KEY\n", "GOOGLE_SERVICE_ACCOUNT_FILE\n" ] } ], "source": [ "config = dotenv_values(\".env\")\n", "for key in config.keys():\n", " print(key)" ] }, { "cell_type": "code", "execution_count": 4, "id": "7622a0e4-64a6-4848-b588-bd65d56c55e0", "metadata": {}, "outputs": [], "source": [ "from openai import OpenAI\n", "openai.api_key = os.getenv('OPENAI_API_KEY')\n", "openai = OpenAI(api_key = openai.api_key)" ] }, { "cell_type": "code", "execution_count": 5, "id": "2ead7c59-1f11-478a-bb69-2928ddc38901", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello! How can I assist you today?\n" ] } ], "source": [ "response = openai.chat.completions.create(\n", " model = \"gpt-4o-mini\",\n", " messages = [\n", " {\"role\":\"system\", \"content\":\"you are a helpful assistant\"},\n", " {\"role\":\"user\", \"content\":\"hi\"}\n", " ])\n", "\n", "reply = response.choices[0].message.content\n", "print(reply)" ] }, { "cell_type": "code", "execution_count": 6, "id": "b3bcbada-6a72-4cc8-a166-d2e596cd1fc4", "metadata": {}, "outputs": [], "source": [ "service_account_file_path = os.getenv(\"GOOGLE_SERVICE_ACCOUNT_FILE\")" ] }, { "cell_type": "code", "execution_count": 7, "id": "81c59a6d-0831-4bff-b3fc-fbe3d4cc1e31", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "openai activated\n", "service_account_file_path activated\n" ] } ], "source": [ "#troubleshoot\n", "if openai is None:\n", " print(\"openai not activated\")\n", "else: \n", " print (\"openai activated\")\n", "\n", "if service_account_file_path is None:\n", " print(\"service_account_file_path not activated\")\n", "else: \n", " print (\"service_account_file_path activated\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a70f32aa-9e43-4175-8cca-d6af723aef91", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 8, "id": "0efe325d-badd-4533-affe-47d572ef128e", "metadata": {}, "outputs": [], "source": [ "class GPTDriveIntegration:\n", " def __init__(self):\n", " # Initialize Google Drive API\n", " self.credentials = service_account.Credentials.from_service_account_file(\n", " os.getenv('GOOGLE_SERVICE_ACCOUNT_FILE'),\n", " scopes=['https://www.googleapis.com/auth/drive.readonly']\n", " )\n", " self.drive_service = build('drive', 'v3', credentials=self.credentials)\n", " \n", " # Initialize OpenAI\n", " openai.api_key = os.getenv('OPENAI_API_KEY')\n", " \n", " def search_files(self, query, file_types=None):\n", " \"\"\"Search for files in Google Drive\"\"\"\n", " search_query = f\"name contains '{query}'\"\n", " \n", " if file_types:\n", " type_queries = []\n", " for file_type in file_types:\n", " if file_type.lower() == 'pdf':\n", " type_queries.append(\"mimeType='application/pdf'\")\n", " elif file_type.lower() in ['doc', 'docx']:\n", " type_queries.append(\"mimeType contains 'document'\")\n", " elif file_type.lower() in ['xls', 'xlsx']:\n", " type_queries.append(\"mimeType contains 'spreadsheet'\")\n", " \n", " if type_queries:\n", " search_query += f\" and ({' or '.join(type_queries)})\"\n", " \n", " results = self.drive_service.files().list(\n", " q=search_query,\n", " fields=\"files(id, name, mimeType, size)\"\n", " ).execute()\n", " \n", " return results.get('files', [])\n", " \n", " def get_file_content(self, file_id, mime_type):\n", " \"\"\"Download and extract text content from file\"\"\"\n", " try:\n", " if 'text' in mime_type or 'document' in mime_type:\n", " # For Google Docs, export as plain text\n", " if 'document' in mime_type:\n", " request = self.drive_service.files().export_media(\n", " fileId=file_id, mimeType='text/plain'\n", " )\n", " else:\n", " request = self.drive_service.files().get_media(fileId=file_id)\n", " \n", " file_content = io.BytesIO()\n", " downloader = MediaIoBaseDownload(file_content, request)\n", " done = False\n", " while done is False:\n", " status, done = downloader.next_chunk()\n", " \n", " return file_content.getvalue().decode('utf-8')\n", " \n", " elif 'spreadsheet' in mime_type:\n", " # For Google Sheets, export as CSV\n", " request = self.drive_service.files().export_media(\n", " fileId=file_id, mimeType='text/csv'\n", " )\n", " file_content = io.BytesIO()\n", " downloader = MediaIoBaseDownload(file_content, request)\n", " done = False\n", " while done is False:\n", " status, done = downloader.next_chunk()\n", " \n", " return file_content.getvalue().decode('utf-8')\n", " \n", " elif mime_type == 'application/pdf':\n", " # For PDF files, download binary content and extract text\n", " request = self.drive_service.files().get_media(fileId=file_id)\n", " file_content = io.BytesIO()\n", " downloader = MediaIoBaseDownload(file_content, request)\n", " done = False\n", " while done is False:\n", " status, done = downloader.next_chunk()\n", " \n", " # Extract text from PDF using PyPDF2 or pdfplumber\n", " file_content.seek(0) # Reset buffer position\n", " \n", " # Option 1: Using PyPDF2\n", " try:\n", " import PyPDF2\n", " pdf_reader = PyPDF2.PdfReader(file_content)\n", " text = \"\"\n", " for page in pdf_reader.pages:\n", " text += page.extract_text() + \"\\n\"\n", " return text\n", " except ImportError:\n", " pass\n", " \n", " # Option 2: Using pdfplumber (better for complex PDFs)\n", " try:\n", " import pdfplumber\n", " text = \"\"\n", " with pdfplumber.open(file_content) as pdf:\n", " for page in pdf.pages:\n", " page_text = page.extract_text()\n", " if page_text:\n", " text += page_text + \"\\n\"\n", " return text\n", " except ImportError:\n", " pass\n", " \n", " # Option 3: Using pymupdf (fitz) - fastest option\n", " try:\n", " import fitz # pymupdf\n", " pdf_document = fitz.open(stream=file_content.read(), filetype=\"pdf\")\n", " text = \"\"\n", " for page_num in range(pdf_document.page_count):\n", " page = pdf_document[page_num]\n", " text += page.get_text() + \"\\n\"\n", " pdf_document.close()\n", " return text\n", " except ImportError:\n", " pass\n", " \n", " return \"PDF text extraction requires PyPDF2, pdfplumber, or pymupdf library\"\n", " \n", " else:\n", " return \"File type not supported for text extraction\"\n", " \n", " except Exception as e:\n", " return f\"Error reading file: {str(e)}\"\n", " \n", " def query_gpt_with_context(self, user_query, file_contents):\n", " \"\"\"Send query to GPT with file context\"\"\"\n", " context = \"\\n\\n\".join([\n", " f\"File: {content['name']}\\nContent: {content['text'][:2000]}...\"\n", " for content in file_contents\n", " ])\n", " \n", " messages = [\n", " {\n", " \"role\": \"system\", \n", " \"content\": \"\"\"\n", " You are an AI assistant that can analyze documents from Google Drive. \n", " Use the provided file contents to answer user questions.\"\"\"\n", " },\n", " {\n", " \"role\": \"user\", \n", " \"content\": f\"Context from Google Drive files:\\n{context}\\n\\nUser Question: {user_query}\"\n", " }\n", " ]\n", " \n", " response = openai.chat.completions.create(\n", " model=\"gpt-4o-mini\",\n", " messages=messages,\n", " max_tokens=1000\n", " )\n", " \n", " return response.choices[0].message.content\n", " \n", " def process_query(self, user_query, search_terms=None):\n", " \"\"\"Main function to process user queries\"\"\"\n", " # Extract search terms from query if not provided\n", " if not search_terms:\n", " search_terms = user_query.split()[:3] # Simple extraction\n", " \n", " # Search for relevant files\n", " files = []\n", " for term in search_terms:\n", " files.extend(self.search_files(term))\n", " \n", " # Remove duplicates\n", " unique_files = {f['id']: f for f in files}.values()\n", " \n", " # Get content from top 3 most relevant files\n", " file_contents = []\n", " for file in list(unique_files)[:3]:\n", " content = self.get_file_content(file['id'], file['mimeType'])\n", " file_contents.append({\n", " 'name': file['name'],\n", " 'text': content\n", " })\n", " \n", " # Query GPT with context\n", " if file_contents:\n", " response = self.query_gpt_with_context(user_query, file_contents)\n", " return {\n", " 'answer': response,\n", " 'sources': [f['name'] for f in file_contents]\n", " }\n", " else:\n", " return {\n", " 'answer': \"No relevant files found in your Google Drive.\",\n", " 'sources': []\n", " }" ] }, { "cell_type": "code", "execution_count": 9, "id": "3c2c1ccf-9ade-482d-a170-978e97bc1c08", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Answer: The transmission of nerves is called \"nerve conduction.\" This process involves the propagation of electrical impulses along the nerve fibers, allowing for communication between different parts of the body.\n", "Sources: ['Neuro Note Dr Clement.docx']\n" ] } ], "source": [ "if __name__ == \"__main__\":\n", " integration = GPTDriveIntegration()\n", " \n", " # Test query\n", " result = integration.process_query(\n", " \"The transmission of nerves is called?\",\n", " search_terms=[\"nerves\", \"Dr Clement\"]\n", " )\n", " \n", " print(\"Answer:\", result['answer'])\n", " print(\"Sources:\", result['sources'])" ] }, { "cell_type": "code", "execution_count": null, "id": "120e7c93-b38a-4c89-8e76-5b0170d22548", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "id": "61e976a6-1fe4-45de-a63d-3cf849eedbe1", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 11, "id": "c514b8af-1af4-4497-9044-139a71aedd36", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "* Running on local URL: http://127.0.0.1:7860\n", "* Running on public URL: https://5320171f6af3e2eef9.gradio.live\n", "\n", "This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)\n" ] }, { "data": { "text/html": [ "
" ], "text/plain": [ "