File size: 3,758 Bytes
5d09ff5 88b8c75 5d09ff5 bc79b16 88b8c75 c93cbfc 5d09ff5 88b8c75 5d09ff5 88b8c75 f09a6d0 88b8c75 037672b 88b8c75 5d09ff5 c93cbfc 5d09ff5 c93cbfc 5d09ff5 88b8c75 5d09ff5 c93cbfc f8384fc 5d09ff5 88b8c75 5d09ff5 c93cbfc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | import os
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
class GoogleDrive_API:
def __init__(self, SERVICE_ACCOUNT_FILE_path: str = ""):
self.SCOPES = ["https://www.googleapis.com/auth/drive"]
self.PARENT_FOLDER_ID = "1r-MlnEpWHx3b1fxHDnHcZ2-Wh_Y89676"
self.service = self.authenticate(SERVICE_ACCOUNT_FILE_path)
self.delete_all_files()
def authenticate(self, SERVICE_ACCOUNT_FILE):
if len(SERVICE_ACCOUNT_FILE) > 0:
if not os.path.exists(SERVICE_ACCOUNT_FILE):
raise rf"Google SERVICE_ACCOUNT_FILE Not Found, Path = {SERVICE_ACCOUNT_FILE}"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=self.SCOPES
)
else:
service_account_info = {
"type": "service_account",
"project_id": os.environ["project_id"],
"private_key_id": os.environ["private_key_id"],
"private_key": os.environ["private_key"],
"client_email": os.environ["client_email"],
"client_id": os.environ["client_id"],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": os.environ["client_x509_cert_url"],
"universe_domain": "googleapis.com",
}
credentials = service_account.Credentials.from_service_account_info(
service_account_info, scopes=self.SCOPES
)
service = build("drive", "v3", credentials=credentials)
return service
def get_files(self):
# List all files in the folder
results = (
self.service.files()
.list(
q=f"'{self.PARENT_FOLDER_ID}' in parents and trashed=false",
fields="files(id, name)",
)
.execute()
)
return results.get("files", [])
def delete_file(self, file_id):
self.service.files().delete(fileId=file_id).execute()
def delete_all_files(self):
items = self.get_files()
# Delete each file
for item in items:
file_id = item["id"]
self.delete_file(file_id)
def upload_file(self, file_name: str, file_path: str):
file_metadata = {
"name": file_name,
"parents": [self.PARENT_FOLDER_ID],
}
media = MediaFileUpload(file_path)
self.service.files().create(body=file_metadata, media_body=media, fields="id").execute()
print(rf"{file_path} uploaded.")
def download_file(self, file_name: str, file_path: str):
results = (
self.service.files()
.list(
q=f"'{self.PARENT_FOLDER_ID}' in parents and name='{file_name}' and trashed=false",
fields="files(id)",
)
.execute()
)
items = results.get("files", [])
if items:
# Get the file ID
file_id = items[0]["id"]
request = self.service.files().get_media(fileId=file_id)
with open(file_path, "wb") as file:
downloader = MediaIoBaseDownload(file, request)
done = False
while not done:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}%.")
else:
print(rf"{file_name} is Not Found")
|