hf-model-viewer / src /lib /byteSource.ts
tfrere's picture
tfrere HF Staff
Deploy hf-model-viewer 2026-05-22T16:59:58Z
fc01079 verified
// A seekable, streaming byte source backed by HTTP Range requests.
//
// The streaming ONNX parser uses this to:
// 1. Read protobuf wire bytes sequentially.
// 2. Skip over uninteresting regions (notably `raw_data` of initializers)
// without buffering them in RAM and, when the skip is large enough,
// without even pulling them from the network — we cancel the current
// stream and issue a fresh Range request that jumps over the region.
//
// Design goals:
// - Always know the current absolute offset in the file.
// - Pull more data from the network only when needed.
// - When a skip is small (< SKIP_RANGE_THRESHOLD), discard from the
// in-flight stream; when large, cancel and re-issue Range from the
// post-skip offset (saves bandwidth).
// - Report total bytes actually transferred so the UI can show savings.
// - Be cancellable from the outside (AbortController).
export interface ByteSourceOpts {
/** Range threshold (bytes) above which we cancel + re-issue instead of stream-skipping. */
skipRangeThreshold?: number;
/** Optional AbortSignal to cancel the entire source. */
signal?: AbortSignal;
/** Called every time we receive more data. Useful for UI progress. */
onTransfer?: (deltaBytes: number, totalTransferred: number) => void;
}
// Skip-vs-stream break-even. Below this, streaming-skip is cheaper than
// paying a fresh Range request RTT (~30–200 ms on Hub CDN, throughput
// typically 10–30 MB/s).
//
// We pick the threshold adaptively from the total file size — see
// `pickAdaptiveThreshold()` below — because the right answer is very
// different between a 50 MB quantized GPT-2 (lots of ~350 KB int8 tensors,
// we *want* to Range-jump even small blobs) and a 5 GB fp16 Llama (we want
// to amortize fewer, bigger jumps).
//
// 1 MB is the safe ceiling: above that, paying a fresh RTT is always
// worthwhile because we'd otherwise download a megabyte of dead bytes.
const SKIP_RANGE_CEILING = 1024 * 1024; // 1 MB
const SKIP_RANGE_FLOOR = 32 * 1024; // 32 KB
/**
* Pick a per-file skip threshold from the total file size.
*
* Rules of thumb the formula encodes:
* - Tiny files (< 50 MB, typical quantized BERT-class): aggressive 32 KB
* floor. We accept dozens of RTTs because the total file is small enough
* that each saved KB matters and TLS reuse keeps each RTT cheap.
* - Medium (50 MB – 500 MB, quantized GPT-2 / DistilBERT / ViT class):
* interpolate up to ~128 KB. We still want to skip most quantized
* tensors which sit in the 200–800 KB range.
* - Large (≥ 500 MB, fp16/fp32 LLaMA-class): full 1 MB ceiling. Tensors
* are big enough that RTT cost is dwarfed by the bandwidth save.
*/
export function pickAdaptiveThreshold(totalSize: number): number {
if (totalSize <= 0) return SKIP_RANGE_CEILING;
if (totalSize < 50 * 1024 * 1024) return SKIP_RANGE_FLOOR;
if (totalSize >= 500 * 1024 * 1024) return SKIP_RANGE_CEILING;
// Linear ramp 32 KB → 1 MB over the 50 MB → 500 MB band.
const t = (totalSize - 50 * 1024 * 1024) / (450 * 1024 * 1024);
const v = SKIP_RANGE_FLOOR + t * (SKIP_RANGE_CEILING - SKIP_RANGE_FLOOR);
return Math.round(v);
}
export class ByteSource {
readonly url: string;
readonly totalSize: number;
private opts: Required<Omit<ByteSourceOpts, "signal" | "onTransfer">> & {
signal?: AbortSignal;
onTransfer?: ByteSourceOpts["onTransfer"];
};
private reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
/** Absolute offset where the current Range request is supposed to end (exclusive). */
private streamEnd = 0;
/** A small ring of buffered chunks pulled from the reader but not yet consumed. */
private buffer: Uint8Array[] = [];
private bufferStart = 0; // absolute offset of buffer[0][0]
/** Offset within buffer[0]. */
private bufferHead = 0;
/** Total bytes currently held across all buffer chunks. */
private bufferedBytes = 0;
/** Absolute offset of the next byte to be returned to the caller. */
pos = 0;
/** Total bytes received over the network so far. */
bytesTransferred = 0;
constructor(url: string, totalSize: number, opts: ByteSourceOpts = {}) {
this.url = url;
this.totalSize = totalSize;
this.opts = {
skipRangeThreshold:
opts.skipRangeThreshold ?? pickAdaptiveThreshold(totalSize),
signal: opts.signal,
onTransfer: opts.onTransfer,
};
}
/** Exposed for the UI: what threshold are we actually using? */
get skipRangeThreshold(): number {
return this.opts.skipRangeThreshold;
}
/** Bytes available to read without pulling more from the network. */
private get availableInBuffer(): number {
return this.bufferedBytes - this.bufferHead;
}
/** Open a Range stream starting at `from` (absolute offset), ending at end (exclusive, defaults to totalSize). */
private async openRangeStream(from: number, end?: number): Promise<void> {
if (this.reader) await this.cancelStream();
const stop = end ?? this.totalSize;
if (from >= stop) {
this.streamEnd = stop;
return;
}
const rangeHeader = `bytes=${from}-${stop - 1}`;
const res = await fetch(this.url, {
headers: { Range: rangeHeader },
signal: this.opts.signal,
});
if (!(res.status === 206 || (res.status === 200 && from === 0))) {
throw new Error(
`Range request failed (status ${res.status}). The CDN may not support Range; falling back is not implemented.`,
);
}
if (!res.body) throw new Error("Response has no body.");
this.reader = res.body.getReader();
this.streamEnd = stop;
}
private async cancelStream(): Promise<void> {
if (!this.reader) return;
try {
await this.reader.cancel();
} catch {
// ignored: cancel failures are non-fatal.
}
this.reader = null;
}
/** Make sure at least `n` bytes are available in the buffer, pulling from the stream as needed. */
private async ensureAvailable(n: number): Promise<void> {
if (this.pos + n > this.totalSize) {
throw new Error(
`Read past end of file: pos=${this.pos}, want=${n}, total=${this.totalSize}`,
);
}
if (!this.reader || this.pos < this.bufferStart + this.bufferHead) {
// Stream is closed or seek happened — (re)open at current pos.
await this.openRangeStream(this.pos);
this.resetBuffer(this.pos);
}
while (this.availableInBuffer < n) {
if (!this.reader) {
// Stream ended but we expected more.
if (this.pos + this.availableInBuffer >= this.streamEnd) {
if (this.streamEnd < this.totalSize) {
await this.openRangeStream(this.pos + this.availableInBuffer);
continue;
}
}
throw new Error("Stream ended unexpectedly before requested bytes arrived.");
}
const { done, value } = await this.reader.read();
if (done) {
this.reader = null;
if (this.pos + this.availableInBuffer >= this.streamEnd && this.streamEnd < this.totalSize) {
await this.openRangeStream(this.pos + this.availableInBuffer);
continue;
}
if (this.availableInBuffer < n) {
throw new Error("Stream EOF before requested bytes arrived.");
}
break;
}
if (!value || value.byteLength === 0) continue;
this.buffer.push(value);
this.bufferedBytes += value.byteLength;
this.bytesTransferred += value.byteLength;
this.opts.onTransfer?.(value.byteLength, this.bytesTransferred);
}
}
private resetBuffer(startPos: number): void {
this.buffer = [];
this.bufferedBytes = 0;
this.bufferHead = 0;
this.bufferStart = startPos;
}
/** Read exactly `n` bytes into a fresh Uint8Array (concatenating internal chunks if needed). */
async readBytes(n: number): Promise<Uint8Array> {
if (n <= 0) return new Uint8Array(0);
await this.ensureAvailable(n);
const out = new Uint8Array(n);
let written = 0;
while (written < n) {
const head = this.buffer[0];
const headRemaining = head.byteLength - this.bufferHead;
const take = Math.min(headRemaining, n - written);
out.set(head.subarray(this.bufferHead, this.bufferHead + take), written);
written += take;
this.bufferHead += take;
if (this.bufferHead >= head.byteLength) {
this.buffer.shift();
this.bufferedBytes -= head.byteLength;
this.bufferStart += head.byteLength;
this.bufferHead = 0;
}
}
this.pos += n;
return out;
}
/** Read a single byte (faster path for varint decoding). */
async readByte(): Promise<number> {
if (this.availableInBuffer === 0) await this.ensureAvailable(1);
const head = this.buffer[0];
const b = head[this.bufferHead++];
if (this.bufferHead >= head.byteLength) {
this.buffer.shift();
this.bufferedBytes -= head.byteLength;
this.bufferStart += head.byteLength;
this.bufferHead = 0;
}
this.pos += 1;
return b;
}
/**
* Advance the read position by `n` bytes without exposing them to the caller.
* - If n <= skipRangeThreshold, we still pull/discard those bytes from the
* current stream (cheap, no RTT).
* - If n > skipRangeThreshold, we cancel the current stream and jump
* forward via a new Range request. This is how we save real bandwidth
* on big initializers.
*/
async skipBytes(n: number): Promise<void> {
if (n <= 0) return;
if (this.pos + n > this.totalSize) {
throw new Error(
`Skip past end of file: pos=${this.pos}, skip=${n}, total=${this.totalSize}`,
);
}
// First, eat what we already have buffered.
if (this.availableInBuffer > 0) {
const eat = Math.min(this.availableInBuffer, n);
let remaining = eat;
while (remaining > 0) {
const head = this.buffer[0];
const headRemaining = head.byteLength - this.bufferHead;
const take = Math.min(headRemaining, remaining);
this.bufferHead += take;
remaining -= take;
if (this.bufferHead >= head.byteLength) {
this.buffer.shift();
this.bufferedBytes -= head.byteLength;
this.bufferStart += head.byteLength;
this.bufferHead = 0;
}
}
this.pos += eat;
n -= eat;
if (n === 0) return;
}
if (n > this.opts.skipRangeThreshold) {
// Big skip: cancel current stream, jump forward via new Range.
await this.cancelStream();
this.resetBuffer(this.pos + n);
this.pos += n;
return;
}
// Small skip: stream-discard.
while (n > 0) {
if (!this.reader) {
await this.openRangeStream(this.pos);
this.resetBuffer(this.pos);
continue;
}
const { done, value } = await this.reader.read();
if (done) {
this.reader = null;
if (this.streamEnd < this.totalSize) {
await this.openRangeStream(this.pos);
continue;
}
throw new Error("Stream EOF during skip.");
}
if (!value || value.byteLength === 0) continue;
this.bytesTransferred += value.byteLength;
this.opts.onTransfer?.(value.byteLength, this.bytesTransferred);
if (value.byteLength <= n) {
// Entire chunk discarded.
this.pos += value.byteLength;
n -= value.byteLength;
} else {
// Partial chunk: keep the unused tail in the buffer.
const tail = value.subarray(n);
this.pos += n;
this.bufferStart = this.pos;
this.bufferHead = 0;
this.buffer = [tail];
this.bufferedBytes = tail.byteLength;
n = 0;
}
}
}
/** Cancel any pending network and free buffers. */
async close(): Promise<void> {
await this.cancelStream();
this.buffer = [];
this.bufferedBytes = 0;
this.bufferHead = 0;
}
/** Pre-fetch the total length via HEAD if not known at construction. */
static async create(url: string, opts: ByteSourceOpts = {}): Promise<ByteSource> {
const head = await fetch(url, { method: "HEAD", signal: opts.signal });
if (!head.ok) throw new Error(`HEAD ${url} failed: ${head.status}`);
const len = Number(head.headers.get("content-length") ?? 0);
if (!Number.isFinite(len) || len <= 0) {
throw new Error(`Server did not return a usable Content-Length for ${url}.`);
}
const acceptRanges = head.headers.get("accept-ranges");
if (acceptRanges && acceptRanges.toLowerCase() === "none") {
throw new Error(`Server says it does not support Range requests for ${url}.`);
}
return new ByteSource(url, len, opts);
}
}