Spaces:
Sleeping
Sleeping
| """Redis client wrapper providing the queue's data model. | |
| Data model (all keys prefixed with ``deepagent:``): | |
| - ``queue:default`` LIST pending job IDs (FIFO via LPUSH/BRPOPLPUSH) | |
| - ``queue:processing:<worker>`` LIST jobs claimed by a worker (crash recovery) | |
| - ``queue:dead`` LIST poison messages (job IDs) | |
| - ``job:<id>`` HASH job metadata | |
| - ``job:<id>:logs`` LIST log lines, capped at 500 | |
| - ``dedup:<delivery_id>`` STRING TTL 10 min — webhook idempotency | |
| - ``repo_lock:<repo>`` STRING TTL 30 min — per-repo mutex | |
| - ``retries:<id>`` STRING counter | |
| """ | |
| from __future__ import annotations | |
| import enum | |
| import json | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| from dataclasses import asdict, dataclass, field | |
| from typing import Any, Optional | |
| import redis | |
| log = logging.getLogger(__name__) | |
| class JobStatus(str, enum.Enum): | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| SUCCEEDED = "succeeded" | |
| FAILED = "failed" | |
| DEAD = "dead" # exceeded retries | |
| SKIPPED = "skipped" # dedup hit / repo locked | |
| class Job: | |
| """Serializable unit of work pushed onto the queue.""" | |
| id: str | |
| event: str | |
| repo_full_name: str | |
| payload: dict | |
| installation_id: Optional[int] = None | |
| delivery_id: Optional[str] = None | |
| status: JobStatus = JobStatus.PENDING | |
| attempts: int = 0 | |
| created_at: float = field(default_factory=time.time) | |
| started_at: Optional[float] = None | |
| finished_at: Optional[float] = None | |
| error: Optional[str] = None | |
| result: Optional[dict] = None | |
| def new( | |
| cls, | |
| *, | |
| event: str, | |
| repo_full_name: str, | |
| payload: dict, | |
| installation_id: Optional[int] = None, | |
| delivery_id: Optional[str] = None, | |
| ) -> "Job": | |
| return cls( | |
| id=str(uuid.uuid4()), | |
| event=event, | |
| repo_full_name=repo_full_name, | |
| payload=payload, | |
| installation_id=installation_id, | |
| delivery_id=delivery_id, | |
| ) | |
| def to_redis(self) -> dict[str, str]: | |
| d = asdict(self) | |
| d["payload"] = json.dumps(self.payload) | |
| d["result"] = json.dumps(self.result) if self.result is not None else "" | |
| d["status"] = self.status.value | |
| # Redis HSET only accepts str/bytes/int/float; coerce None/bool to str. | |
| return {k: ("" if v is None else str(v) if not isinstance(v, str) else v) for k, v in d.items()} | |
| def from_redis(cls, d: dict[bytes | str, bytes | str]) -> "Job": | |
| def _s(key: str, default: str = "") -> str: | |
| v = d.get(key) or d.get(key.encode()) | |
| if v is None: | |
| return default | |
| return v.decode() if isinstance(v, bytes) else str(v) | |
| return cls( | |
| id=_s("id"), | |
| event=_s("event"), | |
| repo_full_name=_s("repo_full_name"), | |
| payload=json.loads(_s("payload") or "{}"), | |
| installation_id=int(_s("installation_id")) if _s("installation_id") else None, | |
| delivery_id=_s("delivery_id") or None, | |
| status=JobStatus(_s("status", "pending")), | |
| attempts=int(_s("attempts", "0")), | |
| created_at=float(_s("created_at", "0") or 0), | |
| started_at=float(_s("started_at")) if _s("started_at") else None, | |
| finished_at=float(_s("finished_at")) if _s("finished_at") else None, | |
| error=_s("error") or None, | |
| result=json.loads(_s("result")) if _s("result") else None, | |
| ) | |
| class JobQueue: | |
| """Thin Redis-backed FIFO with idempotency, per-repo locking, retries and DLQ.""" | |
| KEY_PREFIX = "deepagent" | |
| QUEUE_KEY = f"{KEY_PREFIX}:queue:default" | |
| DLQ_KEY = f"{KEY_PREFIX}:queue:dead" | |
| DEDUP_TTL = 600 # 10 min | |
| REPO_LOCK_TTL = 1800 # 30 min | |
| JOB_TTL = 24 * 3600 # 1 day | |
| LOG_CAP = 500 | |
| MAX_ATTEMPTS = 3 | |
| def __init__(self, url: Optional[str] = None, *, max_attempts: Optional[int] = None): | |
| self._url = url or os.getenv("DEEPAGENT_REDIS_URL", "redis://localhost:6379/0") | |
| self._r = redis.Redis.from_url(self._url) | |
| self.max_attempts = max_attempts or int(os.getenv("DEEPAGENT_MAX_ATTEMPTS", str(self.MAX_ATTEMPTS))) | |
| # ---- low-level keys | |
| def _job_key(self, jid: str) -> str: return f"{self.KEY_PREFIX}:job:{jid}" | |
| def _logs_key(self, jid: str) -> str: return f"{self.KEY_PREFIX}:job:{jid}:logs" | |
| def _dedup_key(self, did: str) -> str: return f"{self.KEY_PREFIX}:dedup:{did}" | |
| def _lock_key(self, repo: str) -> str: return f"{self.KEY_PREFIX}:repo_lock:{repo}" | |
| def _processing_key(self, worker: str) -> str: return f"{self.KEY_PREFIX}:queue:processing:{worker}" | |
| # ---- health | |
| def ping(self) -> bool: | |
| try: | |
| return bool(self._r.ping()) | |
| except redis.RedisError: | |
| return False | |
| # ---- dedup | |
| def already_seen(self, delivery_id: str) -> bool: | |
| """Returns True iff this delivery was already processed (within TTL).""" | |
| if not delivery_id: | |
| return False | |
| # SET NX EX: atomic check-and-mark. | |
| return not bool(self._r.set(self._dedup_key(delivery_id), "1", nx=True, ex=self.DEDUP_TTL)) | |
| # ---- enqueue | |
| def enqueue(self, job: Job) -> str: | |
| """Persist the job and push it on the queue. Returns the job id.""" | |
| pipe = self._r.pipeline() | |
| pipe.hset(self._job_key(job.id), mapping=job.to_redis()) | |
| pipe.expire(self._job_key(job.id), self.JOB_TTL) | |
| pipe.lpush(self.QUEUE_KEY, job.id) | |
| # Per-installation index (newest-first, capped) — used for tenant-scoped listings. | |
| if job.installation_id is not None: | |
| idx_key = self._install_index_key(job.installation_id) | |
| pipe.lpush(idx_key, job.id) | |
| pipe.ltrim(idx_key, 0, 999) | |
| pipe.expire(idx_key, self.JOB_TTL) | |
| pipe.execute() | |
| return job.id | |
| def _install_index_key(self, installation_id: int | str) -> str: | |
| return f"{self.KEY_PREFIX}:install_idx:{installation_id}" | |
| def list_for_installation(self, installation_id: int | str, limit: int = 100) -> list["Job"]: | |
| """Return the N most-recent jobs for an installation (newest first).""" | |
| raw = self._r.lrange(self._install_index_key(installation_id), 0, limit - 1) | |
| out: list[Job] = [] | |
| for b in raw: | |
| jid = b.decode() if isinstance(b, bytes) else b | |
| j = self.get(jid) | |
| if j: | |
| out.append(j) | |
| return out | |
| # ---- dequeue (worker) | |
| def claim(self, worker_id: str, timeout: int = 5) -> Optional[Job]: | |
| """Atomically move a job from the pending queue to this worker's | |
| processing list (for crash recovery), then return it as a Job.""" | |
| # BRPOPLPUSH atomically pops from QUEUE_KEY and pushes to processing list. | |
| raw = self._r.brpoplpush(self.QUEUE_KEY, self._processing_key(worker_id), timeout=timeout) | |
| if raw is None: | |
| return None | |
| jid = raw.decode() if isinstance(raw, bytes) else raw | |
| d = self._r.hgetall(self._job_key(jid)) | |
| if not d: | |
| # Stale id in queue (TTL expired). Drop it. | |
| self._r.lrem(self._processing_key(worker_id), 1, jid) | |
| return None | |
| return Job.from_redis(d) | |
| def ack(self, worker_id: str, job: Job) -> None: | |
| """Remove from the processing list (job done, success or final failure).""" | |
| self._r.lrem(self._processing_key(worker_id), 1, job.id) | |
| def recover_orphans(self, worker_id: str) -> int: | |
| """Move any orphaned jobs back to the pending queue (crash recovery). | |
| Called on worker startup to recover from a previous crash of this worker. | |
| """ | |
| n = 0 | |
| while True: | |
| raw = self._r.rpoplpush(self._processing_key(worker_id), self.QUEUE_KEY) | |
| if raw is None: | |
| return n | |
| n += 1 | |
| log.warning("recovered orphan job from %s", worker_id) | |
| # ---- per-repo locking | |
| def acquire_repo_lock(self, repo: str, owner: str) -> bool: | |
| """Try to grab an exclusive lock on this repo. TTL prevents deadlocks.""" | |
| return bool(self._r.set(self._lock_key(repo), owner, nx=True, ex=self.REPO_LOCK_TTL)) | |
| def release_repo_lock(self, repo: str, owner: str) -> None: | |
| """Release the lock iff we still own it (avoid releasing after TTL expiry). | |
| Uses Lua EVAL for atomicity on real Redis; falls back to a WATCH/MULTI/EXEC | |
| transaction when EVAL is unavailable (e.g. fakeredis in tests). | |
| """ | |
| key = self._lock_key(repo) | |
| # Lua for compare-and-delete. | |
| script = ( | |
| "if redis.call('get', KEYS[1]) == ARGV[1] then " | |
| "return redis.call('del', KEYS[1]) else return 0 end" | |
| ) | |
| try: | |
| self._r.eval(script, 1, key, owner) | |
| return | |
| except redis.RedisError as e: # pragma: no cover | |
| log.debug("release_repo_lock EVAL fallback: %s", e) | |
| # Fallback: WATCH-based transaction (slightly less atomic but acceptable). | |
| try: | |
| with self._r.pipeline() as pipe: | |
| while True: | |
| try: | |
| pipe.watch(key) | |
| current = pipe.get(key) | |
| if (current.decode() if isinstance(current, bytes) else current) != owner: | |
| pipe.unwatch() | |
| return | |
| pipe.multi() | |
| pipe.delete(key) | |
| pipe.execute() | |
| return | |
| except redis.WatchError: | |
| continue | |
| except Exception as e: # pragma: no cover | |
| log.warning("release_repo_lock fallback failed: %s", e) | |
| # ---- status updates | |
| def update(self, job: Job, **fields: Any) -> None: | |
| for k, v in fields.items(): | |
| setattr(job, k, v) | |
| self._r.hset(self._job_key(job.id), mapping=job.to_redis()) | |
| self._r.expire(self._job_key(job.id), self.JOB_TTL) | |
| # Broadcast terminal/transition events to SSE subscribers. | |
| if "status" in fields: | |
| self.publish_status(job) | |
| def append_log(self, job_id: str, line: str) -> None: | |
| """Persist a log line and publish it on the per-job channel for SSE streaming.""" | |
| truncated = line[:2000] | |
| key = self._logs_key(job_id) | |
| chan = self._stream_chan(job_id) | |
| pipe = self._r.pipeline() | |
| pipe.rpush(key, truncated) | |
| pipe.ltrim(key, -self.LOG_CAP, -1) | |
| pipe.expire(key, self.JOB_TTL) | |
| pipe.publish(chan, truncated) | |
| pipe.execute() | |
| def _stream_chan(self, job_id: str) -> str: | |
| return f"{self.KEY_PREFIX}:stream:{job_id}" | |
| def publish_status(self, job: "Job") -> None: | |
| """Broadcast a JSON status update to subscribers.""" | |
| try: | |
| import json as _json | |
| self._r.publish( | |
| self._stream_chan(job.id), | |
| _json.dumps({"_status": job.status.value, "_error": job.error, | |
| "_result": job.result, "_finished_at": job.finished_at}), | |
| ) | |
| except Exception: # pragma: no cover | |
| pass | |
| def subscribe_logs(self, job_id: str): | |
| """Yield log lines as they're published. Caller is responsible for breaking | |
| out of the loop once it sees a ``_status`` terminal message. | |
| """ | |
| pubsub = self._r.pubsub(ignore_subscribe_messages=True) | |
| pubsub.subscribe(self._stream_chan(job_id)) | |
| try: | |
| for msg in pubsub.listen(): | |
| data = msg.get("data") | |
| if isinstance(data, bytes): | |
| data = data.decode("utf-8", errors="replace") | |
| yield data | |
| finally: | |
| try: | |
| pubsub.unsubscribe() | |
| pubsub.close() | |
| except Exception: | |
| pass | |
| # ---- retry / DLQ | |
| def retry_or_dead(self, job: Job, error: str) -> bool: | |
| """Re-queue the job (with backoff) or move it to the DLQ. | |
| Returns True if re-queued, False if sent to DLQ. | |
| """ | |
| job.attempts += 1 | |
| if job.attempts >= self.max_attempts: | |
| self.update(job, status=JobStatus.DEAD, error=error, finished_at=time.time()) | |
| self._r.lpush(self.DLQ_KEY, job.id) | |
| return False | |
| # exponential backoff: 2s, 8s, 32s ... | |
| delay = 2 ** (2 * job.attempts - 1) | |
| log.info("retrying job %s in %ds (attempt %d)", job.id, delay, job.attempts + 1) | |
| self.update(job, status=JobStatus.PENDING, error=error) | |
| # Cheap: rely on the worker pool sleep. Real backoff would use a delayed queue. | |
| time.sleep(min(delay, 10)) # cap to keep things responsive | |
| self._r.lpush(self.QUEUE_KEY, job.id) | |
| return True | |
| # ---- introspection | |
| def get(self, jid: str) -> Optional[Job]: | |
| d = self._r.hgetall(self._job_key(jid)) | |
| return Job.from_redis(d) if d else None | |
| def get_logs(self, jid: str, tail: int = 200) -> list[str]: | |
| raw = self._r.lrange(self._logs_key(jid), -tail, -1) | |
| return [b.decode() if isinstance(b, bytes) else b for b in raw] | |
| def stats(self) -> dict[str, int]: | |
| return { | |
| "queue_depth": self._r.llen(self.QUEUE_KEY), | |
| "dead_letter": self._r.llen(self.DLQ_KEY), | |
| } | |
| def list_dead(self, limit: int = 50) -> list[Job]: | |
| raw = self._r.lrange(self.DLQ_KEY, 0, limit - 1) | |
| out = [] | |
| for b in raw: | |
| jid = b.decode() if isinstance(b, bytes) else b | |
| j = self.get(jid) | |
| if j: | |
| out.append(j) | |
| return out | |
| def requeue_dead(self, jid: str) -> bool: | |
| """Move a DLQ job back to the pending queue and reset its attempts.""" | |
| if self._r.lrem(self.DLQ_KEY, 1, jid) == 0: | |
| return False | |
| job = self.get(jid) | |
| if not job: | |
| return False | |
| self.update(job, status=JobStatus.PENDING, attempts=0, error=None) | |
| self._r.lpush(self.QUEUE_KEY, jid) | |
| return True | |