jebin2 commited on
Commit
f67c2aa
Β·
1 Parent(s): e7fbeb6

feat: Replace file-based progress with Google Sheets logging

Browse files

- Add load_executed_from_gsheet() to fetch completed entries from GSheet
- Add log_progress_to_gsheet() to log progress by appending rows
- Remove old file-based progress tracking (get_progress_file, git_commit_progress)
- Update google_sheet_reader.py to create worksheet if not exists
- Add optional target_row param to create_or_update_sheet
- Support filtering by job_index and setup_type for parallel job tracking

Files changed (2) hide show
  1. src/google_sheet_reader.py +43 -14
  2. src/process_csv.py +68 -124
src/google_sheet_reader.py CHANGED
@@ -88,7 +88,15 @@ class GoogleSheetReader:
88
 
89
  if self.worksheet_name:
90
  logger.info("Opening worksheet: %s", self.worksheet_name)
91
- return spreadsheet.worksheet(self.worksheet_name)
 
 
 
 
 
 
 
 
92
 
93
  logger.info("Opening default worksheet (sheet1)")
94
  return spreadsheet.sheet1
@@ -250,14 +258,17 @@ class GoogleSheetReader:
250
  worksheet_name: str | None = None,
251
  header: list[str] = None,
252
  values: list[dict] = None,
 
253
  ):
254
  """
255
  Create or update a sheet + worksheet.
256
- Ensures headers exist and appends values.
257
 
258
  If sheet_name or worksheet_name not provided, uses instance's sheet.
259
 
260
  values: List of dicts where keys match header names
 
 
261
  """
262
 
263
  if not values:
@@ -297,22 +308,40 @@ class GoogleSheetReader:
297
  final_header = self._ensure_header(worksheet, header)
298
 
299
  # Convert dict rows -> ordered list rows
300
- rows_to_append = []
301
  for item in values:
302
  row = [item.get(col, "") for col in final_header]
303
- rows_to_append.append(row)
304
 
305
- logger.info(
306
- "Appending %d rows to %s / %s",
307
- len(rows_to_append),
308
- sheet_name or f"ID:{self.sheet_id}" if self.sheet_id else self.sheet_name,
309
- worksheet_name,
310
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
311
 
312
- worksheet.append_rows(
313
- rows_to_append,
314
- value_input_option="USER_ENTERED",
315
- )
316
 
317
 
318
  # ------------------ CSV Export ------------------
 
88
 
89
  if self.worksheet_name:
90
  logger.info("Opening worksheet: %s", self.worksheet_name)
91
+ try:
92
+ return spreadsheet.worksheet(self.worksheet_name)
93
+ except gspread.WorksheetNotFound:
94
+ logger.warning("Worksheet not found. Creating: %s", self.worksheet_name)
95
+ return spreadsheet.add_worksheet(
96
+ title=self.worksheet_name,
97
+ rows=1000,
98
+ cols=26,
99
+ )
100
 
101
  logger.info("Opening default worksheet (sheet1)")
102
  return spreadsheet.sheet1
 
258
  worksheet_name: str | None = None,
259
  header: list[str] = None,
260
  values: list[dict] = None,
261
+ target_row: int | None = None,
262
  ):
263
  """
264
  Create or update a sheet + worksheet.
265
+ Ensures headers exist and appends/inserts values.
266
 
267
  If sheet_name or worksheet_name not provided, uses instance's sheet.
268
 
269
  values: List of dicts where keys match header names
270
+ target_row: Optional 1-indexed row to write to. If row has data, inserts below it.
271
+ If None, appends to end of sheet.
272
  """
273
 
274
  if not values:
 
308
  final_header = self._ensure_header(worksheet, header)
309
 
310
  # Convert dict rows -> ordered list rows
311
+ rows_to_write = []
312
  for item in values:
313
  row = [item.get(col, "") for col in final_header]
314
+ rows_to_write.append(row)
315
 
