| import streamSaver from 'streamsaver' |
| import { Api } from 'telegram' |
| import { telegramClient } from './Telegram' |
| import { concat } from 'concat-stream' |
| import FastPriorityQueue from 'fastpriorityqueue' |
| class ConnectionPool { |
| private connections: Promise<any>[] |
| public maxSize: number |
| constructor(maxSize: number) { |
| this.connections = [] |
| this.maxSize = maxSize |
| } |
| async getConnection() { |
| if (this.connections.length > 0) { |
| return this.connections.shift() |
| } |
| if (this.connections.length < this.maxSize) { |
| const connection = telegramClient.connect() |
| this.connections.push(connection) |
| return connection |
| } |
| return new Promise((resolve) => { |
| const interval = setInterval(() => { |
| if (this.connections.length < this.maxSize) { |
| const connection = telegramClient.connect() |
| this.connections.push(connection) |
| clearInterval(interval) |
| resolve(connection) |
| } |
| }, 1000) |
| }) |
| } |
| releaseConnection(connection: Promise<any>) { |
| this.connections.push(connection) |
| } |
| } |
| |
| type FileIterator = { |
| [Symbol.asyncIterator]: () => AsyncGenerator<Uint8Array, void, unknown> |
| } |
| const connectionPool = new ConnectionPool(5) |
| const cache = new Map<string, Uint8Array>() |
| async function* generateChunks( |
| clients: any[], |
| media: any, |
| i: number, |
| numParallel: number, |
| bufferSize: number = 1024 * 1024 |
| ): AsyncGenerator<Uint8Array, void, unknown> { |
| const numConnections = clients.length |
| let connIndex = i % numConnections |
| let offset = Math.floor(i / numConnections) * media.size / numParallel |
| const limit = media.size / numParallel |
| const promises: Promise<Uint8Array[]>[] = [] |
| let buffer: Uint8Array[] = [] |
| let bufferLength = 0 |
| for (let j = 0; j < numParallel; j++) { |
| const client = clients[connIndex] |
| promises.push( |
| client.downloadMedia(media, { offset, limit }) |
| ) |
| offset += limit |
| connIndex = (connIndex + 1) % numConnections |
| } |
| const chunksArray = await Promise.allSettled(promises) |
| for (const result of chunksArray) { |
| if (result.status === 'fulfilled') { |
| const chunks = result.value |
| for (const chunk of chunks) { |
| buffer.push(chunk) |
| bufferLength += chunk.byteLength |
| if (bufferLength >= bufferSize) { |
| const concatenated = concat(buffer) |
| yield concatenated |
| buffer = [] |
| bufferLength = 0 |
| } |
| } |
| } else { |
| console.error(result.reason) |
| } |
| } |
| if (bufferLength > 0) { |
| const concatenated = concat(buffer) |
| yield concatenated |
| } |
| } |
| export async function download( |
| id: string, |
| numParallel: number = 1 |
| ): Promise<ReadableStream<Uint8Array>> { |
| const fileIterators: FileIterator[] = [] |
| const cachedData = cache.get(id) |
| if (cachedData) { |
| return new ReadableStream({ |
| start(controller) { |
| controller.enqueue(cachedData) |
| controller.close() |
| } |
| }) |
| } |
| const clients: any[] = [] |
| for (let i = 0; i < numParallel; i++) { |
| clients.push(await connectionPool.getConnection()) |
| } |
| try { |
| const { data: response } = await clients[0].invoke( |
| new Api.messages.GetMessages({ |
| id: [new Api.InputMessageID({ id: Number(id) })] |
| }) |
| ) |
| const media = response.messages[0].media |
| for (let i = 0; i < numParallel; i++) { |
| fileIterators.push({ |
| [Symbol.asyncIterator]: generateChunks.bind( |
| null, |
| clients, |
| media, |
| i, |
| numParallel |
| ) |
| }) |
| } |
| const streams: ReadableStream<Uint8Array>[] = [] |
| for (const fileIterator of fileIterators) { |
| const stream = new ReadableStream({ |
| async start(controller) { |
| for await (const chunk of fileIterator) { |
| controller.enqueue(chunk) |
| } |
| controller.close() |
| } |
| }) |
| streams.push(stream) |
| } |
| return mergeStreams(...streams) |
| } finally { |
| for (const client of clients) { |
| connectionPool.releaseConnection(client) |
| } |
| } |
| } |
| export const directDownload = async ( |
| id: string, |
| name: string, |
| numParallel: number = 1 |
| ): Promise<void> => { |
| const fileStream = streamSaver.createWriteStream(name) |
| const writer = fileStream.getWriter() |
| try { |
| const streams = await download(id, numParallel) |
| |
| const mergedStream = mergeStreams(streams) |
| const reader = mergedStream.getReader() |
| const pump = async () => { |
| const { done, value } = await reader.read() |
| if (done) { |
| if (value) { |
| cache.set(id, value) |
| } |
| writer.close() |
| return |
| } |
| writer.write(value) |
| pump() |
| } |
| pump() |
| } catch (error) { |
| console.error(error) |
| } |
| } |
|
|
| function mergeStreams(...streams: ReadableStream<Uint8Array>[]): ReadableStream<Uint8Array> { |
| if (streams.length === 1) { |
| return streams[0] |
| } |
| const mid = Math.floor(streams.length / 2) |
| const left = mergeStreams(...streams.slice(0, mid)) |
| const right = mergeStreams(...streams.slice(mid)) |
| |
| const heap = new FastPriorityQueue((a: any, b: any) => a[0] > b[0]) |
| |
| const leftReader = left.getReader() |
| const rightReader = right.getReader() |
| const read = async (reader: ReadableStreamDefaultReader<Uint8Array>) => { |
| const { done, value } = await reader.read() |
| if (!done && value !== undefined) { |
| heap.add([value.byteLength, reader]) |
| } |
| } |
| read(leftReader) |
| read(rightReader) |
| const combinedStream = new ReadableStream({ |
| async start(controller) { |
| while (!heap.isEmpty()) { |
| const [_, reader] = heap.poll() |
| const { done, value } = await reader.read() |
| if (done) { |
| if (!heap.isEmpty()) { |
| const next = heap.poll() |
| heap.add(next) |
| } |
| } else { |
| if (value !== undefined) { |
| controller.enqueue(value) |
| heap.add([value.byteLength, reader]) |
| } |
| } |
| } |
| controller.close() |
| } |
| }) |
| return combinedStream |
| } |