import os from dataclasses import dataclass from pathlib import Path from typing import Optional, Tuple from urllib.parse import urlparse, urlunparse def _split_endpoint_bucket(endpoint: str) -> Tuple[str, str, str]: parsed = urlparse(endpoint) if not parsed.scheme or not parsed.netloc: return endpoint, "", "" path = (parsed.path or "").strip("/") if not path: return urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")), "", "" parts = [part for part in path.split("/") if part] bucket = parts[0] if parts else "" base_prefix = "/".join(parts[1:]) if len(parts) > 1 else "" base_endpoint = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) return base_endpoint, bucket, base_prefix def _join_key(prefix: str, key: str) -> str: prefix = (prefix or "").strip("/") key = (key or "").lstrip("/") if not prefix: return key if not key: return prefix return f"{prefix}/{key}" @dataclass(frozen=True) class R2Config: endpoint_url: str bucket: str access_key_id: str secret_access_key: str base_prefix: str = "" class R2Storage: def __init__(self, config: Optional[R2Config] = None): self.config = config or self._load_config_from_env() self._client = None @staticmethod def _load_config_from_env() -> R2Config: endpoint_raw = (os.getenv("R2_ENDPOINT") or os.getenv("R2_Endpoint") or "").strip() access_key_id = (os.getenv("R2_ACCESS_KEY_ID") or os.getenv("R2_ID") or "").strip() secret_access_key = (os.getenv("R2_SECRET_ACCESS_KEY") or os.getenv("R2_API") or "").strip() bucket = (os.getenv("R2_BUCKET") or "").strip() base_prefix = (os.getenv("R2_PREFIX") or "").strip() if endpoint_raw and not bucket: endpoint_url, bucket_from_path, prefix_from_path = _split_endpoint_bucket(endpoint_raw) bucket = bucket or bucket_from_path endpoint_raw = endpoint_url base_prefix = base_prefix or prefix_from_path endpoint_url = endpoint_raw if not endpoint_url or not bucket or not access_key_id or not secret_access_key: missing = [] if not endpoint_url: missing.append("R2_ENDPOINT / R2_Endpoint") if not bucket: missing.append("R2_BUCKET (或把 bucket 放到 R2_Endpoint 的路径里)") if not access_key_id: missing.append("R2_ACCESS_KEY_ID / R2_ID") if not secret_access_key: missing.append("R2_SECRET_ACCESS_KEY / R2_API") raise ValueError(f"R2 配置缺失: {', '.join(missing)}") if secret_access_key.startswith("cfat_"): raise ValueError("R2_SECRET_ACCESS_KEY 看起来是 Cloudflare API Token(cfat_),不是 R2 的 S3 Secret Access Key。请在 Cloudflare 控制台生成 R2 的 S3 API 访问密钥并替换。") return R2Config( endpoint_url=endpoint_url, bucket=bucket, access_key_id=access_key_id, secret_access_key=secret_access_key, base_prefix=base_prefix, ) def _get_client(self): if self._client is not None: return self._client try: import boto3 from botocore.client import Config except Exception as exc: raise RuntimeError("缺少依赖 boto3,请先安装 requirements.txt") from exc self._client = boto3.client( "s3", endpoint_url=self.config.endpoint_url, aws_access_key_id=self.config.access_key_id, aws_secret_access_key=self.config.secret_access_key, region_name="auto", config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), ) return self._client def resolve_key(self, key: str) -> str: return _join_key(self.config.base_prefix, key) def exists(self, key: str) -> bool: client = self._get_client() resolved_key = self.resolve_key(key) try: client.head_object(Bucket=self.config.bucket, Key=resolved_key) return True except Exception as exc: try: from botocore.exceptions import ClientError except Exception: raise if isinstance(exc, ClientError): error = (exc.response or {}).get("Error") or {} code = str(error.get("Code") or "") if code in {"404", "NoSuchKey", "NotFound"}: return False raise def upload_file(self, local_path: str | Path, key: str, content_type: Optional[str] = None) -> str: client = self._get_client() resolved_key = self.resolve_key(key) path = Path(local_path) if not path.exists(): raise FileNotFoundError(str(path)) kwargs = {} if content_type: kwargs["ContentType"] = content_type with path.open("rb") as handle: client.put_object(Bucket=self.config.bucket, Key=resolved_key, Body=handle, **kwargs) return resolved_key def download_file(self, key: str, local_path: str | Path) -> Path: client = self._get_client() resolved_key = self.resolve_key(key) path = Path(local_path) path.parent.mkdir(parents=True, exist_ok=True) response = client.get_object(Bucket=self.config.bucket, Key=resolved_key) body = response.get("Body") try: with path.open("wb") as handle: handle.write(body.read()) finally: if body is not None: try: body.close() except Exception: pass return path def list_keys(self, prefix: str = "", max_keys: int = 20) -> list[str]: client = self._get_client() resolved_prefix = self.resolve_key(prefix) if prefix else "" params = {"Bucket": self.config.bucket, "MaxKeys": int(max_keys)} if resolved_prefix: params["Prefix"] = resolved_prefix response = client.list_objects_v2(**params) contents = response.get("Contents") or [] return [item.get("Key", "") for item in contents if isinstance(item, dict) and item.get("Key")] def presign_get_url(self, key: str, expires_in: int = 3600) -> str: client = self._get_client() resolved_key = self.resolve_key(key) return client.generate_presigned_url( ClientMethod="get_object", Params={"Bucket": self.config.bucket, "Key": resolved_key}, ExpiresIn=int(expires_in), )