ALM-2 / backend /start_worker.py
ACA050's picture
Upload 520 files
2ed8996 verified
#!/usr/bin/env python3
"""
Production worker startup script for AegisLM SaaS Backend.
This script starts the Celery worker with proper configuration
and monitoring.
"""
import os
import sys
import logging
from pathlib import Path
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Add the backend directory to Python path
sys.path.insert(0, str(Path(__file__).parent))
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def check_environment_variables():
"""Check that all required environment variables are set."""
logger.info("🔍 Checking worker environment variables...")
required_vars = [
"DATABASE_URL",
"REDIS_URL"
]
optional_vars = [
"CELERY_WORKER_CONCURRENCY",
"CELERY_TASK_RATE_LIMIT"
]
missing_vars = []
for var in required_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
logger.error(f"❌ Missing required environment variables: {missing_vars}")
return False
# Check optional variables and set defaults
if not os.getenv("CELERY_WORKER_CONCURRENCY"):
os.environ["CELERY_WORKER_CONCURRENCY"] = "2"
logger.info("📝 Set CELERY_WORKER_CONCURRENCY=2 (default)")
if not os.getenv("CELERY_TASK_RATE_LIMIT"):
os.environ["CELERY_TASK_RATE_LIMIT"] = "10/m"
logger.info("📝 Set CELERY_TASK_RATE_LIMIT=10/m (default)")
logger.info("✅ Worker environment variables check passed")
return True
def start_production_worker():
"""Start the production Celery worker."""
logger.info("🚀 Starting production Celery worker...")
# Get configuration
concurrency = int(os.getenv("CELERY_WORKER_CONCURRENCY", 2))
log_level = os.getenv("LOG_LEVEL", "info").lower()
logger.info(f"🔧 Worker configuration:")
logger.info(f" Concurrency: {concurrency}")
logger.info(f" Log level: {log_level}")
logger.info(f" Broker: {os.getenv('REDIS_URL')}")
logger.info(f" Backend: {os.getenv('REDIS_URL')}")
# Start Celery worker using subprocess
try:
import subprocess
import sys
# Build celery command
cmd = [
sys.executable, "-m", "celery",
"-A", "workers.celery_worker",
"worker",
"--loglevel=" + log_level,
"--concurrency=" + str(concurrency),
"--prefetch-multiplier=1",
"--max-tasks-per-child=50",
"--time-limit=300", # 5 minutes per task
"--soft-time-limit=240", # 4 minutes soft limit
"--without-gossip",
"--without-mingle",
"--without-heartbeat",
"--queues=evaluation,default",
]
logger.info(f"🚀 Starting Celery worker with command: {' '.join(cmd)}")
# Start the worker
subprocess.run(cmd, check=True)
except KeyboardInterrupt:
logger.info("🛑 Worker stopped by user")
except Exception as e:
logger.error(f"❌ Worker error: {e}")
sys.exit(1)
def main():
"""Main worker startup function."""
logger.info("🚀 AegisLM SaaS Backend - Production Worker Startup")
logger.info("=" * 50)
# Check environment variables
if not check_environment_variables():
logger.error("💥 Environment check failed - aborting worker startup")
sys.exit(1)
# Start production worker
start_production_worker()
if __name__ == "__main__":
main()