jebin2 commited on
Commit
37981e0
Β·
1 Parent(s): 2008dd3

feat: add Google Drive upload fallback and cleanup support

Browse files

- Add drive_utils.py with upload_file_to_drive() and delete_file_from_drive()
- Modify upload_file_to_gcs() to fallback to Drive on GCS failure (default: enabled)
- Update cleanup_manager.py to handle Drive URL deletions
- Rename 'Hook' column to 'Source Data' in Google Sheets logs
- Remove 'Success' column from logs (all logged entries are successful)
- Export drive utilities in google_src/__init__.py

src/cleanup_manager.py CHANGED
@@ -3,6 +3,7 @@ import os
3
  from utils import logger
4
  from google_sheet_reader import GoogleSheetReader
5
  from google_src.gcs_utils import delete_gcs_file
 
6
 
7
  def extract_blob_name_from_url(url: str) -> str | None:
8
  """
@@ -19,10 +20,14 @@ def extract_blob_name_from_url(url: str) -> str | None:
19
  return "/".join(parts[2:]) # Skip domain and bucket
20
  return None
21
 
 
 
 
 
22
  async def process_delete_entries():
23
  """
24
  Check Google Sheet logs for entries marked 'Delete Entry' = TRUE.
25
- Delete corresponding video files from GCS and then remove the row from the sheet.
26
  """
27
  gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
28
  if not gsheet_name:
@@ -49,6 +54,7 @@ async def process_delete_entries():
49
  delete_flag = str(row["Delete Entry"]).strip().lower()
50
  if delete_flag == "true":
51
  gcs_filename = ""
 
52
 
53
  # Check GCS Filename column first
54
  if has_filename:
@@ -57,16 +63,19 @@ async def process_delete_entries():
57
  if "/" in val or "." in val:
58
  gcs_filename = val
59
 
60
- # Fallback to URL if filename is empty or looks like just a hash (no slash/dot)
61
- if (not gcs_filename) and has_url:
62
  final_url = str(row["Final URL"]).strip()
 
 
 
63
  extracted = extract_blob_name_from_url(final_url)
64
  if extracted:
65
  logger.info(f"Derived GCS filename from URL: {extracted}")
66
  gcs_filename = extracted
67
 
68
- if gcs_filename:
69
- files_to_delete.append((idx + 2, gcs_filename))
70
  else:
71
  # If no filename found, just mark row for deletion
72
  to_delete_indices.append(idx + 2)
@@ -78,14 +87,23 @@ async def process_delete_entries():
78
  logger.info(f"Found {len(files_to_delete) + len(to_delete_indices)} entries to delete.")
79
 
80
  # Process file deletions
81
- for sheet_row_idx, filename in files_to_delete:
82
- # Check if filename is suspiciouly short (like just a hash) to avoid deleting buckets or wrong things
83
- if len(filename) < 5 or "." not in filename:
 
 
 
 
 
 
 
 
 
 
84
  logger.warning(f"⚠️ Filename '{filename}' looks invalid. Skipping file deletion, but will delete row.")
85
  to_delete_indices.append(sheet_row_idx)
86
  continue
87
 
88
-
89
  success = delete_gcs_file(filename)
90
  if success:
91
  to_delete_indices.append(sheet_row_idx)
 
3
  from utils import logger
4
  from google_sheet_reader import GoogleSheetReader
5
  from google_src.gcs_utils import delete_gcs_file
6
+ from google_src.drive_utils import delete_file_from_drive
7
 
8
  def extract_blob_name_from_url(url: str) -> str | None:
9
  """
 
20
  return "/".join(parts[2:]) # Skip domain and bucket
21
  return None
22
 
23
+ def is_drive_url(url: str) -> bool:
24
+ """Check if URL is a Google Drive URL."""
25
+ return url and "drive.google.com" in url
26
+
27
  async def process_delete_entries():
28
  """
29
  Check Google Sheet logs for entries marked 'Delete Entry' = TRUE.
30
+ Delete corresponding video files from GCS or Drive and then remove the row from the sheet.
31
  """
32
  gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
33
  if not gsheet_name:
 
54
  delete_flag = str(row["Delete Entry"]).strip().lower()
55
  if delete_flag == "true":
56
  gcs_filename = ""
57
+ final_url = ""
58
 
59
  # Check GCS Filename column first
60
  if has_filename:
 
63
  if "/" in val or "." in val:
64
  gcs_filename = val
65
 
