|
|
|
|
|
""" |
|
|
Google Cloud Storage utilities for centralized client access and common operations. |
|
|
""" |
|
|
from typing import Optional |
|
|
import asyncio |
|
|
import os |
|
|
import uuid |
|
|
from src.logger_config import logger |
|
|
from .gcloud_wrapper import get_default_wrapper |
|
|
from src.file_downloader import get_file_downloader |
|
|
from src.config import get_config_value |
|
|
|
|
|
def _upload_to_drive(local_path: str, filename: str = None, account_name: str = "test_data") -> dict: |
|
|
""" |
|
|
Helper to upload file to Google Drive. |
|
|
Returns dict with url, file_id, and storage_type. |
|
|
|
|
|
Args: |
|
|
local_path: Path to local file. |
|
|
filename: Optional filename to use in Drive (for consistency with GCS). |
|
|
account_name: Account to use. |
|
|
""" |
|
|
from .drive_utils import upload_file_to_drive |
|
|
drive_result = upload_file_to_drive(local_path, filename=filename, account_name=account_name) |
|
|
drive_result["storage_type"] = "drive" |
|
|
return drive_result |
|
|
|
|
|
|
|
|
def get_gcs_client(account_name: str = "final_data"): |
|
|
""" |
|
|
Get a configured GCS client for the specified account. |
|
|
Defaults to 'final_data' which is the main account for production. |
|
|
Uses cached singleton wrapper - auth happens only once. |
|
|
""" |
|
|
return get_default_wrapper().get_storage_client(account_name) |
|
|
|
|
|
def get_gcs_credentials(account_name: str = "final_data"): |
|
|
""" |
|
|
Get credentials for the specified account. |
|
|
Useful for initializing other clients (like TTS) with the same credentials. |
|
|
Uses cached singleton wrapper - auth happens only once. |
|
|
""" |
|
|
return get_default_wrapper()._get_credentials(account_name) |
|
|
|
|
|
def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_name: str = "final_data") -> bool: |
|
|
""" |
|
|
Delete a file from GCS. |
|
|
|
|
|
Args: |
|
|
filename: Name/path of the blob to delete. |
|
|
bucket_name: Name of the bucket. If None, tries to infer from filename or env. |
|
|
account_name: Account to use (default: final_data). |
|
|
|
|
|
Returns: |
|
|
bool: True if deleted or didn't exist, False if error. |
|
|
""" |
|
|
try: |
|
|
client = get_gcs_client(account_name) |
|
|
|
|
|
|
|
|
target_bucket = bucket_name |
|
|
blob_name = filename |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not target_bucket: |
|
|
target_bucket = get_config_value("GCS_BUCKET_NAME", "elvoro-assets") |
|
|
|
|
|
bucket = client.bucket(target_bucket) |
|
|
blob = bucket.blob(blob_name) |
|
|
|
|
|
if blob.exists(): |
|
|
blob.delete() |
|
|
logger.debug(f"ποΈ Deleted GCS file: gs://{target_bucket}/{blob_name}") |
|
|
else: |
|
|
logger.debug(f"β οΈ File not found (already deleted?): gs://{target_bucket}/{blob_name}") |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to delete GCS file {filename}: {e}") |
|
|
return False |
|
|
|
|
|
def upload_file_to_gcs( |
|
|
local_path: str, |
|
|
folder: str = None, |
|
|
destination_blob_name: str = None, |
|
|
account_name: str = "final_data", |
|
|
bucket_name: str = get_config_value("GCS_BUCKET_NAME"), |
|
|
generate_signed_url: bool = True, |
|
|
fallback_to_drive: bool = True, |
|
|
save_in_drive_also: bool = True, |
|
|
) -> dict: |
|
|
""" |
|
|
Upload a local file to GCS. |
|
|
|
|
|
Args: |
|
|
local_path: Path to local file. |
|
|
destination_blob_name: Path in the bucket (e.g. 'folder/file.mp4'). |
|
|
bucket_name: Target bucket name. |
|
|
account_name: Account to use. |
|
|
generate_signed_url: Whether to generate a signed URL. |
|
|
fallback_to_drive: If True, fallback to Google Drive upload on GCS failure. |
|
|
|
|
|
Returns: |
|
|
dict: { |
|
|
"gcs_filename": str, |
|
|
"url": str (signed or public), |
|
|
"public_url": str, |
|
|
"storage_type": str ("gcs" or "drive") |
|
|
} |
|
|
""" |
|
|
from datetime import timedelta |
|
|
|
|
|
file_ext = os.path.splitext(local_path)[1].lower() |
|
|
|
|
|
|
|
|
if folder is None: |
|
|
folder = "others" |
|
|
if file_ext in [".mp4", ".mov", ".avi", ".mkv"]: |
|
|
folder = get_config_value("gcs_bucket_folder_name") |
|
|
elif file_ext in [".mp3", ".wav", ".aac", ".m4a"]: |
|
|
folder = "audio" |
|
|
elif file_ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]: |
|
|
folder = "image" |
|
|
|
|
|
setup_type = get_config_value("setup_type", "") |
|
|
setup_prefix = f"{setup_type}_" if setup_type else "" |
|
|
|
|
|
if destination_blob_name: |
|
|
blob_name = f"{folder}/{destination_blob_name}{file_ext}" |
|
|
else: |
|
|
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}" |
|
|
|
|
|
try: |
|
|
client = get_gcs_client(account_name) |
|
|
|
|
|
|
|
|
create_bucket_if_not_exists(client, bucket_name) |
|
|
|
|
|
bucket = client.bucket(bucket_name) |
|
|
blob = bucket.blob(blob_name) |
|
|
|
|
|
|
|
|
if blob.exists(): |
|
|
logger.warning(f"β οΈ File already exists in GCS: {blob_name}") |
|
|
|
|
|
unique_suffix = uuid.uuid4().hex[:6] |
|
|
if destination_blob_name: |
|
|
blob_name = f"{folder}/{destination_blob_name}_{unique_suffix}{file_ext}" |
|
|
else: |
|
|
|
|
|
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}" |
|
|
|
|
|
blob = bucket.blob(blob_name) |
|
|
logger.debug(f"π Renamed to unique path: {blob_name}") |
|
|
|
|
|
|
|
|
content_types = { |
|
|
".mp4": "video/mp4", |
|
|
".mp3": "audio/mpeg", |
|
|
".wav": "audio/wav", |
|
|
".png": "image/png", |
|
|
".jpg": "image/jpeg", |
|
|
".jpeg": "image/jpeg", |
|
|
".json": "application/json", |
|
|
".txt": "text/plain", |
|
|
".srt": "text/plain" |
|
|
} |
|
|
|
|
|
blob.content_type = content_types.get(file_ext, "application/octet-stream") |
|
|
|
|
|
logger.debug(f"βοΈ Uploading {os.path.basename(local_path)} to gs://{bucket_name}/{blob_name}") |
|
|
blob.upload_from_filename(local_path) |
|
|
|
|
|
public_url = f"https://storage.googleapis.com/{bucket_name}/{blob_name}" |
|
|
result = { |
|
|
"gcs_filename": blob_name, |
|
|
"public_url": public_url, |
|
|
"url": public_url, |
|
|
"storage_type": "gcs" |
|
|
} |
|
|
|
|
|
if generate_signed_url: |
|
|
try: |
|
|
|
|
|
signed_url = blob.generate_signed_url( |
|
|
version="v4", |
|
|
expiration=timedelta(days=7), |
|
|
method="GET", |
|
|
service_account_email=client.get_service_account_email() |
|
|
) |
|
|
logger.debug(f"β
Signed URL generated") |
|
|
result["url"] = signed_url |
|
|
result["signed_url"] = signed_url |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Failed to generate signed URL (using public URL): {e}") |
|
|
|
|
|
|
|
|
if save_in_drive_also: |
|
|
try: |
|
|
drive_result = _upload_to_drive(local_path, filename=blob_name) |
|
|
result["drive_url"] = drive_result.get("url") |
|
|
result["drive_file_id"] = drive_result.get("file_id") |
|
|
logger.debug(f"β
Also uploaded to Drive: {drive_result.get('url')}") |
|
|
except Exception as drive_error: |
|
|
logger.warning(f"β οΈ Failed to upload to Drive (GCS upload still successful): {drive_error}") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to upload to GCS: {e}") |
|
|
|
|
|
|
|
|
if fallback_to_drive: |
|
|
logger.debug("π Falling back to Google Drive upload...") |
|
|
try: |
|
|
drive_result = _upload_to_drive(local_path, filename=blob_name) |
|
|
drive_result["gcs_filename"] = blob_name |
|
|
logger.debug(f"β
Fallback to Drive successful: {drive_result['url']}") |
|
|
return drive_result |
|
|
except Exception as drive_error: |
|
|
logger.error(f"β Drive fallback also failed: {drive_error}") |
|
|
raise e |
|
|
|
|
|
|
|
|
raise e |
|
|
|
|
|
def list_gcs_files(prefix: str = None, account_name: str = "final_data") -> list[str]: |
|
|
""" |
|
|
List all files in the bucket, optionally filtered by prefix. |
|
|
Returns files sorted by creation time (newest first). |
|
|
""" |
|
|
try: |
|
|
if prefix is None: |
|
|
prefix = get_config_value("gcs_bucket_folder_name") |
|
|
|
|
|
client = get_gcs_client(account_name) |
|
|
bucket_name = get_config_value("GCS_BUCKET_NAME") |
|
|
bucket = client.bucket(bucket_name) |
|
|
|
|
|
blobs = list(client.list_blobs(bucket, prefix=prefix)) |
|
|
|
|
|
|
|
|
blobs_sorted = sorted( |
|
|
blobs, |
|
|
key=lambda b: b.time_created, |
|
|
reverse=True, |
|
|
) |
|
|
|
|
|
file_names = [blob.name for blob in blobs_sorted] |
|
|
logger.debug(f"π Found {len(file_names)} files under '{prefix}'.") |
|
|
return file_names |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to list files: {e}") |
|
|
return [] |
|
|
|
|
|
def create_bucket_if_not_exists(client, bucket_name: str, location: str = "us-central1") -> bool: |
|
|
""" |
|
|
Create a GCS bucket if it doesn't already exist. |
|
|
""" |
|
|
try: |
|
|
bucket = client.bucket(bucket_name) |
|
|
|
|
|
if bucket.exists(): |
|
|
logger.debug(f"β Bucket already exists: gs://{bucket_name}") |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
project_id = client.project |
|
|
if not project_id: |
|
|
logger.debug("No project ID found in client, trying to get from env.") |
|
|
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") or os.environ.get("GCP_PROJECT") or os.environ.get("GCP_PROJECT_ID") |
|
|
|
|
|
if not project_id: |
|
|
logger.warning(f"β οΈ Cannot create bucket '{bucket_name}': No project ID found in client or env.") |
|
|
return False |
|
|
|
|
|
logger.debug(f"π¦ Creating new bucket: gs://{bucket_name} in {location} (Project: {project_id})") |
|
|
|
|
|
|
|
|
new_bucket = client.create_bucket(bucket_name, location=location, project=project_id) |
|
|
logger.debug(f"β
Bucket created successfully: gs://{new_bucket.name}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to create bucket {bucket_name}: {e}") |
|
|
return False |
|
|
|
|
|
def find_and_download_gcs_file(tts_script: str, local_dir: str = "/tmp", account_name: str = "final_data") -> Optional[str]: |
|
|
""" |
|
|
Find and download a file from GCS whose name contains part of the TTS script. |
|
|
Useful when filenames are generated dynamically based on the script text. |
|
|
Replaces APIClients.download_from_gcs. |
|
|
|
|
|
Args: |
|
|
tts_script: Text content used to search for the file name in GCS |
|
|
local_dir: Local directory to save the downloaded file |
|
|
account_name: GCS account to use |
|
|
|
|
|
Returns: |
|
|
Local path to downloaded file, or None on failure. |
|
|
""" |
|
|
try: |
|
|
if get_config_value("test_automation"): |
|
|
return f"{get_config_value('TEST_DATA_DIRECTORY')}/{uuid.uuid4().hex}.mp4" |
|
|
|
|
|
|
|
|
safe_name_50 = "".join(c for c in tts_script[:50] if c.isalnum()) |
|
|
safe_name_10 = "".join(c for c in tts_script[:10] if c.isalnum()) |
|
|
|
|
|
all_files = list_gcs_files(prefix="", account_name=account_name) |
|
|
|
|
|
|
|
|
blob_name = next((f for f in all_files if tts_script in f), None) |
|
|
if not blob_name: |
|
|
blob_name = next((f for f in all_files if safe_name_50 in f), None) |
|
|
if not blob_name: |
|
|
blob_name = next((f for f in all_files if safe_name_10 in f), None) |
|
|
|
|
|
if not blob_name: |
|
|
logger.error(f"β No matching file found in GCS for script: {tts_script[:50]}...") |
|
|
return None |
|
|
|
|
|
client = get_gcs_client(account_name) |
|
|
bucket_name = get_config_value("GCS_BUCKET_NAME") |
|
|
|
|
|
logger.debug(f"βοΈ Found matching file: gs://{bucket_name}/{blob_name}") |
|
|
|
|
|
|
|
|
local_path = os.path.join(local_dir, os.path.basename(blob_name)) |
|
|
|
|
|
try: |
|
|
|
|
|
bucket = client.bucket(bucket_name) |
|
|
blob = bucket.blob(blob_name) |
|
|
blob.download_to_filename(local_path) |
|
|
|
|
|
file_size = os.path.getsize(local_path) |
|
|
logger.debug(f"β
Downloaded {blob_name} β {local_path} ({file_size/1024:.1f} KB)") |
|
|
return str(local_path) |
|
|
except Exception as download_error: |
|
|
logger.error(f"β GCS authenticated download failed: {download_error}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β GCS download failed: {e}") |
|
|
return None |
|
|
|