#!/usr/bin/env python3 """HF Builder - Daemonless Docker image builder using Kaniko. A HuggingFace Space that builds and pushes Docker images without requiring Docker daemon access. Supports GitHub webhooks for automatic builds. Features: - Build tracking with unique IDs and history - GitHub webhook integration with signature verification - Notification webhooks for build completion - Health/readiness endpoints for orchestration - Configurable build timeouts - Metrics tracking (duration, success rate) - OpenTelemetry tracing support - Slack/Discord notifications - Status badges Environment Variables: # Registry REGISTRY_URL: Container registry URL (default: ghcr.io) REGISTRY_USER: Registry username REGISTRY_PASSWORD: Registry password/token # GitHub GITHUB_TOKEN: Token for cloning private repositories WEBHOOK_SECRET: Secret for validating GitHub webhook signatures # Build DEFAULT_IMAGE: Default image name for webhook builds BUILD_TIMEOUT: Build timeout in seconds (default: 1800 = 30 min) ENABLE_CACHE: Enable Kaniko cache (default: false) # Notifications NOTIFICATION_URL: URL to POST build results to SLACK_WEBHOOK_URL: Slack webhook for notifications DISCORD_WEBHOOK_URL: Discord webhook for notifications NOTIFY_ON: When to notify: "all", "failure", "success" (default: failure) # Observability (all open source) LOG_FORMAT: Log format - "text" or "json" (default: text) OTEL_EXPORTER_OTLP_ENDPOINT: OpenTelemetry collector endpoint (Jaeger, Tempo, etc.) OTEL_SERVICE_NAME: Service name for traces (default: hf-builder) """ from __future__ import annotations import base64 import contextvars import fnmatch import hashlib import hmac import json import os import re import shutil import subprocess import tempfile import threading import time import traceback import uuid from collections import deque from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Callable from urllib.parse import urlparse from flask import Flask, abort, jsonify, render_template_string, request, Response, g import requests as http_requests # Trace ID context variable for request tracing trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("trace_id", default="") # ============================================================================= # Optional: OpenTelemetry # ============================================================================= _tracer = None _meter = None def init_telemetry(): """Initialize OpenTelemetry if configured.""" global _tracer, _meter endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") if not endpoint: return try: from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.sdk.resources import Resource service_name = os.getenv("OTEL_SERVICE_NAME", "hf-builder") resource = Resource.create({"service.name": service_name}) # Tracing trace_provider = TracerProvider(resource=resource) trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))) trace.set_tracer_provider(trace_provider) _tracer = trace.get_tracer(__name__) # Metrics metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=endpoint)) metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[metric_reader])) _meter = metrics.get_meter(__name__) print(f"[OTEL] Initialized: {endpoint}") except ImportError: print("[OTEL] opentelemetry packages not installed") except Exception as e: print(f"[OTEL] Failed to initialize: {e}") def get_tracer(): return _tracer def get_meter(): return _meter # ============================================================================= # Configuration # ============================================================================= class LogFormat(Enum): TEXT = "text" JSON = "json" class NotifyOn(Enum): ALL = "all" FAILURE = "failure" SUCCESS = "success" @dataclass class Config: """Application configuration from environment variables.""" registry_url: str = field(default_factory=lambda: os.getenv("REGISTRY_URL", "ghcr.io")) registry_user: str = field(default_factory=lambda: os.getenv("REGISTRY_USER", "")) registry_password: str = field(default_factory=lambda: os.getenv("REGISTRY_PASSWORD", "")) github_token: str = field(default_factory=lambda: os.getenv("GITHUB_TOKEN", "")) webhook_secret: str = field(default_factory=lambda: os.getenv("WEBHOOK_SECRET", "")) default_image: str = field(default_factory=lambda: os.getenv("DEFAULT_IMAGE", "")) default_branch: str = field(default_factory=lambda: os.getenv("DEFAULT_BRANCH", "main")) default_tags: str = field(default_factory=lambda: os.getenv("DEFAULT_TAGS", "latest")) default_dockerfile: str = field(default_factory=lambda: os.getenv("DEFAULT_DOCKERFILE", "Dockerfile")) default_context: str = field(default_factory=lambda: os.getenv("DEFAULT_CONTEXT", ".")) default_platform: str = field(default_factory=lambda: os.getenv("DEFAULT_PLATFORM", "")) runner_id: str = field(default_factory=lambda: os.getenv("RUNNER_ID", str(uuid.uuid4())[:8])) build_timeout: int = field(default_factory=lambda: int(os.getenv("BUILD_TIMEOUT", "1800"))) enable_cache: bool = field(default_factory=lambda: os.getenv("ENABLE_CACHE", "").lower() == "true") notification_url: str = field(default_factory=lambda: os.getenv("NOTIFICATION_URL", "")) slack_webhook_url: str = field(default_factory=lambda: os.getenv("SLACK_WEBHOOK_URL", "")) discord_webhook_url: str = field(default_factory=lambda: os.getenv("DISCORD_WEBHOOK_URL", "")) notify_on: NotifyOn = field(default_factory=lambda: NotifyOn(os.getenv("NOTIFY_ON", "failure").lower()) if os.getenv("NOTIFY_ON", "failure").lower() in ("all", "failure", "success") else NotifyOn.FAILURE) log_format: LogFormat = field(default_factory=lambda: LogFormat(os.getenv("LOG_FORMAT", "text").lower()) if os.getenv("LOG_FORMAT", "text").lower() in ("text", "json") else LogFormat.TEXT) max_history: int = field(default_factory=lambda: int(os.getenv("MAX_HISTORY", "50"))) # Auto-deploy: Restart HF Space after successful build # Set to HF Space ID (e.g., "username/space-name") to auto-restart on success auto_restart_space: str = field(default_factory=lambda: os.getenv("AUTO_RESTART_SPACE", "")) # HF Token for restarting spaces (uses HF_TOKEN env var) hf_token: str = field(default_factory=lambda: os.getenv("HF_TOKEN", "")) build_patterns: list[str] = field(default_factory=lambda: [ "Dockerfile", "Dockerfile.*", "docker/*", "docker/**/*", "src/**/*.py", "pyproject.toml", "uv.lock", "requirements*.txt", ".dockerignore", ]) # ============================================================================= # Build Models # ============================================================================= class BuildStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" CANCELLED = "cancelled" TIMEOUT = "timeout" @dataclass class BuildConfig: """Configuration for a single build.""" repo_url: str image_name: str branch: str = "main" dockerfile: str = "Dockerfile" context_path: str = "." tags: list[str] = field(default_factory=lambda: ["latest"]) build_args: dict[str, str] = field(default_factory=dict) github_token: str | None = None platform: str | None = None trigger: str = "api" callback_url: str | None = None @dataclass class Build: """A build with tracking information.""" id: str config: BuildConfig status: BuildStatus = BuildStatus.PENDING started_at: str | None = None completed_at: str | None = None duration_seconds: float | None = None exit_code: int | None = None error: str | None = None trace_id: str | None = None def to_dict(self) -> dict[str, Any]: return { "id": self.id, "status": self.status.value, "image": f"{config.registry_url}/{self.config.image_name}", "tags": self.config.tags, "branch": self.config.branch, "trigger": self.config.trigger, "started_at": self.started_at, "completed_at": self.completed_at, "duration_seconds": self.duration_seconds, "exit_code": self.exit_code, "error": self.error, "trace_id": self.trace_id, } # ============================================================================= # State Management # ============================================================================= class BuildState: """Thread-safe build state manager with history.""" def __init__(self, max_history: int = 50) -> None: self._lock = threading.RLock() self._current: Build | None = None self._history: deque[Build] = deque(maxlen=max_history) self._logs: deque[str] = deque(maxlen=500) self._completed = 0 self._failed = 0 self._total_duration = 0.0 self._process: subprocess.Popen | None = None self._ready = False self._last_success_at: str | None = None self._last_failure_at: str | None = None def log(self, msg: str, level: str = "info", **extra: Any) -> None: """Add a log message with optional trace ID.""" ts = datetime.now(timezone.utc) ts_str = ts.strftime("%H:%M:%S") trace_id = trace_id_var.get() or extra.get("trace_id", "") if config.log_format == LogFormat.JSON: log_entry = { "ts": ts.isoformat(), "level": level, "msg": msg, "runner": config.runner_id, "trace_id": trace_id, **{k: v for k, v in extra.items() if k != "trace_id"}, } print(json.dumps(log_entry)) with self._lock: self._logs.append(json.dumps(log_entry)) else: prefix = f"[{ts_str}]" if trace_id: prefix += f" [{trace_id[:8]}]" formatted = f"{prefix} {msg}" print(formatted) with self._lock: self._logs.append(formatted) def start_build(self, build: Build) -> None: with self._lock: build.status = BuildStatus.RUNNING build.started_at = datetime.now(timezone.utc).isoformat() build.trace_id = trace_id_var.get() self._current = build def finish_build(self, build: Build, status: BuildStatus, exit_code: int | None = None, error: str | None = None) -> None: with self._lock: build.status = status build.completed_at = datetime.now(timezone.utc).isoformat() build.exit_code = exit_code build.error = error if build.started_at: start = datetime.fromisoformat(build.started_at) end = datetime.fromisoformat(build.completed_at) build.duration_seconds = (end - start).total_seconds() self._total_duration += build.duration_seconds if status == BuildStatus.SUCCESS: self._completed += 1 self._last_success_at = build.completed_at else: self._failed += 1 self._last_failure_at = build.completed_at self._history.appendleft(build) self._current = None self._process = None def set_process(self, process: subprocess.Popen) -> None: with self._lock: self._process = process def cancel_current(self) -> bool: with self._lock: if self._process and self._current: try: self._process.terminate() return True except Exception: return False return False def set_ready(self, ready: bool = True) -> None: with self._lock: self._ready = ready @property def is_ready(self) -> bool: with self._lock: return self._ready @property def is_building(self) -> bool: with self._lock: return self._current is not None @property def current_build(self) -> Build | None: with self._lock: return self._current @property def logs(self) -> list[str]: with self._lock: return list(self._logs)[-100:] def get_metrics(self) -> dict[str, Any]: with self._lock: total = self._completed + self._failed return { "builds_completed": self._completed, "builds_failed": self._failed, "builds_total": total, "success_rate": self._completed / total if total > 0 else 0, "avg_duration_seconds": self._total_duration / total if total > 0 else 0, "last_success_at": self._last_success_at, "last_failure_at": self._last_failure_at, } def get_history(self, limit: int = 10) -> list[dict]: with self._lock: return [b.to_dict() for b in list(self._history)[:limit]] def to_dict(self) -> dict[str, Any]: with self._lock: return { "status": "building" if self._current else "idle", "current_build": self._current.to_dict() if self._current else None, **self.get_metrics(), } # ============================================================================= # Notifications # ============================================================================= class Notifier: """Send notifications to various channels.""" def __init__(self, cfg: Config, state: BuildState) -> None: self.config = cfg self.state = state def should_notify(self, status: BuildStatus) -> bool: if self.config.notify_on == NotifyOn.ALL: return True if self.config.notify_on == NotifyOn.FAILURE and status not in (BuildStatus.SUCCESS,): return True if self.config.notify_on == NotifyOn.SUCCESS and status == BuildStatus.SUCCESS: return True return False def notify(self, build: Build) -> None: """Send notifications for a completed build.""" # Auto-restart HF Space on success if build.status == BuildStatus.SUCCESS and self.config.auto_restart_space: self._restart_hf_space(build) if not self.should_notify(build.status): return # Generic webhook if self.config.notification_url or build.config.callback_url: self._send_webhook(build) # Slack if self.config.slack_webhook_url: self._send_slack(build) # Discord if self.config.discord_webhook_url: self._send_discord(build) def _restart_hf_space(self, build: Build) -> None: """Restart HuggingFace Space after successful build.""" space_id = self.config.auto_restart_space token = self.config.hf_token if not token: self.state.log(f"Cannot restart {space_id}: HF_TOKEN not set", level="warn") return try: # Use HF Hub API to restart space url = f"https://huggingface.co/api/spaces/{space_id}/restart" headers = {"Authorization": f"Bearer {token}"} resp = http_requests.post(url, headers=headers, json={"factory_reboot": True}, timeout=30) if resp.ok: self.state.log(f"✓ Restarted HF Space: {space_id}") else: self.state.log(f"Failed to restart {space_id}: {resp.status_code}", level="warn") except Exception as e: self.state.log(f"Failed to restart {space_id}: {e}", level="warn") def _send_webhook(self, build: Build) -> None: url = build.config.callback_url or self.config.notification_url if not url: return try: payload = { "build": build.to_dict(), "runner_id": self.config.runner_id, "registry": self.config.registry_url, } http_requests.post(url, json=payload, timeout=10) self.state.log(f"Notification sent to webhook", trace_id=build.trace_id) except Exception as e: self.state.log(f"Webhook notification failed: {e}", level="warn") def _send_slack(self, build: Build) -> None: try: color = "#22c55e" if build.status == BuildStatus.SUCCESS else "#ef4444" status_emoji = ":white_check_mark:" if build.status == BuildStatus.SUCCESS else ":x:" payload = { "attachments": [{ "color": color, "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": f"{status_emoji} *Build {build.status.value.upper()}*\n`{build.config.image_name}:{build.config.tags[0]}`" } }, { "type": "context", "elements": [ {"type": "mrkdwn", "text": f"*ID:* {build.id}"}, {"type": "mrkdwn", "text": f"*Duration:* {build.duration_seconds:.1f}s" if build.duration_seconds else ""}, {"type": "mrkdwn", "text": f"*Branch:* {build.config.branch}"}, ] } ] }] } if build.error: payload["attachments"][0]["blocks"].append({ "type": "section", "text": {"type": "mrkdwn", "text": f"```{build.error[:500]}```"} }) http_requests.post(self.config.slack_webhook_url, json=payload, timeout=10) self.state.log("Slack notification sent", trace_id=build.trace_id) except Exception as e: self.state.log(f"Slack notification failed: {e}", level="warn") def _send_discord(self, build: Build) -> None: try: color = 0x22c55e if build.status == BuildStatus.SUCCESS else 0xef4444 embed = { "title": f"Build {build.status.value.upper()}", "color": color, "fields": [ {"name": "Image", "value": f"`{build.config.image_name}:{build.config.tags[0]}`", "inline": True}, {"name": "Branch", "value": build.config.branch, "inline": True}, {"name": "Build ID", "value": build.id, "inline": True}, ], "timestamp": build.completed_at, } if build.duration_seconds: embed["fields"].append({"name": "Duration", "value": f"{build.duration_seconds:.1f}s", "inline": True}) if build.error: embed["fields"].append({"name": "Error", "value": f"```{build.error[:500]}```", "inline": False}) http_requests.post(self.config.discord_webhook_url, json={"embeds": [embed]}, timeout=10) self.state.log("Discord notification sent", trace_id=build.trace_id) except Exception as e: self.state.log(f"Discord notification failed: {e}", level="warn") # ============================================================================= # Validation # ============================================================================= def validate_url(url: str) -> tuple[bool, str]: if not url: return False, "URL is required" try: parsed = urlparse(url) if parsed.scheme not in ("https", "http"): return False, "URL must use https or http" if not parsed.netloc: return False, "Invalid URL format" return True, "" except Exception as e: return False, f"Invalid URL: {e}" def validate_image_name(name: str) -> tuple[bool, str]: if not name: return False, "Image name is required" pattern = r"^[a-z0-9][a-z0-9._-]*/[a-z0-9][a-z0-9._-]*$" if not re.match(pattern, name.lower()): return False, "Image name must be in owner/repo format" return True, "" def validate_tags(tags: list[str]) -> tuple[bool, str]: if not tags: return False, "At least one tag is required" pattern = r"^[a-zA-Z0-9][a-zA-Z0-9._-]*$" for tag in tags: if not re.match(pattern, tag) or len(tag) > 128: return False, f"Invalid tag: {tag}" return True, "" def mask_token(text: str, token: str | None) -> str: if token and token in text: return text.replace(token, "***") return text # ============================================================================= # Builder # ============================================================================= class KanikoBuilder: def __init__(self, cfg: Config, state: BuildState, notifier: Notifier) -> None: self.config = cfg self.state = state self.notifier = notifier def setup_registry_auth(self) -> bool: if not self.config.registry_user or not self.config.registry_password: self.state.log("Registry credentials not configured", level="warn") return False docker_config_dir = Path("/kaniko/.docker") docker_config_dir.mkdir(parents=True, exist_ok=True) auth = base64.b64encode( f"{self.config.registry_user}:{self.config.registry_password}".encode() ).decode() auth_config = { "auths": { self.config.registry_url: {"auth": auth}, f"https://{self.config.registry_url}": {"auth": auth}, } } (docker_config_dir / "config.json").write_text(json.dumps(auth_config)) self.state.log(f"Registry auth configured for {self.config.registry_url}") return True def _is_valid_github_token(self, token: str) -> bool: """Check if token looks like a valid GitHub token.""" if not token or len(token) < 10: return False # GitHub PAT formats: ghp_, gho_, ghu_, ghs_, ghr_, github_pat_ valid_prefixes = ("ghp_", "gho_", "ghu_", "ghs_", "ghr_", "github_pat_") return token.startswith(valid_prefixes) def clone_repo(self, build_config: BuildConfig, max_retries: int = 3) -> Path: """Clone repository with retry logic and cleanup. Retries help handle transient network issues and stale git state. """ import git # Prefer explicit token from request, fall back to config token = build_config.github_token or self.config.github_token base_repo_url = build_config.repo_url for attempt in range(1, max_retries + 1): # Fresh temp directory each attempt target_dir = Path(tempfile.mkdtemp(prefix=f"build_{build_config.branch[:8]}_")) try: # Clean up any stale git locks in system temp self._cleanup_stale_git_locks() repo_url = base_repo_url use_auth = False # Only use token if it looks valid if token and self._is_valid_github_token(token) and "github.com" in repo_url: repo_url = repo_url.replace("https://github.com", f"https://{token}@github.com") use_auth = True self.state.log(f"Cloning {base_repo_url} ({build_config.branch}) [authenticated] (attempt {attempt}/{max_retries})") else: if token and not self._is_valid_github_token(token): self.state.log(f"Skipping invalid token format, trying public clone") self.state.log(f"Cloning {base_repo_url} ({build_config.branch}) (attempt {attempt}/{max_retries})") git.Repo.clone_from( repo_url, target_dir, branch=build_config.branch, depth=1, single_branch=True, env={"GIT_TERMINAL_PROMPT": "0"} # Prevent git from prompting ) self.state.log(f"Cloned to {target_dir}") return target_dir except Exception as e: error_msg = mask_token(str(e), token) if token else str(e) self.state.log(f"Clone attempt {attempt} failed: {error_msg}", level="warn") # Clean up failed attempt if target_dir.exists(): shutil.rmtree(target_dir, ignore_errors=True) if attempt < max_retries: wait_time = attempt * 2 # 2s, 4s backoff self.state.log(f"Retrying in {wait_time}s...") time.sleep(wait_time) else: self.state.log(f"Clone failed after {max_retries} attempts: {error_msg}", level="error") raise RuntimeError(f"Clone failed after {max_retries} attempts: {error_msg}") # Should never reach here raise RuntimeError("Clone failed unexpectedly") def _cleanup_stale_git_locks(self) -> None: """Remove stale git lock files that can cause clone failures.""" import glob # Clean up any stale .git lock files in temp directories temp_base = tempfile.gettempdir() for lock_file in glob.glob(f"{temp_base}/build_*/.git/index.lock"): try: os.remove(lock_file) self.state.log(f"Removed stale lock: {lock_file}") except Exception: pass # Also clean up old temp build directories (older than 1 hour) import time as time_module now = time_module.time() for dir_path in glob.glob(f"{temp_base}/build_*"): try: if now - os.path.getmtime(dir_path) > 3600: # 1 hour shutil.rmtree(dir_path, ignore_errors=True) except Exception: pass def build_and_push(self, build: Build) -> bool: # Set trace ID for this build trace_id_var.set(build.id) tracer = get_tracer() span = None if tracer: span = tracer.start_span("build_and_push") span.set_attribute("build.id", build.id) span.set_attribute("build.image", build.config.image_name) build_config = build.config full_image = f"{self.config.registry_url}/{build_config.image_name}" self.state.log(f"Building {full_image}", build_id=build.id) self.state.start_build(build) tmpdir = None tar_path = None try: tmpdir = self.clone_repo(build_config) context_dir = str(tmpdir / build_config.context_path) if build_config.context_path != "." else str(tmpdir) tar_path = f"{context_dir}.tar.gz" self.state.log("Creating tar context") subprocess.run(["tar", "-czf", tar_path, "-C", context_dir, "."], capture_output=True, check=True, timeout=120) cmd = [ "/kaniko/executor", f"--context=tar://{tar_path}", f"--dockerfile={build_config.dockerfile}", ] for tag in build_config.tags: cmd.append(f"--destination={full_image}:{tag}") for key, value in build_config.build_args.items(): cmd.append(f"--build-arg={key}={value}") if build_config.platform: cmd.append(f"--custom-platform={build_config.platform}") cmd.extend([ f"--cache={'true' if self.config.enable_cache else 'false'}", "--reproducible", "--ignore-path=/product_uuid", "--ignore-path=/sys", "--log-format=text", "--verbosity=info", ]) self.state.log("Running Kaniko") process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) self.state.set_process(process) def read_output(): for line in process.stdout: line = line.strip() if line: line = mask_token(line, self.config.github_token) line = mask_token(line, build_config.github_token) self.state.log(f" {line[:150]}") output_thread = threading.Thread(target=read_output, daemon=True) output_thread.start() try: exit_code = process.wait(timeout=self.config.build_timeout) except subprocess.TimeoutExpired: process.kill() self.state.log(f"Build timed out after {self.config.build_timeout}s", level="error") self.state.finish_build(build, BuildStatus.TIMEOUT, error="Build timeout") self.notifier.notify(build) if span: span.set_attribute("build.status", "timeout") span.end() return False output_thread.join(timeout=5) if exit_code == 0: self.state.log(f"Build successful: {full_image}") self.state.finish_build(build, BuildStatus.SUCCESS, exit_code=0) if span: span.set_attribute("build.status", "success") else: self.state.log(f"Build failed with exit code {exit_code}", level="error") self.state.finish_build(build, BuildStatus.FAILED, exit_code=exit_code) if span: span.set_attribute("build.status", "failed") self.notifier.notify(build) if span: span.end() return exit_code == 0 except Exception as e: error_msg = mask_token(str(e), self.config.github_token) error_msg = mask_token(error_msg, build_config.github_token) self.state.log(f"Build error: {error_msg}", level="error") self.state.finish_build(build, BuildStatus.FAILED, error=error_msg) self.notifier.notify(build) if span: span.set_attribute("build.status", "error") span.record_exception(e) span.end() return False finally: if tmpdir and tmpdir.exists(): shutil.rmtree(tmpdir, ignore_errors=True) if tar_path and os.path.exists(tar_path): os.remove(tar_path) # ============================================================================= # Webhook Handler # ============================================================================= class WebhookHandler: def __init__(self, cfg: Config, state: BuildState) -> None: self.config = cfg self.state = state def verify_signature(self, payload: bytes, signature: str) -> bool: if not self.config.webhook_secret: self.state.log("WEBHOOK_SECRET not set - skipping verification", level="warn") return True if not signature or not signature.startswith("sha256="): return False expected = hmac.new(self.config.webhook_secret.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) def extract_changed_files(self, payload: dict) -> list[str]: files = set() for commit in payload.get("commits", []): files.update(commit.get("added", [])) files.update(commit.get("modified", [])) files.update(commit.get("removed", [])) return list(files) def should_trigger_build(self, changed_files: list[str]) -> tuple[bool, list[str]]: matching = [] for filepath in changed_files: for pattern in self.config.build_patterns: if fnmatch.fnmatch(filepath, pattern): matching.append(filepath) break return len(matching) > 0, matching def parse_push_event(self, payload: dict, headers: dict) -> dict[str, Any]: repo = payload.get("repository", {}) repo_url = repo.get("clone_url", "") repo_name = repo.get("full_name", "") default_branch = repo.get("default_branch", "main") ref = payload.get("ref", "") pushed_branch = ref.replace("refs/heads/", "") if ref.startswith("refs/heads/") else "" self.state.log(f"Webhook: push to {repo_name}/{pushed_branch}") if pushed_branch != default_branch: return {"status": "ignored", "reason": f"not default branch ({default_branch})"} changed_files = self.extract_changed_files(payload) should_build, matching_files = self.should_trigger_build(changed_files) if not should_build: return {"status": "ignored", "reason": "no build-relevant files changed"} image_name = headers.get("X-Builder-Image") or self.config.default_image or repo_name tags_header = headers.get("X-Builder-Tags", "") tags = [t.strip() for t in tags_header.split(",") if t.strip()] if tags_header else ["latest"] build_args = {} args_header = headers.get("X-Builder-Args", "") if args_header: for pair in args_header.split(","): if "=" in pair: k, v = pair.split("=", 1) build_args[k.strip()] = v.strip() return { "status": "trigger", "repo_url": repo_url, "branch": pushed_branch, "image_name": image_name, "tags": tags, "github_token": headers.get("X-Builder-Token"), "platform": headers.get("X-Builder-Platform"), "build_args": build_args, "callback_url": headers.get("X-Builder-Callback"), "matching_files": matching_files, } # ============================================================================= # Flask App # ============================================================================= config = Config() state = BuildState(max_history=config.max_history) notifier = Notifier(config, state) builder = KanikoBuilder(config, state, notifier) webhook_handler = WebhookHandler(config, state) app = Flask(__name__) @app.before_request def before_request(): """Set trace ID for each request.""" trace_id = request.headers.get("X-Request-ID") or request.headers.get("X-Trace-ID") or str(uuid.uuid4())[:8] trace_id_var.set(trace_id) g.trace_id = trace_id @app.after_request def after_request(response): """Add trace ID to response headers.""" response.headers["X-Trace-ID"] = g.get("trace_id", "") return response def create_build(build_config: BuildConfig) -> Build: return Build(id=str(uuid.uuid4())[:8], config=build_config) # Health endpoints @app.route("/health") def health(): return jsonify({"status": "healthy", "runner_id": config.runner_id}) @app.route("/ready") def ready(): if state.is_ready: return jsonify({"status": "ready"}) return jsonify({"status": "not_ready"}), 503 # Status badge @app.route("/badge") def badge(): """SVG status badge for READMEs.""" metrics = state.get_metrics() if state.is_building: color, text = "#f59e0b", "building" elif metrics["builds_total"] == 0: color, text = "#737373", "no builds" elif metrics["last_failure_at"] and (not metrics["last_success_at"] or metrics["last_failure_at"] > metrics["last_success_at"]): color, text = "#ef4444", "failing" else: color, text = "#22c55e", "passing" svg = f''' build {text} ''' return Response(svg, mimetype="image/svg+xml", headers={"Cache-Control": "no-cache"}) # API endpoints @app.route("/api/status") def api_status(): return jsonify({"runner_id": config.runner_id, "registry": config.registry_url, "trace_id": g.get("trace_id"), **state.to_dict()}) @app.route("/api/metrics") def api_metrics(): metrics = state.get_metrics() lines = [ f'# HELP hf_builder_builds_total Total builds', f'# TYPE hf_builder_builds_total counter', f'hf_builder_builds_total{{status="success"}} {metrics["builds_completed"]}', f'hf_builder_builds_total{{status="failed"}} {metrics["builds_failed"]}', f'# HELP hf_builder_build_duration_seconds Avg build duration', f'# TYPE hf_builder_build_duration_seconds gauge', f'hf_builder_build_duration_seconds {metrics["avg_duration_seconds"]:.2f}', f'# HELP hf_builder_success_rate Success rate', f'# TYPE hf_builder_success_rate gauge', f'hf_builder_success_rate {metrics["success_rate"]:.4f}', ] return Response("\n".join(lines), mimetype="text/plain") @app.route("/api/history") def api_history(): limit = request.args.get("limit", 10, type=int) return jsonify({"builds": state.get_history(limit)}) @app.route("/api/logs") def api_logs(): return jsonify({"logs": state.logs}) @app.route("/api/build", methods=["POST"]) def api_build(): if state.is_building: return jsonify({"error": "Build in progress", "current": state.current_build.to_dict()}), 409 data = request.json or {} valid, err = validate_url(data.get("repo_url", "")) if not valid: return jsonify({"error": err}), 400 valid, err = validate_image_name(data.get("image_name", "")) if not valid: return jsonify({"error": err}), 400 tags = data.get("tags", ["latest"]) valid, err = validate_tags(tags) if not valid: return jsonify({"error": err}), 400 build_config = BuildConfig( repo_url=data["repo_url"], image_name=data["image_name"], branch=data.get("branch", "main"), dockerfile=data.get("dockerfile", "Dockerfile"), context_path=data.get("context_path", "."), tags=tags, build_args=data.get("build_args", {}), github_token=data.get("github_token"), platform=data.get("platform"), trigger="api", callback_url=data.get("callback_url"), ) build = create_build(build_config) threading.Thread(target=builder.build_and_push, args=(build,), daemon=True).start() return jsonify({"status": "started", "build_id": build.id, "trace_id": g.get("trace_id"), "image": f"{config.registry_url}/{build_config.image_name}"}), 202 @app.route("/api/build//cancel", methods=["POST"]) def api_cancel(build_id: str): current = state.current_build if not current or current.id != build_id: return jsonify({"error": "Build not found or not running"}), 404 if state.cancel_current(): return jsonify({"status": "cancelled", "build_id": build_id}) return jsonify({"error": "Failed to cancel"}), 500 # Webhook endpoints @app.route("/webhook/github", methods=["POST"]) def webhook_github(): signature = request.headers.get("X-Hub-Signature-256", "") if not webhook_handler.verify_signature(request.data, signature): abort(401, "Invalid signature") event = request.headers.get("X-GitHub-Event", "") if event == "ping": return jsonify({"status": "pong"}) if event != "push": return jsonify({"status": "ignored", "reason": f"event: {event}"}) payload = request.json if not payload: abort(400, "Missing payload") headers = {k: request.headers.get(k, "") for k in ["X-Builder-Image", "X-Builder-Tags", "X-Builder-Token", "X-Builder-Platform", "X-Builder-Args", "X-Builder-Callback"]} result = webhook_handler.parse_push_event(payload, headers) if result["status"] != "trigger": return jsonify(result) if state.is_building: return jsonify({"status": "busy"}), 409 build_config = BuildConfig( repo_url=result["repo_url"], image_name=result["image_name"], branch=result["branch"], tags=result["tags"], github_token=result.get("github_token"), platform=result.get("platform"), build_args=result.get("build_args", {}), trigger="webhook", callback_url=result.get("callback_url"), ) build = create_build(build_config) threading.Thread(target=builder.build_and_push, args=(build,), daemon=True).start() return jsonify({"status": "started", "build_id": build.id, "trace_id": g.get("trace_id")}), 202 @app.route("/webhook/test", methods=["POST"]) def webhook_test(): if state.is_building: return jsonify({"error": "Build in progress"}), 409 data = request.json or {} if not data.get("repo_url") or not data.get("image_name"): return jsonify({"error": "repo_url and image_name required"}), 400 build_config = BuildConfig(repo_url=data["repo_url"], image_name=data["image_name"], branch=data.get("branch", "main"), tags=data.get("tags", ["latest"]), trigger="test") build = create_build(build_config) threading.Thread(target=builder.build_and_push, args=(build,), daemon=True).start() return jsonify({"status": "started", "build_id": build.id, "trace_id": g.get("trace_id")}), 202 # ============================================================================= # Web UI - HTMX Interface # ============================================================================= def render_stats_html() -> str: """Render compact status hero.""" metrics = state.get_metrics() status = "building" if state.is_building else "idle" return f""" {status.upper()} | {metrics['builds_completed']} / {metrics['builds_failed']} | {metrics['success_rate']*100:.0f}% """ def render_current_build_html() -> str: """Render current build status.""" current = state.current_build if not current: return '
No active build
' elapsed = "" if current.started_at: start = datetime.fromisoformat(current.started_at) elapsed = f"{(datetime.now(timezone.utc) - start).total_seconds():.0f}s" return f"""
building
{current.config.image_name}:{current.config.tags[0]}
{current.id} {current.config.branch} {f'{elapsed}' if elapsed else ''}
""" def render_history_html() -> str: """Render build history.""" history = state.get_history(6) if not history: return '
No builds yet
' html = "" for build in history: status = build["status"] status_class = "success" if status == "success" else "failed" if status in ("failed", "timeout", "cancelled") else "" duration = f'{build["duration_seconds"]:.1f}s' if build.get("duration_seconds") else "-" image_short = build["image"].split("/")[-1] if build.get("image") else "-" html += f"""
{status}
{image_short}
{build['id']} {build.get('branch', '-')} {duration}
""" return html def render_logs_html() -> str: """Render logs.""" logs = state.logs return "".join(f'
{line}
' for line in logs[-60:]) def render_badge_html() -> str: """Render inline badge.""" history = state.get_history(1) if not history: status, color = "no builds", "#737373" elif history[0]["status"] == "success": status, color = "passing", "#4ade80" elif state.is_building: status, color = "building", "#f59e0b" else: status, color = "failing", "#f87171" return f'{status}' def render_metrics_html() -> str: """Render metrics panel.""" metrics = state.get_metrics() return f"""
Success{metrics['builds_completed']}
Failed{metrics['builds_failed']}
Total{metrics['builds_total']}
Rate{metrics['success_rate']*100:.0f}%
Avg Time{metrics['avg_duration_seconds']:.1f}s
""" @app.route("/") def index(): return render_template_string(HTML_TEMPLATE, config=config, stats_html=render_stats_html(), current_html=render_current_build_html(), history_html=render_history_html(), logs_html=render_logs_html(), badge_html=render_badge_html(), metrics_html=render_metrics_html(), ) @app.route("/stats-partial") def stats_partial(): return render_stats_html() @app.route("/current-partial") def current_partial(): return render_current_build_html() @app.route("/history-partial") def history_partial(): return render_history_html() @app.route("/logs-partial") def logs_partial(): return render_logs_html() @app.route("/badge-partial") def badge_partial(): return render_badge_html() @app.route("/metrics-partial") def metrics_partial(): return render_metrics_html() HTML_TEMPLATE = """ Builder

