File size: 5,706 Bytes
a96bcc0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | import google.generativeai as genai
# Google Drive (optional)
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import gspread
from oauth2client.service_account import ServiceAccountCredentials
import os, io, time, re, json
#βββ Authentication setup βββ
GDRIVE_PARENT_FOLDER_NAME = "mtDNA-Location-Classifier"
GDRIVE_DATA_FOLDER_NAME = os.environ["GDRIVE_DATA_FOLDER_NAME"]
GCP_CREDS_DICT = json.loads(os.environ["GCP_CREDS_JSON"]) # from HF secrets
GDRIVE_CREDS = Credentials.from_service_account_info(GCP_CREDS_DICT, scopes=["https://www.googleapis.com/auth/drive"])
drive_service = build("drive", "v3", credentials=GDRIVE_CREDS)
def get_or_create_drive_folder(name, parent_id=None):
query = f"name='{name}' and mimeType='application/vnd.google-apps.folder'"
if parent_id:
query += f" and '{parent_id}' in parents"
results = drive_service.files().list(q=query, spaces='drive', fields="files(id, name)").execute()
items = results.get("files", [])
if items:
return items[0]["id"]
file_metadata = {
"name": name,
"mimeType": "application/vnd.google-apps.folder"
}
if parent_id:
file_metadata["parents"] = [parent_id]
file = drive_service.files().create(body=file_metadata, fields="id").execute()
return file["id"]
# def find_drive_file(filename, parent_id):
# """
# Checks if a file with the given name exists inside the specified Google Drive folder.
# Returns the file ID if found, else None.
# """
# query = f"'{parent_id}' in parents and name = '{filename}' and trashed = false"
# results = drive_service.files().list(q=query, spaces='drive', fields='files(id, name)', pageSize=1).execute()
# files = results.get('files', [])
# if files:
# return files[0]["id"]
# return None
def find_drive_file(filename, parent_id):
"""
Checks if a file with the given name exists inside the specified Google Drive folder.
Returns the file ID if found, else None.
"""
try:
print(f"π Searching for '{filename}' in folder: {parent_id}")
query = f"'{parent_id}' in parents and name = '{filename}' and trashed = false"
results = drive_service.files().list(
q=query,
spaces='drive',
fields='files(id, name)',
pageSize=1
).execute()
files = results.get('files', [])
if files:
print(f"β
Found file: {files[0]['name']} with ID: {files[0]['id']}")
return files[0]["id"]
else:
print("β οΈ File not found.")
return None
except Exception as e:
print(f"β Error during find_drive_file: {e}")
return None
# def upload_file_to_drive(local_path, remote_name, folder_id):
# file_metadata = {"name": remote_name, "parents": [folder_id]}
# media = MediaFileUpload(local_path, resumable=True)
# existing = drive_service.files().list(q=f"name='{remote_name}' and '{folder_id}' in parents", fields="files(id)").execute().get("files", [])
# if existing:
# drive_service.files().delete(fileId=existing[0]["id"]).execute()
# file = drive_service.files().create(body=file_metadata, media_body=media, fields="id").execute()
# result = drive_service.files().list(q=f"name='{remote_name}' and '{folder_id}' in parents", fields="files(id)").execute()
# if not result.get("files"):
# print(f"β Upload failed: File '{remote_name}' not found in folder after upload.")
# else:
# print(f"β
Verified upload: {remote_name}")
# return file["id"]
def upload_file_to_drive(local_path, remote_name, folder_id):
try:
if not os.path.exists(local_path):
raise FileNotFoundError(f"β Local file does not exist: {local_path}")
# Delete existing file on Drive if present
existing = drive_service.files().list(
q=f"name='{remote_name}' and '{folder_id}' in parents and trashed = false",
fields="files(id)"
).execute().get("files", [])
if existing:
drive_service.files().delete(fileId=existing[0]["id"]).execute()
print(f"ποΈ Deleted existing '{remote_name}' in Drive folder {folder_id}")
file_metadata = {"name": remote_name, "parents": [folder_id]}
media = MediaFileUpload(local_path, resumable=True)
file = drive_service.files().create(
body=file_metadata,
media_body=media,
fields="id"
).execute()
print(f"β
Uploaded '{remote_name}' to Google Drive folder ID: {folder_id}")
return file["id"]
except Exception as e:
print(f"β Error during upload: {e}")
return None
def download_file_from_drive(remote_name, folder_id, local_path):
results = drive_service.files().list(q=f"name='{remote_name}' and '{folder_id}' in parents", fields="files(id)").execute()
files = results.get("files", [])
if not files:
return False
file_id = files[0]["id"]
request = drive_service.files().get_media(fileId=file_id)
fh = io.FileIO(local_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
return True
def download_drive_file_content(file_id):
request = drive_service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
fh.seek(0)
return fh.read().decode("utf-8")
|