|
|
import os |
|
|
from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet |
|
|
from pipeline_processor import process_single_row, configure_row_settings |
|
|
from cleanup_manager import process_delete_entries |
|
|
from src.asset_manager import get_content_strategy_lib |
|
|
from src.config import get_config_value, set_config_value |
|
|
from src.logger_config import logger |
|
|
|
|
|
async def run_content_strategy_workflow(commit=False, job_index=None, total_jobs=None): |
|
|
"""Process content strategies from Google Sheet with progress tracking.""" |
|
|
|
|
|
|
|
|
await process_delete_entries() |
|
|
|
|
|
generated_results = [] |
|
|
|
|
|
|
|
|
|
|
|
executed = load_executed_from_gsheet(job_index=job_index) |
|
|
logger.debug(f"Skipping {len(executed)} already executed entries (from all jobs).") |
|
|
|
|
|
|
|
|
content_lib = get_content_strategy_lib() |
|
|
df = content_lib.strategies |
|
|
|
|
|
if df.empty: |
|
|
logger.error("β No content strategies found in Google Sheet!") |
|
|
return |
|
|
|
|
|
|
|
|
all_rows = [] |
|
|
worksheet_name = get_config_value("content_strategy_worksheet", "Unknown_Worksheet") |
|
|
for i, row in df.iterrows(): |
|
|
all_rows.append((worksheet_name, row.to_dict())) |
|
|
|
|
|
|
|
|
if job_index is not None and total_jobs is not None: |
|
|
rows_to_process = [ |
|
|
(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows) |
|
|
if idx % total_jobs == job_index |
|
|
] |
|
|
logger.info(f"π’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows") |
|
|
else: |
|
|
rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)] |
|
|
logger.info(f"Processing all {len(rows_to_process)} rows") |
|
|
|
|
|
|
|
|
script_counts = {} |
|
|
for idx, csv_name, row in rows_to_process: |
|
|
tts_script = row.get("TTS Script (AI Avatar)", "").strip() |
|
|
if get_config_value("on_screen_text", False): |
|
|
tts_script = row.get("On-Screen Text", "").strip() |
|
|
script_counts[tts_script] = script_counts.get(tts_script, 0) + 1 |
|
|
|
|
|
|
|
|
duplicates = {script: count for script, count in script_counts.items() if count > 1} |
|
|
if duplicates: |
|
|
logger.warning(f"β οΈ Found {len(duplicates)} duplicate scripts in this job's rows:") |
|
|
for script, count in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]: |
|
|
logger.warning(f" '{script[:50]}...' appears {count} times") |
|
|
|
|
|
processed_count = 0 |
|
|
success_count = 0 |
|
|
processed_scripts = [] |
|
|
|
|
|
generation_count = get_config_value("generation_count") |
|
|
|
|
|
for global_row_index, csv_name, row in rows_to_process: |
|
|
if success_count >= generation_count: |
|
|
logger.info(f"π Reached generation count limit ({generation_count}). Stopping.") |
|
|
break |
|
|
|
|
|
|
|
|
tts_script = configure_row_settings(row, global_row_index) |
|
|
|
|
|
|
|
|
total_needed = script_counts[tts_script] |
|
|
|
|
|
|
|
|
already_executed = executed.count(tts_script) |
|
|
|
|
|
|
|
|
session_executed = processed_scripts.count(tts_script) |
|
|
|
|
|
|
|
|
if already_executed + session_executed >= total_needed: |
|
|
logger.debug( |
|
|
f"Skipping {tts_script[:50]}... " |
|
|
f"(completed {already_executed} times previously, " |
|
|
f"{session_executed} times this session, " |
|
|
f"{total_needed} needed total)" |
|
|
) |
|
|
continue |
|
|
|
|
|
try: |
|
|
occurrence = already_executed + session_executed + 1 |
|
|
logger.debug( |
|
|
f"Starting {global_row_index}/{len(rows_to_process)} - " |
|
|
f"{tts_script[:50]}... (occurrence {occurrence}/{total_needed})" |
|
|
) |
|
|
|
|
|
results = await process_single_row(row) |
|
|
processed_count += 1 |
|
|
|
|
|
for result in results: |
|
|
if result.get("success", False): |
|
|
success_count += 1 |
|
|
processed_scripts.append(tts_script) |
|
|
|
|
|
log_progress_to_gsheet(result, job_index, commit) |
|
|
|
|
|
logger.debug( |
|
|
f"{tts_script[:50]}... occurrence {occurrence}/{total_needed} " |
|
|
f"completed successfully ({success_count}/{processed_count})" |
|
|
) |
|
|
|
|
|
|
|
|
generated_results.append({ |
|
|
"local_path": result.get("local_path", "N/A"), |
|
|
"gcs_path": result.get("gcs_filename", "N/A"), |
|
|
"drive_path": result.get("final_url", "N/A") |
|
|
}) |
|
|
else: |
|
|
logger.warning(f"β οΈ {tts_script[:50]}... pipeline failed, NOT marking as complete") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error processing {tts_script[:50]}...: {e}", exc_info=True) |
|
|
if get_config_value("test_automation", False): |
|
|
exit() |
|
|
continue |
|
|
|
|
|
logger.info(f"π Job {job_index} finished: {success_count}/{processed_count} successful") |
|
|
return generated_results |
|
|
|