| | """ |
| | Implementation of a custom transfer agent for the transfer type "multipart" for |
| | git-lfs. |
| | |
| | Inspired by: |
| | github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py |
| | |
| | Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md |
| | |
| | |
| | To launch debugger while developing: |
| | |
| | ``` [lfs "customtransfer.multipart"] |
| | path = /path/to/huggingface_hub/.env/bin/python args = -m debugpy --listen 5678 |
| | --wait-for-client |
| | /path/to/huggingface_hub/src/huggingface_hub/commands/huggingface_cli.py |
| | lfs-multipart-upload ```""" |
| |
|
| | import json |
| | import os |
| | import subprocess |
| | import sys |
| | from typing import Annotated, Optional |
| |
|
| | import typer |
| |
|
| | from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND |
| |
|
| | from ..utils import get_session, hf_raise_for_status, logging |
| | from ..utils._lfs import SliceFileObj |
| |
|
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| |
|
| | def lfs_enable_largefiles( |
| | path: Annotated[ |
| | str, |
| | typer.Argument( |
| | help="Local path to repository you want to configure.", |
| | ), |
| | ], |
| | ) -> None: |
| | """ |
| | Configure a local git repository to use the multipart transfer agent for large files. |
| | |
| | This command sets up git-lfs to use the custom multipart transfer agent |
| | which enables efficient uploading of large files in chunks. |
| | """ |
| | local_path = os.path.abspath(path) |
| | if not os.path.isdir(local_path): |
| | print("This does not look like a valid git repo.") |
| | raise typer.Exit(code=1) |
| | subprocess.run( |
| | "git config lfs.customtransfer.multipart.path hf".split(), |
| | check=True, |
| | cwd=local_path, |
| | ) |
| | subprocess.run( |
| | f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(), |
| | check=True, |
| | cwd=local_path, |
| | ) |
| | print("Local repo set up for largefiles") |
| |
|
| |
|
| | def write_msg(msg: dict): |
| | """Write out the message in Line delimited JSON.""" |
| | msg_str = json.dumps(msg) + "\n" |
| | sys.stdout.write(msg_str) |
| | sys.stdout.flush() |
| |
|
| |
|
| | def read_msg() -> Optional[dict]: |
| | """Read Line delimited JSON from stdin.""" |
| | msg = json.loads(sys.stdin.readline().strip()) |
| |
|
| | if "terminate" in (msg.get("type"), msg.get("event")): |
| | |
| | return None |
| |
|
| | if msg.get("event") not in ("download", "upload"): |
| | logger.critical("Received unexpected message") |
| | sys.exit(1) |
| |
|
| | return msg |
| |
|
| |
|
| | def lfs_multipart_upload() -> None: |
| | """Internal git-lfs custom transfer agent for multipart uploads. |
| | |
| | This function implements the custom transfer protocol for git-lfs multipart uploads. |
| | Handles chunked uploads of large files to Hugging Face Hub. |
| | """ |
| | |
| | |
| | |
| | init_msg = json.loads(sys.stdin.readline().strip()) |
| | if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"): |
| | write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}}) |
| | sys.exit(1) |
| |
|
| | |
| | |
| | |
| | |
| | write_msg({}) |
| |
|
| | |
| | |
| | while True: |
| | msg = read_msg() |
| | if msg is None: |
| | |
| | |
| | |
| | |
| | sys.exit(0) |
| |
|
| | oid = msg["oid"] |
| | filepath = msg["path"] |
| | completion_url = msg["action"]["href"] |
| | header = msg["action"]["header"] |
| | chunk_size = int(header.pop("chunk_size")) |
| | presigned_urls: list[str] = list(header.values()) |
| |
|
| | |
| | |
| | |
| | write_msg( |
| | { |
| | "event": "progress", |
| | "oid": oid, |
| | "bytesSoFar": 1, |
| | "bytesSinceLast": 0, |
| | } |
| | ) |
| |
|
| | parts = [] |
| | with open(filepath, "rb") as file: |
| | for i, presigned_url in enumerate(presigned_urls): |
| | with SliceFileObj( |
| | file, |
| | seek_from=i * chunk_size, |
| | read_limit=chunk_size, |
| | ) as data: |
| | r = get_session().put(presigned_url, data=data) |
| | hf_raise_for_status(r) |
| | parts.append( |
| | { |
| | "etag": r.headers.get("etag"), |
| | "partNumber": i + 1, |
| | } |
| | ) |
| | |
| | |
| | write_msg( |
| | { |
| | "event": "progress", |
| | "oid": oid, |
| | "bytesSoFar": (i + 1) * chunk_size, |
| | "bytesSinceLast": chunk_size, |
| | } |
| | ) |
| |
|
| | r = get_session().post( |
| | completion_url, |
| | json={ |
| | "oid": oid, |
| | "parts": parts, |
| | }, |
| | ) |
| | hf_raise_for_status(r) |
| |
|
| | write_msg({"event": "complete", "oid": oid}) |
| |
|