Spaces:
Runtime error
Runtime error
File size: 7,541 Bytes
daf5e70 bae8329 daf5e70 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | """Cloudflare R2 asset uploader.
Reads R2 credentials from the ``SCONFIG`` Space secret (a JSON blob) and uploads
generated assets to a preset bucket. Object keys follow
``<yyyymmddhhmmss>-<namespace>-<rand>.<ext>``; the prompt and request params are
attached as JSON object metadata so the owner can trace any asset back to the
request that produced it.
The ``SCONFIG`` secret is JSON shaped like::
{
"account_id": "<cloudflare account id>",
"akey": "<R2 access key id>",
"skey": "<R2 secret access key>",
"token": "<cloudflare api token value>", # optional, not used for S3
"bucket": "free-generated-assets" # optional, this is the default
}
``upload_asset`` never raises: callers always also return the original
HF-generated asset, so an R2 failure degrades to "asset shown, upload errored"
rather than breaking generation.
"""
from __future__ import annotations
import datetime
import json
import mimetypes
import os
import threading
import uuid
CONFIG_ENV = "SCONFIG"
DEFAULT_BUCKET = "free-generated-assets"
_MAX_META_BYTES = 2000 # R2/S3 cap user metadata; stay well under header limits.
_ensured_buckets: set[str] = set()
_ensure_lock = threading.Lock()
def _load_cfg() -> dict:
raw = (os.environ.get(CONFIG_ENV) or "").strip()
if not raw:
raise RuntimeError(f"{CONFIG_ENV} secret is not set")
return json.loads(raw)
def uid_from_request(request, cookie_name: str = "uid") -> str:
"""Best-effort read of the caller's unique id from a gr.Request cookie.
The web app forwards the per-browser anonymous id and the generator sets it
as the ``uid`` cookie on the Space call; we record it in object metadata so
every asset traces back to the requester. Returns "" if absent.
"""
if request is None:
return ""
try:
cookies = getattr(request, "cookies", None)
if isinstance(cookies, dict) and cookies.get(cookie_name):
return str(cookies[cookie_name])
headers = getattr(request, "headers", None)
raw = ""
if headers is not None:
raw = (headers.get("cookie") if hasattr(headers, "get") else "") or ""
for part in raw.split(";"):
k, _, v = part.strip().partition("=")
if k == cookie_name and v:
from urllib.parse import unquote
return unquote(v)
except Exception: # noqa: BLE001 - identity is best-effort, never fatal
return ""
return ""
def _client(cfg: dict):
import boto3
from botocore.config import Config
return boto3.client(
"s3",
endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com",
aws_access_key_id=cfg["akey"],
aws_secret_access_key=cfg["skey"],
region_name="auto",
config=Config(signature_version="s3v4", retries={"max_attempts": 3}),
)
def _now_stamp() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S")
def _meta_blob(prompt, params, uid: str = "") -> str:
"""Compact JSON of uid + prompt + params, truncated to fit R2 metadata limits."""
blob = json.dumps({"uid": uid, "prompt": prompt, "params": params}, ensure_ascii=True, default=str)
if len(blob.encode("utf-8")) <= _MAX_META_BYTES:
return blob
keep = max(0, _MAX_META_BYTES - len(json.dumps({"uid": uid, "prompt": "", "params": params}, default=str)) - 16)
short = {"uid": uid, "prompt": str(prompt)[:keep] + "...[truncated]", "params": params}
return json.dumps(short, ensure_ascii=True, default=str)[:_MAX_META_BYTES]
def _ensure_bucket(s3, bucket: str) -> None:
"""Create the bucket on first use if it does not already exist (best effort)."""
if bucket in _ensured_buckets:
return
with _ensure_lock:
if bucket in _ensured_buckets:
return
from botocore.exceptions import ClientError
try:
s3.head_bucket(Bucket=bucket)
except ClientError as exc:
code = str(exc.response.get("Error", {}).get("Code", ""))
if code in ("404", "NoSuchBucket", "NotFound"):
# Best effort: object-scoped tokens can't create buckets. If this
# fails, let the subsequent put_object surface the real error
# (NoSuchBucket) rather than masking it with an AccessDenied here.
try:
s3.create_bucket(Bucket=bucket)
except ClientError:
pass
# Other codes (e.g. 403 with object-only tokens) are ignored; the
# subsequent put_object is the real test.
_ensured_buckets.add(bucket)
def presign_get_url(filekey: str, bucket: str | None = None, expires: int = 604800) -> str | None:
"""Return a presigned GET URL for an uploaded object, or None on failure.
``expires`` defaults to 7 days (the SigV4 maximum). This lets callers hand
back a directly-usable URL even when no public R2 domain is bound; downstream
consumers that have a public base can still rebuild a clean URL from the
``filekey``/``bucket`` reported alongside it. Never raises.
"""
try:
cfg = _load_cfg()
bucket = bucket or cfg.get("bucket") or DEFAULT_BUCKET
s3 = _client(cfg)
return s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": filekey},
ExpiresIn=int(expires),
)
except Exception: # noqa: BLE001 - URL is best-effort, never fatal
return None
def upload_asset(
*,
namespace: str,
prompt,
params: dict,
data: bytes | None = None,
path: str | None = None,
ext: str | None = None,
content_type: str | None = None,
uid: str = "",
) -> dict:
"""Upload one asset to R2.
Provide either ``data`` (raw bytes) or ``path`` (a local file). ``uid`` is
the caller's unique id (see :func:`uid_from_request`) and is recorded both as
a dedicated object-metadata field and inside the generation JSON. Returns
``{"ok": True, "filekey": ..., "bucket": ...}`` on success or
``{"ok": False, "error": ...}`` on failure. Never raises.
"""
try:
if data is None and path is None:
raise ValueError("upload_asset requires either data or path")
cfg = _load_cfg()
bucket = cfg.get("bucket") or DEFAULT_BUCKET
if ext is None and path is not None:
ext = os.path.splitext(path)[1]
ext = ext or ""
if ext and not ext.startswith("."):
ext = "." + ext
if content_type is None:
guessed = mimetypes.guess_type("x" + ext)[0] if ext else None
content_type = guessed or "application/octet-stream"
if data is None:
with open(path, "rb") as fh:
data = fh.read()
filekey = f"{_now_stamp()}-{namespace}-{uuid.uuid4().hex[:8]}{ext}"
s3 = _client(cfg)
_ensure_bucket(s3, bucket)
metadata = {"generation": _meta_blob(prompt, params, uid)}
if uid:
metadata["uid"] = uid
s3.put_object(
Bucket=bucket,
Key=filekey,
Body=data,
ContentType=content_type,
Metadata=metadata,
)
return {"ok": True, "filekey": filekey, "bucket": bucket}
except Exception as exc: # noqa: BLE001 - report any failure to the caller
return {"ok": False, "error": f"{type(exc).__name__}: {exc}"}
|