from __future__ import annotations import sqlite3 import threading from pathlib import Path from typing import Any, Dict, List, Optional from ...logging_config import logger from .models import TriggerRecord from .utils import to_storage_timestamp, utc_now class TriggerStore: """Low-level persistence for triggers backed by SQLite.""" def __init__(self, db_path: Path): self._db_path = db_path self._lock = threading.Lock() self._ensure_directory() self._ensure_schema() def _ensure_directory(self) -> None: try: self._db_path.parent.mkdir(parents=True, exist_ok=True) except Exception as exc: # pragma: no cover - defensive logger.warning( "trigger directory creation failed", extra={"error": str(exc)}, ) def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self._db_path, timeout=30, isolation_level=None) conn.row_factory = sqlite3.Row return conn def _ensure_schema(self) -> None: schema_sql = """ CREATE TABLE IF NOT EXISTS triggers ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT NOT NULL, payload TEXT NOT NULL, start_time TEXT, next_trigger TEXT, recurrence_rule TEXT, timezone TEXT, status TEXT NOT NULL DEFAULT 'active', last_error TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); """ index_sql = """ CREATE INDEX IF NOT EXISTS idx_triggers_agent_next ON triggers (agent_name, next_trigger); """ with self._lock, self._connect() as conn: conn.execute("PRAGMA journal_mode=WAL;") conn.execute(schema_sql) conn.execute(index_sql) def insert(self, payload: Dict[str, Any]) -> int: with self._lock, self._connect() as conn: columns = ", ".join(payload.keys()) placeholders = ", ".join([":" + key for key in payload.keys()]) sql = f"INSERT INTO triggers ({columns}) VALUES ({placeholders})" conn.execute(sql, payload) trigger_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] return int(trigger_id) def fetch_one(self, trigger_id: int, agent_name: str) -> Optional[TriggerRecord]: with self._lock, self._connect() as conn: row = conn.execute( "SELECT * FROM triggers WHERE id = ? AND agent_name = ?", (trigger_id, agent_name), ).fetchone() return self._row_to_record(row) if row else None def update(self, trigger_id: int, agent_name: str, fields: Dict[str, Any]) -> bool: if not fields: return False assignments = ", ".join(f"{key} = :{key}" for key in fields.keys()) sql = ( f"UPDATE triggers SET {assignments}, updated_at = :updated_at" " WHERE id = :trigger_id AND agent_name = :agent_name" ) payload = { **fields, "updated_at": to_storage_timestamp(utc_now()), "trigger_id": trigger_id, "agent_name": agent_name, } with self._lock, self._connect() as conn: cursor = conn.execute(sql, payload) return cursor.rowcount > 0 def list_for_agent(self, agent_name: str) -> List[TriggerRecord]: with self._lock, self._connect() as conn: rows = conn.execute( "SELECT * FROM triggers WHERE agent_name = ? ORDER BY next_trigger IS NULL, next_trigger", (agent_name,), ).fetchall() return [self._row_to_record(row) for row in rows] def fetch_due( self, agent_name: Optional[str], before_iso: str ) -> List[TriggerRecord]: sql = ( "SELECT * FROM triggers WHERE status = 'active' AND next_trigger IS NOT NULL" " AND next_trigger <= ?" ) params: List[Any] = [before_iso] if agent_name: sql += " AND agent_name = ?" params.append(agent_name) sql += " ORDER BY next_trigger, id" with self._lock, self._connect() as conn: rows = conn.execute(sql, params).fetchall() return [self._row_to_record(row) for row in rows] def clear_all(self) -> None: with self._lock, self._connect() as conn: conn.execute("DELETE FROM triggers") def _row_to_record(self, row: sqlite3.Row) -> TriggerRecord: data = dict(row) return TriggerRecord.model_validate(data) __all__ = ["TriggerStore"]