import time from datetime import datetime, timezone, timedelta from typing import Optional from spooler.store import get_conn def claim_lock(job_name: str, owner: str, ttl_minutes: int = 60) -> bool: """Attempt to claim a distributed lock for a job name. Returns True if claimed.""" conn = get_conn() now_iso = datetime.now(timezone.utc).isoformat() until_iso = (datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes)).isoformat() # Check if a valid lock exists row = conn.execute( "SELECT locked_until FROM scheduler_jobs WHERE job_name = ?", (job_name,) ).fetchone() if row and row["locked_until"] and row["locked_until"] > now_iso: # Lock is currently held and not expired conn.close() return False # Claim the lock (insert if new, overwrite if expired) conn.execute( """ INSERT OR REPLACE INTO scheduler_jobs (job_name, last_run_at, locked_until, locked_by, status) VALUES (?, COALESCE((SELECT last_run_at FROM scheduler_jobs WHERE job_name = ?), ?), ?, ?, 'locked') """, (job_name, job_name, now_iso, until_iso, owner) ) conn.commit() conn.close() return True def release_lock(job_name: str, owner: str, status_msg: str = "completed") -> bool: """Release a held lock, providing a status message.""" conn = get_conn() now_iso = datetime.now(timezone.utc).isoformat() # Ensure this owner actually holds the lock row = conn.execute( "SELECT locked_by FROM scheduler_jobs WHERE job_name = ?", (job_name,) ).fetchone() if not row or row["locked_by"] != owner: conn.close() return False conn.execute( """ UPDATE scheduler_jobs SET locked_until = NULL, locked_by = NULL, last_run_at = ?, status = ? WHERE job_name = ? AND locked_by = ? """, (now_iso, status_msg, job_name, owner) ) conn.commit() conn.close() return True def get_job_status(job_name: Optional[str] = None) -> list[dict]: """Get status of all jobs or a specific job.""" conn = get_conn() if job_name: rows = conn.execute( "SELECT * FROM scheduler_jobs WHERE job_name = ?", (job_name,) ).fetchall() else: rows = conn.execute("SELECT * FROM scheduler_jobs").fetchall() conn.close() return [dict(row) for row in rows]