66
+ # Get URL for Drive detection
67
+ if has_url:
68
  final_url = str(row["Final URL"]).strip()
69
+
70
+ # Fallback to URL if filename is empty or looks like just a hash (no slash/dot)
71
+ if (not gcs_filename) and final_url:
72
  extracted = extract_blob_name_from_url(final_url)
73
  if extracted:
74
  logger.info(f"Derived GCS filename from URL: {extracted}")
75
  gcs_filename = extracted
76
 
77
+ if gcs_filename or final_url:
78
+ files_to_delete.append((idx + 2, gcs_filename, final_url))
79
  else:
80
  # If no filename found, just mark row for deletion
81
  to_delete_indices.append(idx + 2)
 
87
  logger.info(f"Found {len(files_to_delete) + len(to_delete_indices)} entries to delete.")
88
 
89
  # Process file deletions
90
+ for sheet_row_idx, filename, url in files_to_delete:
91
+ # Check if it's a Drive URL - delete from Drive
92
+ if is_drive_url(url):
93
+ success = delete_file_from_drive(url)
94
+ if success:
95
+ to_delete_indices.append(sheet_row_idx)
96
+ else:
97
+ logger.error(f"Skipping row deletion for row {sheet_row_idx} due to Drive file deletion failure.")
98
+ continue
99
+
100
+ # Otherwise, delete from GCS
101
+ # Check if filename is suspiciously short (like just a hash) to avoid deleting buckets or wrong things
102
+ if not filename or len(filename) < 5 or "." not in filename:
103
  logger.warning(f"⚠️ Filename '{filename}' looks invalid. Skipping file deletion, but will delete row.")
104
  to_delete_indices.append(sheet_row_idx)
105
  continue
106
 
 
107
  success = delete_gcs_file(filename)
108
  if success:
109
  to_delete_indices.append(sheet_row_idx)
src/google_src/__init__.py CHANGED
@@ -12,6 +12,8 @@ from .gcloud_wrapper import (
12
  get_default_wrapper,
13
  )
14
 
 
 
15
  __all__ = [
16
  "GCloudWrapper",
17
  "GCloudAccount",
@@ -19,4 +21,7 @@ __all__ = [
19
  "create_wrapper_from_env",
20
  "create_default_wrapper",
21
  "get_default_wrapper",
 
 
22
  ]
 
 
12
  get_default_wrapper,
13
  )
14
 
15
+ from .drive_utils import upload_file_to_drive, delete_file_from_drive
16
+
17
  __all__ = [
18
  "GCloudWrapper",
19
  "GCloudAccount",
 
21
  "create_wrapper_from_env",
22
  "create_default_wrapper",
23
  "get_default_wrapper",
24
+ "upload_file_to_drive",
25
+ "delete_file_from_drive",
26
  ]
