File size: 5,726 Bytes
54639e8 b936660 54639e8 5f6fd2a 9d54dfd f20025d 54639e8 0b00536 54639e8 b2426a5 503d4ac 54639e8 b14d442 54639e8 5f00d5a 54639e8 5f00d5a b936660 54639e8 503d4ac 54639e8 503d4ac 54639e8 2df2397 54639e8 2df2397 54639e8 503d4ac 2df2397 0b00536 2df2397 54639e8 5f00d5a 54639e8 0b00536 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import os
from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet
from pipeline_processor import process_single_row, configure_row_settings
from cleanup_manager import process_delete_entries
from src.asset_manager import get_content_strategy_lib
from src.config import get_config_value, set_config_value
from src.logger_config import logger
async def run_content_strategy_workflow(commit=False, job_index=None, total_jobs=None):
"""Process content strategies from Google Sheet with progress tracking."""
# Run delete cleanup first
await process_delete_entries()
generated_results = []
# Load executed lines from ALL progress files (main + all jobs)
executed = load_executed_from_gsheet(job_index=job_index)
logger.debug(f"Skipping {len(executed)} already executed entries (from all jobs).")
# Load strategies from Google Sheet
content_lib = get_content_strategy_lib()
df = content_lib.strategies
if df.empty:
logger.error("β No content strategies found in Google Sheet!")
return
# Convert to list of rows with a dummy "worksheet" name
all_rows = []
worksheet_name = get_config_value("content_strategy_worksheet", "Unknown_Worksheet")
for i, row in df.iterrows():
all_rows.append((worksheet_name, row.to_dict()))
# Split rows if running in parallel (keeping track of global index)
if job_index is not None and total_jobs is not None:
rows_to_process = [
(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)
if idx % total_jobs == job_index
]
logger.info(f"π’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows")
else:
rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)]
logger.info(f"Processing all {len(rows_to_process)} rows")
# Count duplicates in rows_to_process
script_counts = {}
for idx, csv_name, row in rows_to_process:
tts_script = row.get("TTS Script (AI Avatar)", "").strip()
if get_config_value("on_screen_text", False):
tts_script = row.get("On-Screen Text", "").strip()
script_counts[tts_script] = script_counts.get(tts_script, 0) + 1
# Log duplicates
duplicates = {script: count for script, count in script_counts.items() if count > 1}
if duplicates:
logger.warning(f"β οΈ Found {len(duplicates)} duplicate scripts in this job's rows:")
for script, count in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]:
logger.warning(f" '{script[:50]}...' appears {count} times")
processed_count = 0
success_count = 0
processed_scripts = [] # Track scripts processed in order
generation_count = get_config_value("generation_count")
for global_row_index, csv_name, row in rows_to_process:
if success_count >= generation_count:
logger.info(f"π Reached generation count limit ({generation_count}). Stopping.")
break
# Configure row settings and get tts_script for tracking
tts_script = configure_row_settings(row, global_row_index)
# Count how many times this script should be executed total
total_needed = script_counts[tts_script]
# Count how many times already executed in previous runs
already_executed = executed.count(tts_script)
# Count how many times processed in this session
session_executed = processed_scripts.count(tts_script)
# Check if we've already processed enough occurrences
if already_executed + session_executed >= total_needed:
logger.debug(
f"Skipping {tts_script[:50]}... "
f"(completed {already_executed} times previously, "
f"{session_executed} times this session, "
f"{total_needed} needed total)"
)
continue
try:
occurrence = already_executed + session_executed + 1
logger.debug(
f"Starting {global_row_index}/{len(rows_to_process)} - "
f"{tts_script[:50]}... (occurrence {occurrence}/{total_needed})"
)
results = await process_single_row(row)
processed_count += 1
for result in results:
if result.get("success", False):
success_count += 1
processed_scripts.append(tts_script) # Track in session
log_progress_to_gsheet(result, job_index, commit)
logger.debug(
f"{tts_script[:50]}... occurrence {occurrence}/{total_needed} "
f"completed successfully ({success_count}/{processed_count})"
)
# Collect for summary table
generated_results.append({
"local_path": result.get("local_path", "N/A"),
"gcs_path": result.get("gcs_filename", "N/A"),
"drive_path": result.get("final_url", "N/A")
})
else:
logger.warning(f"β οΈ {tts_script[:50]}... pipeline failed, NOT marking as complete")
except Exception as e:
logger.error(f"β Error processing {tts_script[:50]}...: {e}", exc_info=True)
if get_config_value("test_automation", False):
exit()
continue
logger.info(f"π Job {job_index} finished: {success_count}/{processed_count} successful")
return generated_results
|