316
+ if target_row is not None:
317
+ # Check if target row has data
318
+ try:
319
+ existing_row = worksheet.row_values(target_row)
320
+ except Exception:
321
+ existing_row = []
322
+
323
+ if existing_row and any(cell.strip() for cell in existing_row):
324
+ # Row has data, insert new row below it
325
+ worksheet.insert_rows(rows_to_write, row=target_row + 1, value_input_option="USER_ENTERED")
326
+ logger.info(f"Inserted {len(rows_to_write)} rows below row {target_row}")
327
+ else:
328
+ # Row is empty, write directly to it
329
+ cell_range = f"A{target_row}"
330
+ worksheet.update(values=rows_to_write, range_name=cell_range, value_input_option="USER_ENTERED")
331
+ logger.info(f"Updated row {target_row}")
332
+ else:
333
+ # No target row, append to end
334
+ logger.info(
335
+ "Appending %d rows to %s / %s",
336
+ len(rows_to_write),
337
+ sheet_name or f"ID:{self.sheet_id}" if self.sheet_id else self.sheet_name,
338
+ worksheet_name,
339
+ )
340
+ worksheet.append_rows(
341
+ rows_to_write,
342
+ value_input_option="USER_ENTERED",
343
+ )
344
 
 
 
 
 
345
 
346
 
347
  # ------------------ CSV Export ------------------
src/process_csv.py CHANGED
@@ -1,8 +1,8 @@
1
  import asyncio
2
  import csv
3
- import subprocess
4
  import os, time
5
  from pathlib import Path
 
6
  from load_config import load_configuration