27
+
src/google_src/drive_utils.py ADDED
@@ -0,0 +1,206 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Google Drive utilities for centralized upload operations.
3
+ Uses test_data credentials for Drive uploads as fallback when GCS fails.
4
+ """
5
+ import os
6
+ import uuid
7
+ from pathlib import Path
8
+ from typing import Optional
9
+ from utils import logger
10
+ from .gcloud_wrapper import get_default_wrapper
11
+
12
+ # MIME type mappings for common file extensions
13
+ MIME_TYPES = {
14
+ ".mp4": "video/mp4",
15
+ ".mov": "video/quicktime",
16
+ ".avi": "video/x-msvideo",
17
+ ".mkv": "video/x-matroska",
18
+ ".mp3": "audio/mpeg",
19
+ ".wav": "audio/wav",
20
+ ".aac": "audio/aac",
21
+ ".m4a": "audio/mp4",
22
+ ".png": "image/png",
23
+ ".jpg": "image/jpeg",
24
+ ".jpeg": "image/jpeg",
25
+ ".gif": "image/gif",
26
+ ".webp": "image/webp",
27
+ ".json": "application/json",
28
+ ".txt": "text/plain",
29
+ ".srt": "text/plain",
30
+ }
31
+
32
+
33
+ def get_drive_service(account_name: str = "test_data"):
34
+ """
35
+ Get a configured Google Drive service for the specified account.
36
+ Defaults to 'test_data' for uploads.
37
+ """
38
+ wrapper = get_default_wrapper()
39
+ creds = wrapper._get_credentials(account_name)
40
+
41
+ from googleapiclient.discovery import build
42
+ return build("drive", "v3", credentials=creds)
43
+
44
+
45
+ def upload_file_to_drive(
46
+ local_path: str,
47
+ folder_id: Optional[str] = None,
48
+ account_name: str = "test_data",
49
+ ) -> dict:
50
+ """
51
+ Upload a local file to Google Drive.
52
+
53
+ Args:
54
+ local_path: Path to local file.
55
+ folder_id: Optional Drive folder ID to upload to. If None, uses DRIVE_UPLOAD_FOLDER_ID env var.
56
+ account_name: Account to use (default: test_data).
57
+
58
+ Returns:
59
+ dict: {
60
+ "file_id": str,
61
+ "url": str (viewable link),
62
+ "download_url": str (direct download link)
63
+ }
64
+ """
65
+ from googleapiclient.http import MediaFileUpload
66
+
67
+ try:
68
+ service = get_drive_service(account_name)
69
+
70
+ # Determine folder ID
71
+ target_folder_id = folder_id or os.getenv("DRIVE_UPLOAD_FOLDER_ID")
72
+
73
+ # Get file info
74
+ file_path = Path(local_path)
75
+ file_ext = file_path.suffix.lower()
76
+
77
+ # Generate unique filename to avoid conflicts
78
+ unique_name = f"{uuid.uuid4().hex[:8]}_{file_path.name}"
79
+
80
+ # Get MIME type
81
+ mime_type = MIME_TYPES.get(file_ext, "application/octet-stream")
82
+
83
+ # Prepare file metadata
84
+ file_metadata = {
85
+ "name": unique_name,
86
+ }
87
+
88
+ if target_folder_id:
89
+ file_metadata["parents"] = [target_folder_id]
90
+
91
+ # Create media upload object
92
+ media = MediaFileUpload(
93
+ local_path,
94
+ mimetype=mime_type,
95
+ resumable=True
96
+ )
97
+
98
+ logger.info(f"πŸ“€ Uploading {file_path.name} to Google Drive...")
99
+
100
+ # Execute upload
101
+ file = service.files().create(
102
+ body=file_metadata,
103
+ media_body=media,
104
+ fields="id, name, webViewLink, webContentLink"
105
+ ).execute()
106
+
107
+ file_id = file.get("id")
108
+
109
+ # Make file publicly accessible (anyone with link can view)
110
+ try:
111
+ service.permissions().create(
112
+ fileId=file_id,
113
+ body={
114
+ "type": "anyone",
115
+ "role": "reader"
116
+ }
117
+ ).execute()
118
+ logger.info(f"βœ… File shared publicly")
119
+ except Exception as e:
120
+ logger.warning(f"⚠️ Could not make file public: {e}")
121
+
122
+ # Construct URLs
123
+ view_url = file.get("webViewLink", f"https://drive.google.com/file/d/{file_id}/view")
124
+ download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
125
+
126
+ result = {
127
+ "file_id": file_id,
128
+ "filename": unique_name,
129
+ "gcs_filename": f"drive/{unique_name}", # For compatibility with GCS result format
130
+ "url": view_url,
131
+ "public_url": view_url,
132
+ "download_url": download_url,
133
+ }
134
+
135
+ logger.info(f"βœ… Uploaded to Drive: {view_url}")
136
+ return result
137
+
138
+ except Exception as e:
139
+ logger.error(f"❌ Failed to upload to Drive: {e}")
140
+ raise e
141
+
142
+
143
+ def extract_drive_file_id(url: str) -> str | None:
144
+ """
145
+ Extract file ID from various Google Drive URL formats:
146
+ - https://drive.google.com/file/d/FILE_ID/view
147
+ - https://drive.google.com/open?id=FILE_ID
148
+ - https://drive.google.com/uc?id=FILE_ID
149
+ - https://drive.google.com/uc?export=download&id=FILE_ID
150
+ """
151
+ import re
152
+
153
+ if not url or "drive.google.com" not in url:
154
+ return None
155
+
156
+ # Pattern 1: /file/d/FILE_ID/
157
+ match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url)
158
+ if match:
159
+ return match.group(1)
160
+
161
+ # Pattern 2: ?id=FILE_ID or &id=FILE_ID
162
+ match = re.search(r'[?&]id=([a-zA-Z0-9_-]+)', url)
163
+ if match:
164
+ return match.group(1)
165
+
166
+ return None
167
+
168
+
169
+ def delete_file_from_drive(
170
+ url_or_file_id: str,
171
+ account_name: str = "test_data",
172
+ ) -> bool:
173
+ """
174
+ Delete a file from Google Drive.
175
+
176
+ Args:
177
+ url_or_file_id: Either a Drive URL or direct file ID.
178
+ account_name: Account to use (default: test_data).
179
+
180
+ Returns:
181
+ bool: True if deleted successfully, False otherwise.
182
+ """
183
+ try:
184
+ service = get_drive_service(account_name)
185
+
186
+ # Extract file ID if URL was provided
187
+ if url_or_file_id.startswith("http"):
188
+ file_id = extract_drive_file_id(url_or_file_id)
189
+ if not file_id:
190
+ logger.error(f"❌ Could not extract file ID from URL: {url_or_file_id}")
191
+ return False
192
+ else:
193
+ file_id = url_or_file_id
194
+
195
+ logger.info(f"πŸ—‘οΈ Deleting file from Drive: {file_id}")
196
+ service.files().delete(fileId=file_id).execute()
197
+ logger.info(f"βœ… Deleted from Drive: {file_id}")
198
+ return True
199
+
200
+ except Exception as e:
201
+ error_str = str(e)
202
+ if "404" in error_str or "not found" in error_str.lower():
203
+ logger.info(f"⚠️ File not found (already deleted?): {url_or_file_id}")
204
+ return True # Consider it success if file doesn't exist
205
+ logger.error(f"❌ Failed to delete from Drive: {e}")
206
+ return False
src/google_src/gcs_utils.py CHANGED
@@ -71,7 +71,8 @@ def delete_gcs_file(filename: str, bucket_name: Optional[str] = None, account_na
71
  def upload_file_to_gcs(
72
  local_path: str,
73
  account_name: str = "final_data",
74
- generate_signed_url: bool = True
 
75
  ) -> dict:
76
  """