Builder

{{ config.runner_id }} · {{ config.registry_url }}
{{ stats_html | safe }}
Active
{{ current_html | safe }}
Recent
{{ history_html | safe }}
New Build
""" # ============================================================================= # Hivemind Integration # ============================================================================= class HivemindClient: """Client for connecting to the Hivemind controller.""" def __init__(self, cfg: Config, build_state: BuildState, kaniko_builder: KanikoBuilder) -> None: self.config = cfg self.state = build_state self.builder = kaniko_builder self.controller_url = os.getenv("HIVEMIND_CONTROLLER_URL", "") self.worker_id = cfg.runner_id self.poll_interval = int(os.getenv("HIVEMIND_POLL_INTERVAL", "30")) self.enabled = bool(self.controller_url) self._running = False def register(self) -> bool: """Register this builder with the hivemind controller.""" if not self.enabled: return False try: resp = http_requests.post( f"{self.controller_url}/api/register", json={ "worker_id": self.worker_id, "name": f"Builder {self.worker_id}", "capabilities": ["build"], }, timeout=10, ) if resp.ok: self.state.log(f"Registered with hivemind: {self.controller_url}") return True else: self.state.log(f"Hivemind registration failed: {resp.status_code}", level="warn") return False except Exception as e: self.state.log(f"Hivemind registration error: {e}", level="error") return False def heartbeat(self, status: str = "idle", work_id: str | None = None, progress: float | None = None) -> None: """Send heartbeat to controller.""" if not self.enabled: return try: http_requests.post( f"{self.controller_url}/api/heartbeat", json={ "worker_id": self.worker_id, "status": status, "work_id": work_id, "progress": progress, }, timeout=5, ) except Exception: pass # Heartbeat failures are not critical def get_work(self) -> dict | None: """Poll controller for available work.""" if not self.enabled: return None try: resp = http_requests.get( f"{self.controller_url}/api/work", params={"worker_id": self.worker_id, "capabilities": "build"}, timeout=10, ) if resp.ok: data = resp.json() return data.get("work") return None except Exception as e: self.state.log(f"Hivemind work poll error: {e}", level="warn") return None def complete_work(self, work_id: str, result: dict | None = None) -> None: """Report work completion to controller.""" if not self.enabled: return try: http_requests.post( f"{self.controller_url}/api/complete", json={ "worker_id": self.worker_id, "work_id": work_id, "result": result or {}, }, timeout=10, ) self.state.log(f"Reported completion: {work_id}") except Exception as e: self.state.log(f"Hivemind complete error: {e}", level="warn") def fail_work(self, work_id: str, error: str) -> None: """Report work failure to controller.""" if not self.enabled: return try: http_requests.post( f"{self.controller_url}/api/fail", json={ "worker_id": self.worker_id, "work_id": work_id, "error": error, }, timeout=10, ) self.state.log(f"Reported failure: {work_id}") except Exception as e: self.state.log(f"Hivemind fail error: {e}", level="warn") def process_work(self, work: dict) -> bool: """Process a work item from the controller.""" work_id = work.get("work_id", "unknown") work_type = work.get("type", "") if work_type != "build": self.state.log(f"Unknown work type: {work_type}", level="warn") self.fail_work(work_id, f"Unknown work type: {work_type}") return False self.state.log(f"Processing hivemind work: {work_id}") self.heartbeat(status="working", work_id=work_id) # Extract build config from work item repo_url = work.get("repo_url", "") image_name = work.get("image_name", "") or work.get("image", "") branch = work.get("branch", "main") tags = work.get("tags", ["latest"]) if not repo_url or not image_name: self.fail_work(work_id, "Missing repo_url or image_name") return False build_config = BuildConfig( repo_url=repo_url, image_name=image_name, branch=branch, tags=tags if isinstance(tags, list) else [tags], dockerfile=work.get("dockerfile", "Dockerfile"), context_path=work.get("context_path", "."), build_args=work.get("build_args", {}), github_token=work.get("github_token"), platform=work.get("platform"), trigger="hivemind", ) build = Build(id=work_id, config=build_config) success = self.builder.build_and_push(build) if success: self.complete_work(work_id, { "image": f"{self.config.registry_url}/{image_name}", "tags": tags, "duration": build.duration_seconds, }) else: self.fail_work(work_id, build.error or "Build failed") return success def work_loop(self) -> None: """Main loop for polling and processing work from hivemind.""" self._running = True self.state.log("Hivemind work loop started") while self._running: try: # Don't poll if already building if self.state.is_building: self.heartbeat(status="working") time.sleep(self.poll_interval) continue # Send idle heartbeat self.heartbeat(status="idle") # Poll for work work = self.get_work() if work: self.process_work(work) else: time.sleep(self.poll_interval) except Exception as e: self.state.log(f"Hivemind loop error: {e}", level="error") time.sleep(self.poll_interval) def stop(self) -> None: """Stop the work loop.""" self._running = False # Initialize hivemind client hivemind = HivemindClient(config, state, builder) # ============================================================================= # Startup # ============================================================================= def check_git_available() -> bool: """Check if git is available and working.""" try: import git # Try to access git version git.Git().version() return True except Exception as e: state.log(f"Git check failed: {e}", level="error") return False def startup(): init_telemetry() state.log(f"HF Builder starting ({config.runner_id})") state.log(f"Registry: {config.registry_url}") # Check git is available if not check_git_available(): state.log("WARNING: git not available - clones may fail", level="error") else: state.log("Git: OK") # Setup registry auth if config.registry_user: builder.setup_registry_auth() else: state.log("WARNING: No registry credentials - pushes will fail", level="warn") # Log configuration if config.default_image: state.log(f"Default image: {config.default_image}") if config.github_token: state.log(f"GitHub token: configured ({len(config.github_token)} chars)") if config.auto_restart_space: state.log(f"Auto-restart: {config.auto_restart_space}") if config.slack_webhook_url: state.log("Slack notifications: enabled") if config.discord_webhook_url: state.log("Discord notifications: enabled") # Hivemind integration if hivemind.enabled: state.log(f"Hivemind controller: {hivemind.controller_url}") if hivemind.register(): threading.Thread(target=hivemind.work_loop, daemon=True).start() # Cleanup any stale build directories from previous runs builder._cleanup_stale_git_locks() state.set_ready(True) state.log("Ready") threading.Thread(target=startup, daemon=True).start() if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)