Spaces:
Runtime error
Runtime error
| """Cloudflare R2 asset uploader. | |
| Reads R2 credentials from the ``SCONFIG`` Space secret (a JSON blob) and uploads | |
| generated assets to a preset bucket. Object keys follow | |
| ``<yyyymmddhhmmss>-<namespace>-<rand>.<ext>``; the prompt and request params are | |
| attached as JSON object metadata so the owner can trace any asset back to the | |
| request that produced it. | |
| The ``SCONFIG`` secret is JSON shaped like:: | |
| { | |
| "account_id": "<cloudflare account id>", | |
| "akey": "<R2 access key id>", | |
| "skey": "<R2 secret access key>", | |
| "token": "<cloudflare api token value>", # optional, not used for S3 | |
| "bucket": "free-generated-assets" # optional, this is the default | |
| } | |
| ``upload_asset`` never raises: callers always also return the original | |
| HF-generated asset, so an R2 failure degrades to "asset shown, upload errored" | |
| rather than breaking generation. | |
| """ | |
| from __future__ import annotations | |
| import datetime | |
| import json | |
| import mimetypes | |
| import os | |
| import threading | |
| import uuid | |
| CONFIG_ENV = "SCONFIG" | |
| DEFAULT_BUCKET = "free-generated-assets" | |
| _MAX_META_BYTES = 2000 # R2/S3 cap user metadata; stay well under header limits. | |
| _ensured_buckets: set[str] = set() | |
| _ensure_lock = threading.Lock() | |
| def _load_cfg() -> dict: | |
| raw = (os.environ.get(CONFIG_ENV) or "").strip() | |
| if not raw: | |
| raise RuntimeError(f"{CONFIG_ENV} secret is not set") | |
| return json.loads(raw) | |
| def uid_from_request(request, cookie_name: str = "uid") -> str: | |
| """Best-effort read of the caller's unique id from a gr.Request cookie. | |
| The web app forwards the per-browser anonymous id and the generator sets it | |
| as the ``uid`` cookie on the Space call; we record it in object metadata so | |
| every asset traces back to the requester. Returns "" if absent. | |
| """ | |
| if request is None: | |
| return "" | |
| try: | |
| cookies = getattr(request, "cookies", None) | |
| if isinstance(cookies, dict) and cookies.get(cookie_name): | |
| return str(cookies[cookie_name]) | |
| headers = getattr(request, "headers", None) | |
| raw = "" | |
| if headers is not None: | |
| raw = (headers.get("cookie") if hasattr(headers, "get") else "") or "" | |
| for part in raw.split(";"): | |
| k, _, v = part.strip().partition("=") | |
| if k == cookie_name and v: | |
| from urllib.parse import unquote | |
| return unquote(v) | |
| except Exception: # noqa: BLE001 - identity is best-effort, never fatal | |
| return "" | |
| return "" | |
| def _client(cfg: dict): | |
| import boto3 | |
| from botocore.config import Config | |
| return boto3.client( | |
| "s3", | |
| endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com", | |
| aws_access_key_id=cfg["akey"], | |
| aws_secret_access_key=cfg["skey"], | |
| region_name="auto", | |
| config=Config(signature_version="s3v4", retries={"max_attempts": 3}), | |
| ) | |
| def _now_stamp() -> str: | |
| return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S") | |
| def _meta_blob(prompt, params, uid: str = "") -> str: | |
| """Compact JSON of uid + prompt + params, truncated to fit R2 metadata limits.""" | |
| blob = json.dumps({"uid": uid, "prompt": prompt, "params": params}, ensure_ascii=True, default=str) | |
| if len(blob.encode("utf-8")) <= _MAX_META_BYTES: | |
| return blob | |
| keep = max(0, _MAX_META_BYTES - len(json.dumps({"uid": uid, "prompt": "", "params": params}, default=str)) - 16) | |
| short = {"uid": uid, "prompt": str(prompt)[:keep] + "...[truncated]", "params": params} | |
| return json.dumps(short, ensure_ascii=True, default=str)[:_MAX_META_BYTES] | |
| def _ensure_bucket(s3, bucket: str) -> None: | |
| """Create the bucket on first use if it does not already exist (best effort).""" | |
| if bucket in _ensured_buckets: | |
| return | |
| with _ensure_lock: | |
| if bucket in _ensured_buckets: | |
| return | |
| from botocore.exceptions import ClientError | |
| try: | |
| s3.head_bucket(Bucket=bucket) | |
| except ClientError as exc: | |
| code = str(exc.response.get("Error", {}).get("Code", "")) | |
| if code in ("404", "NoSuchBucket", "NotFound"): | |
| # Best effort: object-scoped tokens can't create buckets. If this | |
| # fails, let the subsequent put_object surface the real error | |
| # (NoSuchBucket) rather than masking it with an AccessDenied here. | |
| try: | |
| s3.create_bucket(Bucket=bucket) | |
| except ClientError: | |
| pass | |
| # Other codes (e.g. 403 with object-only tokens) are ignored; the | |
| # subsequent put_object is the real test. | |
| _ensured_buckets.add(bucket) | |
| def presign_get_url(filekey: str, bucket: str | None = None, expires: int = 604800) -> str | None: | |
| """Return a presigned GET URL for an uploaded object, or None on failure. | |
| ``expires`` defaults to 7 days (the SigV4 maximum). This lets callers hand | |
| back a directly-usable URL even when no public R2 domain is bound; downstream | |
| consumers that have a public base can still rebuild a clean URL from the | |
| ``filekey``/``bucket`` reported alongside it. Never raises. | |
| """ | |
| try: | |
| cfg = _load_cfg() | |
| bucket = bucket or cfg.get("bucket") or DEFAULT_BUCKET | |
| s3 = _client(cfg) | |
| return s3.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": bucket, "Key": filekey}, | |
| ExpiresIn=int(expires), | |
| ) | |
| except Exception: # noqa: BLE001 - URL is best-effort, never fatal | |
| return None | |
| def upload_asset( | |
| *, | |
| namespace: str, | |
| prompt, | |
| params: dict, | |
| data: bytes | None = None, | |
| path: str | None = None, | |
| ext: str | None = None, | |
| content_type: str | None = None, | |
| uid: str = "", | |
| ) -> dict: | |
| """Upload one asset to R2. | |
| Provide either ``data`` (raw bytes) or ``path`` (a local file). ``uid`` is | |
| the caller's unique id (see :func:`uid_from_request`) and is recorded both as | |
| a dedicated object-metadata field and inside the generation JSON. Returns | |
| ``{"ok": True, "filekey": ..., "bucket": ...}`` on success or | |
| ``{"ok": False, "error": ...}`` on failure. Never raises. | |
| """ | |
| try: | |
| if data is None and path is None: | |
| raise ValueError("upload_asset requires either data or path") | |
| cfg = _load_cfg() | |
| bucket = cfg.get("bucket") or DEFAULT_BUCKET | |
| if ext is None and path is not None: | |
| ext = os.path.splitext(path)[1] | |
| ext = ext or "" | |
| if ext and not ext.startswith("."): | |
| ext = "." + ext | |
| if content_type is None: | |
| guessed = mimetypes.guess_type("x" + ext)[0] if ext else None | |
| content_type = guessed or "application/octet-stream" | |
| if data is None: | |
| with open(path, "rb") as fh: | |
| data = fh.read() | |
| filekey = f"{_now_stamp()}-{namespace}-{uuid.uuid4().hex[:8]}{ext}" | |
| s3 = _client(cfg) | |
| _ensure_bucket(s3, bucket) | |
| metadata = {"generation": _meta_blob(prompt, params, uid)} | |
| if uid: | |
| metadata["uid"] = uid | |
| s3.put_object( | |
| Bucket=bucket, | |
| Key=filekey, | |
| Body=data, | |
| ContentType=content_type, | |
| Metadata=metadata, | |
| ) | |
| return {"ok": True, "filekey": filekey, "bucket": bucket} | |
| except Exception as exc: # noqa: BLE001 - report any failure to the caller | |
| return {"ok": False, "error": f"{type(exc).__name__}: {exc}"} | |