File size: 4,489 Bytes
985f94e
 
 
 
 
 
 
 
 
 
 
a087ce0
 
985f94e
 
 
 
 
 
 
a087ce0
 
 
 
985f94e
 
9408459
985f94e
 
 
 
 
 
 
 
9408459
 
 
985f94e
a087ce0
 
9408459
 
a087ce0
 
 
 
 
 
9408459
a087ce0
 
 
 
 
 
9408459
 
a087ce0
 
 
 
985f94e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { config } from "../config";
import { client as mainClient } from "../bot/client";

// The pool of workers
export const workerClients: TelegramClient[] = [];
const WORKER_COUNT = 5;

let isInitializing = false;

import { Api } from "telegram";

export const initParallelWorkers = async () => {
    if (isInitializing || workerClients.length > 0) return;
    isInitializing = true;

    console.log(`Initializing ${WORKER_COUNT} Parallel MTProto workers for high-speed streaming...`);
    const sessionString = mainClient.session.save();

    // Get current DC ID from main client assuming it's already connected
    // This is safe because startBot() is awaited before startServer()
    const dcId = mainClient.session.dcId;

    for (let i = 0; i < WORKER_COUNT; i++) {
        const workerClient = new TelegramClient(
            new StringSession(""), // Create an empty session, do NOT clone the string directly!
            config.apiId,
            config.apiHash,
            {
                connectionRetries: 5,
                useWSS: false,
            }
        );

        // Tell the empty client which DC to connect to
        workerClient.session.setDC(dcId, mainClient.session.serverAddress!, mainClient.session.port!);

        await workerClient.connect();

        try {
            // To prevent AUTH_BYTES_INVALID and AUTH_KEY_UNREGISTERED
            // We export the auth from the master and import it into this blank worker
            const exportedAuth = await mainClient.invoke(
                new Api.auth.ExportAuthorization({
                    dcId: dcId
                })
            );

            const result = await workerClient.invoke(
                new Api.auth.ImportAuthorization({
                    id: exportedAuth.id,
                    bytes: exportedAuth.bytes
                })
            );

            console.log(`Worker ${i + 1}/${WORKER_COUNT} Auth Imported successfully`, result.className);

        } catch (e: any) {
            console.warn(`Worker ${i + 1} Authorization warning:`, e.message);
        }

        workerClients.push(workerClient);
        console.log(`Worker ${i + 1}/${WORKER_COUNT} connected.`);
    }
};

export async function* parallelDownload(
    file: any, // Api.TypeMessageMedia
    offset: number,
    limit: number,
    chunkSize: number = 1024 * 1024 // 1MB chunks instead of 512kb for faster speeds
): AsyncGenerator<Buffer, void, unknown> {

    const activeClients = workerClients.length > 0 ? workerClients : [mainClient];
    const concurrency = activeClients.length;

    let currentOffset = offset;
    let remainingBytes = limit;

    // A queue of Promises representing sequential byte chunks
    const downloadQueue: Promise<Buffer>[] = [];

    // We queue blocks
    let clientIndex = 0;

    const fetchBlock = async (cIdx: number, start: number, size: number) => {
        const tgClient = activeClients[cIdx % concurrency];
        const chunks: Buffer[] = [];

        try {
            for await (const chunk of tgClient.iterDownload({
                file: file,
                offset: require("big-integer")(start),
                requestSize: size,
                limit: size
            })) {
                chunks.push(chunk as Buffer);
            }
            return Buffer.concat(chunks);
        } catch (err) {
            console.error(`Worker ${cIdx} failed to fetch block at ${start}:`, err);
            throw err;
        }
    };

    // Pre-fill the initial sliding window (Concurrency limit)
    for (let i = 0; i < concurrency && remainingBytes > 0; i++) {
        const size = Math.min(chunkSize, remainingBytes);
        downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
        currentOffset += size;
        remainingBytes -= size;
    }

    // Sequentially wait for the oldest block in the queue to resolve
    while (downloadQueue.length > 0) {
        const chunkPromise = downloadQueue.shift()!;
        const buffer = await chunkPromise;

        yield buffer;

        // We yielded a block, so a worker slot is free! Queue the next segment immediately.
        if (remainingBytes > 0) {
            const size = Math.min(chunkSize, remainingBytes);
            downloadQueue.push(fetchBlock(clientIndex++, currentOffset, size));
            currentOffset += size;
            remainingBytes -= size;
        }
    }
}