Spaces:
Runtime error
Runtime error
| import { TelegramClient } from "telegram"; | |
| import { StringSession } from "telegram/sessions"; | |
| import { config } from "../config"; | |
| import { client as mainClient } from "../bot/client"; | |
| // The pool of workers | |
| export const workerClients: TelegramClient[] = []; | |
| const WORKER_COUNT = 5; | |
| let isInitializing = false; | |
| import { Api } from "telegram"; | |
| export const initParallelWorkers = async () => { | |
| if (isInitializing || workerClients.length > 0) return; | |
| isInitializing = true; | |
| console.log(`Initializing ${WORKER_COUNT} Parallel MTProto workers for high-speed streaming...`); | |
| const sessionString = mainClient.session.save(); | |
| // Get current DC ID from main client assuming it's already connected | |
| // This is safe because startBot() is awaited before startServer() | |
| const dcId = mainClient.session.dcId; | |
| for (let i = 0; i < WORKER_COUNT; i++) { | |
| const workerClient = new TelegramClient( | |
| new StringSession(""), // Create an empty session, do NOT clone the string directly! | |
| config.apiId, | |
| config.apiHash, | |
| { | |
| connectionRetries: 5, | |
| useWSS: false, | |
| } | |
| ); | |
| // Tell the empty client which DC to connect to | |
| workerClient.session.setDC(dcId, mainClient.session.serverAddress!, mainClient.session.port!); | |
| await workerClient.connect(); | |
| try { | |
| // To prevent AUTH_BYTES_INVALID and AUTH_KEY_UNREGISTERED | |
| // We export the auth from the master and import it into this blank worker | |
| const exportedAuth = await mainClient.invoke( | |
| new Api.auth.ExportAuthorization({ | |
| dcId: dcId | |
| }) | |
| ); | |
| const result = await workerClient.invoke( | |
| new Api.auth.ImportAuthorization({ | |
| id: exportedAuth.id, | |
| bytes: exportedAuth.bytes | |
| }) | |
| ); | |
| console.log(`Worker ${i + 1}/${WORKER_COUNT} Auth Imported successfully`, result.className); | |
| } catch (e: any) { | |
| console.warn(`Worker ${i + 1} Authorization warning:`, e.message); | |
| } | |
| workerClients.push(workerClient); | |
| console.log(`Worker ${i + 1}/${WORKER_COUNT} connected.`); | |
| } | |
| }; | |
| export async function* parallelDownload( | |
| file: any, // Api.TypeMessageMedia | |
| offset: number, | |
| limit: number, | |
| chunkSize: number = 1024 * 1024 // 1MB chunks instead of 512kb for faster speeds | |
| ): AsyncGenerator<Buffer, void, unknown> { | |
| const activeClients = workerClients.length > 0 ? workerClients : [mainClient]; | |
| const concurrency = activeClients.length; | |
| let currentOffset = offset; | |
| let remainingBytes = limit; | |
| // A queue of Promises representing sequential byte chunks | |
| const downloadQueue: Promise<Buffer>[] = []; | |
| // We queue blocks | |
| let clientIndex = 0; | |
| const fetchBlock = async (cIdx: number, start: number, size: number) => { | |
| const tgClient = activeClients[cIdx % concurrency]; | |
| const chunks: Buffer[] = []; | |
| try { | |
| for await (const chunk of tgClient.iterDownload({ | |
| file: file, | |
| offset: require("big-integer")(start), | |
| requestSize: size, | |
| limit: size | |
| })) { | |
| chunks.push(chunk as Buffer); | |
| } | |
| return Buffer.concat(chunks); | |
| } catch (err) { | |
| console.error(`Worker ${cIdx} failed to fetch block at ${start}:`, err); | |
| throw err; | |
| } | |
| }; | |
| // Pre-fill the initial sliding window (Concurrency limit) | |
| for (let i = 0; i < concurrency && remainingBytes > 0; i++) { | |
| const size = Math.min(chunkSize, remainingBytes); | |
| downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size)); | |
| currentOffset += size; | |
| remainingBytes -= size; | |
| } | |
| // Sequentially wait for the oldest block in the queue to resolve | |
| while (downloadQueue.length > 0) { | |
| const chunkPromise = downloadQueue.shift()!; | |
| const buffer = await chunkPromise; | |
| yield buffer; | |
| // We yielded a block, so a worker slot is free! Queue the next segment immediately. | |
| if (remainingBytes > 0) { | |
| const size = Math.min(chunkSize, remainingBytes); | |
| downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size)); | |
| currentOffset += size; | |
| remainingBytes -= size; | |
| } | |
| } | |
| } | |