Spaces:
Sleeping
Sleeping
File size: 11,407 Bytes
763ef0d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 | """
Deployment helpers for HuggingFace Spaces, Vercel, and GitHub.
All real - no mocks. Each helper returns a structured result dict.
"""
from __future__ import annotations
import os
import json
import logging
import shutil
import subprocess
import tempfile
import time
from typing import Any, Dict, List, Optional
import httpx
logger = logging.getLogger("deployers")
# ---------------------------------------------------------------------------
# GitHub
# ---------------------------------------------------------------------------
def github_push(
repo_dir: str,
branch: str = "genspark_ai_developer",
commit_message: str = "AI Developer Agent commit",
token: Optional[str] = None,
remote_url: Optional[str] = None,
) -> Dict[str, Any]:
token = token or os.getenv("GITHUB_TOKEN") or os.getenv("GITHUB_PAT")
if not token:
return {"ok": False, "error": "GITHUB_TOKEN missing"}
try:
env = os.environ.copy()
env["GIT_TERMINAL_PROMPT"] = "0"
def run(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess:
r = subprocess.run(cmd, cwd=repo_dir, capture_output=True, text=True, env=env, timeout=120)
if check and r.returncode != 0:
raise RuntimeError(f"{' '.join(cmd)}: {r.stderr[:500]}")
return r
# ensure identity
run(["git", "config", "user.email", "ai-developer@genspark.ai"], check=False)
run(["git", "config", "user.name", "AI Developer Agent"], check=False)
# set token URL
if remote_url:
authed = remote_url.replace("https://", f"https://x-access-token:{token}@")
run(["git", "remote", "set-url", "origin", authed], check=False)
run(["git", "add", "-A"], check=False)
# commit may fail if nothing to commit; that's OK
commit = subprocess.run(["git", "commit", "-m", commit_message], cwd=repo_dir, capture_output=True, text=True, env=env)
run(["git", "checkout", "-B", branch], check=False)
push = subprocess.run(["git", "push", "-u", "origin", branch, "--force"], cwd=repo_dir, capture_output=True, text=True, env=env, timeout=180)
ok = push.returncode == 0
return {
"ok": ok,
"branch": branch,
"commit_out": commit.stdout + commit.stderr,
"push_out": push.stdout + push.stderr,
}
except Exception as e:
logger.exception("github_push failed")
return {"ok": False, "error": str(e)}
# ---------------------------------------------------------------------------
# Hugging Face Space
# ---------------------------------------------------------------------------
def hf_ensure_space(
repo_id: str,
token: Optional[str] = None,
sdk: str = "docker",
private: bool = False,
) -> Dict[str, Any]:
"""Create the Space if it doesn't exist (idempotent)."""
token = token or os.getenv("HF_TOKEN")
if not token:
return {"ok": False, "error": "HF_TOKEN missing"}
try:
headers = {"Authorization": f"Bearer {token}"}
info = httpx.get(f"https://huggingface.co/api/spaces/{repo_id}", headers=headers, timeout=30.0)
if info.status_code == 200:
return {"ok": True, "created": False, "url": f"https://huggingface.co/spaces/{repo_id}"}
# create
owner, name = repo_id.split("/", 1)
payload = {
"name": name,
"organization": None if owner == _hf_whoami(token) else owner,
"type": "space",
"sdk": sdk,
"private": private,
}
r = httpx.post(
"https://huggingface.co/api/repos/create",
headers={**headers, "Content-Type": "application/json"},
json=payload,
timeout=30.0,
)
if r.status_code >= 400:
return {"ok": False, "error": f"create failed: {r.status_code} {r.text[:300]}"}
return {"ok": True, "created": True, "url": f"https://huggingface.co/spaces/{repo_id}"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _hf_whoami(token: str) -> str:
try:
r = httpx.get("https://huggingface.co/api/whoami-v2", headers={"Authorization": f"Bearer {token}"}, timeout=15)
if r.status_code == 200:
return r.json().get("name", "")
except Exception:
pass
return ""
def hf_push_space(
source_dir: str,
repo_id: str,
token: Optional[str] = None,
commit_message: str = "Update from AI Developer Agent",
) -> Dict[str, Any]:
"""Push contents of source_dir to a HuggingFace Space using git."""
token = token or os.getenv("HF_TOKEN")
if not token:
return {"ok": False, "error": "HF_TOKEN missing"}
try:
# First ensure space exists
ensure = hf_ensure_space(repo_id, token=token, sdk="docker")
if not ensure.get("ok"):
return {"ok": False, "error": f"ensure_space: {ensure.get('error')}"}
tmp = tempfile.mkdtemp(prefix="hfpush_")
try:
remote = f"https://user:{token}@huggingface.co/spaces/{repo_id}"
# Clone (may be empty)
clone = subprocess.run(["git", "clone", remote, tmp], capture_output=True, text=True, timeout=120)
if clone.returncode != 0:
# try init
subprocess.run(["git", "init"], cwd=tmp, capture_output=True, text=True)
subprocess.run(["git", "remote", "add", "origin", remote], cwd=tmp, capture_output=True, text=True)
# Copy source files into tmp (preserve .git)
for entry in os.listdir(source_dir):
if entry == ".git":
continue
src = os.path.join(source_dir, entry)
dst = os.path.join(tmp, entry)
if os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst, ignore_errors=True)
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
subprocess.run(["git", "config", "user.email", "ai-developer@genspark.ai"], cwd=tmp, capture_output=True, text=True)
subprocess.run(["git", "config", "user.name", "AI Developer Agent"], cwd=tmp, capture_output=True, text=True)
subprocess.run(["git", "lfs", "install"], cwd=tmp, capture_output=True, text=True)
subprocess.run(["git", "add", "-A"], cwd=tmp, capture_output=True, text=True)
commit = subprocess.run(["git", "commit", "-m", commit_message], cwd=tmp, capture_output=True, text=True)
push = subprocess.run(["git", "push", "origin", "main", "--force"], cwd=tmp, capture_output=True, text=True, timeout=300)
ok = push.returncode == 0
return {
"ok": ok,
"url": f"https://huggingface.co/spaces/{repo_id}",
"commit_out": (commit.stdout + commit.stderr)[-800:],
"push_out": (push.stdout + push.stderr)[-800:],
}
finally:
shutil.rmtree(tmp, ignore_errors=True)
except Exception as e:
logger.exception("hf_push_space failed")
return {"ok": False, "error": str(e)}
def hf_space_health(repo_id: str, path: str = "/health", timeout: float = 15.0) -> Dict[str, Any]:
"""Check the live space URL for health."""
owner, name = repo_id.split("/", 1)
url = f"https://{owner}-{name}.hf.space{path}"
try:
r = httpx.get(url, timeout=timeout)
return {"ok": r.status_code < 500, "status": r.status_code, "url": url, "body": r.text[:500]}
except Exception as e:
return {"ok": False, "url": url, "error": str(e)}
# ---------------------------------------------------------------------------
# Vercel
# ---------------------------------------------------------------------------
def vercel_deploy_via_api(
project_name: str,
files: List[Dict[str, Any]],
token: Optional[str] = None,
target: str = "production",
env: Optional[Dict[str, str]] = None,
framework: Optional[str] = "nextjs",
install_command: Optional[str] = None,
build_command: Optional[str] = None,
) -> Dict[str, Any]:
"""
Deploy via Vercel HTTP API.
files: list of {"file": "path/in/repo", "data": "file contents"}
"""
token = token or os.getenv("VERCEL_TOKEN")
if not token:
return {"ok": False, "error": "VERCEL_TOKEN missing"}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
body: Dict[str, Any] = {
"name": project_name,
"files": files,
"target": target,
"projectSettings": {
"framework": framework,
"installCommand": install_command,
"buildCommand": build_command,
},
}
if env:
body["env"] = env
body["build"] = {"env": env}
try:
r = httpx.post("https://api.vercel.com/v13/deployments", headers=headers, json=body, timeout=180.0)
if r.status_code >= 400:
return {"ok": False, "status": r.status_code, "error": r.text[:1000]}
data = r.json()
url = data.get("url") or ""
full = f"https://{url}" if url and not url.startswith("http") else url
return {"ok": True, "url": full, "id": data.get("id"), "data": data}
except Exception as e:
logger.exception("vercel_deploy_via_api failed")
return {"ok": False, "error": str(e)}
def vercel_deployment_status(deployment_id: str, token: Optional[str] = None) -> Dict[str, Any]:
token = token or os.getenv("VERCEL_TOKEN")
if not token:
return {"ok": False, "error": "VERCEL_TOKEN missing"}
try:
r = httpx.get(
f"https://api.vercel.com/v13/deployments/{deployment_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
if r.status_code >= 400:
return {"ok": False, "status": r.status_code, "error": r.text[:500]}
d = r.json()
return {"ok": True, "state": d.get("readyState"), "url": d.get("url"), "data": d}
except Exception as e:
return {"ok": False, "error": str(e)}
def collect_files_for_vercel(root_dir: str) -> List[Dict[str, Any]]:
"""Walk root_dir and produce list of {file, data} for Vercel API.
Skips node_modules, .git, .next, .vercel and other build artifacts.
"""
SKIP_DIRS = {"node_modules", ".git", ".next", ".vercel", "dist", "build", "__pycache__"}
SKIP_EXTS = {".log"}
files: List[Dict[str, Any]] = []
for cur, dirs, fns in os.walk(root_dir):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fn in fns:
if any(fn.endswith(e) for e in SKIP_EXTS):
continue
full = os.path.join(cur, fn)
rel = os.path.relpath(full, root_dir).replace("\\", "/")
try:
with open(full, "r", encoding="utf-8") as f:
data = f.read()
except UnicodeDecodeError:
with open(full, "rb") as f:
import base64 as _b
data = _b.b64encode(f.read()).decode("ascii")
files.append({"file": rel, "data": data, "encoding": "base64"})
continue
files.append({"file": rel, "data": data})
return files
|