import { TelegramClient } from "telegram"; import { StringSession } from "telegram/sessions"; import { config } from "../config"; import { client as mainClient } from "../bot/client"; // The pool of workers export const workerClients: TelegramClient[] = []; const WORKER_COUNT = 5; let isInitializing = false; import { Api } from "telegram"; export const initParallelWorkers = async () => { if (isInitializing || workerClients.length > 0) return; isInitializing = true; console.log(`Initializing ${WORKER_COUNT} Parallel MTProto workers for high-speed streaming...`); const sessionString = mainClient.session.save(); // Get current DC ID from main client assuming it's already connected // This is safe because startBot() is awaited before startServer() const dcId = mainClient.session.dcId; for (let i = 0; i < WORKER_COUNT; i++) { const workerClient = new TelegramClient( new StringSession(""), // Create an empty session, do NOT clone the string directly! config.apiId, config.apiHash, { connectionRetries: 5, useWSS: false, } ); // Tell the empty client which DC to connect to workerClient.session.setDC(dcId, mainClient.session.serverAddress!, mainClient.session.port!); await workerClient.connect(); try { // To prevent AUTH_BYTES_INVALID and AUTH_KEY_UNREGISTERED // We export the auth from the master and import it into this blank worker const exportedAuth = await mainClient.invoke( new Api.auth.ExportAuthorization({ dcId: dcId }) ); const result = await workerClient.invoke( new Api.auth.ImportAuthorization({ id: exportedAuth.id, bytes: exportedAuth.bytes }) ); console.log(`Worker ${i + 1}/${WORKER_COUNT} Auth Imported successfully`, result.className); } catch (e: any) { console.warn(`Worker ${i + 1} Authorization warning:`, e.message); } workerClients.push(workerClient); console.log(`Worker ${i + 1}/${WORKER_COUNT} connected.`); } }; export async function* parallelDownload( file: any, // Api.TypeMessageMedia offset: number, limit: number, chunkSize: number = 1024 * 1024 // 1MB chunks instead of 512kb for faster speeds ): AsyncGenerator { const activeClients = workerClients.length > 0 ? workerClients : [mainClient]; const concurrency = activeClients.length; let currentOffset = offset; let remainingBytes = limit; // A queue of Promises representing sequential byte chunks const downloadQueue: Promise[] = []; // We queue blocks let clientIndex = 0; const fetchBlock = async (cIdx: number, start: number, size: number) => { const tgClient = activeClients[cIdx % concurrency]; const chunks: Buffer[] = []; try { for await (const chunk of tgClient.iterDownload({ file: file, offset: require("big-integer")(start), requestSize: size, limit: size })) { chunks.push(chunk as Buffer); } return Buffer.concat(chunks); } catch (err) { console.error(`Worker ${cIdx} failed to fetch block at ${start}:`, err); throw err; } }; // Pre-fill the initial sliding window (Concurrency limit) for (let i = 0; i < concurrency && remainingBytes > 0; i++) { const size = Math.min(chunkSize, remainingBytes); downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size)); currentOffset += size; remainingBytes -= size; } // Sequentially wait for the oldest block in the queue to resolve while (downloadQueue.length > 0) { const chunkPromise = downloadQueue.shift()!; const buffer = await chunkPromise; yield buffer; // We yielded a block, so a worker slot is free! Queue the next segment immediately. if (remainingBytes > 0) { const size = Math.min(chunkSize, remainingBytes); downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size)); currentOffset += size; remainingBytes -= size; } } }