ImageStudio / r2_uploader.py
nsfwalex
feat: add UI-less prompt_to_video_assets endpoint (LLM->image->LLM)
bae8329
"""Cloudflare R2 asset uploader.
Reads R2 credentials from the ``SCONFIG`` Space secret (a JSON blob) and uploads
generated assets to a preset bucket. Object keys follow
``<yyyymmddhhmmss>-<namespace>-<rand>.<ext>``; the prompt and request params are
attached as JSON object metadata so the owner can trace any asset back to the
request that produced it.
The ``SCONFIG`` secret is JSON shaped like::
{
"account_id": "<cloudflare account id>",
"akey": "<R2 access key id>",
"skey": "<R2 secret access key>",
"token": "<cloudflare api token value>", # optional, not used for S3
"bucket": "free-generated-assets" # optional, this is the default
}
``upload_asset`` never raises: callers always also return the original
HF-generated asset, so an R2 failure degrades to "asset shown, upload errored"
rather than breaking generation.
"""
from __future__ import annotations
import datetime
import json
import mimetypes
import os
import threading
import uuid
CONFIG_ENV = "SCONFIG"
DEFAULT_BUCKET = "free-generated-assets"
_MAX_META_BYTES = 2000 # R2/S3 cap user metadata; stay well under header limits.
_ensured_buckets: set[str] = set()
_ensure_lock = threading.Lock()
def _load_cfg() -> dict:
raw = (os.environ.get(CONFIG_ENV) or "").strip()
if not raw:
raise RuntimeError(f"{CONFIG_ENV} secret is not set")
return json.loads(raw)
def uid_from_request(request, cookie_name: str = "uid") -> str:
"""Best-effort read of the caller's unique id from a gr.Request cookie.
The web app forwards the per-browser anonymous id and the generator sets it
as the ``uid`` cookie on the Space call; we record it in object metadata so
every asset traces back to the requester. Returns "" if absent.
"""
if request is None:
return ""
try:
cookies = getattr(request, "cookies", None)
if isinstance(cookies, dict) and cookies.get(cookie_name):
return str(cookies[cookie_name])
headers = getattr(request, "headers", None)
raw = ""
if headers is not None:
raw = (headers.get("cookie") if hasattr(headers, "get") else "") or ""
for part in raw.split(";"):
k, _, v = part.strip().partition("=")
if k == cookie_name and v:
from urllib.parse import unquote
return unquote(v)
except Exception: # noqa: BLE001 - identity is best-effort, never fatal
return ""
return ""
def _client(cfg: dict):
import boto3
from botocore.config import Config
return boto3.client(
"s3",
endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com",
aws_access_key_id=cfg["akey"],
aws_secret_access_key=cfg["skey"],
region_name="auto",
config=Config(signature_version="s3v4", retries={"max_attempts": 3}),
)
def _now_stamp() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S")
def _meta_blob(prompt, params, uid: str = "") -> str:
"""Compact JSON of uid + prompt + params, truncated to fit R2 metadata limits."""
blob = json.dumps({"uid": uid, "prompt": prompt, "params": params}, ensure_ascii=True, default=str)
if len(blob.encode("utf-8")) <= _MAX_META_BYTES:
return blob
keep = max(0, _MAX_META_BYTES - len(json.dumps({"uid": uid, "prompt": "", "params": params}, default=str)) - 16)
short = {"uid": uid, "prompt": str(prompt)[:keep] + "...[truncated]", "params": params}
return json.dumps(short, ensure_ascii=True, default=str)[:_MAX_META_BYTES]
def _ensure_bucket(s3, bucket: str) -> None:
"""Create the bucket on first use if it does not already exist (best effort)."""
if bucket in _ensured_buckets:
return
with _ensure_lock:
if bucket in _ensured_buckets:
return
from botocore.exceptions import ClientError
try:
s3.head_bucket(Bucket=bucket)
except ClientError as exc:
code = str(exc.response.get("Error", {}).get("Code", ""))
if code in ("404", "NoSuchBucket", "NotFound"):
# Best effort: object-scoped tokens can't create buckets. If this
# fails, let the subsequent put_object surface the real error
# (NoSuchBucket) rather than masking it with an AccessDenied here.
try:
s3.create_bucket(Bucket=bucket)
except ClientError:
pass
# Other codes (e.g. 403 with object-only tokens) are ignored; the
# subsequent put_object is the real test.
_ensured_buckets.add(bucket)
def presign_get_url(filekey: str, bucket: str | None = None, expires: int = 604800) -> str | None:
"""Return a presigned GET URL for an uploaded object, or None on failure.
``expires`` defaults to 7 days (the SigV4 maximum). This lets callers hand
back a directly-usable URL even when no public R2 domain is bound; downstream
consumers that have a public base can still rebuild a clean URL from the
``filekey``/``bucket`` reported alongside it. Never raises.
"""
try:
cfg = _load_cfg()
bucket = bucket or cfg.get("bucket") or DEFAULT_BUCKET
s3 = _client(cfg)
return s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": filekey},
ExpiresIn=int(expires),
)
except Exception: # noqa: BLE001 - URL is best-effort, never fatal
return None
def upload_asset(
*,
namespace: str,
prompt,
params: dict,
data: bytes | None = None,
path: str | None = None,
ext: str | None = None,
content_type: str | None = None,
uid: str = "",
) -> dict:
"""Upload one asset to R2.
Provide either ``data`` (raw bytes) or ``path`` (a local file). ``uid`` is
the caller's unique id (see :func:`uid_from_request`) and is recorded both as
a dedicated object-metadata field and inside the generation JSON. Returns
``{"ok": True, "filekey": ..., "bucket": ...}`` on success or
``{"ok": False, "error": ...}`` on failure. Never raises.
"""
try:
if data is None and path is None:
raise ValueError("upload_asset requires either data or path")
cfg = _load_cfg()
bucket = cfg.get("bucket") or DEFAULT_BUCKET
if ext is None and path is not None:
ext = os.path.splitext(path)[1]
ext = ext or ""
if ext and not ext.startswith("."):
ext = "." + ext
if content_type is None:
guessed = mimetypes.guess_type("x" + ext)[0] if ext else None
content_type = guessed or "application/octet-stream"
if data is None:
with open(path, "rb") as fh:
data = fh.read()
filekey = f"{_now_stamp()}-{namespace}-{uuid.uuid4().hex[:8]}{ext}"
s3 = _client(cfg)
_ensure_bucket(s3, bucket)
metadata = {"generation": _meta_blob(prompt, params, uid)}
if uid:
metadata["uid"] = uid
s3.put_object(
Bucket=bucket,
Key=filekey,
Body=data,
ContentType=content_type,
Metadata=metadata,
)
return {"ok": True, "filekey": filekey, "bucket": bucket}
except Exception as exc: # noqa: BLE001 - report any failure to the caller
return {"ok": False, "error": f"{type(exc).__name__}: {exc}"}