Spaces:
Sleeping
Sleeping
| """FastAPI server: GitHub webhooks + multi-tenant admin API. | |
| Two auth paths: | |
| - ``POST /webhook``: HMAC signature only (no user auth — GitHub calls us) | |
| - All other endpoints: ``Authorization: Bearer <token>`` | |
| * GitHub user access token (validated against GitHub API, scopes to user's | |
| installations) | |
| * Admin token (``DEEPAGENT_ADMIN_TOKEN``) bypasses scoping | |
| Set ``DEEPAGENT_AUTH_DISABLED=1`` to turn off auth entirely (dev only). | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import json | |
| import logging | |
| import os | |
| from dataclasses import asdict | |
| from typing import Any, Optional | |
| from fastapi import Depends, FastAPI, HTTPException, Header, Request | |
| from fastapi.responses import JSONResponse, Response, StreamingResponse | |
| from prometheus_client import CONTENT_TYPE_LATEST, generate_latest | |
| from ..config import get_settings | |
| from ..observability import setup_observability | |
| from ..observability.audit import audit_log, get_audit_store | |
| from ..observability.cost_tenant import get_store as get_tenant_cost_store | |
| from ..observability.logging_setup import get_logger | |
| from ..observability.metrics import DLQ_SIZE, JOBS_TOTAL, QUEUE_DEPTH, QUOTA_REJECTIONS | |
| from ..observability.tracing import current_traceparent, span | |
| from .auth_tokens import UserContext, get_verifier | |
| from .roles import Role, effective_role, get_role_store | |
| setup_observability(service_name="gh-deepagent-webhook") | |
| log = get_logger("gh_deepagent.webhook") | |
| stdlib_log = logging.getLogger("gh_deepagent.webhook") | |
| stdlib_log.setLevel(logging.INFO) | |
| # ============================================================== DEPENDENCIES | |
| def _bearer_token(authorization: Optional[str]) -> Optional[str]: | |
| if not authorization: | |
| return None | |
| parts = authorization.split(None, 1) | |
| if len(parts) == 2 and parts[0].lower() == "bearer": | |
| return parts[1].strip() | |
| return None | |
| def require_user(authorization: Optional[str] = Header(default=None)) -> UserContext: | |
| """FastAPI dependency: returns the authenticated user or raises 401.""" | |
| tok = _bearer_token(authorization) | |
| ctx = get_verifier().verify(tok) | |
| if ctx is None: | |
| raise HTTPException(401, "Invalid or missing bearer token.") | |
| return ctx | |
| def require_admin(user: UserContext = Depends(require_user)) -> UserContext: | |
| if not user.is_admin: | |
| raise HTTPException(403, "Admin privileges required.") | |
| return user | |
| def require_role_on_installation(installation_id: int, user: UserContext, role: Role) -> Role: | |
| """Return the user's effective role, or raise 403/404 if insufficient. | |
| 404 (not 403) for non-members so we don't leak installation existence. | |
| """ | |
| eff = effective_role(user, installation_id) | |
| if eff is None: | |
| raise HTTPException(404, "installation not found") | |
| if not eff.can(role): | |
| raise HTTPException(403, f"requires role {role.value}, you are {eff.value}") | |
| return eff | |
| # ============================================================== APP | |
| def _make_app() -> FastAPI: | |
| app = FastAPI(title="gh-deepagent webhook", version="0.5.0") | |
| from ..queue import Job, JobQueue | |
| from ..queue.quota import QuotaManager | |
| queue = JobQueue() | |
| quotas = QuotaManager(client=queue._r) | |
| # ------------ helpers ---------- | |
| def _verify_signature(body: bytes, header_sig: str | None) -> None: | |
| secret = os.getenv("DEEPAGENT_WEBHOOK_SECRET", "") | |
| if not secret: | |
| return | |
| if not header_sig or not header_sig.startswith("sha256="): | |
| raise HTTPException(401, "Missing or malformed X-Hub-Signature-256.") | |
| expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() | |
| if not hmac.compare_digest(expected, header_sig): | |
| raise HTTPException(401, "Invalid webhook signature.") | |
| def _scope_or_404(job: Optional[Job], user: UserContext, | |
| min_role: Role = Role.VIEWER) -> Job: | |
| """Validate access to a job, checking role on its installation.""" | |
| if not job: | |
| raise HTTPException(404, "job not found") | |
| if user.is_admin: | |
| return job | |
| if job.installation_id is None: | |
| # Legacy PAT-mode jobs — only admins see them. | |
| raise HTTPException(404, "job not found") | |
| eff = effective_role(user, job.installation_id) | |
| if eff is None: | |
| raise HTTPException(404, "job not found") | |
| if not eff.can(min_role): | |
| raise HTTPException(403, f"requires role {min_role.value}, you are {eff.value}") | |
| return job | |
| # ============================================================ public | |
| async def healthz(): | |
| redis_ok = queue.ping() | |
| stats = queue.stats() if redis_ok else {"queue_depth": -1, "dead_letter": -1} | |
| return JSONResponse( | |
| {"status": "ok" if redis_ok else "degraded", "redis": redis_ok, **stats, "version": "0.5.0"}, | |
| status_code=200 if redis_ok else 503, | |
| ) | |
| # ============================================================ admin-only | |
| async def metrics(_admin: UserContext = Depends(require_admin)): | |
| try: | |
| s = queue.stats() | |
| QUEUE_DEPTH.set(s["queue_depth"]) | |
| DLQ_SIZE.set(s["dead_letter"]) | |
| except Exception: | |
| log.exception("metrics stats failed") | |
| return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) | |
| async def dlq(limit: int = 50, _admin: UserContext = Depends(require_admin)): | |
| return [ | |
| {"id": j.id, "event": j.event, "repo": j.repo_full_name, | |
| "error": j.error, "attempts": j.attempts, | |
| "installation_id": j.installation_id} | |
| for j in queue.list_dead(limit=limit) | |
| ] | |
| async def requeue_dead(job_id: str, user: UserContext = Depends(require_user)): | |
| """Requeue a DLQ job. Allowed to global admins OR install operators.""" | |
| job = queue.get(job_id) | |
| if not job: | |
| raise HTTPException(404, "job not in DLQ") | |
| # Permission check: global admin OR operator on the job's installation. | |
| if not user.is_admin: | |
| require_role_on_installation(job.installation_id, user, Role.OPERATOR) | |
| ok = queue.requeue_dead(job_id) | |
| if not ok: | |
| raise HTTPException(404, "job not in DLQ") | |
| audit_log( | |
| actor=user.login, via=user.via, action="dlq.requeue", | |
| target=job_id, installation_id=job.installation_id, | |
| ) | |
| return {"requeued": True, "job_id": job_id} | |
| # ============================================================ webhook (GitHub HMAC) | |
| async def webhook( | |
| request: Request, | |
| x_hub_signature_256: str | None = Header(default=None), | |
| x_github_event: str | None = Header(default=None), | |
| x_github_delivery: str | None = Header(default=None), | |
| ): | |
| body = await request.body() | |
| _verify_signature(body, x_hub_signature_256) | |
| if not x_github_event: | |
| raise HTTPException(400, "Missing X-GitHub-Event header.") | |
| try: | |
| payload = json.loads(body) | |
| except json.JSONDecodeError as e: | |
| raise HTTPException(400, f"Invalid JSON: {e}") from e | |
| if x_github_delivery and queue.already_seen(x_github_delivery): | |
| JOBS_TOTAL.labels(x_github_event, "deduped").inc() | |
| return {"deduped": True} | |
| settings = get_settings() | |
| if not _is_actionable(x_github_event, payload, settings): | |
| JOBS_TOTAL.labels(x_github_event, "ignored").inc() | |
| return {"ignored": True, "reason": "event does not match triggers"} | |
| repo_full = payload["repository"]["full_name"] | |
| installation_id = (payload.get("installation") or {}).get("id") | |
| decision = quotas.check_and_consume(installation_id) | |
| if not decision.allowed: | |
| QUOTA_REJECTIONS.labels(str(installation_id or "anon")).inc() | |
| JOBS_TOTAL.labels(x_github_event, "quota_rejected").inc() | |
| raise HTTPException( | |
| 429, | |
| detail={ | |
| "error": "quota_exceeded", "bucket": decision.bucket, | |
| "current": decision.current, "limit": decision.limit, | |
| "retry_after_seconds": decision.retry_after_seconds, | |
| }, | |
| headers={"Retry-After": str(decision.retry_after_seconds)}, | |
| ) | |
| with span("job.enqueue", repo=repo_full, gh_event=x_github_event, | |
| installation_id=installation_id) as _s: | |
| traceparent = current_traceparent() | |
| payload_with_trace = dict(payload) | |
| if traceparent: | |
| payload_with_trace.setdefault("_deepagent", {})["traceparent"] = traceparent | |
| job = Job.new( | |
| event=x_github_event, repo_full_name=repo_full, | |
| payload=payload_with_trace, installation_id=installation_id, | |
| delivery_id=x_github_delivery, | |
| ) | |
| queue.enqueue(job) | |
| if _s is not None: | |
| try: _s.set_attribute("job.id", job.id) | |
| except Exception: pass | |
| JOBS_TOTAL.labels(x_github_event, "queued").inc() | |
| QUEUE_DEPTH.set(queue.stats()["queue_depth"]) | |
| log.info("job enqueued", job_id=job.id, repo=repo_full, gh_event=x_github_event) | |
| audit_log( | |
| actor="github", via="webhook", action="job.create", | |
| target=job.id, installation_id=installation_id, | |
| event=x_github_event, repo=repo_full, | |
| ) | |
| return {"accepted": True, "job_id": job.id, "event": x_github_event} | |
| # ============================================================ user-scoped | |
| async def whoami(user: UserContext = Depends(require_user)): | |
| return { | |
| "login": user.login, | |
| "is_admin": user.is_admin, | |
| "via": user.via, | |
| "installation_ids": sorted(user.installation_ids), | |
| } | |
| async def get_job(job_id: str, user: UserContext = Depends(require_user)): | |
| job = _scope_or_404(queue.get(job_id), user) | |
| d = asdict(job) | |
| d["status"] = job.status.value | |
| return d | |
| async def get_job_logs(job_id: str, tail: int = 200, | |
| user: UserContext = Depends(require_user)): | |
| _scope_or_404(queue.get(job_id), user) | |
| return {"job_id": job_id, "lines": queue.get_logs(job_id, tail=tail)} | |
| async def stream_job_logs(job_id: str, replay: bool = True, | |
| user: UserContext = Depends(require_user)): | |
| _scope_or_404(queue.get(job_id), user) | |
| def _sse(event: str, data: str) -> bytes: | |
| payload = data.replace("\n", "\ndata: ") | |
| return f"event: {event}\ndata: {payload}\n\n".encode() | |
| def _gen(): | |
| if replay: | |
| for line in queue.get_logs(job_id, tail=queue.LOG_CAP): | |
| yield _sse("log", line) | |
| current = queue.get(job_id) | |
| if current and current.status.value in {"succeeded", "failed", "dead"}: | |
| yield _sse("status", json.dumps({ | |
| "status": current.status.value, | |
| "result": current.result, "error": current.error, | |
| })) | |
| return | |
| heartbeat = 0 | |
| for chunk in queue.subscribe_logs(job_id): | |
| if chunk is None: | |
| heartbeat += 1 | |
| if heartbeat % 15 == 0: | |
| yield b": keepalive\n\n" | |
| continue | |
| if chunk.startswith("{") and "_status" in chunk: | |
| try: | |
| data = json.loads(chunk) | |
| yield _sse("status", json.dumps({ | |
| "status": data.get("_status"), | |
| "result": data.get("_result"), | |
| "error": data.get("_error"), | |
| })) | |
| if data.get("_status") in {"succeeded", "failed", "dead"}: | |
| return | |
| except Exception: | |
| yield _sse("log", chunk) | |
| else: | |
| yield _sse("log", chunk) | |
| return StreamingResponse( | |
| _gen(), media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", | |
| "Connection": "keep-alive"}, | |
| ) | |
| async def installation_quota(installation_id: int, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.VIEWER) | |
| return {"installation_id": installation_id, "usage": quotas.usage(installation_id)} | |
| async def installation_jobs(installation_id: int, limit: int = 100, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.VIEWER) | |
| jobs = queue.list_for_installation(installation_id, limit=limit) | |
| return [ | |
| {"id": j.id, "event": j.event, "repo": j.repo_full_name, | |
| "status": j.status.value, "created_at": j.created_at, | |
| "finished_at": j.finished_at, "error": j.error, | |
| "attempts": j.attempts} | |
| for j in jobs | |
| ] | |
| # ---------- A. Per-tenant cost ---------- | |
| async def installation_cost(installation_id: int, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.VIEWER) | |
| return get_tenant_cost_store().usage(installation_id) | |
| async def reset_installation_cost(installation_id: int, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.ADMIN) | |
| get_tenant_cost_store().reset(installation_id) | |
| audit_log(actor=user.login, via=user.via, action="cost.reset", | |
| installation_id=installation_id) | |
| return {"reset": True, "installation_id": installation_id} | |
| async def list_installations(user: UserContext = Depends(require_user)): | |
| """List installations visible to this user with the cost store knowledge.""" | |
| if user.is_admin: | |
| iids = sorted(set(_all_installations(queue)) | |
| | set(get_tenant_cost_store().list_installations())) | |
| else: | |
| iids = sorted(user.installation_ids) | |
| return [ | |
| {"installation_id": iid, | |
| "role": (effective_role(user, iid).value if effective_role(user, iid) else None)} | |
| for iid in iids | |
| ] | |
| # ---------- B. Roles per installation ---------- | |
| async def list_roles(installation_id: int, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.VIEWER) | |
| roles = get_role_store().list(installation_id) | |
| return { | |
| "installation_id": installation_id, | |
| "roles": {login: r.value for login, r in roles.items()}, | |
| } | |
| async def set_role(installation_id: int, login: str, | |
| role: str, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.ADMIN) | |
| parsed = Role.parse(role) | |
| if parsed is None: | |
| raise HTTPException(400, f"invalid role {role!r}; must be viewer/operator/admin") | |
| assignment = get_role_store().set(installation_id, login, parsed, granted_by=user.login) | |
| audit_log(actor=user.login, via=user.via, action="role.grant", | |
| target=login, installation_id=installation_id, | |
| role=parsed.value) | |
| return {"installation_id": installation_id, "login": assignment.login, | |
| "role": assignment.role.value, "granted_by": assignment.granted_by} | |
| async def remove_role(installation_id: int, login: str, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.ADMIN) | |
| if get_role_store().remove(installation_id, login, removed_by=user.login): | |
| audit_log(actor=user.login, via=user.via, action="role.revoke", | |
| target=login, installation_id=installation_id) | |
| return {"removed": True} | |
| raise HTTPException(404, "no such role assignment") | |
| # ---------- C. Audit log ---------- | |
| async def audit_global(limit: int = 200, _admin: UserContext = Depends(require_admin)): | |
| return get_audit_store().tail_global(limit=limit) | |
| async def audit_for_installation(installation_id: int, limit: int = 200, | |
| user: UserContext = Depends(require_user)): | |
| require_role_on_installation(installation_id, user, Role.VIEWER) | |
| return get_audit_store().tail_for_installation(installation_id, limit=limit) | |
| async def list_jobs(limit_per_install: int = 50, | |
| user: UserContext = Depends(require_user)): | |
| """List recent jobs across every installation the user can see.""" | |
| out: list[dict] = [] | |
| for iid in (sorted(user.installation_ids) if not user.is_admin else _all_installations(queue)): | |
| for j in queue.list_for_installation(iid, limit=limit_per_install): | |
| out.append({ | |
| "id": j.id, "event": j.event, "repo": j.repo_full_name, | |
| "status": j.status.value, "created_at": j.created_at, | |
| "installation_id": j.installation_id, | |
| }) | |
| out.sort(key=lambda d: d["created_at"], reverse=True) | |
| return out | |
| return app | |
| def _all_installations(queue) -> list[int]: | |
| """For admins: scan Redis for known installation indices.""" | |
| out: list[int] = [] | |
| try: | |
| for key in queue._r.scan_iter(match=f"{queue.KEY_PREFIX}:install_idx:*", count=100): | |
| k = key.decode() if isinstance(key, bytes) else key | |
| try: | |
| out.append(int(k.rsplit(":", 1)[-1])) | |
| except ValueError: | |
| continue | |
| except Exception: | |
| pass | |
| return sorted(set(out)) | |
| def _is_actionable(event: str, payload: dict, settings) -> bool: | |
| if event == "issues": | |
| action = payload.get("action") | |
| labels = [lbl["name"] for lbl in payload["issue"].get("labels", [])] | |
| if action == "labeled" and payload["label"]["name"] == settings.trigger_label: | |
| return True | |
| if action == "opened" and settings.trigger_label in labels: | |
| return True | |
| return False | |
| if event == "issue_comment": | |
| if payload.get("action") != "created": | |
| return False | |
| body = (payload["comment"]["body"] or "").strip() | |
| return body.startswith(settings.command_prefix) | |
| if event == "pull_request": | |
| action = payload.get("action") | |
| labels = [lbl["name"] for lbl in payload["pull_request"].get("labels", [])] | |
| return action in ("labeled", "opened") and settings.review_label in labels | |
| return False | |
| app = _make_app() | |