Spaces:
Sleeping
Sleeping
| import { HUB_URL } from "../consts"; | |
| import { HubApiError, createApiError, InvalidApiResponseFormatError } from "../error"; | |
| import type { | |
| ApiBucketBatchResponse, | |
| ApiCommitHeader, | |
| ApiCommitLfsFile, | |
| ApiCommitOperation, | |
| ApiLfsBatchRequest, | |
| ApiLfsBatchResponse, | |
| ApiLfsCompleteMultipartRequest, | |
| ApiPreuploadRequest, | |
| ApiPreuploadResponse, | |
| } from "../types/api/api-commit"; | |
| import type { CredentialsParams, RepoDesignation } from "../types/public"; | |
| import { checkCredentials } from "../utils/checkCredentials"; | |
| import { chunk } from "../utils/chunk"; | |
| import { promisesQueue } from "../utils/promisesQueue"; | |
| import { promisesQueueStreaming } from "../utils/promisesQueueStreaming"; | |
| import { sha256 } from "../utils/sha256"; | |
| import { toRepoId } from "../utils/toRepoId"; | |
| import { WebBlob } from "../utils/WebBlob"; | |
| import { eventToGenerator } from "../utils/eventToGenerator"; | |
| import { base64FromBytes } from "../utils/base64FromBytes"; | |
| import { isFrontend } from "../utils/isFrontend"; | |
| import { createBlobs } from "../utils/createBlobs"; | |
| import type { XetTokenParams } from "../utils/uploadShards"; | |
| import { uploadShards } from "../utils/uploadShards"; | |
| import { splitAsyncGenerator } from "../utils/splitAsyncGenerator"; | |
| import { SplicedBlob } from "../utils/SplicedBlob"; | |
| const CONCURRENT_SHAS = 5; | |
| const CONCURRENT_LFS_UPLOADS = 5; | |
| const MULTIPART_PARALLEL_UPLOAD = 5; | |
| export interface CommitDeletedEntry { | |
| operation: "delete"; | |
| path: string; | |
| } | |
| export type ContentSource = Blob | URL; | |
| export interface CommitFile { | |
| operation: "addOrUpdate"; | |
| path: string; | |
| content: ContentSource; | |
| // forceLfs?: boolean | |
| } | |
| /** | |
| * Opitmized when only the beginning or the end of the file is replaced | |
| * | |
| * todo: handle other cases | |
| */ | |
| export interface CommitEditFile { | |
| operation: "edit"; | |
| path: string; | |
| /** Later, will be ContentSource. For now simpler to just handle blobs */ | |
| originalContent: Blob; | |
| edits: Array<{ | |
| /** | |
| * Later, will be ContentSource. For now simpler to just handle blobs | |
| * | |
| * originalContent from [start, end) will be replaced by this | |
| */ | |
| content: Blob; | |
| /** | |
| * The start position of the edit in the original content | |
| */ | |
| start: number; | |
| /** | |
| * The end position of the edit in the original content | |
| * | |
| * originalContent from [start, end) will be replaced by the edit | |
| */ | |
| end: number; | |
| }>; | |
| } | |
| type CommitBlob = Omit<CommitFile, "content"> & { content: Blob }; | |
| // TODO: find a nice way to handle LFS & non-LFS files in an uniform manner, see https://github.com/huggingface/moon-landing/issues/4370 | |
| // export type CommitRenameFile = { | |
| // operation: "rename"; | |
| // path: string; | |
| // oldPath: string; | |
| // content?: ContentSource; | |
| // }; | |
| export type CommitOperation = CommitDeletedEntry | CommitFile | CommitEditFile /* | CommitRenameFile */; | |
| type CommitBlobOperation = Exclude<CommitOperation, CommitFile> | CommitBlob; | |
| export type CommitParams = { | |
| title: string; | |
| description?: string; | |
| repo: RepoDesignation; | |
| operations: CommitOperation[]; | |
| /** @default "main" */ | |
| branch?: string; | |
| /** | |
| * Parent commit. Optional | |
| * | |
| * - When opening a PR: will use parentCommit as the parent commit | |
| * - When committing on a branch: Will make sure that there were no intermediate commits | |
| */ | |
| parentCommit?: string; | |
| isPullRequest?: boolean; | |
| hubUrl?: string; | |
| /** | |
| * Whether to use web workers to compute SHA256 hashes. | |
| * | |
| * @default false | |
| */ | |
| useWebWorkers?: boolean | { minSize?: number; poolSize?: number }; | |
| /** | |
| * Maximum depth of folders to upload. Files deeper than this will be ignored | |
| * | |
| * @default 5 | |
| */ | |
| maxFolderDepth?: number; | |
| /** | |
| * Custom fetch function to use instead of the default one, for example to use a proxy or edit headers. | |
| */ | |
| fetch?: typeof fetch; | |
| abortSignal?: AbortSignal; | |
| /** | |
| * @default true | |
| * | |
| * Use xet protocol: https://huggingface.co/blog/xet-on-the-hub to upload, rather than a basic S3 PUT | |
| */ | |
| useXet?: boolean; | |
| // Credentials are optional due to custom fetch functions or cookie auth | |
| } & Partial<CredentialsParams>; | |
| export interface CommitOutput { | |
| pullRequestUrl?: string; | |
| commit: { | |
| oid: string; | |
| url: string; | |
| }; | |
| hookOutput: string; | |
| } | |
| function isFileOperation(op: CommitOperation): op is CommitBlob { | |
| const ret = op.operation === "addOrUpdate"; | |
| if (ret && !(op.content instanceof Blob)) { | |
| throw new TypeError("Precondition failed: op.content should be a Blob"); | |
| } | |
| return ret; | |
| } | |
| export type CommitProgressEvent = | |
| | { | |
| event: "phase"; | |
| phase: "preuploading" | "uploadingLargeFiles" | "committing"; | |
| } | |
| | { | |
| event: "fileProgress"; | |
| path: string; | |
| progress: number; | |
| state: "hashing" | "uploading" | "error"; | |
| }; | |
| /** | |
| * Internal function for now, used by commit. | |
| * | |
| * Can be exposed later to offer fine-tuned progress info | |
| * | |
| * CommitOutput is not present for bucket commits | |
| */ | |
| export async function* commitIter(params: CommitParams): AsyncGenerator<CommitProgressEvent, CommitOutput | undefined> { | |
| const accessToken = checkCredentials(params); | |
| const repoId = toRepoId(params.repo); | |
| if (repoId.type === "bucket") { | |
| return yield* commitIterBucket(params); | |
| } | |
| yield { event: "phase", phase: "preuploading" }; | |
| let useXet = params.useXet ?? true; | |
| const lfsShas = new Map<string, string | null>(); | |
| const abortController = new AbortController(); | |
| const abortSignal = abortController.signal; | |
| // Polyfill see https://discuss.huggingface.co/t/why-cant-i-upload-a-parquet-file-to-my-dataset-error-o-throwifaborted-is-not-a-function/62245 | |
| if (!abortSignal.throwIfAborted) { | |
| abortSignal.throwIfAborted = () => { | |
| if (abortSignal.aborted) { | |
| throw new DOMException("Aborted", "AbortError"); | |
| } | |
| }; | |
| } | |
| if (params.abortSignal) { | |
| params.abortSignal.addEventListener("abort", () => abortController.abort()); | |
| } | |
| try { | |
| const allOperations = ( | |
| await Promise.all( | |
| params.operations.map(async (operation) => { | |
| if (operation.operation === "edit") { | |
| // Convert EditFile operation to a file operation with SplicedBlob | |
| const splicedBlob = SplicedBlob.create( | |
| operation.originalContent, | |
| operation.edits.map((splice) => ({ insert: splice.content, start: splice.start, end: splice.end })), | |
| ); | |
| return { | |
| operation: "addOrUpdate" as const, | |
| path: operation.path, | |
| content: splicedBlob, | |
| }; | |
| } | |
| if (operation.operation !== "addOrUpdate") { | |
| return operation; | |
| } | |
| if (!(operation.content instanceof URL)) { | |
| /** TS trick to enforce `content` to be a `Blob` */ | |
| return { ...operation, content: operation.content }; | |
| } | |
| const lazyBlobs = await createBlobs(operation.content, operation.path, { | |
| fetch: params.fetch, | |
| maxFolderDepth: params.maxFolderDepth, | |
| }); | |
| abortSignal?.throwIfAborted(); | |
| return lazyBlobs.map((blob) => ({ | |
| ...operation, | |
| content: blob.blob, | |
| path: blob.path, | |
| })); | |
| }), | |
| ) | |
| ).flat(1); | |
| const gitAttributes = allOperations.filter(isFileOperation).find((op) => op.path === ".gitattributes")?.content; | |
| for (const operations of chunk(allOperations.filter(isFileOperation), 100)) { | |
| const payload: ApiPreuploadRequest = { | |
| gitAttributes: gitAttributes && (await gitAttributes.text()), | |
| files: await Promise.all( | |
| operations.map(async (operation) => ({ | |
| path: operation.path, | |
| size: operation.content.size, | |
| sample: base64FromBytes(new Uint8Array(await operation.content.slice(0, 512).arrayBuffer())), | |
| })), | |
| ), | |
| }; | |
| abortSignal?.throwIfAborted(); | |
| const res = await (params.fetch ?? fetch)( | |
| `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/preupload/${encodeURIComponent( | |
| params.branch ?? "main", | |
| )}` + (params.isPullRequest ? "?create_pr=1" : ""), | |
| { | |
| method: "POST", | |
| headers: { | |
| ...(accessToken && { Authorization: `Bearer ${accessToken}` }), | |
| "Content-Type": "application/json", | |
| }, | |
| body: JSON.stringify(payload), | |
| signal: abortSignal, | |
| }, | |
| ); | |
| if (!res.ok) { | |
| throw await createApiError(res); | |
| } | |
| const json: ApiPreuploadResponse = await res.json(); | |
| for (const file of json.files) { | |
| if (file.uploadMode === "lfs") { | |
| lfsShas.set(file.path, null); | |
| } | |
| } | |
| } | |
| yield { event: "phase", phase: "uploadingLargeFiles" }; | |
| for (const operations of chunk( | |
| allOperations.filter(isFileOperation).filter((op) => lfsShas.has(op.path)), | |
| 100, | |
| )) { | |
| const shas = yield* eventToGenerator< | |
| { event: "fileProgress"; state: "hashing"; path: string; progress: number }, | |
| string[] | |
| >((yieldCallback, returnCallback, rejectCallack) => { | |
| return promisesQueue( | |
| operations.map((op) => async () => { | |
| const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers, abortSignal: abortSignal }); | |
| let res: IteratorResult<number, string>; | |
| do { | |
| res = await iterator.next(); | |
| if (!res.done) { | |
| yieldCallback({ event: "fileProgress", path: op.path, progress: res.value, state: "hashing" }); | |
| } | |
| } while (!res.done); | |
| const sha = res.value; | |
| lfsShas.set(op.path, res.value); | |
| return sha; | |
| }), | |
| CONCURRENT_SHAS, | |
| ).then(returnCallback, rejectCallack); | |
| }); | |
| abortSignal?.throwIfAborted(); | |
| const payload: ApiLfsBatchRequest = { | |
| operation: "upload", | |
| // multipart is a custom protocol for HF | |
| transfers: ["basic", "multipart", ...(useXet ? ["xet" as const] : [])], | |
| hash_algo: "sha_256", | |
| ...(!params.isPullRequest && { | |
| ref: { | |
| name: params.branch ?? "main", | |
| }, | |
| }), | |
| objects: operations.map((op, i) => ({ | |
| oid: shas[i], | |
| size: op.content.size, | |
| })), | |
| }; | |
| const res = await (params.fetch ?? fetch)( | |
| `${params.hubUrl ?? HUB_URL}/${repoId.type === "model" ? "" : repoId.type + "s/"}${ | |
| repoId.name | |
| }.git/info/lfs/objects/batch`, | |
| { | |
| method: "POST", | |
| headers: { | |
| ...(accessToken && { Authorization: `Bearer ${accessToken}` }), | |
| Accept: "application/vnd.git-lfs+json", | |
| "Content-Type": "application/vnd.git-lfs+json", | |
| }, | |
| body: JSON.stringify(payload), | |
| signal: abortSignal, | |
| }, | |
| ); | |
| if (!res.ok) { | |
| throw await createApiError(res); | |
| } | |
| const json: ApiLfsBatchResponse = await res.json(); | |
| const batchRequestId = res.headers.get("X-Request-Id") || undefined; | |
| const shaToOperation = new Map(operations.map((op, i) => [shas[i], op])); | |
| if (useXet && json.transfer !== "xet") { | |
| useXet = false; | |
| } | |
| let xetParams: XetTokenParams | null = null; | |
| if (useXet) { | |
| // First get all the files that are already uploaded out of the way | |
| for (const obj of json.objects) { | |
| const op = shaToOperation.get(obj.oid); | |
| if (!op) { | |
| throw new InvalidApiResponseFormatError("Unrequested object ID in response"); | |
| } | |
| if (obj.error) { | |
| const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${ | |
| obj.error.message | |
| }${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`; | |
| throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage); | |
| } | |
| if (!obj.actions?.upload) { | |
| // Already uploaded | |
| yield { | |
| event: "fileProgress", | |
| path: op.path, | |
| progress: 1, | |
| state: "uploading", | |
| }; | |
| } else { | |
| const headers = new Headers(obj.actions.upload.header); | |
| xetParams = { | |
| sessionId: headers.get("X-Xet-Session-Id") ?? undefined, | |
| casUrl: headers.get("X-Xet-Cas-Url") ?? undefined, | |
| accessToken: headers.get("X-Xet-Access-Token") ?? undefined, | |
| expiresAt: headers.get("X-Xet-Token-Expiration") | |
| ? new Date(parseInt(headers.get("X-Xet-Token-Expiration") ?? "0") * 1000) | |
| : undefined, | |
| refreshWriteTokenUrl: obj.actions.upload.href, | |
| }; | |
| } | |
| } | |
| const source = (async function* () { | |
| for (const obj of json.objects) { | |
| const op = shaToOperation.get(obj.oid); | |
| if (!op || !obj.actions?.upload) { | |
| continue; | |
| } | |
| abortSignal?.throwIfAborted(); | |
| yield { content: op.content, path: op.path, sha256: obj.oid }; | |
| } | |
| })(); | |
| if (xetParams) { | |
| const fixedXetParams = xetParams; | |
| const sources = splitAsyncGenerator(source, 5); | |
| yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) => | |
| Promise.all( | |
| sources.map(async function (source) { | |
| for await (const event of uploadShards(source, { | |
| fetch: params.fetch, | |
| accessToken, | |
| hubUrl: params.hubUrl ?? HUB_URL, | |
| repo: repoId, | |
| xetParams: fixedXetParams, | |
| // todo: maybe leave empty if PR? | |
| rev: params.branch ?? "main", | |
| isPullRequest: params.isPullRequest, | |
| yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }), | |
| })) { | |
| if (event.event === "file") { | |
| yieldCallback({ | |
| event: "fileProgress" as const, | |
| path: event.path, | |
| progress: 1, | |
| state: "uploading" as const, | |
| }); | |
| } else if (event.event === "fileProgress") { | |
| yieldCallback({ | |
| event: "fileProgress" as const, | |
| path: event.path, | |
| progress: event.progress, | |
| state: "uploading" as const, | |
| }); | |
| } | |
| } | |
| }), | |
| ).then(() => returnCallback(undefined), rejectCallback), | |
| ); | |
| } else { | |
| // No LFS file to upload | |
| } | |
| } else { | |
| yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => { | |
| return promisesQueueStreaming( | |
| json.objects.map((obj) => async () => { | |
| const op = shaToOperation.get(obj.oid); | |
| if (!op) { | |
| throw new InvalidApiResponseFormatError("Unrequested object ID in response"); | |
| } | |
| abortSignal?.throwIfAborted(); | |
| if (obj.error) { | |
| const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${ | |
| obj.error.message | |
| }${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`; | |
| throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage); | |
| } | |
| if (!obj.actions?.upload) { | |
| // Already uploaded | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress: 1, | |
| state: "uploading", | |
| }); | |
| return; | |
| } | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress: 0, | |
| state: "uploading", | |
| }); | |
| const content = op.content; | |
| const header = obj.actions.upload.header; | |
| if (header?.chunk_size) { | |
| const chunkSize = parseInt(header.chunk_size); | |
| // multipart upload | |
| // parts are in upload.header['00001'] to upload.header['99999'] | |
| const completionUrl = obj.actions.upload.href; | |
| const parts = Object.keys(header).filter((key) => /^[0-9]+$/.test(key)); | |
| if (parts.length !== Math.ceil(content.size / chunkSize)) { | |
| throw new Error("Invalid server response to upload large LFS file, wrong number of parts"); | |
| } | |
| const completeReq: ApiLfsCompleteMultipartRequest = { | |
| oid: obj.oid, | |
| parts: parts.map((part) => ({ | |
| partNumber: +part, | |
| etag: "", | |
| })), | |
| }; | |
| // Defined here so that it's not redefined at each iteration (and the caller can tell it's for the same file) | |
| const progressCallback = (progress: number) => | |
| yieldCallback({ event: "fileProgress", path: op.path, progress, state: "uploading" }); | |
| await promisesQueueStreaming( | |
| parts.map((part) => async () => { | |
| abortSignal?.throwIfAborted(); | |
| const index = parseInt(part) - 1; | |
| const slice = content.slice(index * chunkSize, (index + 1) * chunkSize); | |
| const res = await (params.fetch ?? fetch)(header[part], { | |
| method: "PUT", | |
| /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ | |
| body: slice instanceof WebBlob && isFrontend ? await slice.arrayBuffer() : slice, | |
| signal: abortSignal, | |
| ...({ | |
| progressHint: { | |
| path: op.path, | |
| part: index, | |
| numParts: parts.length, | |
| progressCallback, | |
| }, | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| } as any), | |
| }); | |
| if (!res.ok) { | |
| throw await createApiError(res, { | |
| requestId: batchRequestId, | |
| message: `Error while uploading part ${part} of ${ | |
| operations[shas.indexOf(obj.oid)].path | |
| } to LFS storage`, | |
| }); | |
| } | |
| const eTag = res.headers.get("ETag"); | |
| if (!eTag) { | |
| throw new Error("Cannot get ETag of part during multipart upload"); | |
| } | |
| completeReq.parts[Number(part) - 1].etag = eTag; | |
| }), | |
| MULTIPART_PARALLEL_UPLOAD, | |
| ); | |
| abortSignal?.throwIfAborted(); | |
| const res = await (params.fetch ?? fetch)(completionUrl, { | |
| method: "POST", | |
| body: JSON.stringify(completeReq), | |
| headers: { | |
| Accept: "application/vnd.git-lfs+json", | |
| "Content-Type": "application/vnd.git-lfs+json", | |
| }, | |
| signal: abortSignal, | |
| }); | |
| if (!res.ok) { | |
| throw await createApiError(res, { | |
| requestId: batchRequestId, | |
| message: `Error completing multipart upload of ${ | |
| operations[shas.indexOf(obj.oid)].path | |
| } to LFS storage`, | |
| }); | |
| } | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress: 1, | |
| state: "uploading", | |
| }); | |
| } else { | |
| const res = await (params.fetch ?? fetch)(obj.actions.upload.href, { | |
| method: "PUT", | |
| headers: { | |
| ...(batchRequestId ? { "X-Request-Id": batchRequestId } : undefined), | |
| }, | |
| /** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */ | |
| body: content instanceof WebBlob && isFrontend ? await content.arrayBuffer() : content, | |
| signal: abortSignal, | |
| ...({ | |
| progressHint: { | |
| path: op.path, | |
| progressCallback: (progress: number) => | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress, | |
| state: "uploading", | |
| }), | |
| }, | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| } as any), | |
| }); | |
| if (!res.ok) { | |
| throw await createApiError(res, { | |
| requestId: batchRequestId, | |
| message: `Error while uploading ${operations[shas.indexOf(obj.oid)].path} to LFS storage`, | |
| }); | |
| } | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress: 1, | |
| state: "uploading", | |
| }); | |
| } | |
| }), | |
| CONCURRENT_LFS_UPLOADS, | |
| ).then(returnCallback, rejectCallback); | |
| }); | |
| } | |
| } | |
| abortSignal?.throwIfAborted(); | |
| yield { event: "phase", phase: "committing" }; | |
| return yield* eventToGenerator<CommitProgressEvent, CommitOutput>( | |
| async (yieldCallback, returnCallback, rejectCallback) => | |
| (params.fetch ?? fetch)( | |
| `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/commit/${encodeURIComponent( | |
| params.branch ?? "main", | |
| )}` + (params.isPullRequest ? "?create_pr=1" : ""), | |
| { | |
| method: "POST", | |
| headers: { | |
| ...(accessToken && { Authorization: `Bearer ${accessToken}` }), | |
| "Content-Type": "application/x-ndjson", | |
| }, | |
| body: [ | |
| { | |
| key: "header", | |
| value: { | |
| summary: params.title, | |
| description: params.description, | |
| parentCommit: params.parentCommit, | |
| } satisfies ApiCommitHeader, | |
| }, | |
| ...((await Promise.all( | |
| allOperations.map((operation) => { | |
| if (isFileOperation(operation)) { | |
| const sha = lfsShas.get(operation.path); | |
| if (sha) { | |
| return { | |
| key: "lfsFile", | |
| value: { | |
| path: operation.path, | |
| algo: "sha256", | |
| size: operation.content.size, | |
| oid: sha, | |
| } satisfies ApiCommitLfsFile, | |
| }; | |
| } | |
| } | |
| return convertOperationToNdJson(operation); | |
| }), | |
| )) satisfies ApiCommitOperation[]), | |
| ] | |
| .map((x) => JSON.stringify(x)) | |
| .join("\n"), | |
| signal: abortSignal, | |
| ...({ | |
| progressHint: { | |
| progressCallback: (progress: number) => { | |
| // For now, we display equal progress for all files | |
| // We could compute the progress based on the size of `convertOperationToNdJson` for each of the files instead | |
| for (const op of allOperations) { | |
| if (isFileOperation(op) && !lfsShas.has(op.path)) { | |
| yieldCallback({ | |
| event: "fileProgress", | |
| path: op.path, | |
| progress, | |
| state: "uploading", | |
| }); | |
| } | |
| } | |
| }, | |
| }, | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| } as any), | |
| }, | |
| ) | |
| .then(async (res) => { | |
| if (!res.ok) { | |
| throw await createApiError(res); | |
| } | |
| const json = await res.json(); | |
| returnCallback({ | |
| pullRequestUrl: json.pullRequestUrl, | |
| commit: { | |
| oid: json.commitOid, | |
| url: json.commitUrl, | |
| }, | |
| hookOutput: json.hookOutput, | |
| }); | |
| }) | |
| .catch(rejectCallback), | |
| ); | |
| } catch (err) { | |
| // For parallel requests, cancel them all if one fails | |
| abortController.abort(); | |
| throw err; | |
| } | |
| } | |
| export async function* commitIterBucket(params: CommitParams): AsyncGenerator<CommitProgressEvent> { | |
| const accessToken = checkCredentials(params); | |
| const repoId = toRepoId(params.repo); | |
| if (params.useXet === false) { | |
| throw new Error("useXet must be true or undefined for buckets"); | |
| } | |
| const abortController = new AbortController(); | |
| const abortSignal = abortController.signal; | |
| // Polyfill see https://discuss.huggingface.co/t/why-cant-i-upload-a-parquet-file-to-my-dataset-error-o-throwifaborted-is-not-a-function/62245 | |
| if (!abortSignal.throwIfAborted) { | |
| abortSignal.throwIfAborted = () => { | |
| if (abortSignal.aborted) { | |
| throw new DOMException("Aborted", "AbortError"); | |
| } | |
| }; | |
| } | |
| if (params.abortSignal) { | |
| params.abortSignal.addEventListener("abort", () => abortController.abort()); | |
| } | |
| try { | |
| const allOperations = ( | |
| await Promise.all( | |
| params.operations.map(async (operation) => { | |
| if (operation.operation === "edit") { | |
| // Convert EditFile operation to a file operation with SplicedBlob | |
| const splicedBlob = SplicedBlob.create( | |
| operation.originalContent, | |
| operation.edits.map((splice) => ({ insert: splice.content, start: splice.start, end: splice.end })), | |
| ); | |
| return { | |
| operation: "addOrUpdate" as const, | |
| path: operation.path, | |
| content: splicedBlob, | |
| }; | |
| } | |
| if (operation.operation !== "addOrUpdate") { | |
| return operation; | |
| } | |
| if (!(operation.content instanceof URL)) { | |
| /** TS trick to enforce `content` to be a `Blob` */ | |
| return { ...operation, content: operation.content }; | |
| } | |
| const lazyBlobs = await createBlobs(operation.content, operation.path, { | |
| fetch: params.fetch, | |
| maxFolderDepth: params.maxFolderDepth, | |
| }); | |
| abortSignal?.throwIfAborted(); | |
| return lazyBlobs.map((blob) => ({ | |
| ...operation, | |
| content: blob.blob, | |
| path: blob.path, | |
| })); | |
| }), | |
| ) | |
| ).flat(1); | |
| yield { event: "phase", phase: "uploadingLargeFiles" }; | |
| for (const operations of chunk(allOperations.filter(isFileOperation), 100)) { | |
| const xetHashes = new Map<string, string>(); | |
| abortSignal?.throwIfAborted(); | |
| // First get all the files that are already uploaded out of the way | |
| const source = (async function* () { | |
| for (const operation of operations) { | |
| abortSignal?.throwIfAborted(); | |
| yield { content: operation.content, path: operation.path }; | |
| } | |
| })(); | |
| const xetParams: XetTokenParams = { | |
| sessionId: crypto.randomUUID(), | |
| refreshWriteTokenUrl: `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/xet-write-token`, | |
| }; | |
| const sources = splitAsyncGenerator(source, 5); | |
| yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) => | |
| Promise.all( | |
| sources.map(async function (source) { | |
| for await (const event of uploadShards(source, { | |
| fetch: params.fetch, | |
| accessToken, | |
| hubUrl: params.hubUrl ?? HUB_URL, | |
| repo: repoId, | |
| xetParams, | |
| rev: params.branch ?? "main", | |
| yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }), | |
| })) { | |
| if (event.event === "file") { | |
| yieldCallback({ | |
| event: "fileProgress" as const, | |
| path: event.path, | |
| progress: 1, | |
| state: "uploading" as const, | |
| }); | |
| xetHashes.set(event.path, event.xetHash); | |
| } else if (event.event === "fileProgress") { | |
| yieldCallback({ | |
| event: "fileProgress" as const, | |
| path: event.path, | |
| progress: event.progress, | |
| state: "uploading" as const, | |
| }); | |
| } | |
| } | |
| }), | |
| ).then(() => returnCallback(undefined), rejectCallback), | |
| ); | |
| const resp = await (params.fetch ?? fetch)( | |
| `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/batch`, | |
| { | |
| method: "POST", | |
| headers: { | |
| ...(accessToken && { Authorization: `Bearer ${accessToken}` }), | |
| "Content-Type": "application/x-ndjson", | |
| }, | |
| body: [...xetHashes.entries()] | |
| .map(([path, xetHash]) => | |
| JSON.stringify({ | |
| type: "addFile", | |
| path, | |
| xetHash, | |
| }), | |
| ) | |
| .join("\n"), | |
| signal: abortSignal, | |
| }, | |
| ); | |
| if (!resp.ok && resp.status !== 422) { | |
| throw await createApiError(resp); | |
| } | |
| const json = (await resp.json()) as ApiBucketBatchResponse; | |
| for (const failed of json.failed) { | |
| yield { | |
| event: "fileProgress", | |
| path: failed.path, | |
| progress: 0, | |
| state: "error", | |
| }; | |
| } | |
| } | |
| abortSignal?.throwIfAborted(); | |
| const deletedOperations = allOperations.filter((operation) => operation.operation === "delete"); | |
| if (deletedOperations.length > 0) { | |
| const resp = await (params.fetch ?? fetch)( | |
| `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/batch`, | |
| { | |
| method: "POST", | |
| headers: { | |
| ...(accessToken && { Authorization: `Bearer ${accessToken}` }), | |
| "Content-Type": "application/x-ndjson", | |
| }, | |
| body: deletedOperations | |
| .map((operation) => | |
| JSON.stringify({ | |
| type: "deleteFile", | |
| path: operation.path, | |
| }), | |
| ) | |
| .join("\n"), | |
| signal: abortSignal, | |
| }, | |
| ); | |
| if (!resp.ok) { | |
| throw await createApiError(resp); | |
| } | |
| const json = await resp.json(); | |
| if (json.failed.length > 0) { | |
| const failedPaths = json.failed.slice(0, 5).map((f: { path: string }) => f.path); | |
| throw new Error( | |
| `Failed to delete ${json.failed.length} file(s): ${failedPaths.join(", ")}${json.failed.length > 5 ? "..." : ""}, request ID: ${resp.headers.get("X-Request-Id")}`, | |
| ); | |
| } | |
| } | |
| abortSignal?.throwIfAborted(); | |
| } catch (err) { | |
| // For parallel requests, cancel them all if one fails | |
| abortController.abort(); | |
| throw err; | |
| } | |
| } | |
| /** | |
| * @returns undefined for bucket uploads, CommitOutput otherwise | |
| */ | |
| export async function commit(params: CommitParams): Promise<CommitOutput | undefined> { | |
| const iterator = commitIter(params); | |
| const failedPaths: string[] = []; | |
| let failedCount = 0; | |
| let res = await iterator.next(); | |
| while (!res.done) { | |
| if (res.value.event === "fileProgress" && res.value.state === "error") { | |
| failedCount++; | |
| if (failedPaths.length < 5) { | |
| failedPaths.push(res.value.path); | |
| } | |
| } | |
| res = await iterator.next(); | |
| } | |
| if (failedCount > 0) { | |
| throw new Error( | |
| `Failed to upload ${failedCount} file(s): ${failedPaths.join(", ")}${failedCount > 5 ? "..." : ""}`, | |
| ); | |
| } | |
| return res.value; | |
| } | |
| async function convertOperationToNdJson(operation: CommitBlobOperation): Promise<ApiCommitOperation> { | |
| switch (operation.operation) { | |
| case "addOrUpdate": { | |
| // todo: handle LFS | |
| return { | |
| key: "file", | |
| value: { | |
| content: base64FromBytes(new Uint8Array(await operation.content.arrayBuffer())), | |
| path: operation.path, | |
| encoding: "base64", | |
| }, | |
| }; | |
| } | |
| // case "rename": { | |
| // // todo: detect when remote file is already LFS, and in that case rename as LFS | |
| // return { | |
| // key: "file", | |
| // value: { | |
| // content: operation.content, | |
| // path: operation.path, | |
| // oldPath: operation.oldPath | |
| // } | |
| // }; | |
| // } | |
| case "delete": { | |
| return { | |
| key: "deletedFile", | |
| value: { | |
| path: operation.path, | |
| }, | |
| }; | |
| } | |
| case "edit": { | |
| // Note: By the time we get here, splice operations should have been converted to addOrUpdate operations with SplicedBlob | |
| // But we handle this case for completeness | |
| throw new Error( | |
| "Edit operations should be converted to addOrUpdate operations before reaching convertOperationToNdJson", | |
| ); | |
| } | |
| default: | |
| throw new TypeError("Unknown operation: " + (operation as { operation: string }).operation); | |
| } | |
| } | |