Spaces:
Sleeping
Sleeping
| """ | |
| Deployment helpers for HuggingFace Spaces, Vercel, and GitHub. | |
| All real - no mocks. Each helper returns a structured result dict. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import json | |
| import logging | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| import httpx | |
| logger = logging.getLogger("deployers") | |
| # --------------------------------------------------------------------------- | |
| # GitHub | |
| # --------------------------------------------------------------------------- | |
| def github_push( | |
| repo_dir: str, | |
| branch: str = "genspark_ai_developer", | |
| commit_message: str = "AI Developer Agent commit", | |
| token: Optional[str] = None, | |
| remote_url: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| token = token or os.getenv("GITHUB_TOKEN") or os.getenv("GITHUB_PAT") | |
| if not token: | |
| return {"ok": False, "error": "GITHUB_TOKEN missing"} | |
| try: | |
| env = os.environ.copy() | |
| env["GIT_TERMINAL_PROMPT"] = "0" | |
| def run(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess: | |
| r = subprocess.run(cmd, cwd=repo_dir, capture_output=True, text=True, env=env, timeout=120) | |
| if check and r.returncode != 0: | |
| raise RuntimeError(f"{' '.join(cmd)}: {r.stderr[:500]}") | |
| return r | |
| # ensure identity | |
| run(["git", "config", "user.email", "ai-developer@genspark.ai"], check=False) | |
| run(["git", "config", "user.name", "AI Developer Agent"], check=False) | |
| # set token URL | |
| if remote_url: | |
| authed = remote_url.replace("https://", f"https://x-access-token:{token}@") | |
| run(["git", "remote", "set-url", "origin", authed], check=False) | |
| run(["git", "add", "-A"], check=False) | |
| # commit may fail if nothing to commit; that's OK | |
| commit = subprocess.run(["git", "commit", "-m", commit_message], cwd=repo_dir, capture_output=True, text=True, env=env) | |
| run(["git", "checkout", "-B", branch], check=False) | |
| push = subprocess.run(["git", "push", "-u", "origin", branch, "--force"], cwd=repo_dir, capture_output=True, text=True, env=env, timeout=180) | |
| ok = push.returncode == 0 | |
| return { | |
| "ok": ok, | |
| "branch": branch, | |
| "commit_out": commit.stdout + commit.stderr, | |
| "push_out": push.stdout + push.stderr, | |
| } | |
| except Exception as e: | |
| logger.exception("github_push failed") | |
| return {"ok": False, "error": str(e)} | |
| # --------------------------------------------------------------------------- | |
| # Hugging Face Space | |
| # --------------------------------------------------------------------------- | |
| def hf_ensure_space( | |
| repo_id: str, | |
| token: Optional[str] = None, | |
| sdk: str = "docker", | |
| private: bool = False, | |
| ) -> Dict[str, Any]: | |
| """Create the Space if it doesn't exist (idempotent).""" | |
| token = token or os.getenv("HF_TOKEN") | |
| if not token: | |
| return {"ok": False, "error": "HF_TOKEN missing"} | |
| try: | |
| headers = {"Authorization": f"Bearer {token}"} | |
| info = httpx.get(f"https://huggingface.co/api/spaces/{repo_id}", headers=headers, timeout=30.0) | |
| if info.status_code == 200: | |
| return {"ok": True, "created": False, "url": f"https://huggingface.co/spaces/{repo_id}"} | |
| # create | |
| owner, name = repo_id.split("/", 1) | |
| payload = { | |
| "name": name, | |
| "organization": None if owner == _hf_whoami(token) else owner, | |
| "type": "space", | |
| "sdk": sdk, | |
| "private": private, | |
| } | |
| r = httpx.post( | |
| "https://huggingface.co/api/repos/create", | |
| headers={**headers, "Content-Type": "application/json"}, | |
| json=payload, | |
| timeout=30.0, | |
| ) | |
| if r.status_code >= 400: | |
| return {"ok": False, "error": f"create failed: {r.status_code} {r.text[:300]}"} | |
| return {"ok": True, "created": True, "url": f"https://huggingface.co/spaces/{repo_id}"} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e)} | |
| def _hf_whoami(token: str) -> str: | |
| try: | |
| r = httpx.get("https://huggingface.co/api/whoami-v2", headers={"Authorization": f"Bearer {token}"}, timeout=15) | |
| if r.status_code == 200: | |
| return r.json().get("name", "") | |
| except Exception: | |
| pass | |
| return "" | |
| def hf_push_space( | |
| source_dir: str, | |
| repo_id: str, | |
| token: Optional[str] = None, | |
| commit_message: str = "Update from AI Developer Agent", | |
| ) -> Dict[str, Any]: | |
| """Push contents of source_dir to a HuggingFace Space using git.""" | |
| token = token or os.getenv("HF_TOKEN") | |
| if not token: | |
| return {"ok": False, "error": "HF_TOKEN missing"} | |
| try: | |
| # First ensure space exists | |
| ensure = hf_ensure_space(repo_id, token=token, sdk="docker") | |
| if not ensure.get("ok"): | |
| return {"ok": False, "error": f"ensure_space: {ensure.get('error')}"} | |
| tmp = tempfile.mkdtemp(prefix="hfpush_") | |
| try: | |
| remote = f"https://user:{token}@huggingface.co/spaces/{repo_id}" | |
| # Clone (may be empty) | |
| clone = subprocess.run(["git", "clone", remote, tmp], capture_output=True, text=True, timeout=120) | |
| if clone.returncode != 0: | |
| # try init | |
| subprocess.run(["git", "init"], cwd=tmp, capture_output=True, text=True) | |
| subprocess.run(["git", "remote", "add", "origin", remote], cwd=tmp, capture_output=True, text=True) | |
| # Copy source files into tmp (preserve .git) | |
| for entry in os.listdir(source_dir): | |
| if entry == ".git": | |
| continue | |
| src = os.path.join(source_dir, entry) | |
| dst = os.path.join(tmp, entry) | |
| if os.path.isdir(src): | |
| if os.path.exists(dst): | |
| shutil.rmtree(dst, ignore_errors=True) | |
| shutil.copytree(src, dst) | |
| else: | |
| shutil.copy2(src, dst) | |
| subprocess.run(["git", "config", "user.email", "ai-developer@genspark.ai"], cwd=tmp, capture_output=True, text=True) | |
| subprocess.run(["git", "config", "user.name", "AI Developer Agent"], cwd=tmp, capture_output=True, text=True) | |
| subprocess.run(["git", "lfs", "install"], cwd=tmp, capture_output=True, text=True) | |
| subprocess.run(["git", "add", "-A"], cwd=tmp, capture_output=True, text=True) | |
| commit = subprocess.run(["git", "commit", "-m", commit_message], cwd=tmp, capture_output=True, text=True) | |
| push = subprocess.run(["git", "push", "origin", "main", "--force"], cwd=tmp, capture_output=True, text=True, timeout=300) | |
| ok = push.returncode == 0 | |
| return { | |
| "ok": ok, | |
| "url": f"https://huggingface.co/spaces/{repo_id}", | |
| "commit_out": (commit.stdout + commit.stderr)[-800:], | |
| "push_out": (push.stdout + push.stderr)[-800:], | |
| } | |
| finally: | |
| shutil.rmtree(tmp, ignore_errors=True) | |
| except Exception as e: | |
| logger.exception("hf_push_space failed") | |
| return {"ok": False, "error": str(e)} | |
| def hf_space_health(repo_id: str, path: str = "/health", timeout: float = 15.0) -> Dict[str, Any]: | |
| """Check the live space URL for health.""" | |
| owner, name = repo_id.split("/", 1) | |
| url = f"https://{owner}-{name}.hf.space{path}" | |
| try: | |
| r = httpx.get(url, timeout=timeout) | |
| return {"ok": r.status_code < 500, "status": r.status_code, "url": url, "body": r.text[:500]} | |
| except Exception as e: | |
| return {"ok": False, "url": url, "error": str(e)} | |
| # --------------------------------------------------------------------------- | |
| # Vercel | |
| # --------------------------------------------------------------------------- | |
| def vercel_deploy_via_api( | |
| project_name: str, | |
| files: List[Dict[str, Any]], | |
| token: Optional[str] = None, | |
| target: str = "production", | |
| env: Optional[Dict[str, str]] = None, | |
| framework: Optional[str] = "nextjs", | |
| install_command: Optional[str] = None, | |
| build_command: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Deploy via Vercel HTTP API. | |
| files: list of {"file": "path/in/repo", "data": "file contents"} | |
| """ | |
| token = token or os.getenv("VERCEL_TOKEN") | |
| if not token: | |
| return {"ok": False, "error": "VERCEL_TOKEN missing"} | |
| headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
| body: Dict[str, Any] = { | |
| "name": project_name, | |
| "files": files, | |
| "target": target, | |
| "projectSettings": { | |
| "framework": framework, | |
| "installCommand": install_command, | |
| "buildCommand": build_command, | |
| }, | |
| } | |
| if env: | |
| body["env"] = env | |
| body["build"] = {"env": env} | |
| try: | |
| r = httpx.post("https://api.vercel.com/v13/deployments", headers=headers, json=body, timeout=180.0) | |
| if r.status_code >= 400: | |
| return {"ok": False, "status": r.status_code, "error": r.text[:1000]} | |
| data = r.json() | |
| url = data.get("url") or "" | |
| full = f"https://{url}" if url and not url.startswith("http") else url | |
| return {"ok": True, "url": full, "id": data.get("id"), "data": data} | |
| except Exception as e: | |
| logger.exception("vercel_deploy_via_api failed") | |
| return {"ok": False, "error": str(e)} | |
| def vercel_deployment_status(deployment_id: str, token: Optional[str] = None) -> Dict[str, Any]: | |
| token = token or os.getenv("VERCEL_TOKEN") | |
| if not token: | |
| return {"ok": False, "error": "VERCEL_TOKEN missing"} | |
| try: | |
| r = httpx.get( | |
| f"https://api.vercel.com/v13/deployments/{deployment_id}", | |
| headers={"Authorization": f"Bearer {token}"}, | |
| timeout=30.0, | |
| ) | |
| if r.status_code >= 400: | |
| return {"ok": False, "status": r.status_code, "error": r.text[:500]} | |
| d = r.json() | |
| return {"ok": True, "state": d.get("readyState"), "url": d.get("url"), "data": d} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e)} | |
| def collect_files_for_vercel(root_dir: str) -> List[Dict[str, Any]]: | |
| """Walk root_dir and produce list of {file, data} for Vercel API. | |
| Skips node_modules, .git, .next, .vercel and other build artifacts. | |
| """ | |
| SKIP_DIRS = {"node_modules", ".git", ".next", ".vercel", "dist", "build", "__pycache__"} | |
| SKIP_EXTS = {".log"} | |
| files: List[Dict[str, Any]] = [] | |
| for cur, dirs, fns in os.walk(root_dir): | |
| dirs[:] = [d for d in dirs if d not in SKIP_DIRS] | |
| for fn in fns: | |
| if any(fn.endswith(e) for e in SKIP_EXTS): | |
| continue | |
| full = os.path.join(cur, fn) | |
| rel = os.path.relpath(full, root_dir).replace("\\", "/") | |
| try: | |
| with open(full, "r", encoding="utf-8") as f: | |
| data = f.read() | |
| except UnicodeDecodeError: | |
| with open(full, "rb") as f: | |
| import base64 as _b | |
| data = _b.b64encode(f.read()).decode("ascii") | |
| files.append({"file": rel, "data": data, "encoding": "base64"}) | |
| continue | |
| files.append({"file": rel, "data": data}) | |
| return files | |