| |
| """ |
| Production worker startup script for AegisLM SaaS Backend. |
| |
| This script starts the Celery worker with proper configuration |
| and monitoring. |
| """ |
|
|
| import os |
| import sys |
| import logging |
| from pathlib import Path |
|
|
| |
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def check_environment_variables(): |
| """Check that all required environment variables are set.""" |
| logger.info("🔍 Checking worker environment variables...") |
| |
| required_vars = [ |
| "DATABASE_URL", |
| "REDIS_URL" |
| ] |
| |
| optional_vars = [ |
| "CELERY_WORKER_CONCURRENCY", |
| "CELERY_TASK_RATE_LIMIT" |
| ] |
| |
| missing_vars = [] |
| for var in required_vars: |
| if not os.getenv(var): |
| missing_vars.append(var) |
| |
| if missing_vars: |
| logger.error(f"❌ Missing required environment variables: {missing_vars}") |
| return False |
| |
| |
| if not os.getenv("CELERY_WORKER_CONCURRENCY"): |
| os.environ["CELERY_WORKER_CONCURRENCY"] = "2" |
| logger.info("📝 Set CELERY_WORKER_CONCURRENCY=2 (default)") |
| |
| if not os.getenv("CELERY_TASK_RATE_LIMIT"): |
| os.environ["CELERY_TASK_RATE_LIMIT"] = "10/m" |
| logger.info("📝 Set CELERY_TASK_RATE_LIMIT=10/m (default)") |
| |
| logger.info("✅ Worker environment variables check passed") |
| return True |
|
|
|
|
| def start_production_worker(): |
| """Start the production Celery worker.""" |
| logger.info("🚀 Starting production Celery worker...") |
| |
| |
| concurrency = int(os.getenv("CELERY_WORKER_CONCURRENCY", 2)) |
| log_level = os.getenv("LOG_LEVEL", "info").lower() |
| |
| logger.info(f"🔧 Worker configuration:") |
| logger.info(f" Concurrency: {concurrency}") |
| logger.info(f" Log level: {log_level}") |
| logger.info(f" Broker: {os.getenv('REDIS_URL')}") |
| logger.info(f" Backend: {os.getenv('REDIS_URL')}") |
| |
| |
| try: |
| import subprocess |
| import sys |
| |
| |
| cmd = [ |
| sys.executable, "-m", "celery", |
| "-A", "workers.celery_worker", |
| "worker", |
| "--loglevel=" + log_level, |
| "--concurrency=" + str(concurrency), |
| "--prefetch-multiplier=1", |
| "--max-tasks-per-child=50", |
| "--time-limit=300", |
| "--soft-time-limit=240", |
| "--without-gossip", |
| "--without-mingle", |
| "--without-heartbeat", |
| "--queues=evaluation,default", |
| ] |
| |
| logger.info(f"🚀 Starting Celery worker with command: {' '.join(cmd)}") |
| |
| |
| subprocess.run(cmd, check=True) |
| |
| except KeyboardInterrupt: |
| logger.info("🛑 Worker stopped by user") |
| except Exception as e: |
| logger.error(f"❌ Worker error: {e}") |
| sys.exit(1) |
|
|
|
|
| def main(): |
| """Main worker startup function.""" |
| logger.info("🚀 AegisLM SaaS Backend - Production Worker Startup") |
| logger.info("=" * 50) |
| |
| |
| if not check_environment_variables(): |
| logger.error("💥 Environment check failed - aborting worker startup") |
| sys.exit(1) |
| |
| |
| start_production_worker() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|