File size: 13,804 Bytes
6ab6640
 
 
 
 
12b7787
e79a06d
 
f20025d
e79a06d
05104b9
 
6ab6640
8524219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6ab6640
 
 
 
e79a06d
6ab6640
e79a06d
6ab6640
7f46e56
 
 
 
e79a06d
7f46e56
e79a06d
7f46e56
6ab6640
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bba70c
6ab6640
 
 
 
 
 
503d4ac
6ab6640
503d4ac
6ab6640
 
 
 
 
 
 
 
 
 
1d300b0
ff14cb7
6ab6640
05104b9
37981e0
 
af92103
6ab6640
 
 
 
 
 
 
 
 
 
37981e0
6ab6640
 
 
 
 
37981e0
 
6ab6640
 
 
 
5b4dcf4
 
 
1d300b0
 
 
 
 
 
 
 
5b4dcf4
 
 
ff14cb7
 
 
 
 
5b4dcf4
6ab6640
 
750910c
 
 
 
6ab6640
e79a06d
ff14cb7
 
 
 
 
 
 
 
 
 
 
 
 
503d4ac
e79a06d
6ab6640
 
 
 
 
 
 
 
 
 
 
 
e79a06d
6ab6640
 
503d4ac
6ab6640
 
e79a06d
6ab6640
e79a06d
6ab6640
37981e0
 
6ab6640
 
 
 
 
 
 
 
 
 
 
503d4ac
6ab6640
693a2b5
6ab6640
 
8524219
 
 
 
 
 
 
503d4ac
8524219
 
6ab6640
 
 
 
 
37981e0
 
 
503d4ac
37981e0
5b4dcf4
 
503d4ac
37981e0
 
 
 
 
6ab6640
 
e79a06d
92b42d6
e79a06d
 
 
 
 
92b42d6
 
 
e79a06d
3bba70c
e79a06d
 
 
 
 
 
 
 
 
 
 
 
503d4ac
e79a06d
 
 
 
 
3a1dbcc
 
 
 
 
 
 
 
 
503d4ac
3a1dbcc
 
 
05104b9
 
 
503d4ac
05104b9
 
 
 
 
 
503d4ac
05104b9
 
 
503d4ac
3a1dbcc
 
 
 
 
d524fdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fce9bf7
3bba70c
d524fdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bba70c
d524fdc
503d4ac
d524fdc
bd94cd9
d524fdc
 
bd94cd9
 
 
 
 
 
 
503d4ac
bd94cd9
 
 
 
d524fdc
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356

"""
Google Cloud Storage utilities for centralized client access and common operations.
"""
from typing import Optional
import asyncio
import os
import uuid
from src.logger_config import logger
from .gcloud_wrapper import get_default_wrapper
from src.file_downloader import get_file_downloader
from src.config import get_config_value

def _upload_to_drive(local_path: str, filename: str = None, account_name: str = "test_data") -> dict:
    """
    Helper to upload file to Google Drive.
    Returns dict with url, file_id, and storage_type.
    
    Args:
        local_path: Path to local file.
        filename: Optional filename to use in Drive (for consistency with GCS).
        account_name: Account to use.
    """
    from .drive_utils import upload_file_to_drive
    drive_result = upload_file_to_drive(local_path, filename=filename, account_name=account_name)
    drive_result["storage_type"] = "drive"
    return drive_result


def get_gcs_client(account_name: str = "final_data"):
    """
    Get a configured GCS client for the specified account.
    Defaults to 'final_data' which is the main account for production.
    Uses cached singleton wrapper - auth happens only once.
    """
    return get_default_wrapper().get_storage_client(account_name)

def get_gcs_credentials(account_name: str = "final_data"):
    """
    Get credentials for the specified account.
    Useful for initializing other clients (like TTS) with the same credentials.
    Uses cached singleton wrapper - auth happens only once.
    """
    return get_default_wrapper()._get_credentials(account_name)

def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_name: str = "final_data") -> bool:
    """
    Delete a file from GCS.
    
    Args:
        filename: Name/path of the blob to delete.
        bucket_name: Name of the bucket. If None, tries to infer from filename or env.
        account_name: Account to use (default: final_data).
        
    Returns:
        bool: True if deleted or didn't exist, False if error.
    """
    try:
        client = get_gcs_client(account_name)
        
        # Determine bucket name
        target_bucket = bucket_name
        blob_name = filename
        
        # Handle cases where filename contains full GCS path or URL?
        # Assuming filename is relative path in bucket, but let's check config if bucket_name missing
        # For now, simplest implementation relying on caller providing correct relative path 
        # or checking env if bucket_name is None
        
        if not target_bucket:
            target_bucket = get_config_value("GCS_BUCKET_NAME", "elvoro-assets")

        bucket = client.bucket(target_bucket)
        blob = bucket.blob(blob_name)
        
        if blob.exists():
            blob.delete()
            logger.debug(f"πŸ—‘οΈ Deleted GCS file: gs://{target_bucket}/{blob_name}")
        else:
            logger.debug(f"⚠️ File not found (already deleted?): gs://{target_bucket}/{blob_name}")
            
        return True


    except Exception as e:
        logger.error(f"❌ Failed to delete GCS file {filename}: {e}")
        return False