77
  Upload a local file to GCS.
@@ -82,12 +83,14 @@ def upload_file_to_gcs(
82
  bucket_name: Target bucket name.
83
  account_name: Account to use.
84
  generate_signed_url: Whether to generate a signed URL.
 
85
 
86
  Returns:
87
  dict: {
88
  "gcs_filename": str,
89
  "url": str (signed or public),
90
- "public_url": str
 
91
  }
92
  """
93
  import os
@@ -135,7 +138,8 @@ def upload_file_to_gcs(
135
  result = {
136
  "gcs_filename": blob_name,
137
  "public_url": public_url,
138
- "url": public_url
 
139
  }
140
 
141
  if generate_signed_url:
@@ -156,6 +160,20 @@ def upload_file_to_gcs(
156
 
157
  except Exception as e:
158
  logger.error(f"❌ Failed to upload to GCS: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  # Re-raise to let caller handle critical failure
160
  raise e
161
 
 
71
  def upload_file_to_gcs(
72
  local_path: str,
73
  account_name: str = "final_data",
74
+ generate_signed_url: bool = True,
75
+ fallback_to_drive: bool = True,
76
  ) -> dict:
77
  """
78
  Upload a local file to GCS.
 
83
  bucket_name: Target bucket name.
84
  account_name: Account to use.
85
  generate_signed_url: Whether to generate a signed URL.
86
+ fallback_to_drive: If True, fallback to Google Drive upload on GCS failure.
87
 
88
  Returns:
89
  dict: {
90
  "gcs_filename": str,
91
  "url": str (signed or public),
92
+ "public_url": str,
93
+ "storage_type": str ("gcs" or "drive")
94
  }
95
  """
96
  import os
 
138
  result = {
139
  "gcs_filename": blob_name,
140
  "public_url": public_url,
141
+ "url": public_url,
142
+ "storage_type": "gcs"
143
  }
144
 
145
  if generate_signed_url:
 
160
 
161
  except Exception as e:
162
  logger.error(f"❌ Failed to upload to GCS: {e}")
163
+
164
+ # Fallback to Google Drive if enabled
165
+ if fallback_to_drive:
166
+ logger.info("πŸ”„ Falling back to Google Drive upload...")
167
+ try:
168
+ from .drive_utils import upload_file_to_drive
169
+ drive_result = upload_file_to_drive(local_path, account_name="test_data")
170
+ drive_result["storage_type"] = "drive"
171
+ logger.info(f"βœ… Fallback to Drive successful: {drive_result['url']}")
172
+ return drive_result
173
+ except Exception as drive_error:
174
+ logger.error(f"❌ Drive fallback also failed: {drive_error}")
175
+ raise e # Re-raise original GCS error
176
+
177
  # Re-raise to let caller handle critical failure
178
  raise e
179
 
src/process_csv.py CHANGED
@@ -27,10 +27,10 @@ SHARED_API_CLIENTS = None # Shared instance to avoid redundant GCS/TTS client i
27
 
28
  def load_executed_from_gsheet(setup_type=None, job_index=None):
29
  """Load executed scripts from Google Sheets logs.
30
- Returns a list of TTS Scripts that have Success=True.
31
 
32
  Args:
33
- setup_type: Optional. If provided, only count rows where Safe Name contains this value.
34
  job_index: Optional. If provided, only count rows for this specific job.
35
  """
36
  gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
@@ -43,23 +43,22 @@ def load_executed_from_gsheet(setup_type=None, job_index=None):
43
  reader = GoogleSheetReader(worksheet_name=gsheet_name)
44
  df = reader.get_dataframe()
45
 
46
- # Filter for successful entries only
47
- if "Success" in df.columns and "Hook" in df.columns:
48
- successful = df[df["Success"].str.lower() == "true"]
49
 
50
  # Optional: filter by job_index
51
  if job_index is not None and "Job" in df.columns:
52
- successful = successful[successful["Job"] == str(job_index)]
53
 
54
  # Optional: filter by setup_type in Final URL
55
  if setup_type and "Final URL" in df.columns:
56
- successful = successful[successful["Final URL"].str.contains(setup_type, na=False)]
57
 
58
- executed = successful["Hook"].tolist()
59
  logger.info(f"Loaded {len(executed)} executed entries from Google Sheets")
60
  return executed
61
  else:
62
- logger.warning("Google Sheets missing required columns (Success, Hook)")
63
  return []
64
  except Exception as e:
65
  logger.error(f"Failed to load from Google Sheets: {e}")
@@ -81,12 +80,11 @@ def log_progress_to_gsheet(tts_script: str, result: dict, job_index: int, commit
81
 
82
  reader.create_or_update_sheet(
83
  worksheet_name=gsheet_name,
84
- header=["Timestamp", "Job", "Hook", "Success", "Final URL", "GCS Filename", "Delete Entry"],
85
  values=[{
86
  "Timestamp": datetime.now().isoformat(),
87
  "Job": str(job_index if job_index is not None else 0),
88
- "Hook": tts_script,
89
- "Success": str(result.get("success", False)),
90
  "Final URL": result.get("final_url", ""),
91
  "GCS Filename": result.get("gcs_filename", ""),
92
  "Delete Entry": "False",
 
27
 
28
  def load_executed_from_gsheet(setup_type=None, job_index=None):
29
  """Load executed scripts from Google Sheets logs.
30
+ Returns a list of Source Data values (all entries are considered executed).
31
 
32
  Args:
33
+ setup_type: Optional. If provided, only count rows where Final URL contains this value.
34
  job_index: Optional. If provided, only count rows for this specific job.
35
  """
36
  gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
 
43
  reader = GoogleSheetReader(worksheet_name=gsheet_name)
44
  df = reader.get_dataframe()
45
 
46
+ if "Source Data" in df.columns:
47
+ result_df = df
 
48
 
49
  # Optional: filter by job_index
50
  if job_index is not None and "Job" in df.columns:
51
+ result_df = result_df[result_df["Job"] == str(job_index)]
52
 
53
  # Optional: filter by setup_type in Final URL
54
  if setup_type and "Final URL" in df.columns:
55
+ result_df = result_df[result_df["Final URL"].str.contains(setup_type, na=False)]
56
 
57
+ executed = result_df["Source Data"].tolist()
58
  logger.info(f"Loaded {len(executed)} executed entries from Google Sheets")
59
  return executed
60
  else:
61
+ logger.warning("Google Sheets missing required column (Source Data)")
62
  return []
63
  except Exception as e:
64
  logger.error(f"Failed to load from Google Sheets: {e}")
 
80
 
81
  reader.create_or_update_sheet(
82
  worksheet_name=gsheet_name,
83
+ header=["Timestamp", "Job", "Source Data", "Final URL", "GCS Filename", "Delete Entry"],
84
  values=[{
85
  "Timestamp": datetime.now().isoformat(),
86
  "Job": str(job_index if job_index is not None else 0),
87
+ "Source Data": tts_script,
 
88
  "Final URL": result.get("final_url", ""),
89
  "GCS Filename": result.get("gcs_filename", ""),
90
  "Delete Entry": "False",