Spaces:
Running
Running
| """GitHub integration for the loom critique service. | |
| Handles GitHub OAuth token exchange, encrypted storage, repo/branch/content | |
| proxying, workspace CRUD, git commit/push/PR operations, and the public | |
| workspace registry. All GitHub API calls go through stdlib urllib (no httpx | |
| at module level) to stay compatible with the stdlib-only server. | |
| Env vars required: | |
| GITHUB_CLIENT_ID — GitHub OAuth App client ID | |
| GITHUB_CLIENT_SECRET — GitHub OAuth App client secret | |
| ENCRYPTION_KEY — Fernet key for token encryption (auto-generated if absent) | |
| The module is stateless — all durable state lives in ``db.py``. Token | |
| encryption/decryption is handled by ``crypto.py``. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import secrets | |
| import time | |
| import urllib.error | |
| import urllib.request | |
| from pathlib import Path | |
| import crypto | |
| import db | |
| import jwt_auth | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| GITHUB_AUTHORIZE_URL = "https://github.com/login/oauth/authorize" | |
| GITHUB_TOKEN_URL = "https://github.com/login/oauth/access_token" | |
| GITHUB_API = "https://api.github.com" | |
| # Scopes needed for repo access + push + PR creation | |
| GITHUB_SCopes = "read:user user:email repo" | |
| def _github_configured() -> bool: | |
| return bool( | |
| os.environ.get("GITHUB_CLIENT_ID", "").strip() | |
| and os.environ.get("GITHUB_CLIENT_SECRET", "").strip() | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # GitHub OAuth flow | |
| # --------------------------------------------------------------------------- | |
| def make_github_authorize_url(state: str) -> str: | |
| """Build the GitHub OAuth authorize URL with state token.""" | |
| from urllib.parse import urlencode | |
| params = urlencode({ | |
| "client_id": os.environ.get("GITHUB_CLIENT_ID", ""), | |
| "scope": GITHUB_SCopes, | |
| "state": state, | |
| }) | |
| return f"{GITHUB_AUTHORIZE_URL}?{params}" | |
| def exchange_github_code(code: str) -> str: | |
| """Exchange OAuth code for access token. Returns the raw token string. | |
| Raises RuntimeError on failure. | |
| """ | |
| from urllib.parse import urlencode | |
| body = urlencode({ | |
| "client_id": os.environ.get("GITHUB_CLIENT_ID", ""), | |
| "client_secret": os.environ.get("GITHUB_CLIENT_SECRET", ""), | |
| "code": code, | |
| }).encode() | |
| req = urllib.request.Request(GITHUB_TOKEN_URL, data=body, method="POST") | |
| req.add_header("Accept", "application/json") | |
| req.add_header("Content-Type", "application/x-www-form-urlencoded") | |
| try: | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| data = json.loads(resp.read().decode()) | |
| except urllib.error.HTTPError as exc: | |
| detail = exc.read().decode(errors="replace")[:400] | |
| raise RuntimeError(f"GitHub token exchange failed (HTTP {exc.code}): {detail}") from exc | |
| token = data.get("access_token", "").strip() | |
| if not token: | |
| raise RuntimeError(f"GitHub token exchange returned no access_token: {data}") | |
| return token | |
| def get_github_user(token: str) -> dict: | |
| """Fetch authenticated user info from GitHub API. Returns dict with | |
| 'id', 'login', 'avatar_url', etc.""" | |
| return _github_api("/user", token=token) | |
| def _valid_user_id(user_id: str | None) -> bool: | |
| """Return True if user_id looks like a valid 16-char hex ID. | |
| Rejects JWT strings (contain dots) and other garbage.""" | |
| if not user_id or not isinstance(user_id, str): | |
| return False | |
| if len(user_id) != 16: | |
| return False | |
| if "." in user_id: | |
| return False | |
| try: | |
| int(user_id, 16) | |
| return True | |
| except ValueError: | |
| return False | |
| def upsert_user_from_github(github_token: str, user_id: str | None = None) -> dict: | |
| """Fetch GitHub user info, upsert into DB, return the user row. | |
| If ``user_id`` is None, a deterministic ID is derived from the GitHub user id | |
| (via ``jwt_auth.derive_user_id``) so the row survives DB rebuilds. | |
| """ | |
| info = get_github_user(github_token) | |
| encrypted = crypto.encrypt_token(github_token) | |
| if _valid_user_id(user_id): | |
| uid = user_id | |
| else: | |
| uid = jwt_auth.derive_user_id(info["id"]) | |
| return db.upsert_user( | |
| user_id=uid, | |
| github_id=info["id"], | |
| github_username=info["login"], | |
| github_token_encrypted=encrypted, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # GitHub API proxy (uses stored token from DB) | |
| # --------------------------------------------------------------------------- | |
| def _github_api(path: str, *, token: str = "", method: str = "GET", | |
| body: dict | None = None) -> dict | list: | |
| """Raw GitHub API call via stdlib urllib.""" | |
| url = f"{GITHUB_API}{path}" | |
| data = json.dumps(body).encode() if body is not None else None | |
| req = urllib.request.Request(url, data=data, method=method) | |
| req.add_header("Accept", "application/vnd.github+json") | |
| req.add_header("X-GitHub-Api-Version", "2022-11-28") | |
| if token: | |
| req.add_header("Authorization", f"Bearer {token}") | |
| try: | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| raw = resp.read().decode() | |
| return json.loads(raw) if raw.strip() else {} | |
| except urllib.error.HTTPError as exc: | |
| detail = exc.read().decode(errors="replace")[:400] | |
| raise RuntimeError(f"GitHub API {path} failed (HTTP {exc.code}): {detail}") from exc | |
| def _token_for_user(user_id: str) -> str: | |
| """Retrieve and decrypt a user's stored GitHub token. Raises if missing.""" | |
| user = db.get_user(user_id) | |
| if not user or not user.get("github_token_encrypted"): | |
| raise RuntimeError("GitHub not connected — please link your GitHub account") | |
| return crypto.decrypt_token(user["github_token_encrypted"]) | |
| def list_user_repos(user_id: str, *, page: int = 1, per_page: int = 30, | |
| sort: str = "updated") -> list[dict]: | |
| """List the authenticated user's repositories.""" | |
| token = _token_for_user(user_id) | |
| path = f"/user/repos?page={page}&per_page={per_page}&sort={sort}&type=all" | |
| result = _github_api(path, token=token) | |
| # return slim fields the frontend needs | |
| return [ | |
| { | |
| "full_name": r["full_name"], | |
| "name": r["name"], | |
| "owner": r["owner"]["login"], | |
| "private": r["private"], | |
| "default_branch": r["default_branch"], | |
| "description": r.get("description") or "", | |
| "updated_at": r["updated_at"], | |
| "clone_url": r["clone_url"], | |
| } | |
| for r in (result if isinstance(result, list) else []) | |
| ] | |
| def list_repo_branches(user_id: str, owner: str, repo: str) -> list[dict]: | |
| """List branches for a repo.""" | |
| token = _token_for_user(user_id) | |
| result = _github_api(f"/repos/{owner}/{repo}/branches?per_page=100", token=token) | |
| return [ | |
| {"name": b["name"], "sha": b["commit"]["sha"]} | |
| for b in (result if isinstance(result, list) else []) | |
| ] | |
| def get_file_content(user_id: str, owner: str, repo: str, | |
| path: str, ref: str = "main") -> dict: | |
| """Fetch a single file's content from a repo.""" | |
| token = _token_for_user(user_id) | |
| from urllib.parse import quote | |
| result = _github_api( | |
| f"/repos/{owner}/{repo}/contents/{quote(path, safe='')}?ref={ref}", | |
| token=token) | |
| return result # {name, path, content (base64), encoding, sha, ...} | |
| def get_repo_tree(user_id: str, owner: str, repo: str, | |
| ref: str = "main", recursive: bool = True) -> list[dict]: | |
| """Fetch the full file tree of a repo (for context packing).""" | |
| token = _token_for_user(user_id) | |
| suffix = "?recursive=1" if recursive else "" | |
| result = _github_api(f"/repos/{owner}/{repo}/git/trees/{ref}{suffix}", token=token) | |
| return result.get("tree", []) if isinstance(result, dict) else [] | |
| # --------------------------------------------------------------------------- | |
| # Workspace CRUD | |
| # --------------------------------------------------------------------------- | |
| def create_workspace(user_id: str, *, title: str, description: str = "", | |
| source_repo: str | None = None, | |
| source_branch: str | None = None, | |
| source_branches: list[str] | None = None, | |
| visibility: str = "private", | |
| auto_sync: bool = False) -> dict: | |
| """Create a new workspace. If source_repo is given, clone it into the | |
| sandbox directory. source_branches can specify multiple branches to clone; | |
| when source_branch is None and source_branches is None, all branches are cloned.""" | |
| sandbox_base = Path(os.environ.get("WORKSPACE_BASE", "/tmp/workspace")) | |
| ws_id = db._gen_id() | |
| sandbox_path = str(sandbox_base / ws_id) | |
| Path(sandbox_path).mkdir(parents=True, exist_ok=True) | |
| ws = db.create_workspace( | |
| user_id, title=title, description=description, | |
| source_repo=source_repo, source_branch=source_branch, | |
| source_branches=source_branches, | |
| visibility=visibility, auto_sync=auto_sync, | |
| sandbox_path=sandbox_path) | |
| if source_repo: | |
| _clone_repo(user_id, source_repo, source_branch, | |
| dest=sandbox_path, branches=source_branches) | |
| return ws | |
| def list_workspaces(user_id: str) -> list[dict]: | |
| return db.list_user_workspaces(user_id) | |
| def get_workspace(workspace_id: str) -> dict | None: | |
| return db.get_workspace(workspace_id) | |
| def update_workspace(workspace_id: str, **fields) -> dict | None: | |
| return db.update_workspace(workspace_id, **fields) | |
| def delete_workspace(workspace_id: str) -> bool: | |
| ws = db.get_workspace(workspace_id) | |
| if ws and ws.get("sandbox_path"): | |
| import shutil | |
| p = Path(ws["sandbox_path"]) | |
| if p.is_dir(): | |
| shutil.rmtree(p, ignore_errors=True) | |
| return db.delete_workspace(workspace_id) | |
| # --------------------------------------------------------------------------- | |
| # Git operations (commit, push, PR) | |
| # --------------------------------------------------------------------------- | |
| def _run_git(sandbox: str, *args: str) -> str: | |
| """Run a git command in the workspace sandbox. Returns stdout.""" | |
| import subprocess | |
| cmd = ["git", "-C", sandbox] + list(args) | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"git {' '.join(args)} failed: {_sanitize_git_error(result.stderr.strip())}") | |
| return result.stdout.strip() | |
| def _sanitize_git_error(err: str) -> str: | |
| """Strip URLs and auth tokens from git error messages.""" | |
| import re | |
| # redact anything that looks like https://...@... or x-access-token:... | |
| err = re.sub(r'https://[^@]*@', 'https://***@', err) | |
| err = re.sub(r'x-access-token:[^\s]+', 'x-access-token:***', err) | |
| return err | |
| def _clone_repo(user_id: str, repo_url: str, branch: str | None = None, | |
| dest: str | None = None, branches: list[str] | None = None) -> None: | |
| """Clone a GitHub repo into ``dest``. | |
| Three modes: | |
| * ``branch=None`` and ``branches=None`` — full repo clone (all branches). | |
| * ``branch='main'`` and ``branches=None`` — single branch (current behaviour). | |
| * ``branches=['main','dev']`` — clone listed branches (first is default). | |
| """ | |
| import subprocess | |
| from urllib.parse import urlparse | |
| if dest is None: | |
| raise RuntimeError("dest is required for _clone_repo") | |
| token = _token_for_user(user_id) | |
| parsed = urlparse(repo_url) | |
| auth_repo_url = f"{parsed.scheme}://{token}@{parsed.netloc}{parsed.path}" | |
| # Mode A: clone all branches | |
| if branch is None and branches is None: | |
| cmd = [ | |
| "git", "clone", "--no-single-branch", "--depth", "1", | |
| auth_repo_url, dest, | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("git clone timed out") | |
| if result.returncode != 0: | |
| raise RuntimeError(f"git clone failed: {_sanitize_git_error(result.stderr.strip())}") | |
| _strip_remote_token(dest, repo_url) | |
| return | |
| # Mode C: clone specific branches | |
| if branches: | |
| first = branches[0] | |
| cmd = [ | |
| "git", "clone", "--depth", "1", "-b", first, "--single-branch", | |
| auth_repo_url, dest, | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("git clone timed out") | |
| if result.returncode != 0: | |
| raise RuntimeError(f"git clone failed: {_sanitize_git_error(result.stderr.strip())}") | |
| for extra in branches[1:]: | |
| fetch = [ | |
| "git", "-C", dest, "fetch", "origin", | |
| f"{extra}:refs/remotes/origin/{extra}", "--depth", "1", | |
| ] | |
| r = subprocess.run(fetch, capture_output=True, text=True, timeout=60) | |
| if r.returncode != 0: | |
| raise RuntimeError( | |
| f"git fetch {extra} failed: {_sanitize_git_error(r.stderr.strip())}") | |
| co = subprocess.run( | |
| ["git", "-C", dest, "checkout", "-b", extra, f"origin/{extra}"], | |
| capture_output=True, text=True, timeout=30) | |
| if co.returncode != 0: | |
| raise RuntimeError( | |
| f"git checkout {extra} failed: {_sanitize_git_error(co.stderr.strip())}") | |
| subprocess.run( | |
| ["git", "-C", dest, "checkout", first], | |
| capture_output=True, timeout=30) | |
| _strip_remote_token(dest, repo_url) | |
| return | |
| # Mode B: single branch (default to "main") | |
| effective = branch or "main" | |
| cmd = [ | |
| "git", "clone", "--depth", "1", "-b", effective, | |
| auth_repo_url, dest, | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("git clone timed out") | |
| if result.returncode != 0: | |
| raise RuntimeError(f"git clone failed: {_sanitize_git_error(result.stderr.strip())}") | |
| _strip_remote_token(dest, repo_url) | |
| def _strip_remote_token(dest: str, repo_url: str) -> None: | |
| """Overwrite origin's URL with the token-free form so ``.git/config`` | |
| never persists the auth token after a clone. | |
| ``git clone https://<token>@...`` writes the token into ``.git/config``; | |
| this rewrites it to the clean URL immediately after a successful clone, | |
| closing the window in which an LLM could ``cat .git/config`` and read the | |
| user's GitHub token. | |
| """ | |
| from urllib.parse import urlparse | |
| parsed = urlparse(repo_url) | |
| clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| try: | |
| _run_git(dest, "remote", "set-url", "origin", clean) | |
| except RuntimeError: | |
| # no origin remote (shouldn't happen post-clone) — ignore | |
| pass | |
| def init_repo(sandbox: str) -> None: | |
| """Init a fresh git repo in the sandbox (no remote).""" | |
| import subprocess | |
| subprocess.run(["git", "init"], cwd=sandbox, capture_output=True, timeout=10) | |
| subprocess.run(["git", "config", "user.email", "agent@loom.local"], | |
| cwd=sandbox, capture_output=True, timeout=10) | |
| subprocess.run(["git", "config", "user.name", "Loom Agent"], | |
| cwd=sandbox, capture_output=True, timeout=10) | |
| def checkout_branch(sandbox: str, branch: str) -> None: | |
| """Checkout an existing branch or create it from the current HEAD.""" | |
| import re | |
| if not branch or '..' in branch or branch.startswith('-') or ' ' in branch: | |
| raise RuntimeError(f"invalid branch name: {branch!r}") | |
| if not re.match(r'^[a-zA-Z0-9_./\-]+$', branch): | |
| raise RuntimeError(f"invalid branch name: {branch!r}") | |
| try: | |
| _run_git(sandbox, "checkout", branch) | |
| except RuntimeError: | |
| # branch doesn't exist locally — create it from HEAD | |
| _run_git(sandbox, "checkout", "-b", branch) | |
| def commit_changes(sandbox: str, message: str) -> str: | |
| """Stage all changes and commit. Returns the new commit SHA.""" | |
| _run_git(sandbox, "add", "-A") | |
| # check if there's anything to commit | |
| status = _run_git(sandbox, "status", "--porcelain") | |
| if not status.strip(): | |
| raise RuntimeError("Nothing to commit — no changes detected") | |
| _run_git(sandbox, "commit", "-m", message) | |
| return _run_git(sandbox, "rev-parse", "HEAD") | |
| def push_to_remote(user_id: str, workspace_id: str, branch: str, | |
| *, force: bool = False) -> str: | |
| """Push the workspace branch to the source repo's remote. Returns commit SHA. | |
| Pushes directly to an authenticated remote URL passed on the command line, | |
| so the token is NEVER written to ``.git/config`` (where the LLM could read | |
| it via ``cat .git/config``). The token lives only in the subprocess argv | |
| for the duration of the push. The on-disk ``origin`` remote is always | |
| kept pointing at the token-free URL. | |
| """ | |
| ws = db.get_workspace(workspace_id) | |
| if not ws: | |
| raise RuntimeError("Workspace not found") | |
| if not ws.get("source_repo"): | |
| raise RuntimeError("Workspace has no source repo configured") | |
| token = _token_for_user(user_id) | |
| sandbox = ws["sandbox_path"] | |
| from urllib.parse import urlparse | |
| parsed = urlparse(ws["source_repo"]) | |
| clean_repo_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| auth_repo_url = f"{parsed.scheme}://{token}@{parsed.netloc}{parsed.path}" | |
| # ensure 'origin' in .git/config points at the CLEAN url (no token) | |
| try: | |
| _run_git(sandbox, "remote", "set-url", "origin", clean_repo_url) | |
| except RuntimeError: | |
| _run_git(sandbox, "remote", "add", "origin", clean_repo_url) | |
| # push directly to the auth URL — token stays in argv, never on disk | |
| args = ["push", auth_repo_url] | |
| if force: | |
| args.append("--force") | |
| args.append(branch) | |
| try: | |
| _run_git(sandbox, *args) | |
| except RuntimeError as exc: | |
| raise RuntimeError(f"git push failed: {_sanitize_git_error(str(exc))}") | |
| sha = _run_git(sandbox, "rev-parse", "HEAD") | |
| # log the push | |
| db.log_push( | |
| user_id, workspace_id, | |
| target_repo=ws["source_repo"], | |
| target_branch=branch, | |
| commit_sha=sha, | |
| push_type="direct", | |
| approved_by_user=True, | |
| ) | |
| return sha | |
| def fetch_from_remote(user_id: str, workspace_id: str, branch: str = "") -> str: | |
| """Fetch from the source repo's remote into the workspace sandbox. | |
| Like ``push_to_remote``, fetches against an authenticated URL passed on | |
| the command line so the token is never persisted to ``.git/config``. | |
| If ``branch`` is given, also checks it out (creating a local tracking | |
| branch from FETCH_HEAD if it doesn't exist yet). | |
| """ | |
| ws = db.get_workspace(workspace_id) | |
| if not ws: | |
| raise RuntimeError("Workspace not found") | |
| if not ws.get("source_repo"): | |
| raise RuntimeError("Workspace has no source repo configured") | |
| token = _token_for_user(user_id) | |
| sandbox = ws["sandbox_path"] | |
| from urllib.parse import urlparse | |
| parsed = urlparse(ws["source_repo"]) | |
| clean_repo_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| auth_repo_url = f"{parsed.scheme}://{token}@{parsed.netloc}{parsed.path}" | |
| # keep on-disk remote token-free | |
| try: | |
| _run_git(sandbox, "remote", "set-url", "origin", clean_repo_url) | |
| except RuntimeError: | |
| _run_git(sandbox, "remote", "add", "origin", clean_repo_url) | |
| args = ["fetch", auth_repo_url] | |
| if branch: | |
| args.append(branch) | |
| try: | |
| _run_git(sandbox, *args) | |
| except RuntimeError as exc: | |
| raise RuntimeError(f"git fetch failed: {_sanitize_git_error(str(exc))}") | |
| if branch: | |
| # fetch into a bare URL stores the tip in FETCH_HEAD; create/update | |
| # the local branch from it so a subsequent checkout lands on the | |
| # freshly-fetched commit. | |
| try: | |
| _run_git(sandbox, "checkout", branch) | |
| except RuntimeError: | |
| _run_git(sandbox, "checkout", "-b", branch, "FETCH_HEAD") | |
| return _run_git(sandbox, "rev-parse", "HEAD") | |
| def create_pull_request(user_id: str, workspace_id: str, *, | |
| title: str, body: str = "", | |
| head_branch: str = "main", | |
| base_branch: str = "main") -> dict: | |
| """Push branch + create a PR on the source repo via GitHub API. | |
| Returns PR dict with 'number', 'html_url', etc.""" | |
| ws = db.get_workspace(workspace_id) | |
| if not ws: | |
| raise RuntimeError("Workspace not found") | |
| if not ws.get("source_repo"): | |
| raise RuntimeError("Workspace has no source repo configured") | |
| token = _token_for_user(user_id) | |
| sandbox = ws["sandbox_path"] | |
| # push first | |
| push_to_remote(user_id, workspace_id, head_branch) | |
| # extract owner/repo from source_repo URL | |
| source = ws["source_repo"] | |
| # https://github.com/owner/repo.git or /owner/repo | |
| parts = source.rstrip("/").replace(".git", "").split("/") | |
| owner_repo = "/".join(parts[-2:]) | |
| # create PR via API | |
| pr_body = { | |
| "title": title, | |
| "body": body, | |
| "head": head_branch, | |
| "base": base_branch, | |
| } | |
| result = _github_api(f"/repos/{owner_repo}/pulls", token=token, body=pr_body) | |
| # log | |
| db.log_push( | |
| user_id, workspace_id, | |
| target_repo=source, | |
| target_branch=base_branch, | |
| commit_sha=_run_git(sandbox, "rev-parse", "HEAD"), | |
| commit_message=title, | |
| push_type="pr", | |
| pr_number=result.get("number"), | |
| pr_url=result.get("html_url"), | |
| approved_by_user=True, | |
| ) | |
| return { | |
| "number": result.get("number"), | |
| "html_url": result.get("html_url"), | |
| "state": result.get("state"), | |
| "title": result.get("title"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Public workspace registry | |
| # --------------------------------------------------------------------------- | |
| def publish_workspace(user_id: str, workspace_id: str) -> dict: | |
| """Register a workspace as public in the registry.""" | |
| ws = db.get_workspace(workspace_id) | |
| if not ws: | |
| raise RuntimeError("Workspace not found") | |
| user = db.get_user(user_id) | |
| gh_user = user.get("github_username", "") if user else "" | |
| return db.register_public_workspace( | |
| workspace_id, | |
| owner_username=gh_user, | |
| title=ws.get("title", ""), | |
| description=ws.get("description", ""), | |
| source_repo=ws.get("source_repo"), | |
| ) | |
| def unpublish_workspace(workspace_id: str) -> None: | |
| db.unregister_public_workspace(workspace_id) | |
| def list_registry(*, page: int = 1, per_page: int = 20, | |
| sort: str = "recent", search: str = "") -> dict: | |
| return db.list_public_workspaces(page=page, per_page=per_page, | |
| sort=sort, search=search) | |
| # --------------------------------------------------------------------------- | |
| # Push approval queue (for the frontend approval flow) | |
| # --------------------------------------------------------------------------- | |
| # In-memory approval queue: request_id -> {user_id, workspace_id, action, ...} | |
| _approval_queue: dict[str, dict] = {} | |
| _approval_lock = __import__("threading").Lock() | |
| def enqueue_push_request(user_id: str, workspace_id: str, action: str, | |
| **extra) -> str: | |
| """Create a push approval request. Returns request_id.""" | |
| rid = secrets.token_urlsafe(16) | |
| with _approval_lock: | |
| _approval_queue[rid] = { | |
| "user_id": user_id, | |
| "workspace_id": workspace_id, | |
| "action": action, | |
| "created_at": time.time(), | |
| **extra, | |
| } | |
| return rid | |
| # Push requests expire after 1 hour. | |
| _PUSH_REQUEST_TTL_S = 3600 | |
| def resolve_push_request(request_id: str, approved: bool, user_id: str = "") -> dict | None: | |
| """Approve or reject a queued push request. Executes the action if approved. | |
| If user_id is provided, verifies ownership before resolving.""" | |
| with _approval_lock: | |
| req = _approval_queue.get(request_id) | |
| if req is None: | |
| return None | |
| if user_id and req["user_id"] != user_id: | |
| return None # ownership mismatch — don't reveal existence | |
| # check TTL | |
| if time.time() - req["created_at"] > _PUSH_REQUEST_TTL_S: | |
| del _approval_queue[request_id] | |
| return None # expired | |
| # only pop if we're going to act on it | |
| del _approval_queue[request_id] | |
| if not approved: | |
| return {"status": "rejected", "action": req["action"]} | |
| # execute the approved action | |
| action = req["action"] | |
| user_id = req["user_id"] | |
| workspace_id = req["workspace_id"] | |
| try: | |
| if action == "git_push": | |
| branch = req.get("branch", "main") | |
| sha = push_to_remote(user_id, workspace_id, branch) | |
| return {"status": "pushed", "commit_sha": sha, "branch": branch} | |
| elif action == "git_force_push": | |
| branch = req.get("branch", "main") | |
| sha = push_to_remote(user_id, workspace_id, branch, force=True) | |
| return {"status": "force_pushed", "commit_sha": sha, "branch": branch} | |
| elif action == "gh_pr_create": | |
| result = create_pull_request( | |
| user_id, workspace_id, | |
| title=req.get("title", "Agent changes"), | |
| body=req.get("body", ""), | |
| head_branch=req.get("head_branch", "main"), | |
| base_branch=req.get("base_branch", "main")) | |
| return {"status": "pr_created", **result} | |
| else: | |
| return {"status": "error", "message": f"Unknown action: {action}"} | |
| except Exception as exc: | |
| return {"status": "error", "message": str(exc)} | |