Tools / src /pipeline_processor.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import os
import asyncio
from src.logger_config import logger
from src.utils import clean_tts_script
from src.config import get_config_value, set_config_value
from src.pipelines.factory import get_automation_pipeline
from src.asset_manager import get_asset_downloader
from google_src.gcs_utils import list_gcs_files
import hashlib
def configure_row_settings(row: dict, row_index: int = 0) -> str:
"""
Configure all row-related settings in the global config.
Args:
row: The row dict containing TTS Script, prompts, etc.
row_index: The current row/audio index for tracking.
Returns:
The original tts_script value (for workflow tracking/logging).
"""
# Determine tts_script based on on_screen_text setting
tts_script = row.get("TTS Script (AI Avatar)", "")
if get_config_value("on_screen_text", False):
tts_script = row.get("On-Screen Text", "").strip()
# Get downloaded videos from singleton
asset_downloader = get_asset_downloader()
visual_assets = get_config_value("visual_assets", {})
visual_assets["all_videos"] = asset_downloader.downloaded_videos
set_config_value("visual_assets", visual_assets)
# Set per-row configuration
set_config_value("row_index", row_index)
set_config_value("current_audio_index", row_index)
set_config_value("original_tts_script", tts_script)
set_config_value("tts_script", clean_tts_script(tts_script))
set_config_value("hash_tts_script", hashlib.sha256(get_config_value("tts_script").encode('utf-8')).hexdigest())
set_config_value("gemini_prompt", row.get("Gemini Imagen4 Ultra Prompt (specific)", ""))
set_config_value("runway_prompt", row.get("Runway Prompt Gen4 Turbo", ""))
set_config_value("runway_veo_prompt", row.get("Veo-3.1 Fast Prompt (Text-to-Video)", ""))
set_config_value("captions", row.get("Captions", ""))
return tts_script
async def process_single_row(row: dict) -> dict:
"""Process one CSV row using the main pipeline."""
tts_script = get_config_value("original_tts_script", "")
logger.debug(f"▢️ Executing: {tts_script}")
original_merge_type = get_config_value("video_merge_type")
merge_process = get_config_value("video_merge_process")
# Handle comma-separated merge types
merge_types = [t.strip() for t in str(original_merge_type).split(",")] if original_merge_type else [None]
if merge_process == "sameeachtype":
# Process all types for this row
pass
elif len(merge_types) > 1:
# Round Robin / Alternative behavior
current_index = get_config_value("row_index", 0)
selected_type = merge_types[current_index % len(merge_types)]
logger.debug(f"πŸ”„ Round Robin: Row {current_index} selected '{selected_type}' from {merge_types}")
merge_types = [selected_type]
results = []
try:
for merge_type in merge_types:
if merge_type:
logger.debug(f"πŸ”„ Switching VIDEO_MERGE_TYPE to: {merge_type}")
set_config_value("video_merge_type", merge_type)
pipeline = get_automation_pipeline()
result = await pipeline.run_pipeline()
results.append(result)
status_icon = "βœ…" if result.get('success', False) else "❌"
logger.debug(f"{status_icon} Completed {tts_script[:20]}... [{merge_type or 'Default'}]: success={result.get('success', False)}")
finally:
# Restore original config
if original_merge_type:
set_config_value("video_merge_type", original_merge_type)
list_gcs_files()
return results
async def download_all_library_videos():
"""Download all library videos once using singletons."""
# Get the asset downloader singleton
asset_downloader = get_asset_downloader()
# Download all videos using the singleton
logger.debug("πŸ“₯ Pre-downloading all library videos...")
videos = await asset_downloader.download_all_videos()
logger.debug(f"βœ“ Downloaded {len(videos)} library videos")
return videos