|
|
|
|
|
""" |
|
|
Utility script to monitor Hugging Face Space status + logs. |
|
|
|
|
|
Features: |
|
|
- Fetch latest build/runtime info using huggingface_hub |
|
|
- Download logs via HF REST API and persist to logs/hf_space/ |
|
|
- Highlight errors (Traceback/SyntaxError) directly in console |
|
|
- Optional --watch mode to poll every N seconds |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional |
|
|
|
|
|
import requests |
|
|
from huggingface_hub import HfApi |
|
|
|
|
|
DEFAULT_SPACE_ID = "davidtran999/hue-portal-backend" |
|
|
LOG_ROOT = Path(__file__).resolve().parent / "logs" / "hf_space" |
|
|
|
|
|
|
|
|
def get_hf_token() -> str: |
|
|
"""Return Hugging Face token from env or cached file.""" |
|
|
token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN") |
|
|
if token: |
|
|
return token.strip() |
|
|
|
|
|
cache_file = Path.home() / ".cache" / "huggingface" / "token" |
|
|
if cache_file.exists(): |
|
|
return cache_file.read_text(encoding="utf-8").strip() |
|
|
return "" |
|
|
|
|
|
|
|
|
def fetch_space_logs(space_id: str, token: str, limit: int) -> str: |
|
|
"""Fetch the latest logs, trying both repo path and subdomain API.""" |
|
|
candidates = [] |
|
|
if "/" in space_id: |
|
|
owner, name = space_id.split("/", 1) |
|
|
candidates.append(f"https://huggingface.co/api/spaces/{owner}/{name}/logs?limit={limit}") |
|
|
subdomain = f"{owner}-{name}" |
|
|
else: |
|
|
subdomain = space_id |
|
|
candidates.append(f"https://huggingface.co/api/spaces/{subdomain}/logs?limit={limit}") |
|
|
|
|
|
headers = {"Accept": "application/json"} |
|
|
if token: |
|
|
headers["Authorization"] = f"Bearer {token}" |
|
|
|
|
|
last_error = None |
|
|
for url in candidates: |
|
|
try: |
|
|
response = requests.get(url, headers=headers, timeout=30) |
|
|
if response.status_code == 401: |
|
|
raise RuntimeError("Không có quyền đọc logs. Hãy đặt HF_TOKEN với quyền write.") |
|
|
if response.status_code == 404: |
|
|
last_error = f"404 for {url}" |
|
|
continue |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
logs = data.get("logs") or data.get("log") |
|
|
if isinstance(logs, list): |
|
|
return "\n".join(logs) |
|
|
if isinstance(logs, str): |
|
|
return logs |
|
|
return json.dumps(data, ensure_ascii=False) |
|
|
except Exception as exc: |
|
|
last_error = str(exc) |
|
|
continue |
|
|
raise RuntimeError(f"Không thể lấy logs. Nguyên nhân gần nhất: {last_error}") |
|
|
|
|
|
|
|
|
def write_log(space_id: str, space_info: Dict[str, Any], logs: str) -> Path: |
|
|
"""Persist logs + status info to disk.""" |
|
|
LOG_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
|
|
safe_space = space_id.replace("/", "__") |
|
|
log_path = LOG_ROOT / f"{safe_space}-{timestamp}.log" |
|
|
|
|
|
with log_path.open("w", encoding="utf-8") as fp: |
|
|
fp.write(f"# Space: {space_id}\n") |
|
|
fp.write(f"# Timestamp: {timestamp} UTC\n") |
|
|
fp.write(f"# Runtime: {space_info.get('runtime', {}).get('stage')}\n") |
|
|
fp.write(f"# Hardware: {space_info.get('runtime', {}).get('hardware', 'unknown')}\n") |
|
|
fp.write("# --- Logs ---\n") |
|
|
fp.write(logs) |
|
|
|
|
|
return log_path |
|
|
|
|
|
|
|
|
def monitor(space_id: str, watch: bool, interval: int, limit: int) -> None: |
|
|
token = get_hf_token() |
|
|
if not token: |
|
|
print("⚠️ Không tìm thấy HF token – chỉ có thể đọc log public.") |
|
|
|
|
|
api = HfApi(token=token or None) |
|
|
|
|
|
def _single_cycle() -> None: |
|
|
info = api.space_info(space_id) |
|
|
logs = fetch_space_logs(space_id, token, limit) |
|
|
log_path = write_log(space_id, info, logs) |
|
|
|
|
|
runtime = info.runtime if hasattr(info, "runtime") else getattr(info, "runtime", {}) |
|
|
stage = (runtime or {}).get("stage") if isinstance(runtime, dict) else runtime |
|
|
hardware = (runtime or {}).get("hardware") if isinstance(runtime, dict) else "unknown" |
|
|
|
|
|
print(f"\n📡 Space: {space_id}") |
|
|
print(f" Stage: {stage}, Hardware: {hardware}") |
|
|
print(f" Updated: {datetime.utcnow().isoformat()}Z") |
|
|
print(f" Logs saved to: {log_path}") |
|
|
|
|
|
alert_keywords = ["Traceback", "SyntaxError", "ModuleNotFoundError"] |
|
|
if any(keyword in logs for keyword in alert_keywords): |
|
|
print(" 🚨 Detected errors in log (Traceback/SyntaxError). Check file above.") |
|
|
else: |
|
|
print(" ✅ No critical errors detected in latest log.") |
|
|
|
|
|
_single_cycle() |
|
|
while watch: |
|
|
time.sleep(interval) |
|
|
_single_cycle() |
|
|
|
|
|
|
|
|
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="Monitor Hugging Face Space build/logs.") |
|
|
parser.add_argument("--space-id", default=DEFAULT_SPACE_ID, help="Ví dụ: owner/space-name") |
|
|
parser.add_argument("--interval", type=int, default=30, help="Số giây giữa các lần kiểm tra (watch mode)") |
|
|
parser.add_argument("--limit", type=int, default=200, help="Số dòng log lấy về (max 400)") |
|
|
parser.add_argument("--watch", action="store_true", help="Bật chế độ theo dõi liên tục") |
|
|
return parser.parse_args(argv) |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
args = parse_args() |
|
|
try: |
|
|
monitor(space_id=args.space_id, watch=args.watch, interval=args.interval, limit=args.limit) |
|
|
except KeyboardInterrupt: |
|
|
print("\n⏹️ Dừng theo dõi theo yêu cầu người dùng.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|