File size: 3,758 Bytes
5d09ff5
 
 
 
 
 
 
88b8c75
5d09ff5
 
bc79b16
88b8c75
c93cbfc
5d09ff5
88b8c75
 
 
 
5d09ff5
88b8c75
 
 
 
 
 
 
 
f09a6d0
88b8c75
 
 
 
 
 
 
 
037672b
88b8c75
 
 
 
 
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
 
c93cbfc
 
 
 
5d09ff5
 
 
 
 
c93cbfc
5d09ff5
88b8c75
5d09ff5
 
 
 
 
c93cbfc
f8384fc
5d09ff5
88b8c75
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c93cbfc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload


class GoogleDrive_API:
    def __init__(self, SERVICE_ACCOUNT_FILE_path: str = ""):
        self.SCOPES = ["https://www.googleapis.com/auth/drive"]
        self.PARENT_FOLDER_ID = "1r-MlnEpWHx3b1fxHDnHcZ2-Wh_Y89676"

        self.service = self.authenticate(SERVICE_ACCOUNT_FILE_path)
        self.delete_all_files()

    def authenticate(self, SERVICE_ACCOUNT_FILE):
        if len(SERVICE_ACCOUNT_FILE) > 0:
            if not os.path.exists(SERVICE_ACCOUNT_FILE):
                raise rf"Google SERVICE_ACCOUNT_FILE Not Found, Path = {SERVICE_ACCOUNT_FILE}"

            credentials = service_account.Credentials.from_service_account_file(
                SERVICE_ACCOUNT_FILE, scopes=self.SCOPES
            )
        else:
            service_account_info = {
                "type": "service_account",
                "project_id": os.environ["project_id"],
                "private_key_id": os.environ["private_key_id"],
                "private_key": os.environ["private_key"],
                "client_email": os.environ["client_email"],
                "client_id": os.environ["client_id"],
                "auth_uri": "https://accounts.google.com/o/oauth2/auth",
                "token_uri": "https://oauth2.googleapis.com/token",
                "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
                "client_x509_cert_url": os.environ["client_x509_cert_url"],
                "universe_domain": "googleapis.com",
            }

            credentials = service_account.Credentials.from_service_account_info(
                service_account_info, scopes=self.SCOPES
            )

        service = build("drive", "v3", credentials=credentials)
        return service

    def get_files(self):
        # List all files in the folder
        results = (
            self.service.files()
            .list(
                q=f"'{self.PARENT_FOLDER_ID}' in parents and trashed=false",
                fields="files(id, name)",
            )
            .execute()
        )
        return results.get("files", [])

    def delete_file(self, file_id):
        self.service.files().delete(fileId=file_id).execute()

    def delete_all_files(self):
        items = self.get_files()

        # Delete each file
        for item in items:
            file_id = item["id"]
            self.delete_file(file_id)

    def upload_file(self, file_name: str, file_path: str):
        file_metadata = {
            "name": file_name,
            "parents": [self.PARENT_FOLDER_ID],
        }
        media = MediaFileUpload(file_path)
        self.service.files().create(body=file_metadata, media_body=media, fields="id").execute()
        print(rf"{file_path} uploaded.")

    def download_file(self, file_name: str, file_path: str):
        results = (
            self.service.files()
            .list(
                q=f"'{self.PARENT_FOLDER_ID}' in parents and name='{file_name}' and trashed=false",
                fields="files(id)",
            )
            .execute()
        )
        items = results.get("files", [])

        if items:
            # Get the file ID
            file_id = items[0]["id"]

            request = self.service.files().get_media(fileId=file_id)

            with open(file_path, "wb") as file:
                downloader = MediaIoBaseDownload(file, request)
                done = False
                while not done:
                    status, done = downloader.next_chunk()
                    print(f"Download {int(status.progress() * 100)}%.")
        else:
            print(rf"{file_name} is Not Found")