#!/usr/bin/env python3 """ Production worker startup script for AegisLM SaaS Backend. This script starts the Celery worker with proper configuration and monitoring. """ import os import sys import logging from pathlib import Path # Load environment variables from dotenv import load_dotenv load_dotenv() # Add the backend directory to Python path sys.path.insert(0, str(Path(__file__).parent)) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def check_environment_variables(): """Check that all required environment variables are set.""" logger.info("🔍 Checking worker environment variables...") required_vars = [ "DATABASE_URL", "REDIS_URL" ] optional_vars = [ "CELERY_WORKER_CONCURRENCY", "CELERY_TASK_RATE_LIMIT" ] missing_vars = [] for var in required_vars: if not os.getenv(var): missing_vars.append(var) if missing_vars: logger.error(f"❌ Missing required environment variables: {missing_vars}") return False # Check optional variables and set defaults if not os.getenv("CELERY_WORKER_CONCURRENCY"): os.environ["CELERY_WORKER_CONCURRENCY"] = "2" logger.info("📝 Set CELERY_WORKER_CONCURRENCY=2 (default)") if not os.getenv("CELERY_TASK_RATE_LIMIT"): os.environ["CELERY_TASK_RATE_LIMIT"] = "10/m" logger.info("📝 Set CELERY_TASK_RATE_LIMIT=10/m (default)") logger.info("✅ Worker environment variables check passed") return True def start_production_worker(): """Start the production Celery worker.""" logger.info("🚀 Starting production Celery worker...") # Get configuration concurrency = int(os.getenv("CELERY_WORKER_CONCURRENCY", 2)) log_level = os.getenv("LOG_LEVEL", "info").lower() logger.info(f"🔧 Worker configuration:") logger.info(f" Concurrency: {concurrency}") logger.info(f" Log level: {log_level}") logger.info(f" Broker: {os.getenv('REDIS_URL')}") logger.info(f" Backend: {os.getenv('REDIS_URL')}") # Start Celery worker using subprocess try: import subprocess import sys # Build celery command cmd = [ sys.executable, "-m", "celery", "-A", "workers.celery_worker", "worker", "--loglevel=" + log_level, "--concurrency=" + str(concurrency), "--prefetch-multiplier=1", "--max-tasks-per-child=50", "--time-limit=300", # 5 minutes per task "--soft-time-limit=240", # 4 minutes soft limit "--without-gossip", "--without-mingle", "--without-heartbeat", "--queues=evaluation,default", ] logger.info(f"🚀 Starting Celery worker with command: {' '.join(cmd)}") # Start the worker subprocess.run(cmd, check=True) except KeyboardInterrupt: logger.info("🛑 Worker stopped by user") except Exception as e: logger.error(f"❌ Worker error: {e}") sys.exit(1) def main(): """Main worker startup function.""" logger.info("🚀 AegisLM SaaS Backend - Production Worker Startup") logger.info("=" * 50) # Check environment variables if not check_environment_variables(): logger.error("💥 Environment check failed - aborting worker startup") sys.exit(1) # Start production worker start_production_worker() if __name__ == "__main__": main()