File size: 5,726 Bytes
54639e8
 
b936660
54639e8
5f6fd2a
9d54dfd
f20025d
54639e8
 
 
 
 
 
 
0b00536
 
 
54639e8
b2426a5
503d4ac
54639e8
 
 
b14d442
54639e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f00d5a
 
54639e8
5f00d5a
 
 
 
b936660
 
54639e8
 
 
 
 
 
 
 
 
 
 
 
503d4ac
 
54639e8
 
 
 
 
 
 
 
503d4ac
54639e8
 
 
 
2df2397
54639e8
2df2397
 
 
 
 
 
 
54639e8
503d4ac
 
2df2397
 
0b00536
 
 
 
 
 
 
2df2397
 
54639e8
 
 
5f00d5a
 
54639e8
 
 
0b00536
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet
from pipeline_processor import process_single_row, configure_row_settings
from cleanup_manager import process_delete_entries
from src.asset_manager import get_content_strategy_lib
from src.config import get_config_value, set_config_value
from src.logger_config import logger

async def run_content_strategy_workflow(commit=False, job_index=None, total_jobs=None):
    """Process content strategies from Google Sheet with progress tracking."""
    
    # Run delete cleanup first
    await process_delete_entries()

    generated_results = []


    # Load executed lines from ALL progress files (main + all jobs)
    executed = load_executed_from_gsheet(job_index=job_index)
    logger.debug(f"Skipping {len(executed)} already executed entries (from all jobs).")
    
    # Load strategies from Google Sheet
    content_lib = get_content_strategy_lib()
    df = content_lib.strategies
    
    if df.empty:
        logger.error("❌ No content strategies found in Google Sheet!")
        return

    # Convert to list of rows with a dummy "worksheet" name
    all_rows = []
    worksheet_name = get_config_value("content_strategy_worksheet", "Unknown_Worksheet")
    for i, row in df.iterrows():
        all_rows.append((worksheet_name, row.to_dict()))

    # Split rows if running in parallel (keeping track of global index)
    if job_index is not None and total_jobs is not None:
        rows_to_process = [
            (idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows) 
            if idx % total_jobs == job_index
        ]
        logger.info(f"πŸ”’ Job {job_index + 1}/{total_jobs}: Processing {len(rows_to_process)}/{len(all_rows)} rows")
    else:
        rows_to_process = [(idx, csv_name, row) for idx, (csv_name, row) in enumerate(all_rows)]
        logger.info(f"Processing all {len(rows_to_process)} rows")

    # Count duplicates in rows_to_process
    script_counts = {}
    for idx, csv_name, row in rows_to_process:
        tts_script = row.get("TTS Script (AI Avatar)", "").strip()
        if get_config_value("on_screen_text", False):
            tts_script = row.get("On-Screen Text", "").strip()
        script_counts[tts_script] = script_counts.get(tts_script, 0) + 1

    # Log duplicates
    duplicates = {script: count for script, count in script_counts.items() if count > 1}
    if duplicates:
        logger.warning(f"⚠️ Found {len(duplicates)} duplicate scripts in this job's rows:")
        for script, count in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]:
            logger.warning(f"   '{script[:50]}...' appears {count} times")

    processed_count = 0
    success_count = 0
    processed_scripts = []  # Track scripts processed in order

    generation_count = get_config_value("generation_count")

    for global_row_index, csv_name, row in rows_to_process:
        if success_count >= generation_count:
            logger.info(f"πŸ›‘ Reached generation count limit ({generation_count}). Stopping.")
            break

        # Configure row settings and get tts_script for tracking
        tts_script = configure_row_settings(row, global_row_index)

        # Count how many times this script should be executed total
        total_needed = script_counts[tts_script]
        
        # Count how many times already executed in previous runs
        already_executed = executed.count(tts_script)
        
        # Count how many times processed in this session
        session_executed = processed_scripts.count(tts_script)
        
        # Check if we've already processed enough occurrences
        if already_executed + session_executed >= total_needed:
            logger.debug(
                f"Skipping {tts_script[:50]}... "
                f"(completed {already_executed} times previously, "
                f"{session_executed} times this session, "
                f"{total_needed} needed total)"
            )
            continue

        try:
            occurrence = already_executed + session_executed + 1
            logger.debug(
                f"Starting {global_row_index}/{len(rows_to_process)} - "
                f"{tts_script[:50]}... (occurrence {occurrence}/{total_needed})"
            )

            results = await process_single_row(row)
            processed_count += 1

            for result in results:
                if result.get("success", False):
                    success_count += 1
                    processed_scripts.append(tts_script)  # Track in session
                    
                    log_progress_to_gsheet(result, job_index, commit)
                
                    logger.debug(
                        f"{tts_script[:50]}... occurrence {occurrence}/{total_needed} "
                        f"completed successfully ({success_count}/{processed_count})"
                    )

                    # Collect for summary table
                    generated_results.append({
                        "local_path": result.get("local_path", "N/A"),
                        "gcs_path": result.get("gcs_filename", "N/A"),
                        "drive_path": result.get("final_url", "N/A")
                    })
                else:
                    logger.warning(f"⚠️ {tts_script[:50]}... pipeline failed, NOT marking as complete")

        except Exception as e:
            logger.error(f"❌ Error processing {tts_script[:50]}...: {e}", exc_info=True)
            if get_config_value("test_automation", False):
                exit()
            continue

    logger.info(f"🏁 Job {job_index} finished: {success_count}/{processed_count} successful")
    return generated_results