Tools / src /workflows /content_strategy_workflow.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import os
from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet
from pipeline_processor import process_single_row, configure_row_settings
from cleanup_manager import process_delete_entries
from src.asset_manager import get_content_strategy_lib
from src.config import get_config_value, set_config_value
from src.logger_config import logger
async def run_content_strategy_workflow(commit=False, job_index=None, total_jobs=None):
"""Process content strategies from Google Sheet with progress tracking."""
# Run delete cleanup first
await process_delete_entries()
generated_results = []
# Load executed lines from ALL progress files (main + all jobs)
executed = load_executed_from_gsheet(job_index=job_index)
logger.debug(f"Skipping {len(executed)} already executed entries (from all jobs).")
# Load strategies from Google Sheet
content_lib = get_content_strategy_lib()
df = content_lib.strategies
if df.empty:
logger.error("❌ No content strategies found in Google Sheet!")
return
# Convert to list of rows with a dummy "worksheet" name
all_rows = []
worksheet_name = get_config_value("content_strategy_worksheet", "Unknown_Worksheet")
for i, row in df.iterrows():
all_rows.append((worksheet_name, row.to_dict()))
# Split rows if running in parallel (keeping track of global index)
if job_index is not None and total_jobs is not None:
rows_to_process = [
(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)
if idx % total_jobs == job_index
]
logger.info(f"πŸ”’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows")
else:
rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)]
logger.info(f"Processing all {len(rows_to_process)} rows")
# Count duplicates in rows_to_process
script_counts = {}
for idx, csv_name, row in rows_to_process:
tts_script = row.get("TTS Script (AI Avatar)", "").strip()
if get_config_value("on_screen_text", False):
tts_script = row.get("On-Screen Text", "").strip()
script_counts[tts_script] = script_counts.get(tts_script, 0) + 1
# Log duplicates
duplicates = {script: count for script, count in script_counts.items() if count > 1}
if duplicates:
logger.warning(f"⚠️ Found {len(duplicates)} duplicate scripts in this job's rows:")
for script, count in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]:
logger.warning(f" '{script[:50]}...' appears {count} times")
processed_count = 0
success_count = 0
processed_scripts = [] # Track scripts processed in order
generation_count = get_config_value("generation_count")
for global_row_index, csv_name, row in rows_to_process:
if success_count >= generation_count:
logger.info(f"πŸ›‘ Reached generation count limit ({generation_count}). Stopping.")
break
# Configure row settings and get tts_script for tracking
tts_script = configure_row_settings(row, global_row_index)
# Count how many times this script should be executed total
total_needed = script_counts[tts_script]
# Count how many times already executed in previous runs
already_executed = executed.count(tts_script)
# Count how many times processed in this session
session_executed = processed_scripts.count(tts_script)
# Check if we've already processed enough occurrences
if already_executed + session_executed >= total_needed:
logger.debug(
f"Skipping {tts_script[:50]}... "
f"(completed {already_executed} times previously, "
f"{session_executed} times this session, "
f"{total_needed} needed total)"
)
continue
try:
occurrence = already_executed + session_executed + 1
logger.debug(
f"Starting {global_row_index}/{len(rows_to_process)} - "
f"{tts_script[:50]}... (occurrence {occurrence}/{total_needed})"
)
results = await process_single_row(row)
processed_count += 1
for result in results:
if result.get("success", False):
success_count += 1
processed_scripts.append(tts_script) # Track in session
log_progress_to_gsheet(result, job_index, commit)
logger.debug(
f"{tts_script[:50]}... occurrence {occurrence}/{total_needed} "
f"completed successfully ({success_count}/{processed_count})"
)
# Collect for summary table
generated_results.append({
"local_path": result.get("local_path", "N/A"),
"gcs_path": result.get("gcs_filename", "N/A"),
"drive_path": result.get("final_url", "N/A")
})
else:
logger.warning(f"⚠️ {tts_script[:50]}... pipeline failed, NOT marking as complete")
except Exception as e:
logger.error(f"❌ Error processing {tts_script[:50]}...: {e}", exc_info=True)
if get_config_value("test_automation", False):
exit()
continue
logger.info(f"🏁 Job {job_index} finished: {success_count}/{processed_count} successful")
return generated_results