Spaces:
Runtime error
Runtime error
| // Licensed to the Apache Software Foundation (ASF) under one | |
| // or more contributor license agreements. See the NOTICE file | |
| // distributed with this work for additional information | |
| // regarding copyright ownership. The ASF licenses this file | |
| // to you under the Apache License, Version 2.0 (the | |
| // "License"); you may not use this file except in compliance | |
| // with the License. You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, | |
| // software distributed under the License is distributed on an | |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
| // KIND, either express or implied. See the License for the | |
| // specific language governing permissions and limitations | |
| // under the License. | |
| import { MessageHeader } from '../enum.js'; | |
| import { ByteBuffer } from 'flatbuffers'; | |
| import { Message } from './metadata/message.js'; | |
| import { isFileHandle } from '../util/compat.js'; | |
| import { AsyncRandomAccessFile } from '../io/file.js'; | |
| import { toUint8Array, ArrayBufferViewInput } from '../util/buffer.js'; | |
| import { ByteStream, ReadableSource, AsyncByteStream } from '../io/stream.js'; | |
| import { ArrowJSON, ArrowJSONLike, ITERATOR_DONE, FileHandle } from '../io/interfaces.js'; | |
| /** @ignore */ const invalidMessageType = (type: MessageHeader) => `Expected ${MessageHeader[type]} Message in stream, but was null or length 0.`; | |
| /** @ignore */ const nullMessage = (type: MessageHeader) => `Header pointer of flatbuffer-encoded ${MessageHeader[type]} Message is null or length 0.`; | |
| /** @ignore */ const invalidMessageMetadata = (expected: number, actual: number) => `Expected to read ${expected} metadata bytes, but only read ${actual}.`; | |
| /** @ignore */ const invalidMessageBodyLength = (expected: number, actual: number) => `Expected to read ${expected} bytes for message body, but only read ${actual}.`; | |
| /** @ignore */ | |
| export class MessageReader implements IterableIterator<Message> { | |
| protected source: ByteStream; | |
| constructor(source: ByteStream | ArrayBufferViewInput | Iterable<ArrayBufferViewInput>) { | |
| this.source = source instanceof ByteStream ? source : new ByteStream(source); | |
| } | |
| public [Symbol.iterator](): IterableIterator<Message> { return this as IterableIterator<Message>; } | |
| public next(): IteratorResult<Message> { | |
| let r; | |
| if ((r = this.readMetadataLength()).done) { return ITERATOR_DONE; } | |
| // ARROW-6313: If the first 4 bytes are continuation indicator (-1), read | |
| // the next 4 for the 32-bit metadata length. Otherwise, assume this is a | |
| // pre-v0.15 message, where the first 4 bytes are the metadata length. | |
| if ((r.value === -1) && | |
| (r = this.readMetadataLength()).done) { return ITERATOR_DONE; } | |
| if ((r = this.readMetadata(r.value)).done) { return ITERATOR_DONE; } | |
| return (<any>r) as IteratorResult<Message>; | |
| } | |
| public throw(value?: any) { return this.source.throw(value); } | |
| public return(value?: any) { return this.source.return(value); } | |
| public readMessage<T extends MessageHeader>(type?: T | null) { | |
| let r: IteratorResult<Message<T>>; | |
| if ((r = this.next()).done) { return null; } | |
| if ((type != null) && r.value.headerType !== type) { | |
| throw new Error(invalidMessageType(type)); | |
| } | |
| return r.value; | |
| } | |
| public readMessageBody(bodyLength: number): Uint8Array { | |
| if (bodyLength <= 0) { return new Uint8Array(0); } | |
| const buf = toUint8Array(this.source.read(bodyLength)); | |
| if (buf.byteLength < bodyLength) { | |
| throw new Error(invalidMessageBodyLength(bodyLength, buf.byteLength)); | |
| } | |
| // 1. Work around bugs in fs.ReadStream's internal Buffer pooling, see: https://github.com/nodejs/node/issues/24817 | |
| // 2. Work around https://github.com/whatwg/streams/blob/0ebe4b042e467d9876d80ae045de3843092ad797/reference-implementation/lib/helpers.js#L126 | |
| return /* 1. */ (buf.byteOffset % 8 === 0) && | |
| /* 2. */ (buf.byteOffset + buf.byteLength) <= buf.buffer.byteLength ? buf : buf.slice(); | |
| } | |
| public readSchema(throwIfNull = false) { | |
| const type = MessageHeader.Schema; | |
| const message = this.readMessage(type); | |
| const schema = message?.header(); | |
| if (throwIfNull && !schema) { | |
| throw new Error(nullMessage(type)); | |
| } | |
| return schema; | |
| } | |
| protected readMetadataLength(): IteratorResult<number> { | |
| const buf = this.source.read(PADDING); | |
| const bb = buf && new ByteBuffer(buf); | |
| const len = bb?.readInt32(0) || 0; | |
| return { done: len === 0, value: len }; | |
| } | |
| protected readMetadata(metadataLength: number): IteratorResult<Message> { | |
| const buf = this.source.read(metadataLength); | |
| if (!buf) { return ITERATOR_DONE; } | |
| if (buf.byteLength < metadataLength) { | |
| throw new Error(invalidMessageMetadata(metadataLength, buf.byteLength)); | |
| } | |
| return { done: false, value: Message.decode(buf) }; | |
| } | |
| } | |
| /** @ignore */ | |
| export class AsyncMessageReader implements AsyncIterableIterator<Message> { | |
| protected source: AsyncByteStream; | |
| constructor(source: ReadableSource<Uint8Array>); | |
| constructor(source: FileHandle, byteLength?: number); | |
| constructor(source: any, byteLength?: number) { | |
| this.source = source instanceof AsyncByteStream ? source | |
| : isFileHandle(source) | |
| ? new AsyncRandomAccessFile(source, byteLength!) | |
| : new AsyncByteStream(source); | |
| } | |
| public [Symbol.asyncIterator](): AsyncIterableIterator<Message> { return this as AsyncIterableIterator<Message>; } | |
| public async next(): Promise<IteratorResult<Message>> { | |
| let r; | |
| if ((r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } | |
| // ARROW-6313: If the first 4 bytes are continuation indicator (-1), read | |
| // the next 4 for the 32-bit metadata length. Otherwise, assume this is a | |
| // pre-v0.15 message, where the first 4 bytes are the metadata length. | |
| if ((r.value === -1) && | |
| (r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } | |
| if ((r = await this.readMetadata(r.value)).done) { return ITERATOR_DONE; } | |
| return (<any>r) as IteratorResult<Message>; | |
| } | |
| public async throw(value?: any) { return await this.source.throw(value); } | |
| public async return(value?: any) { return await this.source.return(value); } | |
| public async readMessage<T extends MessageHeader>(type?: T | null) { | |
| let r: IteratorResult<Message<T>>; | |
| if ((r = await this.next()).done) { return null; } | |
| if ((type != null) && r.value.headerType !== type) { | |
| throw new Error(invalidMessageType(type)); | |
| } | |
| return r.value; | |
| } | |
| public async readMessageBody(bodyLength: number): Promise<Uint8Array> { | |
| if (bodyLength <= 0) { return new Uint8Array(0); } | |
| const buf = toUint8Array(await this.source.read(bodyLength)); | |
| if (buf.byteLength < bodyLength) { | |
| throw new Error(invalidMessageBodyLength(bodyLength, buf.byteLength)); | |
| } | |
| // 1. Work around bugs in fs.ReadStream's internal Buffer pooling, see: https://github.com/nodejs/node/issues/24817 | |
| // 2. Work around https://github.com/whatwg/streams/blob/0ebe4b042e467d9876d80ae045de3843092ad797/reference-implementation/lib/helpers.js#L126 | |
| return /* 1. */ (buf.byteOffset % 8 === 0) && | |
| /* 2. */ (buf.byteOffset + buf.byteLength) <= buf.buffer.byteLength ? buf : buf.slice(); | |
| } | |
| public async readSchema(throwIfNull = false) { | |
| const type = MessageHeader.Schema; | |
| const message = await this.readMessage(type); | |
| const schema = message?.header(); | |
| if (throwIfNull && !schema) { | |
| throw new Error(nullMessage(type)); | |
| } | |
| return schema; | |
| } | |
| protected async readMetadataLength(): Promise<IteratorResult<number>> { | |
| const buf = await this.source.read(PADDING); | |
| const bb = buf && new ByteBuffer(buf); | |
| const len = bb?.readInt32(0) || 0; | |
| return { done: len === 0, value: len }; | |
| } | |
| protected async readMetadata(metadataLength: number): Promise<IteratorResult<Message>> { | |
| const buf = await this.source.read(metadataLength); | |
| if (!buf) { return ITERATOR_DONE; } | |
| if (buf.byteLength < metadataLength) { | |
| throw new Error(invalidMessageMetadata(metadataLength, buf.byteLength)); | |
| } | |
| return { done: false, value: Message.decode(buf) }; | |
| } | |
| } | |
| /** @ignore */ | |
| export class JSONMessageReader extends MessageReader { | |
| private _schema = false; | |
| private _json: ArrowJSON; | |
| private _body: any[] = []; | |
| private _batchIndex = 0; | |
| private _dictionaryIndex = 0; | |
| constructor(source: ArrowJSON | ArrowJSONLike) { | |
| super(new Uint8Array(0)); | |
| this._json = source instanceof ArrowJSON ? source : new ArrowJSON(source); | |
| } | |
| public next() { | |
| const { _json } = this; | |
| if (!this._schema) { | |
| this._schema = true; | |
| const message = Message.fromJSON(_json.schema, MessageHeader.Schema); | |
| return { done: false, value: message }; | |
| } | |
| if (this._dictionaryIndex < _json.dictionaries.length) { | |
| const batch = _json.dictionaries[this._dictionaryIndex++]; | |
| this._body = batch['data']['columns']; | |
| const message = Message.fromJSON(batch, MessageHeader.DictionaryBatch); | |
| return { done: false, value: message }; | |
| } | |
| if (this._batchIndex < _json.batches.length) { | |
| const batch = _json.batches[this._batchIndex++]; | |
| this._body = batch['columns']; | |
| const message = Message.fromJSON(batch, MessageHeader.RecordBatch); | |
| return { done: false, value: message }; | |
| } | |
| this._body = []; | |
| return ITERATOR_DONE; | |
| } | |
| public readMessageBody(_bodyLength?: number) { | |
| return flattenDataSources(this._body) as any; | |
| function flattenDataSources(xs: any[]): any[][] { | |
| return (xs || []).reduce<any[][]>((buffers, column: any) => [ | |
| ...buffers, | |
| ...(column['VALIDITY'] && [column['VALIDITY']] || []), | |
| ...(column['TYPE'] && [column['TYPE']] || []), | |
| ...(column['OFFSET'] && [column['OFFSET']] || []), | |
| ...(column['DATA'] && [column['DATA']] || []), | |
| ...flattenDataSources(column['children']) | |
| ], [] as any[][]); | |
| } | |
| } | |
| public readMessage<T extends MessageHeader>(type?: T | null) { | |
| let r: IteratorResult<Message<T>>; | |
| if ((r = this.next()).done) { return null; } | |
| if ((type != null) && r.value.headerType !== type) { | |
| throw new Error(invalidMessageType(type)); | |
| } | |
| return r.value; | |
| } | |
| public readSchema() { | |
| const type = MessageHeader.Schema; | |
| const message = this.readMessage(type); | |
| const schema = message?.header(); | |
| if (!message || !schema) { | |
| throw new Error(nullMessage(type)); | |
| } | |
| return schema; | |
| } | |
| } | |
| /** @ignore */ | |
| export const PADDING = 4; | |
| /** @ignore */ | |
| export const MAGIC_STR = 'ARROW1'; | |
| /** @ignore */ | |
| export const MAGIC = new Uint8Array(MAGIC_STR.length); | |
| for (let i = 0; i < MAGIC_STR.length; i += 1) { | |
| MAGIC[i] = MAGIC_STR.codePointAt(i)!; | |
| } | |
| /** @ignore */ | |
| export function checkForMagicArrowString(buffer: Uint8Array, index = 0) { | |
| for (let i = -1, n = MAGIC.length; ++i < n;) { | |
| if (MAGIC[i] !== buffer[index + i]) { | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| /** @ignore */ | |
| export const magicLength = MAGIC.length; | |
| /** @ignore */ | |
| export const magicAndPadding = magicLength + PADDING; | |
| /** @ignore */ | |
| export const magicX2AndPadding = magicLength * 2 + PADDING; | |