import hashlib import hmac import os import time from typing import Optional from urllib.parse import quote, urlencode from fastapi import FastAPI, Header, HTTPException, Query, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from huggingface_hub import HfApi, HfFileSystem HF_TOKEN = os.environ["HF_TOKEN"] SIGN_SECRET = os.environ["SIGN_SECRET"].encode() ADMIN_TOKEN = os.environ["PROXY_ADMIN_TOKEN"] ALLOWED_BUCKETS = {b.strip() for b in os.environ.get("ALLOWED_BUCKETS", "").split(",") if b.strip()} DEFAULT_TTL = int(os.environ.get("DEFAULT_TTL", "86400")) MAX_TTL = int(os.environ.get("MAX_TTL", str(7 * 86400))) CHUNK = 1024 * 1024 # 1 MiB fs = HfFileSystem(token=HF_TOKEN) api = HfApi(token=HF_TOKEN) app = FastAPI(title="bucket-proxy") def _sign(bucket: str, path: str, exp: int) -> str: msg = f"{bucket}|{path}|{exp}".encode() return hmac.new(SIGN_SECRET, msg, hashlib.sha256).hexdigest() def _check_bucket(bucket: str) -> None: if ALLOWED_BUCKETS and bucket not in ALLOWED_BUCKETS: raise HTTPException(403, f"bucket {bucket} not in allowlist") def _check_admin(token_query: Optional[str], authorization: Optional[str]) -> None: token = token_query if not token and authorization and authorization.startswith("Bearer "): token = authorization[7:] if not token: raise HTTPException(401, "missing admin token (query 't' or Bearer header)") if not hmac.compare_digest(token, ADMIN_TOKEN): raise HTTPException(403, "bad admin token") @app.get("/health") def health(): return {"ok": True, "ts": int(time.time())} INDEX_HTML = """ bucket-proxy

bucket-proxy ยท sign URL

""" @app.get("/", response_class=HTMLResponse) def index(): return INDEX_HTML @app.get("/sign") def sign( request: Request, bucket: str = Query(..., description="namespace/bucket-name"), path: str = Query(..., description="path inside the bucket"), ttl: int = Query(DEFAULT_TTL, ge=60, le=MAX_TTL), t: Optional[str] = Query(None, description="admin token (alternative to Bearer)"), authorization: Optional[str] = Header(None), ): _check_admin(t, authorization) _check_bucket(bucket) path = path.lstrip("/") # Try model repo first (no prefix), then datasets/, then buckets/ candidates = [f"{bucket}/{path}", f"datasets/{bucket}/{path}", f"buckets/{bucket}/{path}"] full = None for c in candidates: try: if fs.exists(c): full = c break except Exception: continue if full is None: raise HTTPException(404, f"file not found: {bucket}/{path}") exp = int(time.time()) + ttl sig = _sign(bucket, path, exp) host = request.headers.get("x-forwarded-host") or request.url.netloc scheme = request.headers.get("x-forwarded-proto", "https") base = f"{scheme}://{host}" qs = urlencode({"b": bucket, "f": path, "exp": exp, "sig": sig}, quote_via=quote) return { "url": f"{base}/d?{qs}", "expires_at": exp, "ttl": ttl, "size": fs.info(full).get("size"), } @app.get("/d") def download( b: str = Query(..., description="bucket id namespace/name"), f: str = Query(..., description="path inside bucket"), exp: int = Query(...), sig: str = Query(...), ): _check_bucket(b) if exp < int(time.time()): raise HTTPException(410, "link expired") expected = _sign(b, f, exp) if not hmac.compare_digest(expected, sig): raise HTTPException(403, "bad signature") candidates = [f"{b}/{f}", f"datasets/{b}/{f}", f"buckets/{b}/{f}"] full = None info = None for c in candidates: try: info = fs.info(c) full = c break except Exception: continue if full is None: raise HTTPException(404, "file not found") size = info.get("size") def stream(): with fs.open(full, "rb") as src: while True: chunk = src.read(CHUNK) if not chunk: return yield chunk filename = f.rsplit("/", 1)[-1] headers = { "Content-Disposition": f'attachment; filename="{filename}"', "Cache-Control": "private, max-age=300", } if size: headers["Content-Length"] = str(size) return StreamingResponse(stream(), media_type="application/octet-stream", headers=headers) @app.exception_handler(HTTPException) def http_exc(_: Request, exc: HTTPException): return JSONResponse({"error": exc.detail}, status_code=exc.status_code)