filestream-ts / src /server /parallel.ts
vickydmt's picture
Upload folder using huggingface_hub
9408459 verified
import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { config } from "../config";
import { client as mainClient } from "../bot/client";
// The pool of workers
export const workerClients: TelegramClient[] = [];
const WORKER_COUNT = 5;
let isInitializing = false;
import { Api } from "telegram";
export const initParallelWorkers = async () => {
if (isInitializing || workerClients.length > 0) return;
isInitializing = true;
console.log(`Initializing ${WORKER_COUNT} Parallel MTProto workers for high-speed streaming...`);
const sessionString = mainClient.session.save();
// Get current DC ID from main client assuming it's already connected
// This is safe because startBot() is awaited before startServer()
const dcId = mainClient.session.dcId;
for (let i = 0; i < WORKER_COUNT; i++) {
const workerClient = new TelegramClient(
new StringSession(""), // Create an empty session, do NOT clone the string directly!
config.apiId,
config.apiHash,
{
connectionRetries: 5,
useWSS: false,
}
);
// Tell the empty client which DC to connect to
workerClient.session.setDC(dcId, mainClient.session.serverAddress!, mainClient.session.port!);
await workerClient.connect();
try {
// To prevent AUTH_BYTES_INVALID and AUTH_KEY_UNREGISTERED
// We export the auth from the master and import it into this blank worker
const exportedAuth = await mainClient.invoke(
new Api.auth.ExportAuthorization({
dcId: dcId
})
);
const result = await workerClient.invoke(
new Api.auth.ImportAuthorization({
id: exportedAuth.id,
bytes: exportedAuth.bytes
})
);
console.log(`Worker ${i + 1}/${WORKER_COUNT} Auth Imported successfully`, result.className);
} catch (e: any) {
console.warn(`Worker ${i + 1} Authorization warning:`, e.message);
}
workerClients.push(workerClient);
console.log(`Worker ${i + 1}/${WORKER_COUNT} connected.`);
}
};
export async function* parallelDownload(
file: any, // Api.TypeMessageMedia
offset: number,
limit: number,
chunkSize: number = 1024 * 1024 // 1MB chunks instead of 512kb for faster speeds
): AsyncGenerator<Buffer, void, unknown> {
const activeClients = workerClients.length > 0 ? workerClients : [mainClient];
const concurrency = activeClients.length;
let currentOffset = offset;
let remainingBytes = limit;
// A queue of Promises representing sequential byte chunks
const downloadQueue: Promise<Buffer>[] = [];
// We queue blocks
let clientIndex = 0;
const fetchBlock = async (cIdx: number, start: number, size: number) => {
const tgClient = activeClients[cIdx % concurrency];
const chunks: Buffer[] = [];
try {
for await (const chunk of tgClient.iterDownload({
file: file,
offset: require("big-integer")(start),
requestSize: size,
limit: size
})) {
chunks.push(chunk as Buffer);
}
return Buffer.concat(chunks);
} catch (err) {
console.error(`Worker ${cIdx} failed to fetch block at ${start}:`, err);
throw err;
}
};
// Pre-fill the initial sliding window (Concurrency limit)
for (let i = 0; i < concurrency && remainingBytes > 0; i++) {
const size = Math.min(chunkSize, remainingBytes);
downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
currentOffset += size;
remainingBytes -= size;
}
// Sequentially wait for the oldest block in the queue to resolve
while (downloadQueue.length > 0) {
const chunkPromise = downloadQueue.shift()!;
const buffer = await chunkPromise;
yield buffer;
// We yielded a block, so a worker slot is free! Queue the next segment immediately.
if (remainingBytes > 0) {
const size = Math.min(chunkSize, remainingBytes);
downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
currentOffset += size;
remainingBytes -= size;
}
}
}