File size: 1,633 Bytes
b3b36f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
arq Worker Runner.

This is the entrypoint for the worker process.
Run with: python -m worker.runner

The worker:
- Consumes jobs from Redis queue
- Executes pipeline tasks
- Has NO scheduler - scheduling is external (GitHub Actions, cron, etc.)
"""

import logging
import os
import sys

# Add backend to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from arq import run_worker

from adapters.queue.redis import get_redis_settings
from worker.tasks import run_pipeline, startup, shutdown

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(name)s - [worker] - %(message)s"
)
logger = logging.getLogger(__name__)


class WorkerSettings:
    """
    arq worker settings.
    
    This class is discovered by arq when running:
        arq worker.runner.WorkerSettings
    """
    
    # Redis connection
    redis_settings = get_redis_settings()
    
    # Task functions
    functions = [run_pipeline]
    
    # Lifecycle hooks
    on_startup = startup
    on_shutdown = shutdown
    
    # Job settings
    max_jobs = 1  # Only one pipeline at a time per worker
    job_timeout = 3600  # 1 hour max
    max_tries = 1  # No automatic retries - cron will retry next cycle
    
    # Health check
    health_check_interval = 30


def main():
    """Run the worker."""
    logger.info("Starting Terra Rara worker...")
    logger.info(f"Redis: {WorkerSettings.redis_settings.host}:{WorkerSettings.redis_settings.port}")
    
    # Run worker (blocking)
    run_worker(WorkerSettings)


if __name__ == "__main__":
    main()