import hashlib
import hmac
import os
import time
from typing import Optional
from urllib.parse import quote, urlencode
from fastapi import FastAPI, Header, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from huggingface_hub import HfApi, HfFileSystem
HF_TOKEN = os.environ["HF_TOKEN"]
SIGN_SECRET = os.environ["SIGN_SECRET"].encode()
ADMIN_TOKEN = os.environ["PROXY_ADMIN_TOKEN"]
ALLOWED_BUCKETS = {b.strip() for b in os.environ.get("ALLOWED_BUCKETS", "").split(",") if b.strip()}
DEFAULT_TTL = int(os.environ.get("DEFAULT_TTL", "86400"))
MAX_TTL = int(os.environ.get("MAX_TTL", str(7 * 86400)))
CHUNK = 1024 * 1024 # 1 MiB
fs = HfFileSystem(token=HF_TOKEN)
api = HfApi(token=HF_TOKEN)
app = FastAPI(title="bucket-proxy")
def _sign(bucket: str, path: str, exp: int) -> str:
msg = f"{bucket}|{path}|{exp}".encode()
return hmac.new(SIGN_SECRET, msg, hashlib.sha256).hexdigest()
def _check_bucket(bucket: str) -> None:
if ALLOWED_BUCKETS and bucket not in ALLOWED_BUCKETS:
raise HTTPException(403, f"bucket {bucket} not in allowlist")
def _check_admin(token_query: Optional[str], authorization: Optional[str]) -> None:
token = token_query
if not token and authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if not token:
raise HTTPException(401, "missing admin token (query 't' or Bearer header)")
if not hmac.compare_digest(token, ADMIN_TOKEN):
raise HTTPException(403, "bad admin token")
@app.get("/health")
def health():
return {"ok": True, "ts": int(time.time())}
INDEX_HTML = """
bucket-proxy
bucket-proxy ยท sign URL
"""
@app.get("/", response_class=HTMLResponse)
def index():
return INDEX_HTML
@app.get("/sign")
def sign(
request: Request,
bucket: str = Query(..., description="namespace/bucket-name"),
path: str = Query(..., description="path inside the bucket"),
ttl: int = Query(DEFAULT_TTL, ge=60, le=MAX_TTL),
t: Optional[str] = Query(None, description="admin token (alternative to Bearer)"),
authorization: Optional[str] = Header(None),
):
_check_admin(t, authorization)
_check_bucket(bucket)
path = path.lstrip("/")
# Try model repo first (no prefix), then datasets/, then buckets/
candidates = [f"{bucket}/{path}", f"datasets/{bucket}/{path}", f"buckets/{bucket}/{path}"]
full = None
for c in candidates:
try:
if fs.exists(c):
full = c
break
except Exception:
continue
if full is None:
raise HTTPException(404, f"file not found: {bucket}/{path}")
exp = int(time.time()) + ttl
sig = _sign(bucket, path, exp)
host = request.headers.get("x-forwarded-host") or request.url.netloc
scheme = request.headers.get("x-forwarded-proto", "https")
base = f"{scheme}://{host}"
qs = urlencode({"b": bucket, "f": path, "exp": exp, "sig": sig}, quote_via=quote)
return {
"url": f"{base}/d?{qs}",
"expires_at": exp,
"ttl": ttl,
"size": fs.info(full).get("size"),
}
@app.get("/d")
def download(
b: str = Query(..., description="bucket id namespace/name"),
f: str = Query(..., description="path inside bucket"),
exp: int = Query(...),
sig: str = Query(...),
):
_check_bucket(b)
if exp < int(time.time()):
raise HTTPException(410, "link expired")
expected = _sign(b, f, exp)
if not hmac.compare_digest(expected, sig):
raise HTTPException(403, "bad signature")
candidates = [f"{b}/{f}", f"datasets/{b}/{f}", f"buckets/{b}/{f}"]
full = None
info = None
for c in candidates:
try:
info = fs.info(c)
full = c
break
except Exception:
continue
if full is None:
raise HTTPException(404, "file not found")
size = info.get("size")
def stream():
with fs.open(full, "rb") as src:
while True:
chunk = src.read(CHUNK)
if not chunk:
return
yield chunk
filename = f.rsplit("/", 1)[-1]
headers = {
"Content-Disposition": f'attachment; filename="{filename}"',
"Cache-Control": "private, max-age=300",
}
if size:
headers["Content-Length"] = str(size)
return StreamingResponse(stream(), media_type="application/octet-stream", headers=headers)
@app.exception_handler(HTTPException)
def http_exc(_: Request, exc: HTTPException):
return JSONResponse({"error": exc.detail}, status_code=exc.status_code)