Tools / src /google_src /gcs_utils.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
"""
Google Cloud Storage utilities for centralized client access and common operations.
"""
from typing import Optional
import asyncio
import os
import uuid
from src.logger_config import logger
from .gcloud_wrapper import get_default_wrapper
from src.file_downloader import get_file_downloader
from src.config import get_config_value
def _upload_to_drive(local_path: str, filename: str = None, account_name: str = "test_data") -> dict:
"""
Helper to upload file to Google Drive.
Returns dict with url, file_id, and storage_type.
Args:
local_path: Path to local file.
filename: Optional filename to use in Drive (for consistency with GCS).
account_name: Account to use.
"""
from .drive_utils import upload_file_to_drive
drive_result = upload_file_to_drive(local_path, filename=filename, account_name=account_name)
drive_result["storage_type"] = "drive"
return drive_result
def get_gcs_client(account_name: str = "final_data"):
"""
Get a configured GCS client for the specified account.
Defaults to 'final_data' which is the main account for production.
Uses cached singleton wrapper - auth happens only once.
"""
return get_default_wrapper().get_storage_client(account_name)
def get_gcs_credentials(account_name: str = "final_data"):
"""
Get credentials for the specified account.
Useful for initializing other clients (like TTS) with the same credentials.
Uses cached singleton wrapper - auth happens only once.
"""
return get_default_wrapper()._get_credentials(account_name)
def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_name: str = "final_data") -> bool:
"""
Delete a file from GCS.
Args:
filename: Name/path of the blob to delete.
bucket_name: Name of the bucket. If None, tries to infer from filename or env.
account_name: Account to use (default: final_data).
Returns:
bool: True if deleted or didn't exist, False if error.
"""
try:
client = get_gcs_client(account_name)
# Determine bucket name
target_bucket = bucket_name
blob_name = filename
# Handle cases where filename contains full GCS path or URL?
# Assuming filename is relative path in bucket, but let's check config if bucket_name missing
# For now, simplest implementation relying on caller providing correct relative path
# or checking env if bucket_name is None
if not target_bucket:
target_bucket = get_config_value("GCS_BUCKET_NAME", "elvoro-assets")
bucket = client.bucket(target_bucket)
blob = bucket.blob(blob_name)
if blob.exists():
blob.delete()
logger.debug(f"πŸ—‘οΈ Deleted GCS file: gs://{target_bucket}/{blob_name}")
else:
logger.debug(f"⚠️ File not found (already deleted?): gs://{target_bucket}/{blob_name}")
return True
except Exception as e:
logger.error(f"❌ Failed to delete GCS file {filename}: {e}")
return False
def upload_file_to_gcs(
local_path: str,
folder: str = None,
destination_blob_name: str = None,
account_name: str = "final_data",
bucket_name: str = get_config_value("GCS_BUCKET_NAME"),
generate_signed_url: bool = True,
fallback_to_drive: bool = True,
save_in_drive_also: bool = True, # Uploads to Shared Drive (service account must be contributor)
) -> dict:
"""
Upload a local file to GCS.
Args:
local_path: Path to local file.
destination_blob_name: Path in the bucket (e.g. 'folder/file.mp4').
bucket_name: Target bucket name.
account_name: Account to use.
generate_signed_url: Whether to generate a signed URL.
fallback_to_drive: If True, fallback to Google Drive upload on GCS failure.
Returns:
dict: {
"gcs_filename": str,
"url": str (signed or public),
"public_url": str,
"storage_type": str ("gcs" or "drive")
}
"""
from datetime import timedelta
file_ext = os.path.splitext(local_path)[1].lower()
# Determine folder based on file type
if folder is None:
folder = "others"
if file_ext in [".mp4", ".mov", ".avi", ".mkv"]:
folder = get_config_value("gcs_bucket_folder_name")
elif file_ext in [".mp3", ".wav", ".aac", ".m4a"]:
folder = "audio"
elif file_ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
folder = "image"
setup_type = get_config_value("setup_type", "")
setup_prefix = f"{setup_type}_" if setup_type else ""
if destination_blob_name:
blob_name = f"{folder}/{destination_blob_name}{file_ext}"
else:
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
try:
client = get_gcs_client(account_name)
# Lazy bucket creation
create_bucket_if_not_exists(client, bucket_name)
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# Check if file exists and retry with unique name if so
if blob.exists():
logger.warning(f"⚠️ File already exists in GCS: {blob_name}")
# Insert a short UUID before extension to make it unique
unique_suffix = uuid.uuid4().hex[:6]
if destination_blob_name:
blob_name = f"{folder}/{destination_blob_name}_{unique_suffix}{file_ext}"
else:
# Should rarely happen for UUIDs but good practice
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
blob = bucket.blob(blob_name)
logger.debug(f"πŸ”„ Renamed to unique path: {blob_name}")
# Determine content type
content_types = {
".mp4": "video/mp4",
".mp3": "audio/mpeg",
".wav": "audio/wav",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".json": "application/json",
".txt": "text/plain",
".srt": "text/plain"
}
blob.content_type = content_types.get(file_ext, "application/octet-stream")
logger.debug(f"☁️ Uploading {os.path.basename(local_path)} to gs://{bucket_name}/{blob_name}")
blob.upload_from_filename(local_path)
public_url = f"https://storage.googleapis.com/{bucket_name}/{blob_name}"
result = {
"gcs_filename": blob_name,
"public_url": public_url,
"url": public_url,
"storage_type": "gcs"
}
if generate_signed_url:
try:
# Note: signing requires service account credentials with signing capability
signed_url = blob.generate_signed_url(
version="v4",
expiration=timedelta(days=7),
method="GET",
service_account_email=client.get_service_account_email()
)
logger.debug(f"βœ… Signed URL generated")
result["url"] = signed_url
result["signed_url"] = signed_url
except Exception as e:
logger.warning(f"⚠️ Failed to generate signed URL (using public URL): {e}")
# Also upload to Google Drive if requested
if save_in_drive_also:
try:
drive_result = _upload_to_drive(local_path, filename=blob_name)
result["drive_url"] = drive_result.get("url")
result["drive_file_id"] = drive_result.get("file_id")
logger.debug(f"βœ… Also uploaded to Drive: {drive_result.get('url')}")
except Exception as drive_error:
logger.warning(f"⚠️ Failed to upload to Drive (GCS upload still successful): {drive_error}")
return result
except Exception as e:
logger.error(f"❌ Failed to upload to GCS: {e}")
# Fallback to Google Drive if enabled
if fallback_to_drive:
logger.debug("πŸ”„ Falling back to Google Drive upload...")
try:
drive_result = _upload_to_drive(local_path, filename=blob_name)
drive_result["gcs_filename"] = blob_name
logger.debug(f"βœ… Fallback to Drive successful: {drive_result['url']}")
return drive_result
except Exception as drive_error:
logger.error(f"❌ Drive fallback also failed: {drive_error}")
raise e # Re-raise original GCS error
# Re-raise to let caller handle critical failure
raise e
def list_gcs_files(prefix: str = None, account_name: str = "final_data") -> list[str]:
"""
List all files in the bucket, optionally filtered by prefix.
Returns files sorted by creation time (newest first).
"""
try:
if prefix is None:
prefix = get_config_value("gcs_bucket_folder_name")
client = get_gcs_client(account_name)
bucket_name = get_config_value("GCS_BUCKET_NAME")
bucket = client.bucket(bucket_name)
blobs = list(client.list_blobs(bucket, prefix=prefix))
# Sort by creation time (newest first)
blobs_sorted = sorted(
blobs,
key=lambda b: b.time_created,
reverse=True,
)
file_names = [blob.name for blob in blobs_sorted]
logger.debug(f"πŸ“‚ Found {len(file_names)} files under '{prefix}'.")
return file_names
except Exception as e:
logger.error(f"❌ Failed to list files: {e}")
return []
def create_bucket_if_not_exists(client, bucket_name: str, location: str = "us-central1") -> bool:
"""
Create a GCS bucket if it doesn't already exist.
"""
try:
bucket = client.bucket(bucket_name)
if bucket.exists():
logger.debug(f"βœ“ Bucket already exists: gs://{bucket_name}")
return True
# Bucket doesn't exist, create it
# We need a project ID for creation.
project_id = client.project
if not project_id:
logger.debug("No project ID found in client, trying to get from env.")
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") or os.environ.get("GCP_PROJECT") or os.environ.get("GCP_PROJECT_ID")
if not project_id:
logger.warning(f"⚠️ Cannot create bucket '{bucket_name}': No project ID found in client or env.")
return False
logger.debug(f"πŸ“¦ Creating new bucket: gs://{bucket_name} in {location} (Project: {project_id})")
# Pass project explicitly if creating
new_bucket = client.create_bucket(bucket_name, location=location, project=project_id)
logger.debug(f"βœ… Bucket created successfully: gs://{new_bucket.name}")
return True
except Exception as e:
logger.error(f"❌ Failed to create bucket {bucket_name}: {e}")
return False
def find_and_download_gcs_file(tts_script: str, local_dir: str = "/tmp", account_name: str = "final_data") -> Optional[str]:
"""
Find and download a file from GCS whose name contains part of the TTS script.
Useful when filenames are generated dynamically based on the script text.
Replaces APIClients.download_from_gcs.
Args:
tts_script: Text content used to search for the file name in GCS
local_dir: Local directory to save the downloaded file
account_name: GCS account to use
Returns:
Local path to downloaded file, or None on failure.
"""
try:
if get_config_value("test_automation"):
return f"{get_config_value('TEST_DATA_DIRECTORY')}/{uuid.uuid4().hex}.mp4"
# Prepare a safe pattern to search by
safe_name_50 = "".join(c for c in tts_script[:50] if c.isalnum())
safe_name_10 = "".join(c for c in tts_script[:10] if c.isalnum())
all_files = list_gcs_files(prefix="", account_name=account_name)
# Try to find a matching file name
blob_name = next((f for f in all_files if tts_script in f), None)
if not blob_name:
blob_name = next((f for f in all_files if safe_name_50 in f), None)
if not blob_name:
blob_name = next((f for f in all_files if safe_name_10 in f), None)
if not blob_name:
logger.error(f"❌ No matching file found in GCS for script: {tts_script[:50]}...")
return None
client = get_gcs_client(account_name)
bucket_name = get_config_value("GCS_BUCKET_NAME")
logger.debug(f"☁️ Found matching file: gs://{bucket_name}/{blob_name}")
# Use authenticated download via GCloudWrapper
local_path = os.path.join(local_dir, os.path.basename(blob_name))
try:
# Download using authenticated GCS client
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.download_to_filename(local_path)
file_size = os.path.getsize(local_path)
logger.debug(f"βœ… Downloaded {blob_name} β†’ {local_path} ({file_size/1024:.1f} KB)")
return str(local_path)
except Exception as download_error:
logger.error(f"❌ GCS authenticated download failed: {download_error}")
return None
except Exception as e:
logger.error(f"❌ GCS download failed: {e}")
return None