|
|
import google.generativeai as genai |
|
|
|
|
|
from google.oauth2.service_account import Credentials |
|
|
from googleapiclient.discovery import build |
|
|
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload |
|
|
import gspread |
|
|
from oauth2client.service_account import ServiceAccountCredentials |
|
|
|
|
|
import os, io, time, re, json |
|
|
|
|
|
|
|
|
GDRIVE_PARENT_FOLDER_NAME = "mtDNA-Location-Classifier" |
|
|
GDRIVE_DATA_FOLDER_NAME = os.environ["GDRIVE_DATA_FOLDER_NAME"] |
|
|
GCP_CREDS_DICT = json.loads(os.environ["GCP_CREDS_JSON"]) |
|
|
GDRIVE_CREDS = Credentials.from_service_account_info(GCP_CREDS_DICT, scopes=["https://www.googleapis.com/auth/drive"]) |
|
|
drive_service = build("drive", "v3", credentials=GDRIVE_CREDS) |
|
|
|
|
|
def get_or_create_drive_folder(name, parent_id=None): |
|
|
query = f"name='{name}' and mimeType='application/vnd.google-apps.folder'" |
|
|
if parent_id: |
|
|
query += f" and '{parent_id}' in parents" |
|
|
results = drive_service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() |
|
|
items = results.get("files", []) |
|
|
if items: |
|
|
return items[0]["id"] |
|
|
file_metadata = { |
|
|
"name": name, |
|
|
"mimeType": "application/vnd.google-apps.folder" |
|
|
} |
|
|
if parent_id: |
|
|
file_metadata["parents"] = [parent_id] |
|
|
file = drive_service.files().create(body=file_metadata, fields="id").execute() |
|
|
return file["id"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_drive_file(filename, parent_id): |
|
|
""" |
|
|
Checks if a file with the given name exists inside the specified Google Drive folder. |
|
|
Returns the file ID if found, else None. |
|
|
""" |
|
|
try: |
|
|
print(f"π Searching for '{filename}' in folder: {parent_id}") |
|
|
query = f"'{parent_id}' in parents and name = '{filename}' and trashed = false" |
|
|
results = drive_service.files().list( |
|
|
q=query, |
|
|
spaces='drive', |
|
|
fields='files(id, name)', |
|
|
pageSize=1 |
|
|
).execute() |
|
|
files = results.get('files', []) |
|
|
if files: |
|
|
print(f"β
Found file: {files[0]['name']} with ID: {files[0]['id']}") |
|
|
return files[0]["id"] |
|
|
else: |
|
|
print("β οΈ File not found.") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"β Error during find_drive_file: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_file_to_drive(local_path, remote_name, folder_id): |
|
|
try: |
|
|
if not os.path.exists(local_path): |
|
|
raise FileNotFoundError(f"β Local file does not exist: {local_path}") |
|
|
|
|
|
|
|
|
existing = drive_service.files().list( |
|
|
q=f"name='{remote_name}' and '{folder_id}' in parents and trashed = false", |
|
|
fields="files(id)" |
|
|
).execute().get("files", []) |
|
|
|
|
|
if existing: |
|
|
drive_service.files().delete(fileId=existing[0]["id"]).execute() |
|
|
print(f"ποΈ Deleted existing '{remote_name}' in Drive folder {folder_id}") |
|
|
|
|
|
file_metadata = {"name": remote_name, "parents": [folder_id]} |
|
|
media = MediaFileUpload(local_path, resumable=True) |
|
|
file = drive_service.files().create( |
|
|
body=file_metadata, |
|
|
media_body=media, |
|
|
fields="id" |
|
|
).execute() |
|
|
|
|
|
print(f"β
Uploaded '{remote_name}' to Google Drive folder ID: {folder_id}") |
|
|
return file["id"] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error during upload: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def download_file_from_drive(remote_name, folder_id, local_path): |
|
|
results = drive_service.files().list(q=f"name='{remote_name}' and '{folder_id}' in parents", fields="files(id)").execute() |
|
|
files = results.get("files", []) |
|
|
if not files: |
|
|
return False |
|
|
file_id = files[0]["id"] |
|
|
request = drive_service.files().get_media(fileId=file_id) |
|
|
fh = io.FileIO(local_path, 'wb') |
|
|
downloader = MediaIoBaseDownload(fh, request) |
|
|
done = False |
|
|
while not done: |
|
|
_, done = downloader.next_chunk() |
|
|
return True |
|
|
def download_drive_file_content(file_id): |
|
|
request = drive_service.files().get_media(fileId=file_id) |
|
|
fh = io.BytesIO() |
|
|
downloader = MediaIoBaseDownload(fh, request) |
|
|
done = False |
|
|
while not done: |
|
|
_, done = downloader.next_chunk() |
|
|
fh.seek(0) |
|
|
return fh.read().decode("utf-8") |
|
|
|