Tools / src /pipelines /base.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import time
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from src.logger_config import logger
import src.utils as utils
from src.config import get_config_value, set_config_value
from google_src.gcs_utils import upload_file_to_gcs
from src.video_renderer import VideoRenderer
from google_src.tts import GoogleTTS
from google_src.stt import GoogleSTT
from file_downloader import get_file_downloader
from src.asset_manager import get_asset_downloader, get_audio_lib, AssetProcessor, get_text_overlay_lib
from video_editor import loudness_normalize, remove_green_bg
from video_editor.onscreen_cta import add_cta
from google_src.ai_studio_sdk import generate
class ContentAutomationBase(ABC):
def __init__(self):
# Initialize Google Services directly
self.tts = GoogleTTS()
self.stt = GoogleSTT()
self.video_renderer = VideoRenderer()
# Use asset_manager singletons directly
self._audio_lib = get_audio_lib(get_config_value("current_audio_index", 0))
# self._audio_lib.use_temp_audio_library()
if get_config_value("on_screen_text", False):
self._text_overlay_lib = get_text_overlay_lib(get_config_value("current_audio_index", 0))
else:
self._text_overlay_lib = None
self._asset_processor = AssetProcessor()
self.asset_downloader = get_asset_downloader()
self.file_downloader = get_file_downloader() # Use singleton
self.pipeline_start_time = None
async def workflow_pre_config(self):
logger.info("\n🎭 STEP 1: Pre Config")
self.pipeline_start_time = time.time()
logger.info("\n STEP 2: Load video assets")
videos = await self.asset_downloader.download_all_videos()
visual_assets = get_config_value("visual_assets", {})
visual_assets["all_videos"] = videos
set_config_value("visual_assets", visual_assets)
logger.info("\n🎵 STEP 3: Background Music")
await self._download_bg_music(try_next=False)
async def _download_bg_music(self, try_next: bool = False):
if try_next:
self._audio_lib.inc_audio_index()
visual_assets = get_config_value("visual_assets", {})
visual_assets["background_music_url"] = self._audio_lib.select_background_music()
local_path = await self.file_downloader.safe_download(visual_assets["background_music_url"])
if local_path:
visual_assets["background_music_local"] = str(local_path)
set_config_value("visual_assets", visual_assets)
async def video_features(self, video_path: str):
if get_config_value("use_1x1_ratio", False):
video_path = utils.ratio_1x1_to9x16(video_path)
if get_config_value("is_a2e_lip_sync", False):
video_path = remove_green_bg.process_video_with_ffmpeg(get_config_value("visual_assets")["a2e_video_local_path"], video_path)
if get_config_value("on_screen_text", False):
visual_assets = get_config_value("visual_assets", {})
text_overlay = self._text_overlay_lib.select_text_overlay()
visual_assets["timed_transcript"] = [
{
"start_time": 0,
"end_time": 15,
"word": text_overlay
}
]
set_config_value("visual_assets", visual_assets)
video_path = await self.video_renderer._add_timed_subtitles(video_path, group_all=True, position=250)
set_config_value("video_no_audio_path_with_caption", video_path)
elif get_config_value("is_onscreen_cta", False):
on_screen_cta = get_config_value("on_screen_cta")[get_config_value("current_audio_index", 0) % len(get_config_value("on_screen_cta"))]
cap_parma = self.video_renderer._get_caption_style()[0]()
padding = cap_parma.get("safe_zone_padding", 20)
# bottom_safe_y = 250
video_path = add_cta(
video_path,
on_screen_cta,
padding=padding
# bottom_safe_y=bottom_safe_y
)
return video_path
@abstractmethod
async def generate_content(self) -> str:
"""
Abstract method to generate content.
Must return the path to the video file before audio merging.
"""
pass
def generate_broll_filename(self) -> str | None:
"""
Generate a safe filename based on the provided prompt using Gemini.
Returns a string suitable for use as a filename (lowercase, underscores, no special chars).
"""
try:
import random
seed = random.randint(1000, 9999)
user_prompt = f"""Generate ONE unique luxury b-roll video title (2-3 words max).
TONE: Modern, aspirational, Instagram-worthy. Think luxury influencer content.
EXAMPLES of the vibe (DO NOT COPY THESE):
"Golden Hour Aesthetic", "Rich Life Moments", "Boss Life Vibes", "Money Moves", "Champagne Dreams", "VIP Treatment", "Designer Everything", "Empire Building"
WORD COMBINATIONS - Mix and match:
Part 1 (Descriptors):
Golden, Luxury, Rich, Elite, Opulent, Wealthy, Premium, High End, Lavish, Classy, Million Dollar, Success, Boss, First Class, Diamond, Gold, Platinum, VIP, Exclusive, Private, Penthouse, Yacht, Designer, Five Star, Empire, Sophisticated, Expensive, Refined, Upper Class, Affluent
Part 2 (Vibes/Things):
Hour, Lifestyle, Life, Vibes, Moments, Routine, Living, Aesthetic, Mindset, Dreams, Energy, Moves, Standard, Treatment, Access, Views, Club, Everything, Quality, Taste, Style, Circle, Society, Story
Patterns:
- [Luxury word] + [Thing]: "Diamond Lifestyle", "Platinum Energy"
- [Adjective] + [Vibe word]: "Exclusive Vibes", "Premium Moments"
- [Noun] + [Action/State]: "Empire Mindset", "Yacht Dreams"
CRITICAL: Be creative. Mix words in fresh ways. Avoid repeating common combos.
Variation seed: {seed}
Output ONLY the title (2-3 words). No punctuation, no explanation."""
return generate(user_prompt)
except Exception as e:
logger.error("Failed to generate b-roll filename: %s", e)
return None
async def run_pipeline(self):
await self.workflow_pre_config()
# Generate the video components
final_video_path = await self.generate_content()
# Apply common video features
final_video_path = await self.video_features(final_video_path)
# STEP 7: Add audio to video
logger.info("\n🔊 STEP 8: Add Audio to Video")
final_video_path = await self.video_renderer.add_audio_to_video(final_video_path)
final_video_path = loudness_normalize.normalize_loudness(final_video_path)
# STEP 8: Upload to cloud storage
final_url = None
gcs_filename = None
# Generate UUID-based filename managed by utility
logger.info("\n☁️ STEP 9: Cloud Storage Upload")
file_name = self.generate_broll_filename()
# Final Result
upload_result = upload_file_to_gcs(
final_video_path,
destination_blob_name=file_name
)
final_url = upload_result["url"]
gcs_filename = upload_result["gcs_filename"]
# Pipeline completion
if self.pipeline_start_time:
elapsed_time = time.time() - self.pipeline_start_time
else:
elapsed_time = 0
logger.info(f"\n✅ Enhanced pipeline completed in {elapsed_time:.2f}s")
return {
"success": True,
"final_url": final_url,
"gcs_filename": gcs_filename,
"local_path": final_video_path,
}