|
|
""" |
|
|
Google Drive utilities for centralized upload operations. |
|
|
Uses test_data credentials for Drive uploads as fallback when GCS fails. |
|
|
""" |
|
|
import os |
|
|
import uuid |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
from src.logger_config import logger |
|
|
from .gcloud_wrapper import get_default_wrapper |
|
|
from src.config import get_config_value |
|
|
|
|
|
|
|
|
|
|
|
MIME_TYPES = { |
|
|
".mp4": "video/mp4", |
|
|
".mov": "video/quicktime", |
|
|
".avi": "video/x-msvideo", |
|
|
".mkv": "video/x-matroska", |
|
|
".mp3": "audio/mpeg", |
|
|
".wav": "audio/wav", |
|
|
".aac": "audio/aac", |
|
|
".m4a": "audio/mp4", |
|
|
".png": "image/png", |
|
|
".jpg": "image/jpeg", |
|
|
".jpeg": "image/jpeg", |
|
|
".gif": "image/gif", |
|
|
".webp": "image/webp", |
|
|
".json": "application/json", |
|
|
".txt": "text/plain", |
|
|
".srt": "text/plain", |
|
|
} |
|
|
|
|
|
|
|
|
def get_drive_service(account_name: str = "test_data"): |
|
|
""" |
|
|
Get a configured Google Drive service for the specified account. |
|
|
Defaults to 'test_data' for uploads. |
|
|
""" |
|
|
wrapper = get_default_wrapper() |
|
|
creds = wrapper._get_credentials(account_name) |
|
|
|
|
|
from googleapiclient.discovery import build |
|
|
return build("drive", "v3", credentials=creds) |
|
|
|
|
|
|
|
|
def search_file_by_name( |
|
|
filename: str, |
|
|
folder_id: Optional[str] = None, |
|
|
account_name: str = "test_data", |
|
|
) -> Optional[dict]: |
|
|
""" |
|
|
Search for a file by name in Google Drive. |
|
|
Note: folder_id is currently not used for recursive search. |
|
|
The search looks for the file by name across the entire accessible Drive. |
|
|
|
|
|
Args: |
|
|
filename: The filename to search for (exact match first, then partial) |
|
|
folder_id: Currently unused (kept for API compatibility) |
|
|
account_name: Account to use (default: test_data). |
|
|
|
|
|
Returns: |
|
|
dict with file info if found, None otherwise: |
|
|
{ |
|
|
"file_id": str, |
|
|
"name": str, |
|
|
"url": str (viewable link), |
|
|
"download_url": str |
|
|
} |
|
|
""" |
|
|
try: |
|
|
service = get_drive_service(account_name) |
|
|
|
|
|
|
|
|
escaped_name = filename.replace("'", "\\'") |
|
|
|
|
|
|
|
|
query = f"name = '{escaped_name}' and trashed = false" |
|
|
|
|
|
logger.debug(f"π Searching Drive for: {filename}") |
|
|
|
|
|
results = service.files().list( |
|
|
q=query, |
|
|
fields="files(id, name, webViewLink, parents)", |
|
|
pageSize=10, |
|
|
supportsAllDrives=True, |
|
|
includeItemsFromAllDrives=True |
|
|
).execute() |
|
|
|
|
|
files = results.get("files", []) |
|
|
|
|
|
if not files: |
|
|
|
|
|
|
|
|
clean_name = filename |
|
|
if clean_name.startswith("Copy of "): |
|
|
clean_name = clean_name[8:] |
|
|
|
|
|
escaped_clean = clean_name.replace("'", "\\'") |
|
|
query = f"name contains '{escaped_clean}' and trashed = false" |
|
|
|
|
|
logger.debug(f"π Trying partial match: {clean_name}") |
|
|
|
|
|
results = service.files().list( |
|
|
q=query, |
|
|
fields="files(id, name, webViewLink, parents)", |
|
|
pageSize=10, |
|
|
supportsAllDrives=True, |
|
|
includeItemsFromAllDrives=True |
|
|
).execute() |
|
|
files = results.get("files", []) |
|
|
|
|
|
if files: |
|
|
|
|
|
file = files[0] |
|
|
for f in files: |
|
|
if f.get("name") == filename: |
|
|
file = f |
|
|
break |
|
|
|
|
|
file_id = file.get("id") |
|
|
result = { |
|
|
"file_id": file_id, |
|
|
"name": file.get("name"), |
|
|
"url": file.get("webViewLink", f"https://drive.google.com/file/d/{file_id}/view"), |
|
|
"download_url": f"https://drive.google.com/uc?export=download&id={file_id}", |
|
|
} |
|
|
logger.debug(f"β
Found file: {result['name']} -> {result['url']}") |
|
|
return result |
|
|
|
|
|
logger.warning(f"β οΈ File not found in Drive: {filename}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error searching Drive: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def upload_file_to_drive( |
|
|
local_path: str, |
|
|
folder_id: Optional[str] = None, |
|
|
filename: Optional[str] = None, |
|
|
account_name: str = "test_data", |
|
|
) -> dict: |
|
|
""" |
|
|
Upload a local file to Google Drive. |
|
|
|
|
|
Args: |
|
|
local_path: Path to local file. |
|
|
folder_id: Optional Drive folder ID to upload to. If None, uses DRIVE_UPLOAD_FOLDER_ID env var. |
|
|
filename: Optional filename to use in Drive. If None, generates a unique name. |
|
|
account_name: Account to use (default: test_data). |
|
|
|
|
|
Returns: |
|
|
dict: { |
|
|
"file_id": str, |
|
|
"url": str (viewable link), |
|
|
"download_url": str (direct download link) |
|
|
} |
|
|
""" |
|
|
from googleapiclient.http import MediaFileUpload |
|
|
|
|
|
try: |
|
|
service = get_drive_service(account_name) |
|
|
|
|
|
|
|
|
target_folder_id = folder_id or get_config_value("DRIVE_UPLOAD_FOLDER_ID") |
|
|
|
|
|
|
|
|
file_path = Path(local_path) |
|
|
file_ext = file_path.suffix.lower() |
|
|
|
|
|
|
|
|
unique_name = filename if filename else f"{uuid.uuid4().hex[:8]}_{file_path.name}" |
|
|
|
|
|
|
|
|
mime_type = MIME_TYPES.get(file_ext, "application/octet-stream") |
|
|
|
|
|
|
|
|
file_metadata = { |
|
|
"name": unique_name, |
|
|
} |
|
|
|
|
|
if target_folder_id: |
|
|
file_metadata["parents"] = [target_folder_id] |
|
|
|
|
|
|
|
|
media = MediaFileUpload( |
|
|
local_path, |
|
|
mimetype=mime_type, |
|
|
resumable=True |
|
|
) |
|
|
|
|
|
logger.debug(f"π€ Uploading {file_path.name} to Google Drive...") |
|
|
|
|
|
|
|
|
file = service.files().create( |
|
|
body=file_metadata, |
|
|
media_body=media, |
|
|
fields="id, name, webViewLink, webContentLink", |
|
|
supportsAllDrives=True |
|
|
).execute() |
|
|
|
|
|
file_id = file.get("id") |
|
|
|
|
|
|
|
|
try: |
|
|
service.permissions().create( |
|
|
fileId=file_id, |
|
|
body={ |
|
|
"type": "anyone", |
|
|
"role": "reader" |
|
|
}, |
|
|
supportsAllDrives=True |
|
|
).execute() |
|
|
logger.debug(f"β
File shared publicly") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not make file public: {e}") |
|
|
|
|
|
|
|
|
view_url = file.get("webViewLink", f"https://drive.google.com/file/d/{file_id}/view") |
|
|
download_url = f"https://drive.google.com/uc?export=download&id={file_id}" |
|
|
|
|
|
result = { |
|
|
"file_id": file_id, |
|
|
"filename": unique_name, |
|
|
"gcs_filename": f"drive/{unique_name}", |
|
|
"url": view_url, |
|
|
"public_url": view_url, |
|
|
"download_url": download_url, |
|
|
} |
|
|
|
|
|
logger.debug(f"β
Uploaded to Drive: {view_url}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to upload to Drive: {e}") |
|
|
raise e |
|
|
|
|
|
|
|
|
def extract_drive_file_id(url: str) -> str | None: |
|
|
""" |
|
|
Extract file ID from various Google Drive URL formats: |
|
|
- https://drive.google.com/file/d/FILE_ID/view |
|
|
- https://drive.google.com/open?id=FILE_ID |
|
|
- https://drive.google.com/uc?id=FILE_ID |
|
|
- https://drive.google.com/uc?export=download&id=FILE_ID |
|
|
""" |
|
|
import re |
|
|
|
|
|
if not url or "drive.google.com" not in url: |
|
|
return None |
|
|
|
|
|
|
|
|
match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) |
|
|
if match: |
|
|
return match.group(1) |
|
|
|
|
|
|
|
|
match = re.search(r'[?&]id=([a-zA-Z0-9_-]+)', url) |
|
|
if match: |
|
|
return match.group(1) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def delete_file_from_drive( |
|
|
url_or_file_id: str, |
|
|
account_name: str = "test_data", |
|
|
) -> bool: |
|
|
""" |
|
|
Delete a file from Google Drive. |
|
|
|
|
|
Args: |
|
|
url_or_file_id: Either a Drive URL or direct file ID. |
|
|
account_name: Account to use (default: test_data). |
|
|
|
|
|
Returns: |
|
|
bool: True if deleted successfully, False otherwise. |
|
|
""" |
|
|
try: |
|
|
service = get_drive_service(account_name) |
|
|
|
|
|
|
|
|
if url_or_file_id.startswith("http"): |
|
|
file_id = extract_drive_file_id(url_or_file_id) |
|
|
if not file_id: |
|
|
logger.error(f"β Could not extract file ID from URL: {url_or_file_id}") |
|
|
return False |
|
|
else: |
|
|
file_id = url_or_file_id |
|
|
|
|
|
logger.debug(f"ποΈ Deleting file from Drive: {file_id}") |
|
|
service.files().delete(fileId=file_id).execute() |
|
|
logger.debug(f"β
Deleted from Drive: {file_id}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
error_str = str(e) |
|
|
if "404" in error_str or "not found" in error_str.lower(): |
|
|
logger.debug(f"β οΈ File not found (already deleted?): {url_or_file_id}") |
|
|
return True |
|
|
logger.error(f"β Failed to delete from Drive: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def update_file_content( |
|
|
file_id: str, |
|
|
local_path: str, |
|
|
account_name: str = "test_data", |
|
|
) -> dict: |
|
|
""" |
|
|
Update the content (media) of an existing Google Drive file. |
|
|
Does NOT change the file ID or metadata (name), just the content. |
|
|
""" |
|
|
from googleapiclient.http import MediaFileUpload |
|
|
|
|
|
try: |
|
|
service = get_drive_service(account_name) |
|
|
|
|
|
|
|
|
file_path = Path(local_path) |
|
|
file_ext = file_path.suffix.lower() |
|
|
|
|
|
|
|
|
mime_type = MIME_TYPES.get(file_ext, "application/octet-stream") |
|
|
|
|
|
|
|
|
media = MediaFileUpload( |
|
|
local_path, |
|
|
mimetype=mime_type, |
|
|
resumable=True |
|
|
) |
|
|
|
|
|
logger.debug(f"π Updating file content on Drive : {file_id} with {file_path.name}") |
|
|
|
|
|
|
|
|
file = service.files().update( |
|
|
fileId=file_id, |
|
|
media_body=media, |
|
|
fields="id, name, webViewLink, webContentLink", |
|
|
supportsAllDrives=True |
|
|
).execute() |
|
|
|
|
|
|
|
|
view_url = file.get("webViewLink", f"https://drive.google.com/file/d/{file_id}/view") |
|
|
download_url = f"https://drive.google.com/uc?export=download&id={file_id}" |
|
|
|
|
|
result = { |
|
|
"file_id": file_id, |
|
|
"name": file.get("name"), |
|
|
"url": view_url, |
|
|
"download_url": download_url, |
|
|
} |
|
|
|
|
|
logger.debug(f"β
Updated Drive file content: {view_url}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to update Drive file: {e}") |
|
|
raise e |
|
|
|