"""Cloudflare R2 asset uploader. Reads R2 credentials from the ``SCONFIG`` Space secret (a JSON blob) and uploads generated assets to a preset bucket. Object keys follow ``--.``; the prompt and request params are attached as JSON object metadata so the owner can trace any asset back to the request that produced it. The ``SCONFIG`` secret is JSON shaped like:: { "account_id": "", "akey": "", "skey": "", "token": "", # optional, not used for S3 "bucket": "free-generated-assets" # optional, this is the default } ``upload_asset`` never raises: callers always also return the original HF-generated asset, so an R2 failure degrades to "asset shown, upload errored" rather than breaking generation. """ from __future__ import annotations import datetime import json import mimetypes import os import threading import uuid CONFIG_ENV = "SCONFIG" DEFAULT_BUCKET = "free-generated-assets" _MAX_META_BYTES = 2000 # R2/S3 cap user metadata; stay well under header limits. _ensured_buckets: set[str] = set() _ensure_lock = threading.Lock() def _load_cfg() -> dict: raw = (os.environ.get(CONFIG_ENV) or "").strip() if not raw: raise RuntimeError(f"{CONFIG_ENV} secret is not set") return json.loads(raw) def uid_from_request(request, cookie_name: str = "uid") -> str: """Best-effort read of the caller's unique id from a gr.Request cookie. The web app forwards the per-browser anonymous id and the generator sets it as the ``uid`` cookie on the Space call; we record it in object metadata so every asset traces back to the requester. Returns "" if absent. """ if request is None: return "" try: cookies = getattr(request, "cookies", None) if isinstance(cookies, dict) and cookies.get(cookie_name): return str(cookies[cookie_name]) headers = getattr(request, "headers", None) raw = "" if headers is not None: raw = (headers.get("cookie") if hasattr(headers, "get") else "") or "" for part in raw.split(";"): k, _, v = part.strip().partition("=") if k == cookie_name and v: from urllib.parse import unquote return unquote(v) except Exception: # noqa: BLE001 - identity is best-effort, never fatal return "" return "" def _client(cfg: dict): import boto3 from botocore.config import Config return boto3.client( "s3", endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com", aws_access_key_id=cfg["akey"], aws_secret_access_key=cfg["skey"], region_name="auto", config=Config(signature_version="s3v4", retries={"max_attempts": 3}), ) def _now_stamp() -> str: return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S") def _meta_blob(prompt, params, uid: str = "") -> str: """Compact JSON of uid + prompt + params, truncated to fit R2 metadata limits.""" blob = json.dumps({"uid": uid, "prompt": prompt, "params": params}, ensure_ascii=True, default=str) if len(blob.encode("utf-8")) <= _MAX_META_BYTES: return blob keep = max(0, _MAX_META_BYTES - len(json.dumps({"uid": uid, "prompt": "", "params": params}, default=str)) - 16) short = {"uid": uid, "prompt": str(prompt)[:keep] + "...[truncated]", "params": params} return json.dumps(short, ensure_ascii=True, default=str)[:_MAX_META_BYTES] def _ensure_bucket(s3, bucket: str) -> None: """Create the bucket on first use if it does not already exist (best effort).""" if bucket in _ensured_buckets: return with _ensure_lock: if bucket in _ensured_buckets: return from botocore.exceptions import ClientError try: s3.head_bucket(Bucket=bucket) except ClientError as exc: code = str(exc.response.get("Error", {}).get("Code", "")) if code in ("404", "NoSuchBucket", "NotFound"): # Best effort: object-scoped tokens can't create buckets. If this # fails, let the subsequent put_object surface the real error # (NoSuchBucket) rather than masking it with an AccessDenied here. try: s3.create_bucket(Bucket=bucket) except ClientError: pass # Other codes (e.g. 403 with object-only tokens) are ignored; the # subsequent put_object is the real test. _ensured_buckets.add(bucket) def presign_get_url(filekey: str, bucket: str | None = None, expires: int = 604800) -> str | None: """Return a presigned GET URL for an uploaded object, or None on failure. ``expires`` defaults to 7 days (the SigV4 maximum). This lets callers hand back a directly-usable URL even when no public R2 domain is bound; downstream consumers that have a public base can still rebuild a clean URL from the ``filekey``/``bucket`` reported alongside it. Never raises. """ try: cfg = _load_cfg() bucket = bucket or cfg.get("bucket") or DEFAULT_BUCKET s3 = _client(cfg) return s3.generate_presigned_url( "get_object", Params={"Bucket": bucket, "Key": filekey}, ExpiresIn=int(expires), ) except Exception: # noqa: BLE001 - URL is best-effort, never fatal return None def upload_asset( *, namespace: str, prompt, params: dict, data: bytes | None = None, path: str | None = None, ext: str | None = None, content_type: str | None = None, uid: str = "", ) -> dict: """Upload one asset to R2. Provide either ``data`` (raw bytes) or ``path`` (a local file). ``uid`` is the caller's unique id (see :func:`uid_from_request`) and is recorded both as a dedicated object-metadata field and inside the generation JSON. Returns ``{"ok": True, "filekey": ..., "bucket": ...}`` on success or ``{"ok": False, "error": ...}`` on failure. Never raises. """ try: if data is None and path is None: raise ValueError("upload_asset requires either data or path") cfg = _load_cfg() bucket = cfg.get("bucket") or DEFAULT_BUCKET if ext is None and path is not None: ext = os.path.splitext(path)[1] ext = ext or "" if ext and not ext.startswith("."): ext = "." + ext if content_type is None: guessed = mimetypes.guess_type("x" + ext)[0] if ext else None content_type = guessed or "application/octet-stream" if data is None: with open(path, "rb") as fh: data = fh.read() filekey = f"{_now_stamp()}-{namespace}-{uuid.uuid4().hex[:8]}{ext}" s3 = _client(cfg) _ensure_bucket(s3, bucket) metadata = {"generation": _meta_blob(prompt, params, uid)} if uid: metadata["uid"] = uid s3.put_object( Bucket=bucket, Key=filekey, Body=data, ContentType=content_type, Metadata=metadata, ) return {"ok": True, "filekey": filekey, "bucket": bucket} except Exception as exc: # noqa: BLE001 - report any failure to the caller return {"ok": False, "error": f"{type(exc).__name__}: {exc}"}