Spaces:
Running
Running
File size: 1,633 Bytes
b3b36f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | """
arq Worker Runner.
This is the entrypoint for the worker process.
Run with: python -m worker.runner
The worker:
- Consumes jobs from Redis queue
- Executes pipeline tasks
- Has NO scheduler - scheduling is external (GitHub Actions, cron, etc.)
"""
import logging
import os
import sys
# Add backend to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from arq import run_worker
from adapters.queue.redis import get_redis_settings
from worker.tasks import run_pipeline, startup, shutdown
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - [worker] - %(message)s"
)
logger = logging.getLogger(__name__)
class WorkerSettings:
"""
arq worker settings.
This class is discovered by arq when running:
arq worker.runner.WorkerSettings
"""
# Redis connection
redis_settings = get_redis_settings()
# Task functions
functions = [run_pipeline]
# Lifecycle hooks
on_startup = startup
on_shutdown = shutdown
# Job settings
max_jobs = 1 # Only one pipeline at a time per worker
job_timeout = 3600 # 1 hour max
max_tries = 1 # No automatic retries - cron will retry next cycle
# Health check
health_check_interval = 30
def main():
"""Run the worker."""
logger.info("Starting Terra Rara worker...")
logger.info(f"Redis: {WorkerSettings.redis_settings.host}:{WorkerSettings.redis_settings.port}")
# Run worker (blocking)
run_worker(WorkerSettings)
if __name__ == "__main__":
main()
|