File size: 13,804 Bytes
6ab6640 12b7787 e79a06d f20025d e79a06d 05104b9 6ab6640 8524219 6ab6640 e79a06d 6ab6640 e79a06d 6ab6640 7f46e56 e79a06d 7f46e56 e79a06d 7f46e56 6ab6640 3bba70c 6ab6640 503d4ac 6ab6640 503d4ac 6ab6640 1d300b0 ff14cb7 6ab6640 05104b9 37981e0 af92103 6ab6640 37981e0 6ab6640 37981e0 6ab6640 5b4dcf4 1d300b0 5b4dcf4 ff14cb7 5b4dcf4 6ab6640 750910c 6ab6640 e79a06d ff14cb7 503d4ac e79a06d 6ab6640 e79a06d 6ab6640 503d4ac 6ab6640 e79a06d 6ab6640 e79a06d 6ab6640 37981e0 6ab6640 503d4ac 6ab6640 693a2b5 6ab6640 8524219 503d4ac 8524219 6ab6640 37981e0 503d4ac 37981e0 5b4dcf4 503d4ac 37981e0 6ab6640 e79a06d 92b42d6 e79a06d 92b42d6 e79a06d 3bba70c e79a06d 503d4ac e79a06d 3a1dbcc 503d4ac 3a1dbcc 05104b9 503d4ac 05104b9 503d4ac 05104b9 503d4ac 3a1dbcc d524fdc fce9bf7 3bba70c d524fdc 3bba70c d524fdc 503d4ac d524fdc bd94cd9 d524fdc bd94cd9 503d4ac bd94cd9 d524fdc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
"""
Google Cloud Storage utilities for centralized client access and common operations.
"""
from typing import Optional
import asyncio
import os
import uuid
from src.logger_config import logger
from .gcloud_wrapper import get_default_wrapper
from src.file_downloader import get_file_downloader
from src.config import get_config_value
def _upload_to_drive(local_path: str, filename: str = None, account_name: str = "test_data") -> dict:
"""
Helper to upload file to Google Drive.
Returns dict with url, file_id, and storage_type.
Args:
local_path: Path to local file.
filename: Optional filename to use in Drive (for consistency with GCS).
account_name: Account to use.
"""
from .drive_utils import upload_file_to_drive
drive_result = upload_file_to_drive(local_path, filename=filename, account_name=account_name)
drive_result["storage_type"] = "drive"
return drive_result
def get_gcs_client(account_name: str = "final_data"):
"""
Get a configured GCS client for the specified account.
Defaults to 'final_data' which is the main account for production.
Uses cached singleton wrapper - auth happens only once.
"""
return get_default_wrapper().get_storage_client(account_name)
def get_gcs_credentials(account_name: str = "final_data"):
"""
Get credentials for the specified account.
Useful for initializing other clients (like TTS) with the same credentials.
Uses cached singleton wrapper - auth happens only once.
"""
return get_default_wrapper()._get_credentials(account_name)
def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_name: str = "final_data") -> bool:
"""
Delete a file from GCS.
Args:
filename: Name/path of the blob to delete.
bucket_name: Name of the bucket. If None, tries to infer from filename or env.
account_name: Account to use (default: final_data).
Returns:
bool: True if deleted or didn't exist, False if error.
"""
try:
client = get_gcs_client(account_name)
# Determine bucket name
target_bucket = bucket_name
blob_name = filename
# Handle cases where filename contains full GCS path or URL?
# Assuming filename is relative path in bucket, but let's check config if bucket_name missing
# For now, simplest implementation relying on caller providing correct relative path
# or checking env if bucket_name is None
if not target_bucket:
target_bucket = get_config_value("GCS_BUCKET_NAME", "elvoro-assets")
bucket = client.bucket(target_bucket)
blob = bucket.blob(blob_name)
if blob.exists():
blob.delete()
logger.debug(f"ποΈ Deleted GCS file: gs://{target_bucket}/{blob_name}")
else:
logger.debug(f"β οΈ File not found (already deleted?): gs://{target_bucket}/{blob_name}")
return True
except Exception as e:
logger.error(f"β Failed to delete GCS file {filename}: {e}")
return False
def upload_file_to_gcs(
local_path: str,
folder: str = None,
destination_blob_name: str = None,
account_name: str = "final_data",
bucket_name: str = get_config_value("GCS_BUCKET_NAME"),
generate_signed_url: bool = True,
fallback_to_drive: bool = True,
save_in_drive_also: bool = True, # Uploads to Shared Drive (service account must be contributor)
) -> dict:
"""
Upload a local file to GCS.
Args:
local_path: Path to local file.
destination_blob_name: Path in the bucket (e.g. 'folder/file.mp4').
bucket_name: Target bucket name.
account_name: Account to use.
generate_signed_url: Whether to generate a signed URL.
fallback_to_drive: If True, fallback to Google Drive upload on GCS failure.
Returns:
dict: {
"gcs_filename": str,
"url": str (signed or public),
"public_url": str,
"storage_type": str ("gcs" or "drive")
}
"""
from datetime import timedelta
file_ext = os.path.splitext(local_path)[1].lower()
# Determine folder based on file type
if folder is None:
folder = "others"
if file_ext in [".mp4", ".mov", ".avi", ".mkv"]:
folder = get_config_value("gcs_bucket_folder_name")
elif file_ext in [".mp3", ".wav", ".aac", ".m4a"]:
folder = "audio"
elif file_ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
folder = "image"
setup_type = get_config_value("setup_type", "")
setup_prefix = f"{setup_type}_" if setup_type else ""
if destination_blob_name:
blob_name = f"{folder}/{destination_blob_name}{file_ext}"
else:
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
try:
client = get_gcs_client(account_name)
# Lazy bucket creation
create_bucket_if_not_exists(client, bucket_name)
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# Check if file exists and retry with unique name if so
if blob.exists():
logger.warning(f"β οΈ File already exists in GCS: {blob_name}")
# Insert a short UUID before extension to make it unique
unique_suffix = uuid.uuid4().hex[:6]
if destination_blob_name:
blob_name = f"{folder}/{destination_blob_name}_{unique_suffix}{file_ext}"
else:
# Should rarely happen for UUIDs but good practice
blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
blob = bucket.blob(blob_name)
logger.debug(f"π Renamed to unique path: {blob_name}")
# Determine content type
content_types = {
".mp4": "video/mp4",
".mp3": "audio/mpeg",
".wav": "audio/wav",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".json": "application/json",
".txt": "text/plain",
".srt": "text/plain"
}
blob.content_type = content_types.get(file_ext, "application/octet-stream")
logger.debug(f"βοΈ Uploading {os.path.basename(local_path)} to gs://{bucket_name}/{blob_name}")
blob.upload_from_filename(local_path)
public_url = f"https://storage.googleapis.com/{bucket_name}/{blob_name}"
result = {
"gcs_filename": blob_name,
"public_url": public_url,
"url": public_url,
"storage_type": "gcs"
}
if generate_signed_url:
try:
# Note: signing requires service account credentials with signing capability
signed_url = blob.generate_signed_url(
version="v4",
expiration=timedelta(days=7),
method="GET",
service_account_email=client.get_service_account_email()
)
logger.debug(f"β
Signed URL generated")
result["url"] = signed_url
result["signed_url"] = signed_url
except Exception as e:
logger.warning(f"β οΈ Failed to generate signed URL (using public URL): {e}")
# Also upload to Google Drive if requested
if save_in_drive_also:
try:
drive_result = _upload_to_drive(local_path, filename=blob_name)
result["drive_url"] = drive_result.get("url")
result["drive_file_id"] = drive_result.get("file_id")
logger.debug(f"β
Also uploaded to Drive: {drive_result.get('url')}")
except Exception as drive_error:
logger.warning(f"β οΈ Failed to upload to Drive (GCS upload still successful): {drive_error}")
return result
except Exception as e:
logger.error(f"β Failed to upload to GCS: {e}")
# Fallback to Google Drive if enabled
if fallback_to_drive:
logger.debug("π Falling back to Google Drive upload...")
try:
drive_result = _upload_to_drive(local_path, filename=blob_name)
drive_result["gcs_filename"] = blob_name
logger.debug(f"β
Fallback to Drive successful: {drive_result['url']}")
return drive_result
except Exception as drive_error:
logger.error(f"β Drive fallback also failed: {drive_error}")
raise e # Re-raise original GCS error
# Re-raise to let caller handle critical failure
raise e
def list_gcs_files(prefix: str = None, account_name: str = "final_data") -> list[str]:
"""
List all files in the bucket, optionally filtered by prefix.
Returns files sorted by creation time (newest first).
"""
try:
if prefix is None:
prefix = get_config_value("gcs_bucket_folder_name")
client = get_gcs_client(account_name)
bucket_name = get_config_value("GCS_BUCKET_NAME")
bucket = client.bucket(bucket_name)
blobs = list(client.list_blobs(bucket, prefix=prefix))
# Sort by creation time (newest first)
blobs_sorted = sorted(
blobs,
key=lambda b: b.time_created,
reverse=True,
)
file_names = [blob.name for blob in blobs_sorted]
logger.debug(f"π Found {len(file_names)} files under '{prefix}'.")
return file_names
except Exception as e:
logger.error(f"β Failed to list files: {e}")
return []
def create_bucket_if_not_exists(client, bucket_name: str, location: str = "us-central1") -> bool:
"""
Create a GCS bucket if it doesn't already exist.
"""
try:
bucket = client.bucket(bucket_name)
if bucket.exists():
logger.debug(f"β Bucket already exists: gs://{bucket_name}")
return True
# Bucket doesn't exist, create it
# We need a project ID for creation.
project_id = client.project
if not project_id:
logger.debug("No project ID found in client, trying to get from env.")
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") or os.environ.get("GCP_PROJECT") or os.environ.get("GCP_PROJECT_ID")
if not project_id:
logger.warning(f"β οΈ Cannot create bucket '{bucket_name}': No project ID found in client or env.")
return False
logger.debug(f"π¦ Creating new bucket: gs://{bucket_name} in {location} (Project: {project_id})")
# Pass project explicitly if creating
new_bucket = client.create_bucket(bucket_name, location=location, project=project_id)
logger.debug(f"β
Bucket created successfully: gs://{new_bucket.name}")
return True
except Exception as e:
logger.error(f"β Failed to create bucket {bucket_name}: {e}")
return False
def find_and_download_gcs_file(tts_script: str, local_dir: str = "/tmp", account_name: str = "final_data") -> Optional[str]:
"""
Find and download a file from GCS whose name contains part of the TTS script.
Useful when filenames are generated dynamically based on the script text.
Replaces APIClients.download_from_gcs.
Args:
tts_script: Text content used to search for the file name in GCS
local_dir: Local directory to save the downloaded file
account_name: GCS account to use
Returns:
Local path to downloaded file, or None on failure.
"""
try:
if get_config_value("test_automation"):
return f"{get_config_value('TEST_DATA_DIRECTORY')}/{uuid.uuid4().hex}.mp4"
# Prepare a safe pattern to search by
safe_name_50 = "".join(c for c in tts_script[:50] if c.isalnum())
safe_name_10 = "".join(c for c in tts_script[:10] if c.isalnum())
all_files = list_gcs_files(prefix="", account_name=account_name)
# Try to find a matching file name
blob_name = next((f for f in all_files if tts_script in f), None)
if not blob_name:
blob_name = next((f for f in all_files if safe_name_50 in f), None)
if not blob_name:
blob_name = next((f for f in all_files if safe_name_10 in f), None)
if not blob_name:
logger.error(f"β No matching file found in GCS for script: {tts_script[:50]}...")
return None
client = get_gcs_client(account_name)
bucket_name = get_config_value("GCS_BUCKET_NAME")
logger.debug(f"βοΈ Found matching file: gs://{bucket_name}/{blob_name}")
# Use authenticated download via GCloudWrapper
local_path = os.path.join(local_dir, os.path.basename(blob_name))
try:
# Download using authenticated GCS client
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.download_to_filename(local_path)
file_size = os.path.getsize(local_path)
logger.debug(f"β
Downloaded {blob_name} β {local_path} ({file_size/1024:.1f} KB)")
return str(local_path)
except Exception as download_error:
logger.error(f"β GCS authenticated download failed: {download_error}")
return None
except Exception as e:
logger.error(f"β GCS download failed: {e}")
return None
|