def upload_file_to_gcs(
    local_path: str,
    folder: str = None,
    destination_blob_name: str = None,
    account_name: str = "final_data",
    bucket_name: str = get_config_value("GCS_BUCKET_NAME"),
    generate_signed_url: bool = True,
    fallback_to_drive: bool = True,
    save_in_drive_also: bool = True,  # Uploads to Shared Drive (service account must be contributor)
) -> dict:
    """
    Upload a local file to GCS.
    
    Args:
        local_path: Path to local file.
        destination_blob_name: Path in the bucket (e.g. 'folder/file.mp4').
        bucket_name: Target bucket name.
        account_name: Account to use.
        generate_signed_url: Whether to generate a signed URL.
        fallback_to_drive: If True, fallback to Google Drive upload on GCS failure.
        
    Returns:
        dict: {
            "gcs_filename": str,
            "url": str (signed or public),
            "public_url": str,
            "storage_type": str ("gcs" or "drive")
        }
    """
    from datetime import timedelta
    
    file_ext = os.path.splitext(local_path)[1].lower()
    
    # Determine folder based on file type
    if folder is None:
        folder = "others"
        if file_ext in [".mp4", ".mov", ".avi", ".mkv"]:
            folder = get_config_value("gcs_bucket_folder_name")
        elif file_ext in [".mp3", ".wav", ".aac", ".m4a"]:
            folder = "audio"
        elif file_ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
            folder = "image"
        
    setup_type = get_config_value("setup_type", "")
    setup_prefix = f"{setup_type}_" if setup_type else ""
    
    if destination_blob_name:
        blob_name = f"{folder}/{destination_blob_name}{file_ext}"
    else:
        blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
    
    try:
        client = get_gcs_client(account_name)

        # Lazy bucket creation
        create_bucket_if_not_exists(client, bucket_name)

        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        
        # Check if file exists and retry with unique name if so
        if blob.exists():
            logger.warning(f"⚠️ File already exists in GCS: {blob_name}")
            # Insert a short UUID before extension to make it unique
            unique_suffix = uuid.uuid4().hex[:6]
            if destination_blob_name:
                blob_name = f"{folder}/{destination_blob_name}_{unique_suffix}{file_ext}"
            else:
                # Should rarely happen for UUIDs but good practice
                blob_name = f"{folder}/{setup_prefix}{uuid.uuid4().hex}{file_ext}"
            
            blob = bucket.blob(blob_name)
            logger.debug(f"πŸ”„ Renamed to unique path: {blob_name}")

        # Determine content type
        content_types = {
            ".mp4": "video/mp4",
            ".mp3": "audio/mpeg",
            ".wav": "audio/wav",
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".json": "application/json",
            ".txt": "text/plain",
            ".srt": "text/plain" 
        }
        
        blob.content_type = content_types.get(file_ext, "application/octet-stream")
        
        logger.debug(f"☁️ Uploading {os.path.basename(local_path)} to gs://{bucket_name}/{blob_name}")
        blob.upload_from_filename(local_path)
        
        public_url = f"https://storage.googleapis.com/{bucket_name}/{blob_name}"
        result = {
            "gcs_filename": blob_name,
            "public_url": public_url,
            "url": public_url,
            "storage_type": "gcs"
        }
        
        if generate_signed_url:
            try:
                # Note: signing requires service account credentials with signing capability
                signed_url = blob.generate_signed_url(
                    version="v4", 
                    expiration=timedelta(days=7), 
                    method="GET",
                    service_account_email=client.get_service_account_email()
                )
                logger.debug(f"βœ… Signed URL generated")
                result["url"] = signed_url
                result["signed_url"] = signed_url
            except Exception as e:
                logger.warning(f"⚠️ Failed to generate signed URL (using public URL): {e}")
        
        # Also upload to Google Drive if requested
        if save_in_drive_also:
            try:
                drive_result = _upload_to_drive(local_path, filename=blob_name)
                result["drive_url"] = drive_result.get("url")
                result["drive_file_id"] = drive_result.get("file_id")
                logger.debug(f"βœ… Also uploaded to Drive: {drive_result.get('url')}")
            except Exception as drive_error:
                logger.warning(f"⚠️ Failed to upload to Drive (GCS upload still successful): {drive_error}")
                
        return result

    except Exception as e:
        logger.error(f"❌ Failed to upload to GCS: {e}")
        
        # Fallback to Google Drive if enabled
        if fallback_to_drive:
            logger.debug("πŸ”„ Falling back to Google Drive upload...")
            try:
                drive_result = _upload_to_drive(local_path, filename=blob_name)
                drive_result["gcs_filename"] = blob_name
                logger.debug(f"βœ… Fallback to Drive successful: {drive_result['url']}")
                return drive_result
            except Exception as drive_error:
                logger.error(f"❌ Drive fallback also failed: {drive_error}")
                raise e  # Re-raise original GCS error
        
        # Re-raise to let caller handle critical failure
        raise e

