Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import json | |
| import tempfile | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload | |
| import openai | |
| from dotenv import load_dotenv, dotenv_values | |
| import io | |
| from openai import OpenAI | |
| openai.api_key = os.getenv('OPENAI_API_KEY') | |
| openai = OpenAI(api_key = openai.api_key) | |
| class GPTDriveIntegration: | |
| def __init__(self): | |
| # Build credentials info from individual environment variables | |
| credentials_info = { | |
| "type": "service_account", | |
| "project_id": os.getenv('GOOGLE_PROJECT_ID'), | |
| "private_key_id": os.getenv('GOOGLE_PRIVATE_KEY_ID'), | |
| "private_key": os.getenv('GOOGLE_PRIVATE_KEY').replace('\\n', '\n'), # Fix line breaks | |
| "client_email": os.getenv('GOOGLE_CLIENT_EMAIL'), | |
| "client_id": os.getenv('GOOGLE_CLIENT_ID'), | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
| "client_x509_cert_url": os.getenv('GOOGLE_CLIENT_CERT_URL'), | |
| "universe_domain": "googleapis.com" | |
| } | |
| # Check if all required fields are present | |
| required_fields = ['project_id', 'private_key', 'client_email'] | |
| missing_fields = [field for field in required_fields if not credentials_info[field]] | |
| if missing_fields: | |
| raise ValueError(f"Missing required environment variables: {missing_fields}") | |
| # Initialize Google Drive API | |
| self.credentials = service_account.Credentials.from_service_account_info( | |
| credentials_info, | |
| scopes=['https://www.googleapis.com/auth/drive.readonly'] | |
| ) | |
| self.drive_service = build('drive', 'v3', credentials=self.credentials) | |
| # Initialize OpenAI | |
| openai.api_key = os.getenv('OPENAI_API_KEY') | |
| def search_files(self, query, file_types=None): | |
| """Search for files in Google Drive""" | |
| search_query = f"name contains '{query}'" | |
| if file_types: | |
| type_queries = [] | |
| for file_type in file_types: | |
| if file_type.lower() == 'pdf': | |
| type_queries.append("mimeType='application/pdf'") | |
| elif file_type.lower() in ['doc', 'docx']: | |
| type_queries.append("mimeType contains 'document'") | |
| elif file_type.lower() in ['xls', 'xlsx']: | |
| type_queries.append("mimeType contains 'spreadsheet'") | |
| if type_queries: | |
| search_query += f" and ({' or '.join(type_queries)})" | |
| results = self.drive_service.files().list( | |
| q=search_query, | |
| fields="files(id, name, mimeType, size)" | |
| ).execute() | |
| return results.get('files', []) | |
| def get_file_content(self, file_id, mime_type): | |
| """Download and extract text content from file""" | |
| try: | |
| if 'text' in mime_type or 'document' in mime_type: | |
| # For Google Docs, export as plain text | |
| if 'document' in mime_type: | |
| request = self.drive_service.files().export_media( | |
| fileId=file_id, mimeType='text/plain' | |
| ) | |
| else: | |
| request = self.drive_service.files().get_media(fileId=file_id) | |
| file_content = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_content, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| return file_content.getvalue().decode('utf-8') | |
| elif 'spreadsheet' in mime_type: | |
| # For Google Sheets, export as CSV | |
| request = self.drive_service.files().export_media( | |
| fileId=file_id, mimeType='text/csv' | |
| ) | |
| file_content = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_content, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| return file_content.getvalue().decode('utf-8') | |
| elif mime_type == 'application/pdf': | |
| # For PDF files, download binary content and extract text | |
| request = self.drive_service.files().get_media(fileId=file_id) | |
| file_content = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_content, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| # Extract text from PDF | |
| file_content.seek(0) | |
| try: | |
| import PyPDF2 | |
| pdf_reader = PyPDF2.PdfReader(file_content) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except ImportError: | |
| return "PDF text extraction requires PyPDF2 library" | |
| else: | |
| return "File type not supported for text extraction" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def query_gpt_with_context(self, user_query, file_contents): | |
| """Send query to GPT with file context""" | |
| context = "\n\n".join([ | |
| f"File: {content['name']}\nContent: {content['text'][:2000]}..." | |
| for content in file_contents | |
| ]) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """ | |
| You are an AI assistant that can analyze documents from Google Drive. | |
| Use the provided file contents to answer user questions.""" | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Context from Google Drive files:\n{context}\n\nUser Question: {user_query}" | |
| } | |
| ] | |
| response = openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages, | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| def process_query(self, user_query, search_terms=None): | |
| """Main function to process user queries""" | |
| # Extract search terms from query if not provided | |
| if not search_terms: | |
| search_terms = user_query.split()[:3] # Simple extraction | |
| # Search for relevant files | |
| files = [] | |
| for term in search_terms: | |
| files.extend(self.search_files(term)) | |
| # Remove duplicates | |
| unique_files = {f['id']: f for f in files}.values() | |
| # Get content from top 3 most relevant files | |
| file_contents = [] | |
| for file in list(unique_files)[:3]: | |
| content = self.get_file_content(file['id'], file['mimeType']) | |
| file_contents.append({ | |
| 'name': file['name'], | |
| 'text': content | |
| }) | |
| # Query GPT with context | |
| if file_contents: | |
| response = self.query_gpt_with_context(user_query, file_contents) | |
| return { | |
| 'answer': response, | |
| 'sources': [f['name'] for f in file_contents] | |
| } | |
| else: | |
| return { | |
| 'answer': "No relevant files found in your Google Drive.", | |
| 'sources': [] | |
| } | |
| def query_gpt_with_context(self, user_query, file_contents): | |
| """Send query to GPT with file context""" | |
| context = "\n\n".join([ | |
| f"File: {content['name']}\nContent: {content['text'][:2000]}..." | |
| for content in file_contents | |
| ]) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """ | |
| You are an AI assistant that can analyze documents from Google Drive. | |
| Use the provided file contents to answer user questions. | |
| Answer directly and add additional suggestions on how to answer questions in the exam | |
| Always end with 'Is there anything I can hel you with?' | |
| Your name is Study buddy, happy to help students study more effectively | |
| """ | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Context from Google Drive files:\n{context}\n\nUser Question: {user_query}" | |
| } | |
| ] | |
| response = openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages, | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| def process_query(self, user_query, search_terms=None): | |
| """Main function to process user queries""" | |
| # Extract search terms from query if not provided | |
| if not search_terms: | |
| search_terms = user_query.split()[:3] # Simple extraction | |
| # Search for relevant files | |
| files = [] | |
| for term in search_terms: | |
| files.extend(self.search_files(term)) | |
| # Remove duplicates | |
| unique_files = {f['id']: f for f in files}.values() | |
| # Get content from top 3 most relevant files | |
| file_contents = [] | |
| for file in list(unique_files)[:3]: | |
| content = self.get_file_content(file['id'], file['mimeType']) | |
| file_contents.append({ | |
| 'name': file['name'], | |
| 'text': content | |
| }) | |
| # Query GPT with context | |
| if file_contents: | |
| response = self.query_gpt_with_context(user_query, file_contents) | |
| return { | |
| 'answer': response, | |
| 'sources': [f['name'] for f in file_contents] | |
| } | |
| else: | |
| return { | |
| 'answer': "No relevant files found in your Google Drive.", | |
| 'sources': [] | |
| } | |
| gpt_drive = GPTDriveIntegration() | |
| def process_user_query(query, search_terms_input): | |
| """Process user query and return formatted response""" | |
| if not query.strip(): | |
| return "Please enter a question.", "" | |
| # Parse search terms if provided | |
| search_terms = None | |
| # if search_terms_input.strip(): | |
| # search_terms = [term.strip() for term in search_terms_input.split(',')] | |
| # Process the query | |
| result = gpt_drive.process_query(query, search_terms) | |
| # Format the response | |
| answer = result['answer'] | |
| sources = result['sources'] | |
| sources_text = "" | |
| if sources: | |
| sources_text = "**Sources used:**\n" + "\n".join([f"β’ {source}" for source in sources]) | |
| return answer, sources_text | |
| def check_setup(): | |
| """Check if the APIs are properly configured""" | |
| status_messages = [] | |
| # Check Google Drive API | |
| if gpt_drive.drive_initialized: | |
| status_messages.append("β Google Drive API: Connected") | |
| else: | |
| status_messages.append(f"β Google Drive API: {getattr(gpt_drive, 'drive_error', 'Not configured')}") | |
| # Check OpenAI API | |
| if gpt_drive.openai_initialized: | |
| status_messages.append("β OpenAI API: Connected") | |
| else: | |
| status_messages.append(f"β OpenAI API: {getattr(gpt_drive, 'openai_error', 'Not configured')}") | |
| return "\n".join(status_messages) | |
| # Create Gradio interface | |
| import gradio as gr | |
| with gr.Blocks(title="Study Buddy", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# Anatomy Study Buddy ") | |
| gr.Markdown("Study more effectively with study Buddy!") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Main query interface | |
| with gr.Group(): | |
| gr.Markdown("### Ask a Question") | |
| query_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask me any question about your anatomy books?", | |
| lines=3 | |
| ) | |
| search_terms_input = gr.Textbox( | |
| label="Search Terms", | |
| placeholder="Enter comma-separated terms to search for specific files", | |
| lines=1 | |
| ) | |
| submit_btn = gr.Button("Search & Ask", variant="primary", size="lg") | |
| # Results section | |
| with gr.Group(): | |
| gr.Markdown("### Answer") | |
| answer_output = gr.Textbox( | |
| label="AI Response", | |
| lines=10, | |
| interactive=False | |
| ) | |
| sources_output = gr.Textbox( | |
| label="Sources", | |
| lines=3, | |
| interactive=False | |
| ) | |
| # with gr.Column(scale=1): | |
| # # Status and setup info | |
| # with gr.Group(): | |
| # gr.Markdown("### System Status") | |
| # status_btn = gr.Button("Check Status", size="sm") | |
| # status_output = gr.Textbox( | |
| # label="API Status", | |
| # lines=4, | |
| # interactive=False | |
| # ) | |
| # Event handlers | |
| submit_btn.click( | |
| fn=process_user_query, | |
| inputs=[query_input, search_terms_input], | |
| outputs=[answer_output, sources_output] | |
| ) | |
| # status_btn.click( | |
| # fn=check_setup, | |
| # outputs=status_output | |
| # ) | |
| # Example queries | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=[ | |
| ["What is morbid Anatomy?", "morbid, Anatomy"], | |
| ["The transmission of nerves from one neuron to another is as a result of what?", "neuron, nerves, Dr Clement"], | |
| ["Explain what the external ear contains of?", "Ear Anatomy, Ear"], | |
| ["What are the types of massage?", "massage Lecture, nerves"], | |
| ["What is trauma?", "Trauma, pysical trauma and sex Offenders"], | |
| ["what is Upper limb prosthetics?", "Upper limb prosthetics"], | |
| ], | |
| inputs=[query_input, search_terms_input],) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch( | |
| share=True,debug =True) |