Tools / src /workflows /plain_video_workflow.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import uuid
from execution_tracker import load_executed_from_gsheet, log_progress_to_gsheet
from pipeline_processor import process_single_row, configure_row_settings
from cleanup_manager import process_delete_entries
from src.config import get_config_value
from src.logger_config import logger
async def run_plain_video_workflow(commit=False, job_index=None, total_jobs=None):
"""Generate plain videos (music-synced, no AI content) with progress tracking."""
# Run delete cleanup first
await process_delete_entries()
if total_jobs is None or job_index is None:
total_jobs = 1
job_index = 0
try:
n = int(get_config_value("generation_count", 100))
except ValueError:
n = 100
logger.info(f"Creating {n} plain videos...")
# Load how many already completed from Google Sheets (filtered by SETUP_TYPE and job_index)
# setup_type = get_config_value("setup_type")
executed = load_executed_from_gsheet(job_index=job_index)
completed = len(executed)
if get_config_value("test_automation"):
completed = 0
# --- ASSIGN SLICE TO THIS JOB ---
per_job = n / total_jobs
start_index = int(per_job * job_index)
end_index = int(per_job * (job_index + 1))
# But skip those already completed
start_index = max(start_index, completed)
logger.debug(
f"[Job {job_index}/{total_jobs}] Assigned range: {start_index}{end_index}, "
f"completed={completed}"
)
# Run only the assigned range
generated_results = []
for i in range(start_index, min(n, end_index)):
logger.debug("=" * 200)
row = { "TTS Script (AI Avatar)": uuid.uuid4().hex[:8] }
# Configure row settings (includes setting current_audio_index)
configure_row_settings(row, i)
results = await process_single_row(row)
for result in results:
if result.get("success", False):
log_progress_to_gsheet(result, job_index, commit)
# Collect for summary table
generated_results.append({
"local_path": result.get("local_path", "N/A"),
"gcs_path": result.get("gcs_filename", "N/A"),
"drive_path": result.get("final_url", "N/A") # final_url serves as drive/public URL
})
logger.info(f"[Job {job_index}] Finished slice.")
return generated_results