Ordo
Initial public release
63c75d5
import time
from datetime import datetime, timezone, timedelta
from typing import Optional
from spooler.store import get_conn
def claim_lock(job_name: str, owner: str, ttl_minutes: int = 60) -> bool:
"""Attempt to claim a distributed lock for a job name. Returns True if claimed."""
conn = get_conn()
now_iso = datetime.now(timezone.utc).isoformat()
until_iso = (datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes)).isoformat()
# Check if a valid lock exists
row = conn.execute(
"SELECT locked_until FROM scheduler_jobs WHERE job_name = ?",
(job_name,)
).fetchone()
if row and row["locked_until"] and row["locked_until"] > now_iso:
# Lock is currently held and not expired
conn.close()
return False
# Claim the lock (insert if new, overwrite if expired)
conn.execute(
"""
INSERT OR REPLACE INTO scheduler_jobs
(job_name, last_run_at, locked_until, locked_by, status)
VALUES (?, COALESCE((SELECT last_run_at FROM scheduler_jobs WHERE job_name = ?), ?), ?, ?, 'locked')
""",
(job_name, job_name, now_iso, until_iso, owner)
)
conn.commit()
conn.close()
return True
def release_lock(job_name: str, owner: str, status_msg: str = "completed") -> bool:
"""Release a held lock, providing a status message."""
conn = get_conn()
now_iso = datetime.now(timezone.utc).isoformat()
# Ensure this owner actually holds the lock
row = conn.execute(
"SELECT locked_by FROM scheduler_jobs WHERE job_name = ?",
(job_name,)
).fetchone()
if not row or row["locked_by"] != owner:
conn.close()
return False
conn.execute(
"""
UPDATE scheduler_jobs
SET locked_until = NULL,
locked_by = NULL,
last_run_at = ?,
status = ?
WHERE job_name = ? AND locked_by = ?
""",
(now_iso, status_msg, job_name, owner)
)
conn.commit()
conn.close()
return True
def get_job_status(job_name: Optional[str] = None) -> list[dict]:
"""Get status of all jobs or a specific job."""
conn = get_conn()
if job_name:
rows = conn.execute(
"SELECT * FROM scheduler_jobs WHERE job_name = ?",
(job_name,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM scheduler_jobs").fetchall()
conn.close()
return [dict(row) for row in rows]