7
  from main import (
8
  load_content_strategies,
@@ -12,120 +12,87 @@ from automation import ContentAutomation
12
  from utils import logger
13
  from data_holder import DataHolder
14
  from asset_selector import AssetSelector
 
15
  import argparse
16
- import random
17
  import uuid
18
 
19
  DATA_DIR = Path("data")
20
  ALL_VIDEO_FILE_INFO = None
21
 
22
- def get_progress_file(job_index=None):
23
- """Get the appropriate progress file for this job."""
24
- if job_index is not None:
25
- return DATA_DIR / f"executed_lines_job{job_index}.txt"
26
- return DATA_DIR / "executed_lines.txt"
27
-
28
 
29
- def load_all_executed_lines():
30
- """Load executed lines from all job-specific progress files.
31
- Returns a list (not a set) to preserve duplicate counts.
32
- """
33
- executed = [] # Changed from set() to list
34
 
35
- # Load from main progress file
36
- main_progress = DATA_DIR / "executed_lines.txt"
37
- if main_progress.exists():
38
- with open(main_progress, "r") as f:
39
- executed.extend(x.strip() for x in f if x.strip())
40
 
41
- # Load from all job-specific files
42
- for job_file in DATA_DIR.glob("executed_lines_job*.txt"):
43
- with open(job_file, "r") as f:
44
- executed.extend(x.strip() for x in f if x.strip())
45
 
46
- return executed # Returns list instead of set
47
-
48
-
49
- def git_commit_progress(job_index: int, commit=False):
50
- """
51
- Commit progress for a specific job. This flow is robust for parallel and CI/CD environments.
52
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
54
- print("Testing do not publish.")
55
  return
56
 
57
  if not commit:
58
- print("ℹ️ Skipping git commit (use --commit to enable).")
59
  return
60
 
61
- progress_file = get_progress_file(job_index)
62
- if not progress_file.exists():
63
- logger.info(f"ℹ️ No progress file found for job {job_index}. Nothing to commit.")
64
  return
65
 
66
  try:
67
- branch = "feature/video-revamp"
68
- max_retries = 3
69
-
70
- # 1. Ensure we are on the correct branch and not in a detached HEAD state.
71
- # This is critical for CI/CD environments like GitHub Actions.
72
- logger.info(f"Git: Ensuring we are on branch '{branch}' for job {job_index}...")
73
- subprocess.run(["git", "fetch", "origin", branch], check=True, capture_output=True)
74
- subprocess.run(["git", "checkout", "-B", branch, f"origin/{branch}"], check=True, capture_output=True)
75
-
76
- # 2. Stage and commit the local changes.
77
- subprocess.run(["git", "add", str(progress_file)], check=True)
78
- commit_result = subprocess.run(
79
- ["git", "commit", "-m", f"βœ… Job {job_index} progress update"],
80
- capture_output=True, text=True # Capture output to check it
81
  )
82
 
83
- # If the commit was successful, proceed. If not, check if it's just "nothing to commit".
84
- if commit_result.returncode != 0:
85
- if "nothing to commit" in commit_result.stdout or "nothing to commit" in commit_result.stderr:
86
- logger.info(f"ℹ️ No new progress to commit for job {job_index}.")
87
- return # This is a successful state, no need to push.
88
- else:
89
- # The commit failed for another reason. Raise an error.
90
- raise subprocess.CalledProcessError(
91
- commit_result.returncode, cmd=commit_result.args,
92
- output=commit_result.stdout, stderr=commit_result.stderr
93
- )
94
 
95
- # 3. Push with a retry loop to handle race conditions from other parallel jobs.
96
- for attempt in range(max_retries):
97
- try:
98
- # Pull with rebase immediately before pushing to resolve any new remote commits.
99
- logger.info(f"Git: Pulling with rebase (Attempt {attempt + 1}/{max_retries})...")
100
- subprocess.run(["git", "pull", "--rebase", "origin", branch], check=True, capture_output=True)
101
-
102
- # Attempt the push.
103
- logger.info(f"Git: Pushing to remote (Attempt {attempt + 1}/{max_retries})...")
104
- subprocess.run(
105
- ["git", "push", "origin", branch],
106
- check=True,
107
- timeout=45
108
- )
109
-
110
- logger.info(f"βœ“ Committed progress successfully for job {job_index}")
111
- return # Exit function on success
112
-
113
- except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
114
- if attempt < max_retries - 1:
115
- # Use randomized exponential backoff to de-synchronize retries
116
- sleep_duration = random.uniform(2, 5) * (attempt + 1)
117
- logger.warning(
118
- f"Push failed for job {job_index}. Retrying in {sleep_duration:.2f} seconds..."
119
- )
120
- time.sleep(sleep_duration)
121
- else:
122
- logger.error(f"❌ Failed to push progress for job {job_index} after {max_retries} attempts.")
123
- raise # Re-raise the final exception to signal a persistent failure
124
-
125
- except subprocess.CalledProcessError as e:
126
- error_message = e.stderr if e.stderr else e.stdout
127
- logger.error(f"❌ A Git command failed for job {job_index}: {e.cmd}\nError: {error_message}")
128
- subprocess.run(["git", "reset", "--hard"])
129
 
130
  async def process_row(row, config: dict):
131
  """Process one CSV row using the main pipeline."""
@@ -197,11 +164,8 @@ async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None
197
  return
198
 
199
  # Load executed lines from ALL progress files (main + all jobs)
200
- executed = load_all_executed_lines()
201
  logger.info(f"Skipping {len(executed)} already executed entries (from all jobs).")
202
-
203
- # Get this job's progress file
204
- progress_file = get_progress_file(job_index)
205
 
206
  # Load all rows from all CSVs with their global index
207
  all_rows = []
@@ -217,7 +181,6 @@ async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None
217
  if idx % total_jobs == job_index
218
  ]
219
  logger.info(f"πŸ”’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows")
220
- logger.info(f"πŸ“ Progress file: {progress_file}")
221
  else:
222
  rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)]
223
  logger.info(f"Processing all {len(rows_to_process)} rows")
@@ -283,12 +246,7 @@ async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None
283
  success_count += 1
284
  processed_scripts.append(tts_script) # Track in session
285
 
286
- if os.getenv("DO_NOT_PUBLISH", "false").lower() != "true":
287
- # Write progress to THIS JOB's file only
288
- with progress_file.open("a") as pf:
289
- pf.write(f"{tts_script}\n")
290
-
291
- git_commit_progress(job_index, commit)
292
 
293
  logger.info(
294
  f"βœ… {tts_script[:50]}... occurrence {occurrence}/{total_needed} "
@@ -296,8 +254,6 @@ async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None
296
  )
297
  else:
298
  logger.warning(f"⚠️ {tts_script[:50]}... pipeline failed, NOT marking as complete")
299
- if success_count % 2 == 1:
300
- git_commit_progress(job_index, commit)
301
 
302
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
303
  break
@@ -305,10 +261,6 @@ async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None
305
  logger.error(f"❌ Error processing {tts_script[:50]}...: {e}", exc_info=True)
306
  continue
307
 
308
- # Final commit for any remaining progress
309
- if commit and success_count > 0:
310
- git_commit_progress(job_index, commit)
311
-
312
  logger.info(f"🏁 Job {job_index} finished: {success_count}/{processed_count} successful")
313
 
314
  async def create_plain_videos(config, commit=False, job_index=None, total_jobs=None):
@@ -320,16 +272,10 @@ async def create_plain_videos(config, commit=False, job_index=None, total_jobs=N
320
  n = int(os.getenv("PlAIN_VIDEO_COUNT", 100))
321
  logger.info(f"Creating {n} random videos for testing...")
322
 
323
- progress_file = get_progress_file(job_index)
324
-
325
- # How many videos already created across ALL jobs
326
- completed = 0
327
- if progress_file.exists():
328
- with progress_file.open("r") as pf:
329
- completed = sum(1 for line in pf if os.getenv("SETUP_TYPE") in line.strip())
330
- else:
331
- progress_file.touch()
332
- completed = 0
333
 
334
  # --- ASSIGN SLICE TO THIS JOB ---
335
  per_job = n / total_jobs
@@ -350,12 +296,10 @@ async def create_plain_videos(config, commit=False, job_index=None, total_jobs=N
350
 
351
  config["current_audio_index"] = i
352
  result = await process_row(row, config)
 
353
 
354
  if commit and result.get("success", False):
355
- with progress_file.open("a") as pf:
356
- print(result)
357
- pf.write(f"{result.get('final_url').split('/')[-1]}\n")
358
- git_commit_progress(job_index, commit)
359
 
360
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
361
  break
 
1
  import asyncio
2
  import csv
 
3
  import os, time
4
  from pathlib import Path
5
+ from datetime import datetime
6
  from load_config import load_configuration
7
  from main import (
8
  load_content_strategies,
 
12
  from utils import logger
13
  from data_holder import DataHolder
14
  from asset_selector import AssetSelector
15
+ from google_sheet_reader import GoogleSheetReader
16
  import argparse
 
17
  import uuid
18
 
19
  DATA_DIR = Path("data")
20
  ALL_VIDEO_FILE_INFO = None
21
 
 
 
 
 
 
 
22
 
23
+ def load_executed_from_gsheet(setup_type=None, job_index=None):
24
+ """Load executed scripts from Google Sheets logs.
25
+ Returns a list of TTS Scripts that have Success=True.
 
 
26
 
27
+ Args:
28
+ setup_type: Optional. If provided, only count rows where Safe Name contains this value.
29
+ job_index: Optional. If provided, only count rows for this specific job.
30
+ """
31
+ gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
32
 
33
+ if not gsheet_name:
34
+ logger.warning("GSHEET_WORKSHEET_LOGS not set")
35
+ return []
 
36
 
37
+ try:
38
+ reader = GoogleSheetReader(worksheet_name=gsheet_name)
39
+ df = reader.get_dataframe()
40
+
41
+ # Filter for successful entries only
42
+ if "Success" in df.columns and "Hook" in df.columns:
43
+ successful = df[df["Success"].str.lower() == "true"]
44
+
45
+ # Optional: filter by job_index
46
+ if job_index is not None and "Job" in df.columns:
47
+ successful = successful[successful["Job"] == str(job_index)]
48
+
49
+ # Optional: filter by setup_type in Final URL
50
+ if setup_type and "Final URL" in df.columns:
51
+ successful = successful[successful["Final URL"].str.contains(setup_type, na=False)]
52
+
53
+ executed = successful["Hook"].tolist()
54
+ logger.info(f"Loaded {len(executed)} executed entries from Google Sheets")
55
+ return executed
56
+ else:
57
+ logger.warning("Google Sheets missing required columns (Success, Hook)")
58
+ return []
59
+ except Exception as e:
60
+ logger.error(f"Failed to load from Google Sheets: {e}")
61
+ return []
62
+
63
+
64
+ def log_progress_to_gsheet(tts_script: str, result: dict, job_index: int, commit=False):
65
+ """Log progress to Google Sheets by appending rows."""
66
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
 
67
  return
68
 
69
  if not commit:
 
70
  return
71
 
72
+ gsheet_name = os.getenv("GSHEET_WORKSHEET_LOGS")
73
+ if not gsheet_name:
74
+ logger.warning("GSHEET_WORKSHEET_LOGS not set, skipping gsheet logging")
75
  return
76
 
77
  try:
78
+ reader = GoogleSheetReader(worksheet_name=gsheet_name)
79
+
80
+ reader.create_or_update_sheet(
81
+ worksheet_name=gsheet_name,
82
+ header=["Timestamp", "Job", "Hook", "Success", "Final URL"],
83
+ values=[{
84
+ "Timestamp": datetime.now().isoformat(),
85
+ "Job": str(job_index if job_index is not None else 0),
86
+ "Hook": tts_script,
87
+ "Success": str(result.get("success", False)),
88
+ "Final URL": result.get("final_url", ""),
89
+ }],
 
 
90
  )
91
 
92
+ logger.info(f"βœ“ Logged progress to Google Sheet for job {job_index}")
 
 
 
 
 
 
 
 
 
 
93
 
94
+ except Exception as e:
95
+ logger.error(f"❌ Failed to log to Google Sheet: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  async def process_row(row, config: dict):
98
  """Process one CSV row using the main pipeline."""
 
164
  return
165
 
166
  # Load executed lines from ALL progress files (main + all jobs)
167
+ executed = load_executed_from_gsheet()
168
  logger.info(f"Skipping {len(executed)} already executed entries (from all jobs).")
 
 
 
169
 
170
  # Load all rows from all CSVs with their global index
171
  all_rows = []
 
181
  if idx % total_jobs == job_index
182
  ]
183
  logger.info(f"πŸ”’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows")
 
184
  else:
185
  rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)]
186
  logger.info(f"Processing all {len(rows_to_process)} rows")
 
246
  success_count += 1
247
  processed_scripts.append(tts_script) # Track in session
248
 
249
+ log_progress_to_gsheet(tts_script, result, job_index, commit)
 
 
 
 
 
250
 
251
  logger.info(
252
  f"βœ… {tts_script[:50]}... occurrence {occurrence}/{total_needed} "
 
254
  )
255
  else:
256
  logger.warning(f"⚠️ {tts_script[:50]}... pipeline failed, NOT marking as complete")
 
 
257
 
258
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
259
  break
 
261
  logger.error(f"❌ Error processing {tts_script[:50]}...: {e}", exc_info=True)
262
  continue
263
 
 
 
 
 
264
  logger.info(f"🏁 Job {job_index} finished: {success_count}/{processed_count} successful")
265
 
266
  async def create_plain_videos(config, commit=False, job_index=None, total_jobs=None):
 
272
  n = int(os.getenv("PlAIN_VIDEO_COUNT", 100))
273
  logger.info(f"Creating {n} random videos for testing...")
274
 
275
+ # Load how many already completed from Google Sheets (filtered by SETUP_TYPE and job_index)
276
+ setup_type = os.getenv("SETUP_TYPE")
277
+ executed = load_executed_from_gsheet(setup_type=setup_type, job_index=job_index)
278
+ completed = len(executed)
 
 
 
 
 
 
279
 
280
  # --- ASSIGN SLICE TO THIS JOB ---
281
  per_job = n / total_jobs
 
296
 
297
  config["current_audio_index"] = i
298
  result = await process_row(row, config)
299
+ print(result)
300
 
301
  if commit and result.get("success", False):
302
+ log_progress_to_gsheet(row.get("TTS Script (AI Avatar)", ""), result, job_index, commit)
 
 
 
303
 
304
  if os.getenv("DO_NOT_PUBLISH", "false").lower() == "true":
305
  break