import asyncio import argparse import sys import os # Add project root to sys.path to allow 'from src.config import ...' sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import src.logger_config # Configure logging early from src.logger_config import logger from src.config import get_config_value, configure_job_environment from pipeline_processor import download_all_library_videos from google_src.gcs_utils import list_gcs_files from src.workflows import run_content_strategy_workflow, run_plain_video_workflow def pre_check(): """Validate required environment variables defined in video_generate.env.""" # Path to video_generate.env (in root, one level up from src/) import os env_path = os.path.join(os.path.dirname(__file__), '..', 'video_generate.env') if not os.path.exists(env_path): logger.warning(f"video_generate.env not found at {env_path}. Skipping dynamic validation.") return missing = [] try: with open(env_path, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue # Extract key (before the first =) if '=' in line: key = line.split('=')[0].strip() # Use get_config_value (which now falls back to get_config_value) if key and not get_config_value(key): missing.append(key) except Exception as e: logger.error(f"Failed to read video_generate.env: {e}") sys.exit(1) except Exception as e: logger.error(f"Failed to read video_generate.env: {e}") sys.exit(1) if missing: logger.warning(f"Missing required environment variables: {', '.join(missing)}") pass logger.info("✅ Environment pre-check passed") async def main(): """Parse command-line arguments and orchestrate the automation process.""" # Add argument parsing parser = argparse.ArgumentParser(description="AI Content Automation Pipeline") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging (DEBUG level)") # Allow other args for downstream consumption if necessary, or parse known args args, unknown = parser.parse_known_args() if args.verbose: os.environ["VERBOSE"] = "true" # Reload logger configuration to apply DEBUG level import src.logger_config import importlib importlib.reload(src.logger_config) logger.info("Verbose mode enabled (DEBUG level)") # Run pre-check pre_check() commit = not get_config_value("test_automation") job_index = get_config_value("JOB_INDEX") total_jobs = get_config_value("TOTAL_JOBS") configure_job_environment(job_index) # Pre-download assets await download_all_library_videos() # Determine execution mode ai_generation = get_config_value("ai_generation") list_gcs_files() if ai_generation: generated_results = await run_content_strategy_workflow( commit=commit, job_index=job_index, total_jobs=total_jobs ) else: generated_results = await run_plain_video_workflow( commit=commit, job_index=job_index, total_jobs=total_jobs ) # Print Summary Table if generated_results: logger.info("\n" + "="*120) logger.info(f"{'LOCAL PATH':<50} | {'GCS FILENAME':<30} | {'DRIVE/PUBLIC URL'}") logger.info("-" * 120) for res in generated_results: local = str(res['local_path']) if len(local) > 47: local = "..." + local[-44:] gcs = str(res['gcs_path']) if len(gcs) > 27: gcs = "..." + gcs[-24:] logger.info(f"{local:<50} | {gcs:<30} | {res['drive_path']}") logger.info("="*120 + "\n") if __name__ == "__main__": asyncio.run(main())