|
|
import { TelegramClient } from "telegram"; |
|
|
import { StringSession } from "telegram/sessions"; |
|
|
import { Telegraf } from "telegraf"; |
|
|
import { Api } from "telegram"; |
|
|
import { NewMessage, NewMessageEvent } from "telegram/events"; |
|
|
import { telegramClients, pendingAuthRequests, activeTasks, telegrafBots } from "./models"; |
|
|
import { getBlockKeywords, getKeywords, normalizeArabicText, sendMessage as sendTelegramMessage, sendReplayTelegramMessage } from "./utils"; |
|
|
import { COMMANDS, MESSAGES } from "./config"; |
|
|
import { fetchDataFromTable, updateDataInTable } from "./db/supabaseHelper"; |
|
|
import { supabase } from './db/supabase'; |
|
|
import { Request, Response } from 'express'; |
|
|
|
|
|
|
|
|
const activeTaskHandlers = new Map(); |
|
|
|
|
|
export const handleSubmitCode = async (phoneNumber: string, code: string) => { |
|
|
if (!phoneNumber || !code) { |
|
|
return { status: 400, data: { error: "Phone number and code required" } }; |
|
|
} |
|
|
|
|
|
const authRequest = pendingAuthRequests.get(phoneNumber); |
|
|
if (!authRequest) { |
|
|
return { status: 400, data: { error: "No active verification session" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
await authRequest.client.invoke( |
|
|
new Api.auth.SignIn({ |
|
|
phoneNumber, |
|
|
phoneCodeHash: authRequest.phoneCodeHash, |
|
|
phoneCode: code, |
|
|
}) |
|
|
); |
|
|
const me = await authRequest.client.getMe(); |
|
|
const sessionString = authRequest.client.session.save() as unknown as string; |
|
|
|
|
|
telegramClients.set(phoneNumber, { |
|
|
client: authRequest.client, |
|
|
lastActivity: new Date(), |
|
|
username: me.username, |
|
|
}); |
|
|
|
|
|
pendingAuthRequests.delete(phoneNumber); |
|
|
|
|
|
return { |
|
|
status: 200, |
|
|
data: { |
|
|
message: "Authentication successful", |
|
|
session: sessionString, |
|
|
phoneNumber: phoneNumber, |
|
|
}, |
|
|
}; |
|
|
} catch (error: any) { |
|
|
console.error("Sign-in error:", error); |
|
|
pendingAuthRequests.delete(phoneNumber); |
|
|
|
|
|
if (error.errorMessage === "PHONE_CODE_INVALID") { |
|
|
return { status: 400, data: { error: "Invalid verification code" } }; |
|
|
} else { |
|
|
return { status: 500, data: { error: error.message } }; |
|
|
} |
|
|
} |
|
|
}; |
|
|
|
|
|
export const handleLoginWithSession = async ( |
|
|
apiKey: string, |
|
|
hash: string, |
|
|
session: string, |
|
|
phoneNumber: string |
|
|
) => { |
|
|
if (!apiKey || !hash || !session || !phoneNumber) { |
|
|
return { |
|
|
status: 400, |
|
|
data: { error: "API ID, Hash, Session, and Phone Number are required" }, |
|
|
}; |
|
|
} |
|
|
|
|
|
const apiId = Number(apiKey); |
|
|
if (isNaN(apiId)) { |
|
|
return { status: 400, data: { error: "Invalid API ID format" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
const client = new TelegramClient(new StringSession(session), apiId, hash, { |
|
|
connectionRetries: 3, |
|
|
}); |
|
|
|
|
|
await client.connect(); |
|
|
const me = await client.getMe(); |
|
|
|
|
|
telegramClients.set(phoneNumber, { |
|
|
client, |
|
|
lastActivity: new Date(), |
|
|
username: me.username, |
|
|
}); |
|
|
|
|
|
return { |
|
|
status: 200, |
|
|
data: { message: "Session login successful", phoneNumber: phoneNumber }, |
|
|
}; |
|
|
} catch (error: any) { |
|
|
console.error("Session login error:", error); |
|
|
return { |
|
|
status: 401, |
|
|
data: { error: "Session login failed", details: error.message }, |
|
|
}; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const handleRequestCode = async ( |
|
|
apiKey: string, |
|
|
hash: string, |
|
|
phoneNumber: string |
|
|
) => { |
|
|
if (!apiKey || !hash || !phoneNumber) { |
|
|
return { |
|
|
status: 400, |
|
|
data: { |
|
|
error: |
|
|
"API ID, Hash, and Phone Number are required. Received: " + |
|
|
JSON.stringify({ apiKey, hash, phoneNumber }), |
|
|
}, |
|
|
}; |
|
|
} |
|
|
|
|
|
const apiId = Number(apiKey); |
|
|
if (isNaN(apiId)) { |
|
|
return { status: 400, data: { error: "Invalid API ID format" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
const client = new TelegramClient(new StringSession(), apiId, hash, { |
|
|
connectionRetries: 5, |
|
|
}); |
|
|
|
|
|
await client.connect(); |
|
|
|
|
|
const { phoneCodeHash } = await client.sendCode( |
|
|
{ apiId, apiHash: hash }, |
|
|
phoneNumber, |
|
|
false |
|
|
); |
|
|
|
|
|
pendingAuthRequests.set(phoneNumber, { client, phoneCodeHash }); |
|
|
|
|
|
return { |
|
|
status: 200, |
|
|
data: { message: "Verification code sent successfully" }, |
|
|
}; |
|
|
} catch (error: any) { |
|
|
console.error("Request code error:", error); |
|
|
return { status: 500, data: { error: error.message } }; |
|
|
} |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const createQuickMessageTask = (clientInfo: any) => { |
|
|
const messageHandler = async (event: NewMessageEvent) => { |
|
|
const message = event.message; |
|
|
|
|
|
if (!message.out || !message.message.startsWith(COMMANDS.GO)) return; |
|
|
|
|
|
const repliedMessage = await message.getReplyMessage(); |
|
|
if (!repliedMessage) { |
|
|
try { |
|
|
await sendTelegramMessage(clientInfo.client, message.sender, "لم يتم العثور على الرسالة المُرَد عليها."); |
|
|
} catch (error) { |
|
|
console.error("Error sending message:", error); |
|
|
} |
|
|
return; |
|
|
} |
|
|
|
|
|
const sender = await repliedMessage.sender; |
|
|
if (!repliedMessage.fwdFrom || !repliedMessage.fwdFrom.fromId) { |
|
|
try { |
|
|
await sendTelegramMessage(clientInfo.client, sender, "المرسل مخفي"); |
|
|
} catch (error) { |
|
|
console.error("Error sending message:", error); |
|
|
} |
|
|
return; |
|
|
} |
|
|
|
|
|
const originalSenderId = repliedMessage.fwdFrom?.fromId?.userId?.value; |
|
|
if (!originalSenderId) { |
|
|
try { |
|
|
await sendTelegramMessage(clientInfo.client, sender, "تعذر تحديد المرسل الأصلي."); |
|
|
} catch (error) { |
|
|
console.error("Error sending message:", error); |
|
|
} |
|
|
return; |
|
|
} |
|
|
|
|
|
const originalSenderEntity = await clientInfo.client.getEntity(originalSenderId); |
|
|
try { |
|
|
await sendTelegramMessage(clientInfo.client, originalSenderEntity, repliedMessage.message); |
|
|
await sendTelegramMessage(clientInfo.client, originalSenderEntity, MESSAGES.FORWARD_SUCCESS); |
|
|
await sendReplayTelegramMessage(clientInfo.client, sender, "تم التوجيه بنجاح", repliedMessage.id); |
|
|
} catch (error) { |
|
|
await sendReplayTelegramMessage(clientInfo.client, sender, " حدث خطأ أثناء التوجيه, يبدو أن حسابك محظور مؤقتا", repliedMessage.id); |
|
|
console.error("Error sending message:", error); |
|
|
} |
|
|
|
|
|
|
|
|
let filters; |
|
|
try { |
|
|
const response = await clientInfo.client.invoke(new Api.messages.GetDialogFilters()); |
|
|
|
|
|
|
|
|
filters = Array.isArray(response.filters) ? response.filters : []; |
|
|
} catch (error) { |
|
|
console.error("Error fetching dialog filters:", error); |
|
|
filters = []; |
|
|
} |
|
|
|
|
|
console.log("Existing filters:", filters); |
|
|
|
|
|
|
|
|
const existingFilter = filters.find((filter: any) => |
|
|
filter.includePeers?.some((peer: any) => |
|
|
peer.userId?.value === originalSenderEntity.id |
|
|
) |
|
|
); |
|
|
|
|
|
if (existingFilter) { |
|
|
console.log("Chat is already in a folder. Skipping folder creation."); |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
const folderWithSpace = filters.find((filter: any) => filter.includePeers?.length < 100); |
|
|
|
|
|
if (folderWithSpace) { |
|
|
|
|
|
folderWithSpace.includePeers.push(new Api.InputPeerUser({ |
|
|
userId: originalSenderEntity.id, |
|
|
accessHash: originalSenderEntity.accessHash |
|
|
})); |
|
|
|
|
|
try { |
|
|
await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ |
|
|
id: folderWithSpace.id, |
|
|
filter: folderWithSpace, |
|
|
})); |
|
|
console.log("Chat added to existing folder with ID:", folderWithSpace.id); |
|
|
} catch (error) { |
|
|
console.error("Error updating existing folder:", error); |
|
|
} |
|
|
} else { |
|
|
|
|
|
const existingIds = Array.isArray(filters) ? filters.map((filter: any) => filter.id) : []; |
|
|
let newFilterId = 4; |
|
|
while (existingIds.includes(newFilterId)) { |
|
|
newFilterId++; |
|
|
} |
|
|
|
|
|
|
|
|
if (newFilterId > 255) { |
|
|
console.error("Cannot create new filter: Maximum number of filters reached (255)."); |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
const newFilter = new Api.DialogFilter({ |
|
|
id: newFilterId, |
|
|
title: "M" + Math.floor(Math.random() * 100), |
|
|
includePeers: [new Api.InputPeerUser({ |
|
|
userId: originalSenderEntity.id, |
|
|
accessHash: originalSenderEntity.accessHash |
|
|
})], |
|
|
excludePeers: [], |
|
|
contacts: false, |
|
|
nonContacts: false, |
|
|
groups: false, |
|
|
broadcasts: false, |
|
|
bots: false, |
|
|
excludeMuted: false, |
|
|
excludeRead: false, |
|
|
excludeArchived: false, |
|
|
pinnedPeers: [], |
|
|
}); |
|
|
|
|
|
console.log("New filter object:", newFilter); |
|
|
|
|
|
try { |
|
|
await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ |
|
|
id: newFilterId, |
|
|
filter: newFilter, |
|
|
})); |
|
|
console.log("New filter created successfully with ID:", newFilterId); |
|
|
} catch (error) { |
|
|
console.error("Error creating new dialog filter:", error); |
|
|
} |
|
|
} |
|
|
}; |
|
|
|
|
|
const eventFilter = new NewMessage({ |
|
|
outgoing: true, |
|
|
incoming: false, |
|
|
}); |
|
|
|
|
|
return { messageHandler, eventFilter }; |
|
|
}; |
|
|
|
|
|
|
|
|
const createMonitorTask = async ( |
|
|
clientInfo: any, |
|
|
|
|
|
forwardToUsernames: string[] |
|
|
) => { |
|
|
const normalizedKeywordList = await getKeywords(); |
|
|
const normalizedBlockKeywordList = await getBlockKeywords(); |
|
|
|
|
|
console.log("Normalized Keywords:", normalizedKeywordList); |
|
|
console.log("Normalized Block Keywords:", normalizedBlockKeywordList); |
|
|
|
|
|
const messageHandler = async (event: NewMessageEvent) => { |
|
|
const message = event.message; |
|
|
|
|
|
|
|
|
if (!message.message || !message.isGroup) return; |
|
|
|
|
|
|
|
|
const messageText = normalizeArabicText(message.message.toLowerCase()); |
|
|
|
|
|
|
|
|
const containsKeyword = normalizedKeywordList.some((keyword) => |
|
|
messageText.includes(keyword.toLowerCase()) |
|
|
); |
|
|
|
|
|
|
|
|
const doesNotContainBlockKeyword = !normalizedBlockKeywordList.some((keyword) => |
|
|
messageText.includes(keyword.toLowerCase()) |
|
|
); |
|
|
|
|
|
|
|
|
if (containsKeyword && doesNotContainBlockKeyword) { |
|
|
try { |
|
|
|
|
|
for (const username of forwardToUsernames) { |
|
|
|
|
|
await clientInfo.client.forwardMessages(username, { |
|
|
messages: [message.id], |
|
|
fromPeer: message.peerId, |
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
} catch (error: any) { |
|
|
console.error("Error forwarding message:", error.message); |
|
|
} |
|
|
} |
|
|
}; |
|
|
|
|
|
const eventFilter = new NewMessage({ |
|
|
outgoing: false, |
|
|
incoming: true, |
|
|
}); |
|
|
|
|
|
return { messageHandler, eventFilter }; |
|
|
}; |
|
|
|
|
|
|
|
|
const createJoinGroupTask = (clientInfo: any) => { |
|
|
const messageHandler = async (event: NewMessageEvent) => { |
|
|
const message = event.message; |
|
|
if (message.message.startsWith("./join")) { |
|
|
const groupLink = message.message.split("./join")[1]?.trim(); |
|
|
if (groupLink) { |
|
|
try { |
|
|
await clientInfo.client.invoke(new Api.messages.ImportChatInvite({ hash: groupLink })); |
|
|
await sendTelegramMessage(clientInfo.client, message.sender, `Joined group: ${groupLink}`); |
|
|
} catch (error: any) { |
|
|
await sendTelegramMessage(clientInfo.client, message.sender, `Failed to join group: ${error.message}`); |
|
|
} |
|
|
} |
|
|
} |
|
|
}; |
|
|
|
|
|
const eventFilter = new NewMessage({ |
|
|
outgoing: true, |
|
|
incoming: false, |
|
|
}); |
|
|
|
|
|
return { messageHandler, eventFilter }; |
|
|
}; |
|
|
|
|
|
|
|
|
const createScrapeLinksTask = (clientInfo: any) => { |
|
|
const messageHandler = async (event: NewMessageEvent) => { |
|
|
const message = event.message; |
|
|
const links = message.message.match(/https?:\/\/[^\s]+/g); |
|
|
if (links) { |
|
|
await sendTelegramMessage(clientInfo.client, message.sender, `Found links: ${links.join(", ")}`); |
|
|
} |
|
|
}; |
|
|
|
|
|
const eventFilter = new NewMessage({ |
|
|
outgoing: false, |
|
|
incoming: true, |
|
|
}); |
|
|
|
|
|
return { messageHandler, eventFilter }; |
|
|
}; |
|
|
|
|
|
export const handleCreateTask = async ( |
|
|
phoneNumber: string, |
|
|
taskType: API.TaskType, |
|
|
taskData?: any |
|
|
) => { |
|
|
if (!phoneNumber) { |
|
|
return { status: 400, data: { error: "Phone number is required" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
const clientInfo = telegramClients.get(phoneNumber); |
|
|
if (!clientInfo) { |
|
|
return { status: 404, data: { error: "Client not found for this phone number" } }; |
|
|
} |
|
|
|
|
|
|
|
|
const taskId = taskData.id; |
|
|
if (!taskId) { |
|
|
return { status: 400, data: { error: "Task ID is required" } }; |
|
|
} |
|
|
|
|
|
let messageHandler: (event: NewMessageEvent) => void; |
|
|
let eventFilter: NewMessage; |
|
|
|
|
|
switch (taskType) { |
|
|
case "quickMessage": |
|
|
({ messageHandler, eventFilter } = createQuickMessageTask(clientInfo)); |
|
|
break; |
|
|
|
|
|
case "monitor": |
|
|
if ( |
|
|
!taskData?.receiverUsernames |
|
|
) { |
|
|
return { status: 400, data: { error: "Missing required task data for monitor task" } }; |
|
|
} |
|
|
|
|
|
({ messageHandler, eventFilter } = await createMonitorTask( |
|
|
clientInfo, |
|
|
taskData.receiverUsernames |
|
|
)); |
|
|
break; |
|
|
|
|
|
|
|
|
case "joinGroup": |
|
|
({ messageHandler, eventFilter } = createJoinGroupTask(clientInfo)); |
|
|
break; |
|
|
|
|
|
case "scrapeLinks": |
|
|
({ messageHandler, eventFilter } = createScrapeLinksTask(clientInfo)); |
|
|
break; |
|
|
|
|
|
default: |
|
|
return { status: 400, data: { error: "Invalid task type" } }; |
|
|
} |
|
|
|
|
|
clientInfo.client.addEventHandler(messageHandler, eventFilter); |
|
|
|
|
|
activeTasks.set(taskId, { |
|
|
client: clientInfo.client, |
|
|
taskData:taskData, |
|
|
taskType: taskType, |
|
|
listeners: new Map([[phoneNumber, { handler: messageHandler, filter: eventFilter }]]), |
|
|
}); |
|
|
|
|
|
telegramClients.set(phoneNumber, { |
|
|
...clientInfo, |
|
|
lastActivity: new Date(), |
|
|
}); |
|
|
|
|
|
return { |
|
|
status: 200, |
|
|
data: { |
|
|
message: `Task of type '${taskType}' created successfully`, |
|
|
taskId, |
|
|
monitoring: taskType === "monitor" ? "Monitoring incoming messages" : "ME (your own messages)", |
|
|
}, |
|
|
}; |
|
|
} catch (error: any) { |
|
|
console.error("Task creation error:", error); |
|
|
return { status: 500, data: { error: "Failed to create task", details: error.message } }; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const handleCancelTask = async (taskId: string) => { |
|
|
const task = activeTasks.get(taskId); |
|
|
|
|
|
if (!task) { |
|
|
return { status: 404, data: { error: "Task not found" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
for (const { handler, filter } of task.listeners.values()) { |
|
|
task.client.removeEventHandler(handler, filter); |
|
|
} |
|
|
|
|
|
activeTasks.delete(taskId); |
|
|
return { status: 200, data: { message: "Task canceled successfully" } }; |
|
|
} catch (error: any) { |
|
|
console.error("Task cancellation error:", error); |
|
|
return { status: 500, data: { error: "Failed to cancel task" } }; |
|
|
} |
|
|
}; |
|
|
|
|
|
|
|
|
export const handleSendMessage = async (botToken: string, chatId: string, text: string) => { |
|
|
const bot = telegrafBots.get(botToken); |
|
|
if (!bot) { |
|
|
return { status: 404, data: { error: "Telegraf bot not found" } }; |
|
|
} |
|
|
|
|
|
try { |
|
|
await bot.telegram.sendMessage(chatId, text); |
|
|
return { status: 200, data: { message: "Message sent successfully" } }; |
|
|
} catch (error: any) { |
|
|
return { status: 500, data: { error: error.message } }; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const startBots = async (): Promise<void> => { |
|
|
try { |
|
|
|
|
|
const accountsResponse = await fetchDataFromTable("personal_accounts_telegram", 1000, 0); |
|
|
console.log("Accounts response:", accountsResponse.data); |
|
|
|
|
|
|
|
|
const tasksResponse = await fetchDataFromTable("tasks", 1000, 0); |
|
|
console.log("Tasks response:", tasksResponse.data); |
|
|
|
|
|
for (const account of accountsResponse.data as API.PersonalAccount[]) { |
|
|
if (account.isActive && account.session) { |
|
|
try { |
|
|
|
|
|
const loginResponse = await handleLoginWithSession( |
|
|
account.apiKey, |
|
|
account.hash, |
|
|
account.session, |
|
|
account.phoneNumber |
|
|
); |
|
|
|
|
|
if (!loginResponse || loginResponse.status !== 200) { |
|
|
console.error(`Failed to login account ${account.phoneNumber}:`, loginResponse?.data?.error); |
|
|
continue; |
|
|
} |
|
|
|
|
|
|
|
|
for (const task of tasksResponse.data as API.Task[]) { |
|
|
if (task.isActive && task.personalAccountId === account.id) { |
|
|
try { |
|
|
|
|
|
await updateDataInTable('tasks', { |
|
|
id: task.id, |
|
|
state: 'active', |
|
|
updated_at: new Date().toISOString() |
|
|
}).catch(err => { |
|
|
console.error(`Failed to update task state for ${task.id}:`, err); |
|
|
|
|
|
}); |
|
|
|
|
|
const taskDataWithId = { |
|
|
...task.taskData, |
|
|
id: task.id |
|
|
}; |
|
|
|
|
|
const taskResponse = await handleCreateTask( |
|
|
account.phoneNumber, |
|
|
task.taskType, |
|
|
taskDataWithId |
|
|
); |
|
|
|
|
|
console.log(`${task.taskType} task response for ${account.phoneNumber}:`, taskResponse); |
|
|
|
|
|
if (taskResponse.status !== 200) { |
|
|
console.error(`Task creation failed for ${task.id}:`, taskResponse.data.error); |
|
|
|
|
|
|
|
|
await updateDataInTable('tasks', { |
|
|
id: task.id, |
|
|
state: 'failed', |
|
|
error_message: taskResponse.data.error, |
|
|
updated_at: new Date().toISOString() |
|
|
}).catch(err => console.error(`Failed to update error state for ${task.id}:`, err)); |
|
|
|
|
|
continue; |
|
|
} |
|
|
|
|
|
} catch (error) { |
|
|
console.error(`Failed to process task ${task.id} for ${account.phoneNumber}:`, error); |
|
|
|
|
|
|
|
|
try { |
|
|
await updateDataInTable('tasks', { |
|
|
id: task.id, |
|
|
state: 'failed', |
|
|
error_message: error.message, |
|
|
updated_at: new Date().toISOString() |
|
|
}); |
|
|
} catch (updateError) { |
|
|
console.error(`Failed to update error state for ${task.id}:`, updateError); |
|
|
} |
|
|
|
|
|
continue; |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (accountError) { |
|
|
console.error(`Error processing account ${account.phoneNumber}:`, accountError); |
|
|
continue; |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
console.log("Completed processing all accounts and tasks"); |
|
|
return; |
|
|
|
|
|
} catch (error: any) { |
|
|
console.error("Error in startBots:", error.message); |
|
|
|
|
|
return; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
export const stopBots = async (): Promise<void> => { |
|
|
try { |
|
|
|
|
|
for (const [taskId, task] of activeTasks.entries()) { |
|
|
|
|
|
for (const [phoneNumber, listener] of task.listeners.entries()) { |
|
|
task.client.removeEventHandler(listener.handler, listener.filter); |
|
|
} |
|
|
|
|
|
|
|
|
await supabase |
|
|
.from('tasks') |
|
|
.update({ |
|
|
state: 'stopped', |
|
|
updated_at: new Date().toISOString() |
|
|
}) |
|
|
.eq('id', taskId); |
|
|
|
|
|
activeTasks.delete(taskId); |
|
|
} |
|
|
|
|
|
|
|
|
for (const [phoneNumber, clientInfo] of telegramClients.entries()) { |
|
|
await clientInfo.client.disconnect(); |
|
|
telegramClients.delete(phoneNumber); |
|
|
} |
|
|
|
|
|
|
|
|
for (const [botToken, bot] of telegrafBots.entries()) { |
|
|
await bot.stop(); |
|
|
telegrafBots.delete(botToken); |
|
|
} |
|
|
|
|
|
console.log("All bots stopped successfully"); |
|
|
} catch (error: any) { |
|
|
console.error("Error stopping bots:", error.message); |
|
|
throw error; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const restartBots = async (): Promise<void> => { |
|
|
try { |
|
|
|
|
|
await stopBots(); |
|
|
|
|
|
|
|
|
await startBots(); |
|
|
|
|
|
console.log("All bots restarted successfully"); |
|
|
} catch (error: any) { |
|
|
console.error("Error restarting bots:", error.message); |
|
|
throw error; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const getBotsState = async () => { |
|
|
try { |
|
|
|
|
|
const hasActiveClients = telegramClients.size > 0; |
|
|
const hasActiveBots = telegrafBots.size > 0; |
|
|
const activeTasksCount = activeTasks.size; |
|
|
|
|
|
return { |
|
|
state: hasActiveClients || hasActiveBots ? 'running' : 'stopped', |
|
|
details: { |
|
|
activeClients: telegramClients.size, |
|
|
activeBots: telegrafBots.size, |
|
|
activeTasks: activeTasksCount, |
|
|
} |
|
|
}; |
|
|
} catch (error: any) { |
|
|
console.error("Error getting bots state:", error.message); |
|
|
throw error; |
|
|
} |
|
|
}; |
|
|
|
|
|
export const setupTaskWatcher = () => { |
|
|
const subscription = supabase |
|
|
.channel('tasks-changes') |
|
|
.on( |
|
|
'postgres_changes', |
|
|
{ |
|
|
event: '*', |
|
|
schema: 'public', |
|
|
table: 'tasks' |
|
|
}, |
|
|
async (payload) => { |
|
|
const { eventType, new: newRecord, old: oldRecord } = payload; |
|
|
|
|
|
switch (eventType) { |
|
|
case 'UPDATE': |
|
|
await handleTaskUpdate(newRecord, oldRecord); |
|
|
break; |
|
|
case 'DELETE': |
|
|
await handleTaskDelete(oldRecord); |
|
|
break; |
|
|
} |
|
|
} |
|
|
) |
|
|
.subscribe(); |
|
|
|
|
|
return subscription; |
|
|
}; |
|
|
|
|
|
async function handleTaskUpdate(newTask: any, oldTask: any) { |
|
|
try { |
|
|
|
|
|
const taskDataChanged = JSON.stringify(newTask.taskData) !== JSON.stringify(oldTask.taskData); |
|
|
const stateChanged = newTask.state !== oldTask.state; |
|
|
|
|
|
if (taskDataChanged && newTask.state === 'active') { |
|
|
|
|
|
await stopTaskProcess(newTask.id); |
|
|
|
|
|
await startTaskProcess(newTask); |
|
|
} else if (stateChanged) { |
|
|
if (newTask.state === 'active') { |
|
|
await startTaskProcess(newTask); |
|
|
} else if (newTask.state === 'stopped' || newTask.state === 'failed') { |
|
|
await stopTaskProcess(newTask.id); |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
console.error('Error handling task update:', error); |
|
|
|
|
|
await supabase |
|
|
.from('tasks') |
|
|
.update({ state: 'failed' }) |
|
|
.eq('id', newTask.id); |
|
|
} |
|
|
} |
|
|
|
|
|
async function handleTaskDelete(task: any) { |
|
|
try { |
|
|
|
|
|
await stopTaskProcess(task.id); |
|
|
} catch (error) { |
|
|
console.error('Error handling task deletion:', error); |
|
|
} |
|
|
} |
|
|
|
|
|
async function startTaskProcess(task: any) { |
|
|
|
|
|
await stopTaskProcess(task.id); |
|
|
|
|
|
try { |
|
|
let taskHandler: any; |
|
|
|
|
|
switch (task.taskType) { |
|
|
case 'monitor': |
|
|
taskHandler = await setupMonitorTask(task); |
|
|
break; |
|
|
case 'quickMessage': |
|
|
taskHandler = await setupMessageTask(task); |
|
|
break; |
|
|
|
|
|
} |
|
|
|
|
|
if (taskHandler) { |
|
|
activeTaskHandlers.set(task.id, taskHandler); |
|
|
} |
|
|
|
|
|
} catch (error) { |
|
|
console.error(`Error starting task ${task.id}:`, error); |
|
|
await supabase |
|
|
.from('tasks') |
|
|
.update({ state: 'failed' }) |
|
|
.eq('id', task.id); |
|
|
} |
|
|
} |
|
|
|
|
|
async function stopTaskProcess(taskId: string) { |
|
|
const handler = activeTaskHandlers.get(taskId); |
|
|
if (handler) { |
|
|
if (handler.interval) { |
|
|
clearInterval(handler.interval); |
|
|
} |
|
|
if (handler.cleanup) { |
|
|
await handler.cleanup(); |
|
|
} |
|
|
activeTaskHandlers.delete(taskId); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async function setupMessageTask(task: any) { |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
export function initializeTaskSystem() { |
|
|
|
|
|
const subscription = setupTaskWatcher(); |
|
|
|
|
|
|
|
|
loadExistingTasks(); |
|
|
|
|
|
return { |
|
|
cleanup: () => { |
|
|
subscription.unsubscribe(); |
|
|
|
|
|
for (const taskId of activeTaskHandlers.keys()) { |
|
|
stopTaskProcess(taskId); |
|
|
} |
|
|
} |
|
|
}; |
|
|
} |
|
|
|
|
|
async function loadExistingTasks() { |
|
|
try { |
|
|
const { data: activeTasks, error } = await supabase |
|
|
.from('tasks') |
|
|
.select('*') |
|
|
.eq('state', 'active'); |
|
|
|
|
|
if (error) throw error; |
|
|
|
|
|
|
|
|
for (const task of activeTasks) { |
|
|
await startTaskProcess(task); |
|
|
} |
|
|
} catch (error) { |
|
|
console.error('Error loading existing tasks:', error); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
export function setupTaskSystem(app) { |
|
|
const taskSystem = initializeTaskSystem(); |
|
|
|
|
|
|
|
|
process.on('SIGTERM', () => { |
|
|
taskSystem.cleanup(); |
|
|
}); |
|
|
|
|
|
process.on('SIGINT', () => { |
|
|
taskSystem.cleanup(); |
|
|
}); |
|
|
} |