import os import json import requests import json import tempfile from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import openai from dotenv import load_dotenv, dotenv_values import io from openai import OpenAI openai.api_key = os.getenv('OPENAI_API_KEY') openai = OpenAI(api_key = openai.api_key) class GPTDriveIntegration: def __init__(self): # Download the credentials file from Hugging Face credentials_url = "https://huggingface.co/spaces/Ephraimmm/studybuddy/resolve/main/rag-system-463320-f292991d0516.json" try: # Download the file response = requests.get(credentials_url) response.raise_for_status() # Raise an exception for bad status codes # Create a temporary file to store the credentials with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: json.dump(response.json(), temp_file) temp_credentials_path = temp_file.name # Initialize Google Drive API with the temporary file self.credentials = service_account.Credentials.from_service_account_file( temp_credentials_path, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) # Clean up the temporary file os.unlink(temp_credentials_path) except requests.RequestException as e: raise Exception(f"Failed to download credentials file: {e}") except json.JSONDecodeError as e: raise Exception(f"Invalid JSON in credentials file: {e}") self.drive_service = build('drive', 'v3', credentials=self.credentials) # Initialize OpenAI openai.api_key = os.getenv('OPENAI_API_KEY') def search_files(self, query, file_types=None): """Search for files in Google Drive""" search_query = f"name contains '{query}'" if file_types: type_queries = [] for file_type in file_types: if file_type.lower() == 'pdf': type_queries.append("mimeType='application/pdf'") elif file_type.lower() in ['doc', 'docx']: type_queries.append("mimeType contains 'document'") elif file_type.lower() in ['xls', 'xlsx']: type_queries.append("mimeType contains 'spreadsheet'") if type_queries: search_query += f" and ({' or '.join(type_queries)})" results = self.drive_service.files().list( q=search_query, fields="files(id, name, mimeType, size)" ).execute() return results.get('files', []) def get_file_content(self, file_id, mime_type): """Download and extract text content from file""" try: if 'text' in mime_type or 'document' in mime_type: # For Google Docs, export as plain text if 'document' in mime_type: request = self.drive_service.files().export_media( fileId=file_id, mimeType='text/plain' ) else: request = self.drive_service.files().get_media(fileId=file_id) file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while done is False: status, done = downloader.next_chunk() return file_content.getvalue().decode('utf-8') elif 'spreadsheet' in mime_type: # For Google Sheets, export as CSV request = self.drive_service.files().export_media( fileId=file_id, mimeType='text/csv' ) file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while done is False: status, done = downloader.next_chunk() return file_content.getvalue().decode('utf-8') elif mime_type == 'application/pdf': # For PDF files, download binary content and extract text request = self.drive_service.files().get_media(fileId=file_id) file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while done is False: status, done = downloader.next_chunk() # Extract text from PDF file_content.seek(0) try: import PyPDF2 pdf_reader = PyPDF2.PdfReader(file_content) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text except ImportError: return "PDF text extraction requires PyPDF2 library" else: return "File type not supported for text extraction" except Exception as e: return f"Error reading file: {str(e)}" def query_gpt_with_context(self, user_query, file_contents): """Send query to GPT with file context""" context = "\n\n".join([ f"File: {content['name']}\nContent: {content['text'][:2000]}..." for content in file_contents ]) messages = [ { "role": "system", "content": """ You are an AI assistant that can analyze documents from Google Drive. Use the provided file contents to answer user questions.""" }, { "role": "user", "content": f"Context from Google Drive files:\n{context}\n\nUser Question: {user_query}" } ] response = openai.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=1000 ) return response.choices[0].message.content def process_query(self, user_query, search_terms=None): """Main function to process user queries""" # Extract search terms from query if not provided if not search_terms: search_terms = user_query.split()[:3] # Simple extraction # Search for relevant files files = [] for term in search_terms: files.extend(self.search_files(term)) # Remove duplicates unique_files = {f['id']: f for f in files}.values() # Get content from top 3 most relevant files file_contents = [] for file in list(unique_files)[:3]: content = self.get_file_content(file['id'], file['mimeType']) file_contents.append({ 'name': file['name'], 'text': content }) # Query GPT with context if file_contents: response = self.query_gpt_with_context(user_query, file_contents) return { 'answer': response, 'sources': [f['name'] for f in file_contents] } else: return { 'answer': "No relevant files found in your Google Drive.", 'sources': [] } def query_gpt_with_context(self, user_query, file_contents): """Send query to GPT with file context""" context = "\n\n".join([ f"File: {content['name']}\nContent: {content['text'][:2000]}..." for content in file_contents ]) messages = [ { "role": "system", "content": """ You are an AI assistant that can analyze documents from Google Drive. Use the provided file contents to answer user questions.""" }, { "role": "user", "content": f"Context from Google Drive files:\n{context}\n\nUser Question: {user_query}" } ] response = openai.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=1000 ) return response.choices[0].message.content def process_query(self, user_query, search_terms=None): """Main function to process user queries""" # Extract search terms from query if not provided if not search_terms: search_terms = user_query.split()[:3] # Simple extraction # Search for relevant files files = [] for term in search_terms: files.extend(self.search_files(term)) # Remove duplicates unique_files = {f['id']: f for f in files}.values() # Get content from top 3 most relevant files file_contents = [] for file in list(unique_files)[:3]: content = self.get_file_content(file['id'], file['mimeType']) file_contents.append({ 'name': file['name'], 'text': content }) # Query GPT with context if file_contents: response = self.query_gpt_with_context(user_query, file_contents) return { 'answer': response, 'sources': [f['name'] for f in file_contents] } else: return { 'answer': "No relevant files found in your Google Drive.", 'sources': [] } gpt_drive = GPTDriveIntegration() def process_user_query(query, search_terms_input): """Process user query and return formatted response""" if not query.strip(): return "Please enter a question.", "" # Parse search terms if provided search_terms = None # if search_terms_input.strip(): # search_terms = [term.strip() for term in search_terms_input.split(',')] # Process the query result = gpt_drive.process_query(query, search_terms) # Format the response answer = result['answer'] sources = result['sources'] sources_text = "" if sources: sources_text = "**Sources used:**\n" + "\n".join([f"• {source}" for source in sources]) return answer, sources_text def check_setup(): """Check if the APIs are properly configured""" status_messages = [] # Check Google Drive API if gpt_drive.drive_initialized: status_messages.append("✅ Google Drive API: Connected") else: status_messages.append(f"❌ Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}") # Check OpenAI API if gpt_drive.openai_initialized: status_messages.append("✅ OpenAI API: Connected") else: status_messages.append(f"❌ OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}") return "\n".join(status_messages) # Create Gradio interface import gradio as gr with gr.Blocks(title="Study Buddy", theme=gr.themes.Soft()) as app: gr.Markdown("# 300 Level Anatamomy Study Buddy Anatomy bot") gr.Markdown("Study more effectively with study Buddy!") with gr.Row(): with gr.Column(scale=2): # Main query interface with gr.Group(): gr.Markdown("### Ask a Question") query_input = gr.Textbox( label="Your Question", placeholder="Ask me any question about your anatomy books?", lines=3 ) search_terms_input = gr.Textbox( label="Search Terms", placeholder="Enter comma-separated terms to search for specific files", lines=1 ) submit_btn = gr.Button("Search & Ask", variant="primary", size="lg") # Results section with gr.Group(): gr.Markdown("### Answer") answer_output = gr.Textbox( label="AI Response", lines=10, interactive=False ) sources_output = gr.Textbox( label="Sources", lines=3, interactive=False ) with gr.Column(scale=1): # Status and setup info with gr.Group(): gr.Markdown("### System Status") status_btn = gr.Button("Check Status", size="sm") status_output = gr.Textbox( label="API Status", lines=4, interactive=False ) with gr.Group(): gr.Markdown("### Setup Instructions") gr.Markdown(""" **Important Notes:** 1.Only documents shared with studybuddy, it can answer **File Types Supported:** - Google Docs - Google Sheets - PDF files - Text files **Tips:** - Use specific search terms for better results - The system searches the top 3 most relevant files - Ask clear, specific questions for better answers """) # Event handlers submit_btn.click( fn=process_user_query, inputs=[query_input, search_terms_input], outputs=[answer_output, sources_output] ) status_btn.click( fn=check_setup, outputs=status_output ) # Example queries with gr.Row(): gr.Examples( examples=[ ["What is morbid Anatomy?", "morbid, Anatomy"], ["The transmission of nerves from one neuron to another is as a result of what?", "neuron, nerves, Dr Clement"], ], inputs=[query_input, search_terms_input], ) # Launch the app if __name__ == "__main__": app.launch( share=True,debug =True)