| import os |
| from google.oauth2 import service_account |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload |
|
|
|
|
| class GoogleDrive_API: |
| def __init__(self, SERVICE_ACCOUNT_FILE_path: str = ""): |
| self.SCOPES = ["https://www.googleapis.com/auth/drive"] |
| self.PARENT_FOLDER_ID = "1r-MlnEpWHx3b1fxHDnHcZ2-Wh_Y89676" |
|
|
| self.service = self.authenticate(SERVICE_ACCOUNT_FILE_path) |
| self.delete_all_files() |
|
|
| def authenticate(self, SERVICE_ACCOUNT_FILE): |
| if len(SERVICE_ACCOUNT_FILE) > 0: |
| if not os.path.exists(SERVICE_ACCOUNT_FILE): |
| raise rf"Google SERVICE_ACCOUNT_FILE Not Found, Path = {SERVICE_ACCOUNT_FILE}" |
|
|
| credentials = service_account.Credentials.from_service_account_file( |
| SERVICE_ACCOUNT_FILE, scopes=self.SCOPES |
| ) |
| else: |
| service_account_info = { |
| "type": "service_account", |
| "project_id": os.environ["project_id"], |
| "private_key_id": os.environ["private_key_id"], |
| "private_key": os.environ["private_key"], |
| "client_email": os.environ["client_email"], |
| "client_id": os.environ["client_id"], |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", |
| "token_uri": "https://oauth2.googleapis.com/token", |
| "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", |
| "client_x509_cert_url": os.environ["client_x509_cert_url"], |
| "universe_domain": "googleapis.com", |
| } |
|
|
| credentials = service_account.Credentials.from_service_account_info( |
| service_account_info, scopes=self.SCOPES |
| ) |
|
|
| service = build("drive", "v3", credentials=credentials) |
| return service |
|
|
| def get_files(self): |
| |
| results = ( |
| self.service.files() |
| .list( |
| q=f"'{self.PARENT_FOLDER_ID}' in parents and trashed=false", |
| fields="files(id, name)", |
| ) |
| .execute() |
| ) |
| return results.get("files", []) |
|
|
| def delete_file(self, file_id): |
| self.service.files().delete(fileId=file_id).execute() |
|
|
| def delete_all_files(self): |
| items = self.get_files() |
|
|
| |
| for item in items: |
| file_id = item["id"] |
| self.delete_file(file_id) |
|
|
| def upload_file(self, file_name: str, file_path: str): |
| file_metadata = { |
| "name": file_name, |
| "parents": [self.PARENT_FOLDER_ID], |
| } |
| media = MediaFileUpload(file_path) |
| self.service.files().create(body=file_metadata, media_body=media, fields="id").execute() |
| print(rf"{file_path} uploaded.") |
|
|
| def download_file(self, file_name: str, file_path: str): |
| results = ( |
| self.service.files() |
| .list( |
| q=f"'{self.PARENT_FOLDER_ID}' in parents and name='{file_name}' and trashed=false", |
| fields="files(id)", |
| ) |
| .execute() |
| ) |
| items = results.get("files", []) |
|
|
| if items: |
| |
| file_id = items[0]["id"] |
|
|
| request = self.service.files().get_media(fileId=file_id) |
|
|
| with open(file_path, "wb") as file: |
| downloader = MediaIoBaseDownload(file, request) |
| done = False |
| while not done: |
| status, done = downloader.next_chunk() |
| print(f"Download {int(status.progress() * 100)}%.") |
| else: |
| print(rf"{file_name} is Not Found") |
|
|