Spaces:
Runtime error
Runtime error
File size: 4,489 Bytes
985f94e a087ce0 985f94e a087ce0 985f94e 9408459 985f94e 9408459 985f94e a087ce0 9408459 a087ce0 9408459 a087ce0 9408459 a087ce0 985f94e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { config } from "../config";
import { client as mainClient } from "../bot/client";
// The pool of workers
export const workerClients: TelegramClient[] = [];
const WORKER_COUNT = 5;
let isInitializing = false;
import { Api } from "telegram";
export const initParallelWorkers = async () => {
if (isInitializing || workerClients.length > 0) return;
isInitializing = true;
console.log(`Initializing ${WORKER_COUNT} Parallel MTProto workers for high-speed streaming...`);
const sessionString = mainClient.session.save();
// Get current DC ID from main client assuming it's already connected
// This is safe because startBot() is awaited before startServer()
const dcId = mainClient.session.dcId;
for (let i = 0; i < WORKER_COUNT; i++) {
const workerClient = new TelegramClient(
new StringSession(""), // Create an empty session, do NOT clone the string directly!
config.apiId,
config.apiHash,
{
connectionRetries: 5,
useWSS: false,
}
);
// Tell the empty client which DC to connect to
workerClient.session.setDC(dcId, mainClient.session.serverAddress!, mainClient.session.port!);
await workerClient.connect();
try {
// To prevent AUTH_BYTES_INVALID and AUTH_KEY_UNREGISTERED
// We export the auth from the master and import it into this blank worker
const exportedAuth = await mainClient.invoke(
new Api.auth.ExportAuthorization({
dcId: dcId
})
);
const result = await workerClient.invoke(
new Api.auth.ImportAuthorization({
id: exportedAuth.id,
bytes: exportedAuth.bytes
})
);
console.log(`Worker ${i + 1}/${WORKER_COUNT} Auth Imported successfully`, result.className);
} catch (e: any) {
console.warn(`Worker ${i + 1} Authorization warning:`, e.message);
}
workerClients.push(workerClient);
console.log(`Worker ${i + 1}/${WORKER_COUNT} connected.`);
}
};
export async function* parallelDownload(
file: any, // Api.TypeMessageMedia
offset: number,
limit: number,
chunkSize: number = 1024 * 1024 // 1MB chunks instead of 512kb for faster speeds
): AsyncGenerator<Buffer, void, unknown> {
const activeClients = workerClients.length > 0 ? workerClients : [mainClient];
const concurrency = activeClients.length;
let currentOffset = offset;
let remainingBytes = limit;
// A queue of Promises representing sequential byte chunks
const downloadQueue: Promise<Buffer>[] = [];
// We queue blocks
let clientIndex = 0;
const fetchBlock = async (cIdx: number, start: number, size: number) => {
const tgClient = activeClients[cIdx % concurrency];
const chunks: Buffer[] = [];
try {
for await (const chunk of tgClient.iterDownload({
file: file,
offset: require("big-integer")(start),
requestSize: size,
limit: size
})) {
chunks.push(chunk as Buffer);
}
return Buffer.concat(chunks);
} catch (err) {
console.error(`Worker ${cIdx} failed to fetch block at ${start}:`, err);
throw err;
}
};
// Pre-fill the initial sliding window (Concurrency limit)
for (let i = 0; i < concurrency && remainingBytes > 0; i++) {
const size = Math.min(chunkSize, remainingBytes);
downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
currentOffset += size;
remainingBytes -= size;
}
// Sequentially wait for the oldest block in the queue to resolve
while (downloadQueue.length > 0) {
const chunkPromise = downloadQueue.shift()!;
const buffer = await chunkPromise;
yield buffer;
// We yielded a block, so a worker slot is free! Queue the next segment immediately.
if (remainingBytes > 0) {
const size = Math.min(chunkSize, remainingBytes);
downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
currentOffset += size;
remainingBytes -= size;
}
}
}
|