def list_gcs_files(prefix: str = None, account_name: str = "final_data") -> list[str]:
    """
    List all files in the bucket, optionally filtered by prefix.
    Returns files sorted by creation time (newest first).
    """
    try:
        if prefix is None:
            prefix = get_config_value("gcs_bucket_folder_name")

        client = get_gcs_client(account_name)
        bucket_name = get_config_value("GCS_BUCKET_NAME")
        bucket = client.bucket(bucket_name)

        blobs = list(client.list_blobs(bucket, prefix=prefix))
        
        # Sort by creation time (newest first)
        blobs_sorted = sorted(
            blobs,
            key=lambda b: b.time_created,
            reverse=True,
        )
        
        file_names = [blob.name for blob in blobs_sorted]
        logger.debug(f"πŸ“‚ Found {len(file_names)} files under '{prefix}'.")
        return file_names
        
    except Exception as e:
        logger.error(f"❌ Failed to list files: {e}")
        return []

def create_bucket_if_not_exists(client, bucket_name: str, location: str = "us-central1") -> bool:
    """
    Create a GCS bucket if it doesn't already exist.
    """
    try:
        bucket = client.bucket(bucket_name)
        
        if bucket.exists():
            logger.debug(f"βœ“ Bucket already exists: gs://{bucket_name}")
            return True
        
        # Bucket doesn't exist, create it
        # We need a project ID for creation.
        project_id = client.project
        if not project_id:
            logger.debug("No project ID found in client, trying to get from env.")
            project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") or os.environ.get("GCP_PROJECT") or os.environ.get("GCP_PROJECT_ID")
        
        if not project_id:
            logger.warning(f"⚠️ Cannot create bucket '{bucket_name}': No project ID found in client or env.")
            return False

        logger.debug(f"πŸ“¦ Creating new bucket: gs://{bucket_name} in {location} (Project: {project_id})")
        
        # Pass project explicitly if creating
        new_bucket = client.create_bucket(bucket_name, location=location, project=project_id)
        logger.debug(f"βœ… Bucket created successfully: gs://{new_bucket.name}")
        return True
        
    except Exception as e:
        logger.error(f"❌ Failed to create bucket {bucket_name}: {e}")
        return False

def find_and_download_gcs_file(tts_script: str, local_dir: str = "/tmp", account_name: str = "final_data") -> Optional[str]:
    """
    Find and download a file from GCS whose name contains part of the TTS script.
    Useful when filenames are generated dynamically based on the script text.
    Replaces APIClients.download_from_gcs.

    Args:
        tts_script: Text content used to search for the file name in GCS
        local_dir: Local directory to save the downloaded file
        account_name: GCS account to use

    Returns:
        Local path to downloaded file, or None on failure.
    """
    try:
        if get_config_value("test_automation"):
            return f"{get_config_value('TEST_DATA_DIRECTORY')}/{uuid.uuid4().hex}.mp4"

        # Prepare a safe pattern to search by
        safe_name_50 = "".join(c for c in tts_script[:50] if c.isalnum())
        safe_name_10 = "".join(c for c in tts_script[:10] if c.isalnum())
        
        all_files = list_gcs_files(prefix="", account_name=account_name)

        # Try to find a matching file name
        blob_name = next((f for f in all_files if tts_script in f), None)
        if not blob_name:
            blob_name = next((f for f in all_files if safe_name_50 in f), None)
        if not blob_name:
            blob_name = next((f for f in all_files if safe_name_10 in f), None)

        if not blob_name:
            logger.error(f"❌ No matching file found in GCS for script: {tts_script[:50]}...")
            return None

        client = get_gcs_client(account_name)
        bucket_name = get_config_value("GCS_BUCKET_NAME")
        
        logger.debug(f"☁️ Found matching file: gs://{bucket_name}/{blob_name}")

        # Use authenticated download via GCloudWrapper
        local_path = os.path.join(local_dir, os.path.basename(blob_name))
        
        try:
            # Download using authenticated GCS client
            bucket = client.bucket(bucket_name)
            blob = bucket.blob(blob_name)
            blob.download_to_filename(local_path)
            
            file_size = os.path.getsize(local_path)
            logger.debug(f"βœ… Downloaded {blob_name} β†’ {local_path} ({file_size/1024:.1f} KB)")
            return str(local_path)
        except Exception as download_error:
            logger.error(f"❌ GCS authenticated download failed: {download_error}")
            return None

    except Exception as e:
        logger.error(f"❌ GCS download failed: {e}")
        return None