| import time |
| from datetime import datetime, timezone, timedelta |
| from typing import Optional |
|
|
| from spooler.store import get_conn |
|
|
| def claim_lock(job_name: str, owner: str, ttl_minutes: int = 60) -> bool: |
| """Attempt to claim a distributed lock for a job name. Returns True if claimed.""" |
| conn = get_conn() |
| now_iso = datetime.now(timezone.utc).isoformat() |
| until_iso = (datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes)).isoformat() |
| |
| |
| row = conn.execute( |
| "SELECT locked_until FROM scheduler_jobs WHERE job_name = ?", |
| (job_name,) |
| ).fetchone() |
| |
| if row and row["locked_until"] and row["locked_until"] > now_iso: |
| |
| conn.close() |
| return False |
| |
| |
| conn.execute( |
| """ |
| INSERT OR REPLACE INTO scheduler_jobs |
| (job_name, last_run_at, locked_until, locked_by, status) |
| VALUES (?, COALESCE((SELECT last_run_at FROM scheduler_jobs WHERE job_name = ?), ?), ?, ?, 'locked') |
| """, |
| (job_name, job_name, now_iso, until_iso, owner) |
| ) |
| conn.commit() |
| conn.close() |
| return True |
|
|
| def release_lock(job_name: str, owner: str, status_msg: str = "completed") -> bool: |
| """Release a held lock, providing a status message.""" |
| conn = get_conn() |
| now_iso = datetime.now(timezone.utc).isoformat() |
| |
| |
| row = conn.execute( |
| "SELECT locked_by FROM scheduler_jobs WHERE job_name = ?", |
| (job_name,) |
| ).fetchone() |
| |
| if not row or row["locked_by"] != owner: |
| conn.close() |
| return False |
| |
| conn.execute( |
| """ |
| UPDATE scheduler_jobs |
| SET locked_until = NULL, |
| locked_by = NULL, |
| last_run_at = ?, |
| status = ? |
| WHERE job_name = ? AND locked_by = ? |
| """, |
| (now_iso, status_msg, job_name, owner) |
| ) |
| conn.commit() |
| conn.close() |
| return True |
|
|
| def get_job_status(job_name: Optional[str] = None) -> list[dict]: |
| """Get status of all jobs or a specific job.""" |
| conn = get_conn() |
| if job_name: |
| rows = conn.execute( |
| "SELECT * FROM scheduler_jobs WHERE job_name = ?", |
| (job_name,) |
| ).fetchall() |
| else: |
| rows = conn.execute("SELECT * FROM scheduler_jobs").fetchall() |
| conn.close() |
| return [dict(row) for row in rows] |
|
|