Spaces:
Sleeping
Sleeping
| import json | |
| from google.cloud import storage | |
| from google.oauth2 import service_account | |
| from googleapiclient.http import MediaIoBaseDownload | |
| class GoogleCloudStorage: | |
| def __init__(self, service_account_key_string): | |
| credentials_dict = json.loads(service_account_key_string) | |
| credentials = service_account.Credentials.from_service_account_info(credentials_dict) | |
| self.client = storage.Client(credentials=credentials, project=credentials_dict['project_id']) | |
| def check_file_exists(self, bucket_name, file_name): | |
| blob = self.client.bucket(bucket_name).blob(file_name) | |
| return blob.exists() | |
| def upload_file(self, bucket_name, destination_blob_name, file_path): | |
| blob = self.client.bucket(bucket_name).blob(destination_blob_name) | |
| blob.upload_from_filename(file_path) | |
| print(f"File {file_path} uploaded to {destination_blob_name} in GCS.") | |
| def upload_file_as_string(self, bucket_name, destination_blob_name, content): | |
| blob = self.client.bucket(bucket_name).blob(destination_blob_name) | |
| blob.upload_from_string(content) | |
| print(f"String content uploaded to {destination_blob_name} in GCS.") | |
| return None | |
| def upload_json_string(self, bucket_name, destination_blob_name, json_data): | |
| """Uploads a JSON string to a specified GCS bucket.""" | |
| blob = self.client.bucket(bucket_name).blob(destination_blob_name) | |
| blob.upload_from_string(json_data, content_type='application/json') | |
| print(f"JSON string uploaded to {destination_blob_name} in GCS.") | |
| def download_as_string(self, bucket_name, source_blob_name): | |
| blob = self.client.bucket(bucket_name).blob(source_blob_name) | |
| return blob.download_as_text() | |
| def make_blob_public(self, bucket_name, blob_name): | |
| blob = self.client.bucket(bucket_name).blob(blob_name) | |
| blob.make_public() | |
| print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}") | |
| def get_public_url(self, bucket_name, blob_name): | |
| blob = self.client.bucket(bucket_name).blob(blob_name) | |
| return blob.public_url | |
| def upload_image_and_get_public_url(self, bucket_name, file_name, file_path): | |
| self.upload_file(bucket_name, file_name, file_path) | |
| # self.make_blob_public(bucket_name, file_name) | |
| return self.get_public_url(bucket_name, file_name) | |
| def delete_blob(self, bucket_name, blob_name): | |
| blob = self.client.bucket(bucket_name).blob(blob_name) | |
| blob.delete() | |
| print(f"Blob {blob_name} deleted from {bucket_name}.") | |
| def list_blobs(self, bucket_name, prefix): | |
| blobs = self.client.list_blobs(bucket_name, prefix=prefix) | |
| return [blob.name for blob in blobs] | |
| def delete_blobs_by_folder_name(self, bucket_name, folder_name): | |
| bucket = self.client.bucket(bucket_name) | |
| blobs = list(bucket.list_blobs(prefix=folder_name)) | |
| if blobs: | |
| bucket.delete_blobs(blobs) | |
| print(f"所有以 {folder_name} 開頭的檔案已批量刪除") | |
| else: | |
| print(f"未找到以 {folder_name} 開頭的檔案") | |