Spaces:
Configuration error
Configuration error
File size: 4,704 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from __future__ import annotations
import sqlite3
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
from ...logging_config import logger
from .models import TriggerRecord
from .utils import to_storage_timestamp, utc_now
class TriggerStore:
"""Low-level persistence for triggers backed by SQLite."""
def __init__(self, db_path: Path):
self._db_path = db_path
self._lock = threading.Lock()
self._ensure_directory()
self._ensure_schema()
def _ensure_directory(self) -> None:
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"trigger directory creation failed",
extra={"error": str(exc)},
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path, timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
return conn
def _ensure_schema(self) -> None:
schema_sql = """
CREATE TABLE IF NOT EXISTS triggers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
payload TEXT NOT NULL,
start_time TEXT,
next_trigger TEXT,
recurrence_rule TEXT,
timezone TEXT,
status TEXT NOT NULL DEFAULT 'active',
last_error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"""
index_sql = """
CREATE INDEX IF NOT EXISTS idx_triggers_agent_next
ON triggers (agent_name, next_trigger);
"""
with self._lock, self._connect() as conn:
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute(schema_sql)
conn.execute(index_sql)
def insert(self, payload: Dict[str, Any]) -> int:
with self._lock, self._connect() as conn:
columns = ", ".join(payload.keys())
placeholders = ", ".join([":" + key for key in payload.keys()])
sql = f"INSERT INTO triggers ({columns}) VALUES ({placeholders})"
conn.execute(sql, payload)
trigger_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
return int(trigger_id)
def fetch_one(self, trigger_id: int, agent_name: str) -> Optional[TriggerRecord]:
with self._lock, self._connect() as conn:
row = conn.execute(
"SELECT * FROM triggers WHERE id = ? AND agent_name = ?",
(trigger_id, agent_name),
).fetchone()
return self._row_to_record(row) if row else None
def update(self, trigger_id: int, agent_name: str, fields: Dict[str, Any]) -> bool:
if not fields:
return False
assignments = ", ".join(f"{key} = :{key}" for key in fields.keys())
sql = (
f"UPDATE triggers SET {assignments}, updated_at = :updated_at"
" WHERE id = :trigger_id AND agent_name = :agent_name"
)
payload = {
**fields,
"updated_at": to_storage_timestamp(utc_now()),
"trigger_id": trigger_id,
"agent_name": agent_name,
}
with self._lock, self._connect() as conn:
cursor = conn.execute(sql, payload)
return cursor.rowcount > 0
def list_for_agent(self, agent_name: str) -> List[TriggerRecord]:
with self._lock, self._connect() as conn:
rows = conn.execute(
"SELECT * FROM triggers WHERE agent_name = ? ORDER BY next_trigger IS NULL, next_trigger",
(agent_name,),
).fetchall()
return [self._row_to_record(row) for row in rows]
def fetch_due(
self, agent_name: Optional[str], before_iso: str
) -> List[TriggerRecord]:
sql = (
"SELECT * FROM triggers WHERE status = 'active' AND next_trigger IS NOT NULL"
" AND next_trigger <= ?"
)
params: List[Any] = [before_iso]
if agent_name:
sql += " AND agent_name = ?"
params.append(agent_name)
sql += " ORDER BY next_trigger, id"
with self._lock, self._connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [self._row_to_record(row) for row in rows]
def clear_all(self) -> None:
with self._lock, self._connect() as conn:
conn.execute("DELETE FROM triggers")
def _row_to_record(self, row: sqlite3.Row) -> TriggerRecord:
data = dict(row)
return TriggerRecord.model_validate(data)
__all__ = ["TriggerStore"]
|