""" Google Cloud Storage utilities for centralized client access and common operations. """ from typing import Optional import asyncio import os import uuid from src.logger_config import logger from .gcloud_wrapper import get_default_wrapper from src.file_downloader import get_file_downloader from src.config import get_config_value def _upload_to_drive(local_path: str, filename: str = None, account_name: str = "test_data") -> dict: """ Helper to upload file to Google Drive. Returns dict with url, file_id, and storage_type. Args: local_path: Path to local file. filename: Optional filename to use in Drive (for consistency with GCS). account_name: Account to use. """ from .drive_utils import upload_file_to_drive drive_result = upload_file_to_drive(local_path, filename=filename, account_name=account_name) drive_result["storage_type"] = "drive" return drive_result def get_gcs_client(account_name: str = "final_data"): """ Get a configured GCS client for the specified account. Defaults to 'final_data' which is the main account for production. Uses cached singleton wrapper - auth happens only once. """ return get_default_wrapper().get_storage_client(account_name) def get_gcs_credentials(account_name: str = "final_data"): """ Get credentials for the specified account. Useful for initializing other clients (like TTS) with the same credentials. Uses cached singleton wrapper - auth happens only once. """ return get_default_wrapper()._get_credentials(account_name) def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_name: str = "final_data") -> bool: """ Delete a file from GCS. Args: filename: Name/path of the blob to delete. bucket_name: Name of the bucket. If None, tries to infer from filename or env. account_name: Account to use (default: final_data). Returns: bool: True if deleted or didn't exist, False if error. """ try: client = get_gcs_client(account_name) # Determine bucket name target_bucket = bucket_name blob_name = filename # Handle cases where filename contains full GCS path or URL? # Assuming filename is relative path in bucket, but let's check config if bucket_name missing # For now, simplest implementation relying on caller providing correct relative path # or checking env if bucket_name is None if not target_bucket: target_bucket = get_config_value("GCS_BUCKET_NAME", "elvoro-assets") bucket = client.bucket(target_bucket) blob = bucket.blob(blob_name) if blob.exists(): blob.delete() logger.debug(f"🗑️ Deleted GCS file: gs://{target_bucket}/{blob_name}") else: logger.debug(f"⚠️ File not found (already deleted?): gs://{target_bucket}/{blob_name}") return True except Exception as e: logger.error(f"❌ Failed to delete GCS file {filename}: {e}") return False def upload_file_to_gcs( local_path: str, folder: str = None, destination_blob_name: str = None, account_name: str = "final_data", bucket_name: str = get_config_value("GCS_BUCKET_NAME"), generate_signed_url: bool = True, fallback_to_drive: bool = True, save_in_drive_also: bool = True, # Uploads to Shared Drive (service account must be contributor) ) -> dict: """ Upload a local file to GCS. Args: local_path: Path to local file. destination_blob_name: Path in the bucket (e.g. 'folder/file.mp4'). bucket_name: Target bucket name. account_name: Account to use. generate_signed_url: Whether to generate a signed URL. fallback_to_drive: If True, fallback to Google Drive upload on GCS failure. Returns: dict: { "gcs_filename": str, "url": str (signed or public), "public_url": str, "storage_type": str ("gcs" or "drive") } """ from datetime import timedelta file_ext = os.path.splitext(local_path)[1].lower() # Determine folder based on file type if folder is None: folder = "others" if file_ext in [".mp4", ".mov", ".avi", ".mkv"]: folder = get_config_value("gcs_bucket_folder_name") elif file_ext in [".mp3", ".wav", ".aac", ".m4a"]: folder = "audio" elif file_ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]: folder = "image" setup_type = get_config_value("setup_type", "") setup_prefix = f"{setup_type}_" if setup_type else "" if destination_blob_name: blob_name = f"{folder}/{destination_blob_name}{file_ext}" else: blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}" try: client = get_gcs_client(account_name) # Lazy bucket creation create_bucket_if_not_exists(client, bucket_name) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) # Check if file exists and retry with unique name if so if blob.exists(): logger.warning(f"⚠️ File already exists in GCS: {blob_name}") # Insert a short UUID before extension to make it unique unique_suffix = uuid.uuid4().hex[:6] if destination_blob_name: blob_name = f"{folder}/{destination_blob_name}_{unique_suffix}{file_ext}" else: # Should rarely happen for UUIDs but good practice blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}" blob = bucket.blob(blob_name) logger.debug(f"🔄 Renamed to unique path: {blob_name}") # Determine content type content_types = { ".mp4": "video/mp4", ".mp3": "audio/mpeg", ".wav": "audio/wav", ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".json": "application/json", ".txt": "text/plain", ".srt": "text/plain" } blob.content_type = content_types.get(file_ext, "application/octet-stream") logger.debug(f"☁️ Uploading {os.path.basename(local_path)} to gs://{bucket_name}/{blob_name}") blob.upload_from_filename(local_path) public_url = f"https://storage.googleapis.com/{bucket_name}/{blob_name}" result = { "gcs_filename": blob_name, "public_url": public_url, "url": public_url, "storage_type": "gcs" } if generate_signed_url: try: # Note: signing requires service account credentials with signing capability signed_url = blob.generate_signed_url( version="v4", expiration=timedelta(days=7), method="GET", service_account_email=client.get_service_account_email() ) logger.debug(f"✅ Signed URL generated") result["url"] = signed_url result["signed_url"] = signed_url except Exception as e: logger.warning(f"⚠️ Failed to generate signed URL (using public URL): {e}") # Also upload to Google Drive if requested if save_in_drive_also: try: drive_result = _upload_to_drive(local_path, filename=blob_name) result["drive_url"] = drive_result.get("url") result["drive_file_id"] = drive_result.get("file_id") logger.debug(f"✅ Also uploaded to Drive: {drive_result.get('url')}") except Exception as drive_error: logger.warning(f"⚠️ Failed to upload to Drive (GCS upload still successful): {drive_error}") return result except Exception as e: logger.error(f"❌ Failed to upload to GCS: {e}") # Fallback to Google Drive if enabled if fallback_to_drive: logger.debug("🔄 Falling back to Google Drive upload...") try: drive_result = _upload_to_drive(local_path, filename=blob_name) drive_result["gcs_filename"] = blob_name logger.debug(f"✅ Fallback to Drive successful: {drive_result['url']}") return drive_result except Exception as drive_error: logger.error(f"❌ Drive fallback also failed: {drive_error}") raise e # Re-raise original GCS error # Re-raise to let caller handle critical failure raise e def list_gcs_files(prefix: str = None, account_name: str = "final_data") -> list[str]: """ List all files in the bucket, optionally filtered by prefix. Returns files sorted by creation time (newest first). """ try: if prefix is None: prefix = get_config_value("gcs_bucket_folder_name") client = get_gcs_client(account_name) bucket_name = get_config_value("GCS_BUCKET_NAME") bucket = client.bucket(bucket_name) blobs = list(client.list_blobs(bucket, prefix=prefix)) # Sort by creation time (newest first) blobs_sorted = sorted( blobs, key=lambda b: b.time_created, reverse=True, ) file_names = [blob.name for blob in blobs_sorted] logger.debug(f"📂 Found {len(file_names)} files under '{prefix}'.") return file_names except Exception as e: logger.error(f"❌ Failed to list files: {e}") return [] def create_bucket_if_not_exists(client, bucket_name: str, location: str = "us-central1") -> bool: """ Create a GCS bucket if it doesn't already exist. """ try: bucket = client.bucket(bucket_name) if bucket.exists(): logger.debug(f"✓ Bucket already exists: gs://{bucket_name}") return True # Bucket doesn't exist, create it # We need a project ID for creation. project_id = client.project if not project_id: logger.debug("No project ID found in client, trying to get from env.") project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") or os.environ.get("GCP_PROJECT") or os.environ.get("GCP_PROJECT_ID") if not project_id: logger.warning(f"⚠️ Cannot create bucket '{bucket_name}': No project ID found in client or env.") return False logger.debug(f"📦 Creating new bucket: gs://{bucket_name} in {location} (Project: {project_id})") # Pass project explicitly if creating new_bucket = client.create_bucket(bucket_name, location=location, project=project_id) logger.debug(f"✅ Bucket created successfully: gs://{new_bucket.name}") return True except Exception as e: logger.error(f"❌ Failed to create bucket {bucket_name}: {e}") return False def find_and_download_gcs_file(tts_script: str, local_dir: str = "/tmp", account_name: str = "final_data") -> Optional[str]: """ Find and download a file from GCS whose name contains part of the TTS script. Useful when filenames are generated dynamically based on the script text. Replaces APIClients.download_from_gcs. Args: tts_script: Text content used to search for the file name in GCS local_dir: Local directory to save the downloaded file account_name: GCS account to use Returns: Local path to downloaded file, or None on failure. """ try: if get_config_value("test_automation"): return f"{get_config_value('TEST_DATA_DIRECTORY')}/{uuid.uuid4().hex}.mp4" # Prepare a safe pattern to search by safe_name_50 = "".join(c for c in tts_script[:50] if c.isalnum()) safe_name_10 = "".join(c for c in tts_script[:10] if c.isalnum()) all_files = list_gcs_files(prefix="", account_name=account_name) # Try to find a matching file name blob_name = next((f for f in all_files if tts_script in f), None) if not blob_name: blob_name = next((f for f in all_files if safe_name_50 in f), None) if not blob_name: blob_name = next((f for f in all_files if safe_name_10 in f), None) if not blob_name: logger.error(f"❌ No matching file found in GCS for script: {tts_script[:50]}...") return None client = get_gcs_client(account_name) bucket_name = get_config_value("GCS_BUCKET_NAME") logger.debug(f"☁️ Found matching file: gs://{bucket_name}/{blob_name}") # Use authenticated download via GCloudWrapper local_path = os.path.join(local_dir, os.path.basename(blob_name)) try: # Download using authenticated GCS client bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) blob.download_to_filename(local_path) file_size = os.path.getsize(local_path) logger.debug(f"✅ Downloaded {blob_name} → {local_path} ({file_size/1024:.1f} KB)") return str(local_path) except Exception as download_error: logger.error(f"❌ GCS authenticated download failed: {download_error}") return None except Exception as e: logger.error(f"❌ GCS download failed: {e}") return None