Spaces:
Sleeping
Sleeping
added dspy, to allow .mkv files, upload multiple slides and notebooks, remove base name matching in mentor materials
a4af32a
| # src/gdrive_manager.py | |
| import os | |
| import io | |
| import json | |
| import logging | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload | |
| from googleapiclient.errors import HttpError | |
| from google.oauth2 import service_account | |
| logger = logging.getLogger(__name__) | |
| class GoogleDriveManager: | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| def __init__(self): | |
| # Try to load full credentials from environment variable | |
| gcp_credentials = os.getenv("GCP_CREDENTIALS") | |
| if gcp_credentials: | |
| cred_data = json.loads(gcp_credentials) | |
| creds = service_account.Credentials.from_service_account_info( | |
| cred_data, scopes=self.SCOPES | |
| ) | |
| else: | |
| # Fallback to file-based credentials | |
| creds = service_account.Credentials.from_service_account_file( | |
| "credentials.json", scopes=self.SCOPES | |
| ) | |
| self.service = build('drive', 'v3', credentials=creds) | |
| def get_folder_id(self, url): | |
| """Extract folder ID from Google Drive URL""" | |
| if 'folders/' in url: | |
| return url.split('folders/')[-1].split('?')[0] | |
| elif 'id=' in url: | |
| return url.split('id=')[-1].split('&')[0] | |
| return url | |
| def list_files(self, folder_id, file_types=('video/mp4', 'video/x-matroska')): | |
| """List files in a Google Drive folder""" | |
| if isinstance(file_types, str): | |
| file_types = [file_types] | |
| type_query = " or ".join([f"mimeType='{ftype}'" for ftype in file_types]) | |
| query = f"('{folder_id}' in parents) and ({type_query}) and trashed=false" | |
| results = self.service.files().list( | |
| q=query, | |
| fields="files(id, name, mimeType)" | |
| ).execute() | |
| return results.get('files', []) | |
| def download_file(self, file_id, destination): | |
| """Download a file from Google Drive""" | |
| logger.info("Downloading file %s to %s", file_id, destination) | |
| request = self.service.files().get_media(fileId=file_id) | |
| fh = io.FileIO(destination, 'wb') | |
| downloader = MediaIoBaseDownload(fh, request) | |
| done = False | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| logger.info(f"Download {int(status.progress() * 100)}%") | |
| logger.info("Download complete: %s", destination) | |
| return destination | |
| def upload_file(self, local_path, drive_folder_id, mime_type): | |
| """Upload a file to Google Drive""" | |
| file_metadata = { | |
| 'name': os.path.basename(local_path), | |
| 'parents': [drive_folder_id] | |
| } | |
| media = MediaFileUpload(local_path, mimetype=mime_type) | |
| file = self.service.files().create( | |
| body=file_metadata, | |
| media_body=media, | |
| fields='id' | |
| ).execute() | |
| logger.info(f"Uploaded {local_path} to Drive folder {drive_folder_id}") | |
| return file.get('id') | |
| def delete_file(self, file_id): | |
| """Delete a file from Google Drive""" | |
| try: | |
| self.service.files().delete(fileId=file_id).execute() | |
| logger.info(f"Deleted file {file_id} from Drive") | |
| return True | |
| except HttpError as error: | |
| logger.error(f"An error occurred: {error}") | |
| return False | |
| def find_file_by_name(self, folder_id, filename): | |
| """Find a file by name in a folder""" | |
| query = f"'{folder_id}' in parents and name='{filename}' and trashed=false" | |
| results = self.service.files().list( | |
| q=query, | |
| fields="files(id)" | |
| ).execute() | |
| files = results.get('files', []) | |
| return files[0]['id'] if files else None | |
| def list_txt_files(self, folder_id): | |
| """List all .txt files in a Google Drive folder""" | |
| query = f"'{folder_id}' in parents and mimeType='text/plain' and trashed=false" | |
| results = self.service.files().list( | |
| q=query, | |
| fields="files(id, name)" | |
| ).execute() | |
| return results.get('files', []) | |
| def remove_duplicates_by_name(self, folder_id): | |
| """Remove duplicate files (by name) in a Drive folder, keeping only the latest.""" | |
| files = self.list_txt_files(folder_id) | |
| name_map = {} | |
| for file in files: | |
| if file['name'] not in name_map: | |
| name_map[file['name']] = file | |
| else: | |
| # If duplicate, delete the older one (or you can keep the latest by timestamp if available) | |
| self.delete_file(file['id']) | |
| logger.info(f"Deleted duplicate file: {file['name']} ({file['id']})") |