import os from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet from pipeline_processor import process_single_row, configure_row_settings from cleanup_manager import process_delete_entries from src.asset_manager import get_content_strategy_lib from src.config import get_config_value, set_config_value from src.logger_config import logger async def run_content_strategy_workflow(commit=False, job_index=None, total_jobs=None): """Process content strategies from Google Sheet with progress tracking.""" # Run delete cleanup first await process_delete_entries() generated_results = [] # Load executed lines from ALL progress files (main + all jobs) executed = load_executed_from_gsheet(job_index=job_index) logger.debug(f"Skipping {len(executed)} already executed entries (from all jobs).") # Load strategies from Google Sheet content_lib = get_content_strategy_lib() df = content_lib.strategies if df.empty: logger.error("❌ No content strategies found in Google Sheet!") return # Convert to list of rows with a dummy "worksheet" name all_rows = [] worksheet_name = get_config_value("content_strategy_worksheet", "Unknown_Worksheet") for i, row in df.iterrows(): all_rows.append((worksheet_name, row.to_dict())) # Split rows if running in parallel (keeping track of global index) if job_index is not None and total_jobs is not None: rows_to_process = [ (idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows) if idx % total_jobs == job_index ] logger.info(f"🔢 Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows") else: rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)] logger.info(f"Processing all {len(rows_to_process)} rows") # Count duplicates in rows_to_process script_counts = {} for idx, csv_name, row in rows_to_process: tts_script = row.get("TTS Script (AI Avatar)", "").strip() if get_config_value("on_screen_text", False): tts_script = row.get("On-Screen Text", "").strip() script_counts[tts_script] = script_counts.get(tts_script, 0) + 1 # Log duplicates duplicates = {script: count for script, count in script_counts.items() if count > 1} if duplicates: logger.warning(f"⚠️ Found {len(duplicates)} duplicate scripts in this job's rows:") for script, count in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]: logger.warning(f" '{script[:50]}...' appears {count} times") processed_count = 0 success_count = 0 processed_scripts = [] # Track scripts processed in order generation_count = get_config_value("generation_count") for global_row_index, csv_name, row in rows_to_process: if success_count >= generation_count: logger.info(f"🛑 Reached generation count limit ({generation_count}). Stopping.") break # Configure row settings and get tts_script for tracking tts_script = configure_row_settings(row, global_row_index) # Count how many times this script should be executed total total_needed = script_counts[tts_script] # Count how many times already executed in previous runs already_executed = executed.count(tts_script) # Count how many times processed in this session session_executed = processed_scripts.count(tts_script) # Check if we've already processed enough occurrences if already_executed + session_executed >= total_needed: logger.debug( f"Skipping {tts_script[:50]}... " f"(completed {already_executed} times previously, " f"{session_executed} times this session, " f"{total_needed} needed total)" ) continue try: occurrence = already_executed + session_executed + 1 logger.debug( f"Starting {global_row_index}/{len(rows_to_process)} - " f"{tts_script[:50]}... (occurrence {occurrence}/{total_needed})" ) results = await process_single_row(row) processed_count += 1 for result in results: if result.get("success", False): success_count += 1 processed_scripts.append(tts_script) # Track in session log_progress_to_gsheet(result, job_index, commit) logger.debug( f"{tts_script[:50]}... occurrence {occurrence}/{total_needed} " f"completed successfully ({success_count}/{processed_count})" ) # Collect for summary table generated_results.append({ "local_path": result.get("local_path", "N/A"), "gcs_path": result.get("gcs_filename", "N/A"), "drive_path": result.get("final_url", "N/A") }) else: logger.warning(f"⚠️ {tts_script[:50]}... pipeline failed, NOT marking as complete") except Exception as e: logger.error(f"❌ Error processing {tts_script[:50]}...: {e}", exc_info=True) if get_config_value("test_automation", False): exit() continue logger.info(f"🏁 Job {job_index} finished: {success_count}/{processed_count} successful") return generated_results