|
|
""" |
|
|
Redis-based job store for distributed job tracking across multiple Gunicorn workers. |
|
|
""" |
|
|
import json |
|
|
import uuid |
|
|
import redis |
|
|
from flask import current_app |
|
|
from datetime import datetime, timedelta |
|
|
import logging |
|
|
import re |
|
|
|
|
|
|
|
|
class RedisJobStore: |
|
|
""" |
|
|
Redis-based job store for tracking background tasks across multiple Gunicorn workers. |
|
|
""" |
|
|
|
|
|
|
|
|
VALID_STATUSES = {'pending', 'processing', 'completed', 'failed', 'cancelled'} |
|
|
|
|
|
def __init__(self, redis_url=None, default_ttl_hours=24): |
|
|
""" |
|
|
Initialize Redis job store. |
|
|
|
|
|
Args: |
|
|
redis_url (str): Redis connection URL. If None, uses current_app.config['REDIS_URL'] |
|
|
default_ttl_hours (int): Default time-to-live for jobs in hours |
|
|
""" |
|
|
self.default_ttl_hours = default_ttl_hours |
|
|
if redis_url: |
|
|
self.redis_client = redis.from_url(redis_url) |
|
|
elif current_app and hasattr(current_app, 'config') and 'REDIS_URL' in current_app.config: |
|
|
self.redis_client = redis.from_url(current_app.config['REDIS_URL']) |
|
|
else: |
|
|
|
|
|
self.redis_client = redis.from_url('redis://localhost:6379/0') |
|
|
|
|
|
def _validate_job_id(self, job_id): |
|
|
""" |
|
|
Validate job ID format to prevent injection attacks. |
|
|
|
|
|
Args: |
|
|
job_id (str): Job ID to validate |
|
|
|
|
|
Returns: |
|
|
bool: True if valid, False otherwise |
|
|
""" |
|
|
if not job_id or not isinstance(job_id, str): |
|
|
return False |
|
|
|
|
|
return bool(re.match(r'^[a-zA-Z0-9_-]{1,64}$', job_id)) |
|
|
|
|
|
def _validate_status(self, status): |
|
|
""" |
|
|
Validate job status value. |
|
|
|
|
|
Args: |
|
|
status (str): Status to validate |
|
|
|
|
|
Returns: |
|
|
bool: True if valid, False otherwise |
|
|
""" |
|
|
return status in self.VALID_STATUSES |
|
|
|
|
|
def create_job(self, job_id=None, initial_status='pending', initial_data=None): |
|
|
""" |
|
|
Create a new job in Redis. |
|
|
|
|
|
Args: |
|
|
job_id (str): Job ID. If None, generates a new UUID. |
|
|
initial_status (str): Initial job status. |
|
|
initial_data (dict): Initial job data. |
|
|
|
|
|
Returns: |
|
|
str: Job ID |
|
|
""" |
|
|
if initial_status and not self._validate_status(initial_status): |
|
|
raise ValueError(f"Invalid status: {initial_status}. Valid statuses are: {self.VALID_STATUSES}") |
|
|
|
|
|
if job_id and not self._validate_job_id(job_id): |
|
|
raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
|
|
|
if job_id is None: |
|
|
job_id = str(uuid.uuid4()) |
|
|
|
|
|
job_data = { |
|
|
'status': initial_status, |
|
|
'result': None, |
|
|
'error': None, |
|
|
'created_at': datetime.utcnow().isoformat(), |
|
|
'updated_at': datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
if initial_data: |
|
|
job_data.update(initial_data) |
|
|
|
|
|
try: |
|
|
|
|
|
self.redis_client.setex( |
|
|
f"job:{job_id}", |
|
|
timedelta(hours=self.default_ttl_hours), |
|
|
json.dumps(job_data) |
|
|
) |
|
|
except redis.ConnectionError: |
|
|
logging.error(f"Failed to connect to Redis when creating job {job_id}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error when creating job {job_id}: {str(e)}") |
|
|
raise |
|
|
|
|
|
return job_id |
|
|
|
|
|
def get_job(self, job_id): |
|
|
""" |
|
|
Get job data from Redis. |
|
|
|
|
|
Args: |
|
|
job_id (str): Job ID |
|
|
|
|
|
Returns: |
|
|
dict: Job data or None if not found |
|
|
""" |
|
|
if not self._validate_job_id(job_id): |
|
|
raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
|
|
|
try: |
|
|
job_data_json = self.redis_client.get(f"job:{job_id}") |
|
|
if job_data_json: |
|
|
return json.loads(job_data_json) |
|
|
return None |
|
|
except redis.ConnectionError: |
|
|
logging.error(f"Failed to connect to Redis when getting job {job_id}") |
|
|
return None |
|
|
except json.JSONDecodeError: |
|
|
logging.error(f"Failed to decode JSON for job {job_id}") |
|
|
return None |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error when getting job {job_id}: {str(e)}") |
|
|
return None |
|
|
|
|
|
def update_job(self, job_id, status=None, result=None, error=None): |
|
|
""" |
|
|
Update job status and data in Redis using atomic operations to prevent race conditions. |
|
|
|
|
|
Args: |
|
|
job_id (str): Job ID |
|
|
status (str): New status |
|
|
result (any): Result data |
|
|
error (str): Error message |
|
|
|
|
|
Returns: |
|
|
bool: True if job was updated, False if not found |
|
|
""" |
|
|
if not self._validate_job_id(job_id): |
|
|
raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
|
|
|
if status is not None and not self._validate_status(status): |
|
|
raise ValueError(f"Invalid status: {status}. Valid statuses are: {self.VALID_STATUSES}") |
|
|
|
|
|
|
|
|
lua_script = """ |
|
|
local job_key = KEYS[1] |
|
|
local job_data = redis.call('GET', job_key) |
|
|
|
|
|
if not job_data then |
|
|
return 0 |
|
|
end |
|
|
|
|
|
local updated_data = cjson.decode(job_data) |
|
|
|
|
|
if ARGV[1] ~= 'nil' then |
|
|
updated_data.status = ARGV[1] |
|
|
end |
|
|
if ARGV[2] ~= 'nil' then |
|
|
updated_data.result = cjson.decode(ARGV[2]) |
|
|
end |
|
|
if ARGV[3] ~= 'nil' then |
|
|
updated_data.error = ARGV[3] |
|
|
end |
|
|
|
|
|
updated_data.updated_at = ARGV[4] |
|
|
|
|
|
local ttl = redis.call('TTL', job_key) |
|
|
redis.call('SET', job_key, cjson.encode(updated_data), 'EX', ttl) |
|
|
|
|
|
return 1 |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
status_arg = status if status is not None else 'nil' |
|
|
result_arg = json.dumps(result) if result is not None else 'nil' |
|
|
error_arg = error if error is not None else 'nil' |
|
|
updated_at_arg = datetime.utcnow().isoformat() |
|
|
|
|
|
script = self.redis_client.register_script(lua_script) |
|
|
result = script(keys=[f"job:{job_id}"], |
|
|
args=[status_arg, result_arg, error_arg, updated_at_arg]) |
|
|
|
|
|
return result == 1 |
|
|
except redis.ConnectionError: |
|
|
logging.error(f"Failed to connect to Redis when updating job {job_id}") |
|
|
return False |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error when updating job {job_id}: {str(e)}") |
|
|
return False |
|
|
|
|
|
def delete_job(self, job_id): |
|
|
""" |
|
|
Delete a job from Redis. |
|
|
|
|
|
Args: |
|
|
job_id (str): Job ID |
|
|
|
|
|
Returns: |
|
|
bool: True if job was deleted, False if not found |
|
|
""" |
|
|
if not self._validate_job_id(job_id): |
|
|
raise ValueError(f"Invalid job ID format: {job_id}") |
|
|
|
|
|
try: |
|
|
result = self.redis_client.delete(f"job:{job_id}") |
|
|
return result > 0 |
|
|
except redis.ConnectionError: |
|
|
logging.error(f"Failed to connect to Redis when deleting job {job_id}") |
|
|
return False |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error when deleting job {job_id}: {str(e)}") |
|
|
return False |
|
|
|
|
|
def cleanup_expired_jobs(self): |
|
|
""" |
|
|
Clean up jobs that have expired based on their creation time. |
|
|
This is a placeholder method - in a real implementation, you might want to |
|
|
scan for expired jobs and remove them, but Redis automatically handles TTL. |
|
|
""" |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
def get_redis_job_store(): |
|
|
""" |
|
|
Get the Redis job store instance from the current app context. |
|
|
|
|
|
Returns: |
|
|
RedisJobStore: Redis job store instance |
|
|
""" |
|
|
if not hasattr(current_app, 'redis_job_store'): |
|
|
redis_url = current_app.config.get('REDIS_URL', 'redis://localhost:6379/0') |
|
|
current_app.redis_job_store = RedisJobStore(redis_url) |
|
|
|
|
|
return current_app.redis_job_store |