Spaces:
Runtime error
Runtime error
| // Licensed to the Apache Software Foundation (ASF) under one | |
| // or more contributor license agreements. See the NOTICE file | |
| // distributed with this work for additional information | |
| // regarding copyright ownership. The ASF licenses this file | |
| // to you under the Apache License, Version 2.0 (the | |
| // "License"); you may not use this file except in compliance | |
| // with the License. You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, | |
| // software distributed under the License is distributed on an | |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
| // KIND, either express or implied. See the License for the | |
| // specific language governing permissions and limitations | |
| // under the License. | |
| import streamAdapters from './adapters.js'; | |
| /** @ignore */ | |
| export const ITERATOR_DONE: any = Object.freeze({ done: true, value: void (0) }); | |
| /** @ignore */ | |
| export type FileHandle = import('fs').promises.FileHandle; | |
| /** @ignore */ | |
| export type ArrowJSONLike = { schema: any; batches?: any[]; dictionaries?: any[] }; | |
| /** @ignore */ | |
| export type ReadableDOMStreamOptions = { type: 'bytes' | undefined; autoAllocateChunkSize?: number; highWaterMark?: number }; | |
| /** @ignore */ | |
| export class ArrowJSON { | |
| constructor(private _json: ArrowJSONLike) { } | |
| public get schema(): any { return this._json['schema']; } | |
| public get batches(): any[] { return (this._json['batches'] || []) as any[]; } | |
| public get dictionaries(): any[] { return (this._json['dictionaries'] || []) as any[]; } | |
| } | |
| /** @ignore */ | |
| export interface Readable<T> { | |
| readonly closed: Promise<void>; | |
| cancel(reason?: any): Promise<void>; | |
| read(size?: number | null): Promise<T | null>; | |
| peek(size?: number | null): Promise<T | null>; | |
| throw(value?: any): Promise<IteratorResult<any>>; | |
| return(value?: any): Promise<IteratorResult<any>>; | |
| next(size?: number | null): Promise<IteratorResult<T>>; | |
| } | |
| /** @ignore */ | |
| export interface Writable<T> { | |
| readonly closed: Promise<void>; | |
| close(): void; | |
| write(chunk: T): void; | |
| abort(reason?: any): void; | |
| } | |
| /** @ignore */ | |
| export interface ReadableWritable<TReadable, TWritable> extends Readable<TReadable>, Writable<TWritable> { | |
| [Symbol.asyncIterator](): AsyncIterableIterator<TReadable>; | |
| toDOMStream(options?: ReadableDOMStreamOptions): ReadableStream<TReadable>; | |
| toNodeStream(options?: import('stream').ReadableOptions): import('stream').Readable; | |
| } | |
| /** @ignore */ | |
| export abstract class ReadableInterop<T> { | |
| public abstract toDOMStream(options?: ReadableDOMStreamOptions): ReadableStream<T>; | |
| public abstract toNodeStream(options?: import('stream').ReadableOptions): import('stream').Readable; | |
| public tee(): [ReadableStream<T>, ReadableStream<T>] { | |
| return this._getDOMStream().tee(); | |
| } | |
| public pipe<R extends NodeJS.WritableStream>(writable: R, options?: { end?: boolean }) { | |
| return this._getNodeStream().pipe(writable, options); | |
| } | |
| public pipeTo(writable: WritableStream<T>, options?: StreamPipeOptions) { return this._getDOMStream().pipeTo(writable, options); } | |
| public pipeThrough<R extends ReadableStream<any>>(duplex: { writable: WritableStream<T>; readable: R }, options?: StreamPipeOptions) { | |
| return this._getDOMStream().pipeThrough(duplex, options); | |
| } | |
| protected _DOMStream?: ReadableStream<T>; | |
| private _getDOMStream() { | |
| return this._DOMStream || (this._DOMStream = this.toDOMStream()); | |
| } | |
| protected _nodeStream?: import('stream').Readable; | |
| private _getNodeStream() { | |
| return this._nodeStream || (this._nodeStream = this.toNodeStream()); | |
| } | |
| } | |
| /** @ignore */ | |
| type Resolution<T> = { resolve: (value: T | PromiseLike<T>) => void; reject: (reason?: any) => void }; | |
| /** @ignore */ | |
| export class AsyncQueue<TReadable = Uint8Array, TWritable = TReadable> extends ReadableInterop<TReadable> | |
| implements AsyncIterableIterator<TReadable>, ReadableWritable<TReadable, TWritable> { | |
| protected _values: TWritable[] = []; | |
| protected _error?: { error: any }; | |
| protected _closedPromise: Promise<void>; | |
| protected _closedPromiseResolve?: (value?: any) => void; | |
| protected resolvers: Resolution<IteratorResult<TReadable>>[] = []; | |
| constructor() { | |
| super(); | |
| this._closedPromise = new Promise((r) => this._closedPromiseResolve = r); | |
| } | |
| public get closed(): Promise<void> { return this._closedPromise; } | |
| public async cancel(reason?: any) { await this.return(reason); } | |
| public write(value: TWritable) { | |
| if (this._ensureOpen()) { | |
| this.resolvers.length <= 0 | |
| ? (this._values.push(value)) | |
| : (this.resolvers.shift()!.resolve({ done: false, value } as any)); | |
| } | |
| } | |
| public abort(value?: any) { | |
| if (this._closedPromiseResolve) { | |
| this.resolvers.length <= 0 | |
| ? (this._error = { error: value }) | |
| : (this.resolvers.shift()!.reject({ done: true, value })); | |
| } | |
| } | |
| public close() { | |
| if (this._closedPromiseResolve) { | |
| const { resolvers } = this; | |
| while (resolvers.length > 0) { | |
| resolvers.shift()!.resolve(ITERATOR_DONE); | |
| } | |
| this._closedPromiseResolve(); | |
| this._closedPromiseResolve = undefined; | |
| } | |
| } | |
| public [Symbol.asyncIterator]() { return this; } | |
| public toDOMStream(options?: ReadableDOMStreamOptions) { | |
| return streamAdapters.toDOMStream( | |
| (this._closedPromiseResolve || this._error) | |
| ? (this as AsyncIterable<TReadable>) | |
| : (this._values as any) as Iterable<TReadable>, | |
| options); | |
| } | |
| public toNodeStream(options?: import('stream').ReadableOptions) { | |
| return streamAdapters.toNodeStream( | |
| (this._closedPromiseResolve || this._error) | |
| ? (this as AsyncIterable<TReadable>) | |
| : (this._values as any) as Iterable<TReadable>, | |
| options); | |
| } | |
| public async throw(_?: any) { await this.abort(_); return ITERATOR_DONE; } | |
| public async return(_?: any) { await this.close(); return ITERATOR_DONE; } | |
| public async read(size?: number | null): Promise<TReadable | null> { return (await this.next(size, 'read')).value; } | |
| public async peek(size?: number | null): Promise<TReadable | null> { return (await this.next(size, 'peek')).value; } | |
| public next(..._args: any[]): Promise<IteratorResult<TReadable>> { | |
| if (this._values.length > 0) { | |
| return Promise.resolve({ done: false, value: this._values.shift()! } as any); | |
| } else if (this._error) { | |
| return Promise.reject({ done: true, value: this._error.error }); | |
| } else if (!this._closedPromiseResolve) { | |
| return Promise.resolve(ITERATOR_DONE); | |
| } else { | |
| return new Promise<IteratorResult<TReadable>>((resolve, reject) => { | |
| this.resolvers.push({ resolve, reject }); | |
| }); | |
| } | |
| } | |
| protected _ensureOpen() { | |
| if (this._closedPromiseResolve) { | |
| return true; | |
| } | |
| throw new Error(`AsyncQueue is closed`); | |
| } | |
| } | |