// A seekable, streaming byte source backed by HTTP Range requests. // // The streaming ONNX parser uses this to: // 1. Read protobuf wire bytes sequentially. // 2. Skip over uninteresting regions (notably `raw_data` of initializers) // without buffering them in RAM and, when the skip is large enough, // without even pulling them from the network — we cancel the current // stream and issue a fresh Range request that jumps over the region. // // Design goals: // - Always know the current absolute offset in the file. // - Pull more data from the network only when needed. // - When a skip is small (< SKIP_RANGE_THRESHOLD), discard from the // in-flight stream; when large, cancel and re-issue Range from the // post-skip offset (saves bandwidth). // - Report total bytes actually transferred so the UI can show savings. // - Be cancellable from the outside (AbortController). export interface ByteSourceOpts { /** Range threshold (bytes) above which we cancel + re-issue instead of stream-skipping. */ skipRangeThreshold?: number; /** Optional AbortSignal to cancel the entire source. */ signal?: AbortSignal; /** Called every time we receive more data. Useful for UI progress. */ onTransfer?: (deltaBytes: number, totalTransferred: number) => void; } // Skip-vs-stream break-even. Below this, streaming-skip is cheaper than // paying a fresh Range request RTT (~30–200 ms on Hub CDN, throughput // typically 10–30 MB/s). // // We pick the threshold adaptively from the total file size — see // `pickAdaptiveThreshold()` below — because the right answer is very // different between a 50 MB quantized GPT-2 (lots of ~350 KB int8 tensors, // we *want* to Range-jump even small blobs) and a 5 GB fp16 Llama (we want // to amortize fewer, bigger jumps). // // 1 MB is the safe ceiling: above that, paying a fresh RTT is always // worthwhile because we'd otherwise download a megabyte of dead bytes. const SKIP_RANGE_CEILING = 1024 * 1024; // 1 MB const SKIP_RANGE_FLOOR = 32 * 1024; // 32 KB /** * Pick a per-file skip threshold from the total file size. * * Rules of thumb the formula encodes: * - Tiny files (< 50 MB, typical quantized BERT-class): aggressive 32 KB * floor. We accept dozens of RTTs because the total file is small enough * that each saved KB matters and TLS reuse keeps each RTT cheap. * - Medium (50 MB – 500 MB, quantized GPT-2 / DistilBERT / ViT class): * interpolate up to ~128 KB. We still want to skip most quantized * tensors which sit in the 200–800 KB range. * - Large (≥ 500 MB, fp16/fp32 LLaMA-class): full 1 MB ceiling. Tensors * are big enough that RTT cost is dwarfed by the bandwidth save. */ export function pickAdaptiveThreshold(totalSize: number): number { if (totalSize <= 0) return SKIP_RANGE_CEILING; if (totalSize < 50 * 1024 * 1024) return SKIP_RANGE_FLOOR; if (totalSize >= 500 * 1024 * 1024) return SKIP_RANGE_CEILING; // Linear ramp 32 KB → 1 MB over the 50 MB → 500 MB band. const t = (totalSize - 50 * 1024 * 1024) / (450 * 1024 * 1024); const v = SKIP_RANGE_FLOOR + t * (SKIP_RANGE_CEILING - SKIP_RANGE_FLOOR); return Math.round(v); } export class ByteSource { readonly url: string; readonly totalSize: number; private opts: Required> & { signal?: AbortSignal; onTransfer?: ByteSourceOpts["onTransfer"]; }; private reader: ReadableStreamDefaultReader | null = null; /** Absolute offset where the current Range request is supposed to end (exclusive). */ private streamEnd = 0; /** A small ring of buffered chunks pulled from the reader but not yet consumed. */ private buffer: Uint8Array[] = []; private bufferStart = 0; // absolute offset of buffer[0][0] /** Offset within buffer[0]. */ private bufferHead = 0; /** Total bytes currently held across all buffer chunks. */ private bufferedBytes = 0; /** Absolute offset of the next byte to be returned to the caller. */ pos = 0; /** Total bytes received over the network so far. */ bytesTransferred = 0; constructor(url: string, totalSize: number, opts: ByteSourceOpts = {}) { this.url = url; this.totalSize = totalSize; this.opts = { skipRangeThreshold: opts.skipRangeThreshold ?? pickAdaptiveThreshold(totalSize), signal: opts.signal, onTransfer: opts.onTransfer, }; } /** Exposed for the UI: what threshold are we actually using? */ get skipRangeThreshold(): number { return this.opts.skipRangeThreshold; } /** Bytes available to read without pulling more from the network. */ private get availableInBuffer(): number { return this.bufferedBytes - this.bufferHead; } /** Open a Range stream starting at `from` (absolute offset), ending at end (exclusive, defaults to totalSize). */ private async openRangeStream(from: number, end?: number): Promise { if (this.reader) await this.cancelStream(); const stop = end ?? this.totalSize; if (from >= stop) { this.streamEnd = stop; return; } const rangeHeader = `bytes=${from}-${stop - 1}`; const res = await fetch(this.url, { headers: { Range: rangeHeader }, signal: this.opts.signal, }); if (!(res.status === 206 || (res.status === 200 && from === 0))) { throw new Error( `Range request failed (status ${res.status}). The CDN may not support Range; falling back is not implemented.`, ); } if (!res.body) throw new Error("Response has no body."); this.reader = res.body.getReader(); this.streamEnd = stop; } private async cancelStream(): Promise { if (!this.reader) return; try { await this.reader.cancel(); } catch { // ignored: cancel failures are non-fatal. } this.reader = null; } /** Make sure at least `n` bytes are available in the buffer, pulling from the stream as needed. */ private async ensureAvailable(n: number): Promise { if (this.pos + n > this.totalSize) { throw new Error( `Read past end of file: pos=${this.pos}, want=${n}, total=${this.totalSize}`, ); } if (!this.reader || this.pos < this.bufferStart + this.bufferHead) { // Stream is closed or seek happened — (re)open at current pos. await this.openRangeStream(this.pos); this.resetBuffer(this.pos); } while (this.availableInBuffer < n) { if (!this.reader) { // Stream ended but we expected more. if (this.pos + this.availableInBuffer >= this.streamEnd) { if (this.streamEnd < this.totalSize) { await this.openRangeStream(this.pos + this.availableInBuffer); continue; } } throw new Error("Stream ended unexpectedly before requested bytes arrived."); } const { done, value } = await this.reader.read(); if (done) { this.reader = null; if (this.pos + this.availableInBuffer >= this.streamEnd && this.streamEnd < this.totalSize) { await this.openRangeStream(this.pos + this.availableInBuffer); continue; } if (this.availableInBuffer < n) { throw new Error("Stream EOF before requested bytes arrived."); } break; } if (!value || value.byteLength === 0) continue; this.buffer.push(value); this.bufferedBytes += value.byteLength; this.bytesTransferred += value.byteLength; this.opts.onTransfer?.(value.byteLength, this.bytesTransferred); } } private resetBuffer(startPos: number): void { this.buffer = []; this.bufferedBytes = 0; this.bufferHead = 0; this.bufferStart = startPos; } /** Read exactly `n` bytes into a fresh Uint8Array (concatenating internal chunks if needed). */ async readBytes(n: number): Promise { if (n <= 0) return new Uint8Array(0); await this.ensureAvailable(n); const out = new Uint8Array(n); let written = 0; while (written < n) { const head = this.buffer[0]; const headRemaining = head.byteLength - this.bufferHead; const take = Math.min(headRemaining, n - written); out.set(head.subarray(this.bufferHead, this.bufferHead + take), written); written += take; this.bufferHead += take; if (this.bufferHead >= head.byteLength) { this.buffer.shift(); this.bufferedBytes -= head.byteLength; this.bufferStart += head.byteLength; this.bufferHead = 0; } } this.pos += n; return out; } /** Read a single byte (faster path for varint decoding). */ async readByte(): Promise { if (this.availableInBuffer === 0) await this.ensureAvailable(1); const head = this.buffer[0]; const b = head[this.bufferHead++]; if (this.bufferHead >= head.byteLength) { this.buffer.shift(); this.bufferedBytes -= head.byteLength; this.bufferStart += head.byteLength; this.bufferHead = 0; } this.pos += 1; return b; } /** * Advance the read position by `n` bytes without exposing them to the caller. * - If n <= skipRangeThreshold, we still pull/discard those bytes from the * current stream (cheap, no RTT). * - If n > skipRangeThreshold, we cancel the current stream and jump * forward via a new Range request. This is how we save real bandwidth * on big initializers. */ async skipBytes(n: number): Promise { if (n <= 0) return; if (this.pos + n > this.totalSize) { throw new Error( `Skip past end of file: pos=${this.pos}, skip=${n}, total=${this.totalSize}`, ); } // First, eat what we already have buffered. if (this.availableInBuffer > 0) { const eat = Math.min(this.availableInBuffer, n); let remaining = eat; while (remaining > 0) { const head = this.buffer[0]; const headRemaining = head.byteLength - this.bufferHead; const take = Math.min(headRemaining, remaining); this.bufferHead += take; remaining -= take; if (this.bufferHead >= head.byteLength) { this.buffer.shift(); this.bufferedBytes -= head.byteLength; this.bufferStart += head.byteLength; this.bufferHead = 0; } } this.pos += eat; n -= eat; if (n === 0) return; } if (n > this.opts.skipRangeThreshold) { // Big skip: cancel current stream, jump forward via new Range. await this.cancelStream(); this.resetBuffer(this.pos + n); this.pos += n; return; } // Small skip: stream-discard. while (n > 0) { if (!this.reader) { await this.openRangeStream(this.pos); this.resetBuffer(this.pos); continue; } const { done, value } = await this.reader.read(); if (done) { this.reader = null; if (this.streamEnd < this.totalSize) { await this.openRangeStream(this.pos); continue; } throw new Error("Stream EOF during skip."); } if (!value || value.byteLength === 0) continue; this.bytesTransferred += value.byteLength; this.opts.onTransfer?.(value.byteLength, this.bytesTransferred); if (value.byteLength <= n) { // Entire chunk discarded. this.pos += value.byteLength; n -= value.byteLength; } else { // Partial chunk: keep the unused tail in the buffer. const tail = value.subarray(n); this.pos += n; this.bufferStart = this.pos; this.bufferHead = 0; this.buffer = [tail]; this.bufferedBytes = tail.byteLength; n = 0; } } } /** Cancel any pending network and free buffers. */ async close(): Promise { await this.cancelStream(); this.buffer = []; this.bufferedBytes = 0; this.bufferHead = 0; } /** Pre-fetch the total length via HEAD if not known at construction. */ static async create(url: string, opts: ByteSourceOpts = {}): Promise { const head = await fetch(url, { method: "HEAD", signal: opts.signal }); if (!head.ok) throw new Error(`HEAD ${url} failed: ${head.status}`); const len = Number(head.headers.get("content-length") ?? 0); if (!Number.isFinite(len) || len <= 0) { throw new Error(`Server did not return a usable Content-Length for ${url}.`); } const acceptRanges = head.headers.get("accept-ranges"); if (acceptRanges && acceptRanges.toLowerCase() === "none") { throw new Error(`Server says it does not support Range requests for ${url}.`); } return new ByteSource(url, len, opts); } }