Spaces:
Paused
Paused
| import { TelegramClient } from "telegram"; | |
| import { StringSession } from "telegram/sessions"; | |
| import { Telegraf } from "telegraf"; | |
| import { Api } from "telegram"; | |
| import { NewMessage, NewMessageEvent } from "telegram/events"; | |
| import { telegramClients, pendingAuthRequests, activeTasks, telegrafBots } from "./models"; | |
| import { getBlockKeywords, getKeywords, normalizeArabicText, sendMessage as sendTelegramMessage, sendReplayTelegramMessage } from "./utils"; | |
| import { COMMANDS, MESSAGES } from "./config"; // Import the COMMANDS and MESSAGES objects | |
| import { fetchDataFromTable, updateDataInTable } from "./db/supabaseHelper"; | |
| import { supabase } from './db/supabase'; | |
| import { Request, Response } from 'express'; | |
| // Map to store active task handlers/intervals | |
| const activeTaskHandlers = new Map(); | |
| export const handleSubmitCode = async (phoneNumber: string, code: string) => { | |
| if (!phoneNumber || !code) { | |
| return { status: 400, data: { error: "Phone number and code required" } }; | |
| } | |
| const authRequest = pendingAuthRequests.get(phoneNumber); | |
| if (!authRequest) { | |
| return { status: 400, data: { error: "No active verification session" } }; | |
| } | |
| try { | |
| await authRequest.client.invoke( | |
| new Api.auth.SignIn({ | |
| phoneNumber, | |
| phoneCodeHash: authRequest.phoneCodeHash, | |
| phoneCode: code, | |
| }) | |
| ); | |
| const me = await authRequest.client.getMe(); | |
| const sessionString = authRequest.client.session.save() as unknown as string; | |
| telegramClients.set(phoneNumber, { | |
| client: authRequest.client, | |
| lastActivity: new Date(), | |
| username: me.username, | |
| }); | |
| pendingAuthRequests.delete(phoneNumber); | |
| return { | |
| status: 200, | |
| data: { | |
| message: "Authentication successful", | |
| session: sessionString, | |
| phoneNumber: phoneNumber, | |
| }, | |
| }; | |
| } catch (error: any) { | |
| console.error("Sign-in error:", error); | |
| pendingAuthRequests.delete(phoneNumber); | |
| if (error.errorMessage === "PHONE_CODE_INVALID") { | |
| return { status: 400, data: { error: "Invalid verification code" } }; | |
| } else { | |
| return { status: 500, data: { error: error.message } }; | |
| } | |
| } | |
| }; | |
| export const handleLoginWithSession = async ( | |
| apiKey: string, | |
| hash: string, | |
| session: string, | |
| phoneNumber: string | |
| ) => { | |
| if (!apiKey || !hash || !session || !phoneNumber) { | |
| return { | |
| status: 400, | |
| data: { error: "API ID, Hash, Session, and Phone Number are required" }, | |
| }; | |
| } | |
| const apiId = Number(apiKey); | |
| if (isNaN(apiId)) { | |
| return { status: 400, data: { error: "Invalid API ID format" } }; | |
| } | |
| try { | |
| const client = new TelegramClient(new StringSession(session), apiId, hash, { | |
| connectionRetries: 3, | |
| }); | |
| await client.connect(); | |
| const me = await client.getMe(); | |
| telegramClients.set(phoneNumber, { | |
| client, | |
| lastActivity: new Date(), | |
| username: me.username, | |
| }); | |
| return { | |
| status: 200, | |
| data: { message: "Session login successful", phoneNumber: phoneNumber }, | |
| }; | |
| } catch (error: any) { | |
| console.error("Session login error:", error); | |
| return { | |
| status: 401, | |
| data: { error: "Session login failed", details: error.message }, | |
| }; | |
| } | |
| }; | |
| export const handleRequestCode = async ( | |
| apiKey: string, | |
| hash: string, | |
| phoneNumber: string | |
| ) => { | |
| if (!apiKey || !hash || !phoneNumber) { | |
| return { | |
| status: 400, | |
| data: { | |
| error: | |
| "API ID, Hash, and Phone Number are required. Received: " + | |
| JSON.stringify({ apiKey, hash, phoneNumber }), | |
| }, | |
| }; | |
| } | |
| const apiId = Number(apiKey); | |
| if (isNaN(apiId)) { | |
| return { status: 400, data: { error: "Invalid API ID format" } }; | |
| } | |
| try { | |
| const client = new TelegramClient(new StringSession(), apiId, hash, { | |
| connectionRetries: 5, | |
| }); | |
| await client.connect(); | |
| const { phoneCodeHash } = await client.sendCode( | |
| { apiId, apiHash: hash }, | |
| phoneNumber, | |
| false | |
| ); | |
| pendingAuthRequests.set(phoneNumber, { client, phoneCodeHash }); | |
| return { | |
| status: 200, | |
| data: { message: "Verification code sent successfully" }, | |
| }; | |
| } catch (error: any) { | |
| console.error("Request code error:", error); | |
| return { status: 500, data: { error: error.message } }; | |
| } | |
| }; | |
| // export const handleAddTelegrafBot = async (botToken: string) => { | |
| // if (!botToken) { | |
| // return { status: 400, data: { error: "Bot Token is required" } }; | |
| // } | |
| // try { | |
| // const bot = new Telegraf(botToken); | |
| // telegrafBots.set(botToken, bot); | |
| // bot.launch(); | |
| // console.log("Bot launched successfully"); | |
| // return { | |
| // status: 200, | |
| // data: { message: "Telegraf bot started successfully" }, | |
| // }; | |
| // } catch (error: any) { | |
| // console.error("Error launching bot:", error); | |
| // return { status: 500, data: { error: "Failed to launch bot: " + error.message } }; | |
| // } | |
| // }; | |
| export const createQuickMessageTask = (clientInfo: any) => { | |
| const messageHandler = async (event: NewMessageEvent) => { | |
| const message = event.message; | |
| if (!message.out || !message.message.startsWith(COMMANDS.GO)) return; | |
| const repliedMessage = await message.getReplyMessage(); | |
| if (!repliedMessage) { | |
| try { | |
| await sendTelegramMessage(clientInfo.client, message.sender, "لم يتم العثور على الرسالة المُرَد عليها."); | |
| } catch (error) { | |
| console.error("Error sending message:", error); | |
| } | |
| return; | |
| } | |
| const sender = await repliedMessage.sender; | |
| if (!repliedMessage.fwdFrom || !repliedMessage.fwdFrom.fromId) { | |
| try { | |
| await sendTelegramMessage(clientInfo.client, sender, "المرسل مخفي"); | |
| } catch (error) { | |
| console.error("Error sending message:", error); | |
| } | |
| return; | |
| } | |
| const originalSenderId = repliedMessage.fwdFrom?.fromId?.userId?.value; | |
| if (!originalSenderId) { | |
| try { | |
| await sendTelegramMessage(clientInfo.client, sender, "تعذر تحديد المرسل الأصلي."); | |
| } catch (error) { | |
| console.error("Error sending message:", error); | |
| } | |
| return; | |
| } | |
| const originalSenderEntity = await clientInfo.client.getEntity(originalSenderId); | |
| try { | |
| await sendTelegramMessage(clientInfo.client, originalSenderEntity, repliedMessage.message); | |
| await sendTelegramMessage(clientInfo.client, originalSenderEntity, MESSAGES.FORWARD_SUCCESS); | |
| await sendReplayTelegramMessage(clientInfo.client, sender, "تم التوجيه بنجاح", repliedMessage.id); | |
| } catch (error) { | |
| await sendReplayTelegramMessage(clientInfo.client, sender, " حدث خطأ أثناء التوجيه, يبدو أن حسابك محظور مؤقتا", repliedMessage.id); | |
| console.error("Error sending message:", error); | |
| } | |
| // Fetch dialog filters (folders) | |
| let filters; | |
| try { | |
| const response = await clientInfo.client.invoke(new Api.messages.GetDialogFilters()); | |
| // Ensure filters is an array | |
| filters = Array.isArray(response.filters) ? response.filters : []; | |
| } catch (error) { | |
| console.error("Error fetching dialog filters:", error); | |
| filters = []; | |
| } | |
| console.log("Existing filters:", filters); // Debugging: Log the filters | |
| // Check if the chat is already in a folder | |
| const existingFilter = filters.find((filter: any) => | |
| filter.includePeers?.some((peer: any) => | |
| peer.userId?.value === originalSenderEntity.id | |
| ) | |
| ); | |
| if (existingFilter) { | |
| console.log("Chat is already in a folder. Skipping folder creation."); | |
| return; | |
| } | |
| // Find a folder with space (less than 100 chats) | |
| const folderWithSpace = filters.find((filter: any) => filter.includePeers?.length < 100); | |
| if (folderWithSpace) { | |
| // Add the chat to the existing folder | |
| folderWithSpace.includePeers.push(new Api.InputPeerUser({ | |
| userId: originalSenderEntity.id, | |
| accessHash: originalSenderEntity.accessHash | |
| })); | |
| try { | |
| await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ | |
| id: folderWithSpace.id, | |
| filter: folderWithSpace, | |
| })); | |
| console.log("Chat added to existing folder with ID:", folderWithSpace.id); | |
| } catch (error) { | |
| console.error("Error updating existing folder:", error); | |
| } | |
| } else { | |
| // No folder with space found, create a new folder | |
| const existingIds = Array.isArray(filters) ? filters.map((filter: any) => filter.id) : []; | |
| let newFilterId = 4; // Start with a higher ID to avoid conflicts | |
| while (existingIds.includes(newFilterId)) { | |
| newFilterId++; | |
| } | |
| // Ensure the new ID is within the valid range (1-255) | |
| if (newFilterId > 255) { | |
| console.error("Cannot create new filter: Maximum number of filters reached (255)."); | |
| return; | |
| } | |
| // Create a new folder and add the chat | |
| const newFilter = new Api.DialogFilter({ | |
| id: newFilterId, | |
| title: "M" + Math.floor(Math.random() * 100), | |
| includePeers: [new Api.InputPeerUser({ | |
| userId: originalSenderEntity.id, | |
| accessHash: originalSenderEntity.accessHash | |
| })], | |
| excludePeers: [], | |
| contacts: false, | |
| nonContacts: false, | |
| groups: false, | |
| broadcasts: false, | |
| bots: false, | |
| excludeMuted: false, | |
| excludeRead: false, | |
| excludeArchived: false, | |
| pinnedPeers: [], | |
| }); | |
| console.log("New filter object:", newFilter); // Debugging: Log the new filter | |
| try { | |
| await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ | |
| id: newFilterId, | |
| filter: newFilter, | |
| })); | |
| console.log("New filter created successfully with ID:", newFilterId); | |
| } catch (error) { | |
| console.error("Error creating new dialog filter:", error); | |
| } | |
| } | |
| }; | |
| const eventFilter = new NewMessage({ | |
| outgoing: true, | |
| incoming: false, | |
| }); | |
| return { messageHandler, eventFilter }; | |
| }; | |
| // Helper function to create a monitor task | |
| const createMonitorTask = async ( | |
| clientInfo: any, | |
| forwardToUsernames: string[] | |
| ) => { | |
| const normalizedKeywordList = await getKeywords(); | |
| const normalizedBlockKeywordList = await getBlockKeywords(); | |
| console.log("Normalized Keywords:", normalizedKeywordList); | |
| console.log("Normalized Block Keywords:", normalizedBlockKeywordList); | |
| const messageHandler = async (event: NewMessageEvent) => { | |
| const message = event.message; | |
| // Ensure the message is from a group and contains text | |
| if (!message.message || !message.isGroup) return; | |
| // Convert the message text to lowercase for case-insensitive comparison | |
| const messageText = normalizeArabicText(message.message.toLowerCase()); | |
| // Check if the message contains any keyword from the normalizedKeywordList | |
| const containsKeyword = normalizedKeywordList.some((keyword) => | |
| messageText.includes(keyword.toLowerCase()) | |
| ); | |
| // Check if the message does NOT contain any keyword from the normalizedBlockKeywordList | |
| const doesNotContainBlockKeyword = !normalizedBlockKeywordList.some((keyword) => | |
| messageText.includes(keyword.toLowerCase()) | |
| ); | |
| // If the message meets the conditions, forward it to the specified users | |
| if (containsKeyword && doesNotContainBlockKeyword) { | |
| try { | |
| // Forward the message to each user in the forwardToUsernames list | |
| for (const username of forwardToUsernames) { | |
| await clientInfo.client.forwardMessages(username, { | |
| messages: [message.id], | |
| fromPeer: message.peerId, | |
| }); | |
| // console.log(`Forwarded message to ${username}`); | |
| // } else { | |
| // console.error(`User ${username} not found`); | |
| // } | |
| } | |
| } catch (error: any) { | |
| console.error("Error forwarding message:", error.message); | |
| } | |
| } | |
| }; | |
| const eventFilter = new NewMessage({ | |
| outgoing: false, // Monitor incoming messages only | |
| incoming: true, | |
| }); | |
| return { messageHandler, eventFilter }; | |
| }; | |
| // Helper function to create a joinGroup task | |
| const createJoinGroupTask = (clientInfo: any) => { | |
| const messageHandler = async (event: NewMessageEvent) => { | |
| const message = event.message; | |
| if (message.message.startsWith("./join")) { | |
| const groupLink = message.message.split("./join")[1]?.trim(); | |
| if (groupLink) { | |
| try { | |
| await clientInfo.client.invoke(new Api.messages.ImportChatInvite({ hash: groupLink })); | |
| await sendTelegramMessage(clientInfo.client, message.sender, `Joined group: ${groupLink}`); | |
| } catch (error: any) { | |
| await sendTelegramMessage(clientInfo.client, message.sender, `Failed to join group: ${error.message}`); | |
| } | |
| } | |
| } | |
| }; | |
| const eventFilter = new NewMessage({ | |
| outgoing: true, | |
| incoming: false, | |
| }); | |
| return { messageHandler, eventFilter }; | |
| }; | |
| // Helper function to create a scrapeLinks task | |
| const createScrapeLinksTask = (clientInfo: any) => { | |
| const messageHandler = async (event: NewMessageEvent) => { | |
| const message = event.message; | |
| const links = message.message.match(/https?:\/\/[^\s]+/g); | |
| if (links) { | |
| await sendTelegramMessage(clientInfo.client, message.sender, `Found links: ${links.join(", ")}`); | |
| } | |
| }; | |
| const eventFilter = new NewMessage({ | |
| outgoing: false, | |
| incoming: true, | |
| }); | |
| return { messageHandler, eventFilter }; | |
| }; | |
| export const handleCreateTask = async ( | |
| phoneNumber: string, | |
| taskType: API.TaskType, | |
| taskData?: any | |
| ) => { | |
| if (!phoneNumber) { | |
| return { status: 400, data: { error: "Phone number is required" } }; | |
| } | |
| try { | |
| const clientInfo = telegramClients.get(phoneNumber); | |
| if (!clientInfo) { | |
| return { status: 404, data: { error: "Client not found for this phone number" } }; | |
| } | |
| // Use the task ID from taskData instead of generating a new one | |
| const taskId = taskData.id; | |
| if (!taskId) { | |
| return { status: 400, data: { error: "Task ID is required" } }; | |
| } | |
| let messageHandler: (event: NewMessageEvent) => void; | |
| let eventFilter: NewMessage; | |
| switch (taskType) { | |
| case "quickMessage": | |
| ({ messageHandler, eventFilter } = createQuickMessageTask(clientInfo)); | |
| break; | |
| case "monitor": | |
| if ( | |
| !taskData?.receiverUsernames | |
| ) { | |
| return { status: 400, data: { error: "Missing required task data for monitor task" } }; | |
| } | |
| ({ messageHandler, eventFilter } = await createMonitorTask( | |
| clientInfo, | |
| taskData.receiverUsernames | |
| )); | |
| break; | |
| case "joinGroup": | |
| ({ messageHandler, eventFilter } = createJoinGroupTask(clientInfo)); | |
| break; | |
| case "scrapeLinks": | |
| ({ messageHandler, eventFilter } = createScrapeLinksTask(clientInfo)); | |
| break; | |
| default: | |
| return { status: 400, data: { error: "Invalid task type" } }; | |
| } | |
| clientInfo.client.addEventHandler(messageHandler, eventFilter); | |
| activeTasks.set(taskId, { | |
| client: clientInfo.client, | |
| taskData:taskData, | |
| taskType: taskType, | |
| listeners: new Map([[phoneNumber, { handler: messageHandler, filter: eventFilter }]]), | |
| }); | |
| telegramClients.set(phoneNumber, { | |
| ...clientInfo, | |
| lastActivity: new Date(), | |
| }); | |
| return { | |
| status: 200, | |
| data: { | |
| message: `Task of type '${taskType}' created successfully`, | |
| taskId, | |
| monitoring: taskType === "monitor" ? "Monitoring incoming messages" : "ME (your own messages)", | |
| }, | |
| }; | |
| } catch (error: any) { | |
| console.error("Task creation error:", error); | |
| return { status: 500, data: { error: "Failed to create task", details: error.message } }; | |
| } | |
| }; | |
| export const handleCancelTask = async (taskId: string) => { | |
| const task = activeTasks.get(taskId); | |
| if (!task) { | |
| return { status: 404, data: { error: "Task not found" } }; | |
| } | |
| try { | |
| for (const { handler, filter } of task.listeners.values()) { | |
| task.client.removeEventHandler(handler, filter); | |
| } | |
| activeTasks.delete(taskId); | |
| return { status: 200, data: { message: "Task canceled successfully" } }; | |
| } catch (error: any) { | |
| console.error("Task cancellation error:", error); | |
| return { status: 500, data: { error: "Failed to cancel task" } }; | |
| } | |
| }; | |
| export const handleSendMessage = async (botToken: string, chatId: string, text: string) => { | |
| const bot = telegrafBots.get(botToken); | |
| if (!bot) { | |
| return { status: 404, data: { error: "Telegraf bot not found" } }; | |
| } | |
| try { | |
| await bot.telegram.sendMessage(chatId, text); | |
| return { status: 200, data: { message: "Message sent successfully" } }; | |
| } catch (error: any) { | |
| return { status: 500, data: { error: error.message } }; | |
| } | |
| }; | |
| export const startBots = async (): Promise<void> => { | |
| try { | |
| // Fetch personal accounts data | |
| const accountsResponse = await fetchDataFromTable("personal_accounts_telegram", 1000, 0); | |
| console.log("Accounts response:", accountsResponse.data); | |
| // Fetch tasks data | |
| const tasksResponse = await fetchDataFromTable("tasks", 1000, 0); | |
| console.log("Tasks response:", tasksResponse.data); | |
| for (const account of accountsResponse.data as API.PersonalAccount[]) { | |
| if (account.isActive && account.session) { | |
| try { | |
| // Login to Telegram | |
| const loginResponse = await handleLoginWithSession( | |
| account.apiKey, | |
| account.hash, | |
| account.session, | |
| account.phoneNumber | |
| ); | |
| if (!loginResponse || loginResponse.status !== 200) { | |
| console.error(`Failed to login account ${account.phoneNumber}:`, loginResponse?.data?.error); | |
| continue; // Skip this account but continue with others | |
| } | |
| // Create tasks from the tasks table | |
| for (const task of tasksResponse.data as API.Task[]) { | |
| if (task.isActive && task.personalAccountId === account.id) { | |
| try { | |
| // Update task state to running | |
| await updateDataInTable('tasks', { | |
| id: task.id, | |
| state: 'active', | |
| updated_at: new Date().toISOString() | |
| }).catch(err => { | |
| console.error(`Failed to update task state for ${task.id}:`, err); | |
| // Continue despite update error | |
| }); | |
| const taskDataWithId = { | |
| ...task.taskData, | |
| id: task.id | |
| }; | |
| const taskResponse = await handleCreateTask( | |
| account.phoneNumber, | |
| task.taskType, | |
| taskDataWithId | |
| ); | |
| console.log(`${task.taskType} task response for ${account.phoneNumber}:`, taskResponse); | |
| if (taskResponse.status !== 200) { | |
| console.error(`Task creation failed for ${task.id}:`, taskResponse.data.error); | |
| // Update task state to failed but don't throw error | |
| await updateDataInTable('tasks', { | |
| id: task.id, | |
| state: 'failed', | |
| error_message: taskResponse.data.error, | |
| updated_at: new Date().toISOString() | |
| }).catch(err => console.error(`Failed to update error state for ${task.id}:`, err)); | |
| continue; // Skip to next task | |
| } | |
| } catch (error) { | |
| console.error(`Failed to process task ${task.id} for ${account.phoneNumber}:`, error); | |
| // Try to update task state but continue regardless | |
| try { | |
| await updateDataInTable('tasks', { | |
| id: task.id, | |
| state: 'failed', | |
| error_message: error.message, | |
| updated_at: new Date().toISOString() | |
| }); | |
| } catch (updateError) { | |
| console.error(`Failed to update error state for ${task.id}:`, updateError); | |
| } | |
| continue; // Skip to next task | |
| } | |
| } | |
| } | |
| } catch (accountError) { | |
| console.error(`Error processing account ${account.phoneNumber}:`, accountError); | |
| continue; // Skip to next account | |
| } | |
| } | |
| } | |
| console.log("Completed processing all accounts and tasks"); | |
| return; | |
| } catch (error: any) { | |
| console.error("Error in startBots:", error.message); | |
| // Don't throw the error, just log it | |
| return; | |
| } | |
| } | |
| // Add new service functions for stopping and restarting bots | |
| export const stopBots = async (): Promise<void> => { | |
| try { | |
| // Stop all active tasks | |
| for (const [taskId, task] of activeTasks.entries()) { | |
| // Stop task handlers | |
| for (const [phoneNumber, listener] of task.listeners.entries()) { | |
| task.client.removeEventHandler(listener.handler, listener.filter); | |
| } | |
| // Update task state in database | |
| await supabase | |
| .from('tasks') | |
| .update({ | |
| state: 'stopped', | |
| updated_at: new Date().toISOString() | |
| }) | |
| .eq('id', taskId); | |
| activeTasks.delete(taskId); | |
| } | |
| // Disconnect all telegram clients | |
| for (const [phoneNumber, clientInfo] of telegramClients.entries()) { | |
| await clientInfo.client.disconnect(); | |
| telegramClients.delete(phoneNumber); | |
| } | |
| // Stop all telegraf bots | |
| for (const [botToken, bot] of telegrafBots.entries()) { | |
| await bot.stop(); | |
| telegrafBots.delete(botToken); | |
| } | |
| console.log("All bots stopped successfully"); | |
| } catch (error: any) { | |
| console.error("Error stopping bots:", error.message); | |
| throw error; | |
| } | |
| }; | |
| export const restartBots = async (): Promise<void> => { | |
| try { | |
| // Stop all bots first | |
| await stopBots(); | |
| // Start bots again | |
| await startBots(); | |
| console.log("All bots restarted successfully"); | |
| } catch (error: any) { | |
| console.error("Error restarting bots:", error.message); | |
| throw error; | |
| } | |
| }; | |
| export const getBotsState = async () => { | |
| try { | |
| // Check if there are any active telegram clients or telegraf bots | |
| const hasActiveClients = telegramClients.size > 0; | |
| const hasActiveBots = telegrafBots.size > 0; | |
| const activeTasksCount = activeTasks.size; | |
| return { | |
| state: hasActiveClients || hasActiveBots ? 'running' : 'stopped', | |
| details: { | |
| activeClients: telegramClients.size, | |
| activeBots: telegrafBots.size, | |
| activeTasks: activeTasksCount, | |
| } | |
| }; | |
| } catch (error: any) { | |
| console.error("Error getting bots state:", error.message); | |
| throw error; | |
| } | |
| }; | |
| export const setupTaskWatcher = () => { | |
| const subscription = supabase | |
| .channel('tasks-changes') | |
| .on( | |
| 'postgres_changes', | |
| { | |
| event: '*', | |
| schema: 'public', | |
| table: 'tasks' | |
| }, | |
| async (payload) => { | |
| const { eventType, new: newRecord, old: oldRecord } = payload; | |
| switch (eventType) { | |
| case 'UPDATE': | |
| await handleTaskUpdate(newRecord, oldRecord); | |
| break; | |
| case 'DELETE': | |
| await handleTaskDelete(oldRecord); | |
| break; | |
| } | |
| } | |
| ) | |
| .subscribe(); | |
| return subscription; | |
| }; | |
| async function handleTaskUpdate(newTask: any, oldTask: any) { | |
| try { | |
| // Check if task data or state has changed | |
| const taskDataChanged = JSON.stringify(newTask.taskData) !== JSON.stringify(oldTask.taskData); | |
| const stateChanged = newTask.state !== oldTask.state; | |
| if (taskDataChanged && newTask.state === 'active') { | |
| // Stop existing task | |
| await stopTaskProcess(newTask.id); | |
| // Restart with new configuration | |
| await startTaskProcess(newTask); | |
| } else if (stateChanged) { | |
| if (newTask.state === 'active') { | |
| await startTaskProcess(newTask); | |
| } else if (newTask.state === 'stopped' || newTask.state === 'failed') { | |
| await stopTaskProcess(newTask.id); | |
| } | |
| } | |
| } catch (error) { | |
| console.error('Error handling task update:', error); | |
| // Update task state to failed if there's an error | |
| await supabase | |
| .from('tasks') | |
| .update({ state: 'failed' }) | |
| .eq('id', newTask.id); | |
| } | |
| } | |
| async function handleTaskDelete(task: any) { | |
| try { | |
| // Stop and cleanup any running processes | |
| await stopTaskProcess(task.id); | |
| } catch (error) { | |
| console.error('Error handling task deletion:', error); | |
| } | |
| } | |
| async function startTaskProcess(task: any) { | |
| // Stop existing task if running | |
| await stopTaskProcess(task.id); | |
| try { | |
| let taskHandler: any; | |
| switch (task.taskType) { | |
| case 'monitor': | |
| taskHandler = await setupMonitorTask(task); | |
| break; | |
| case 'quickMessage': | |
| taskHandler = await setupMessageTask(task); | |
| break; | |
| // Add other task types as needed | |
| } | |
| if (taskHandler) { | |
| activeTaskHandlers.set(task.id, taskHandler); | |
| } | |
| } catch (error) { | |
| console.error(`Error starting task ${task.id}:`, error); | |
| await supabase | |
| .from('tasks') | |
| .update({ state: 'failed' }) | |
| .eq('id', task.id); | |
| } | |
| } | |
| async function stopTaskProcess(taskId: string) { | |
| const handler = activeTaskHandlers.get(taskId); | |
| if (handler) { | |
| if (handler.interval) { | |
| clearInterval(handler.interval); | |
| } | |
| if (handler.cleanup) { | |
| await handler.cleanup(); | |
| } | |
| activeTaskHandlers.delete(taskId); | |
| } | |
| } | |
| // | |
| // Example message task setup | |
| async function setupMessageTask(task: any) { | |
| // Implement message task logic | |
| // Return handler with cleanup function | |
| } | |
| // Initialize task watcher when server starts | |
| export function initializeTaskSystem() { | |
| // Setup realtime subscription | |
| const subscription = setupTaskWatcher(); | |
| // Load and start existing active tasks | |
| loadExistingTasks(); | |
| return { | |
| cleanup: () => { | |
| subscription.unsubscribe(); | |
| // Stop all active tasks | |
| for (const taskId of activeTaskHandlers.keys()) { | |
| stopTaskProcess(taskId); | |
| } | |
| } | |
| }; | |
| } | |
| async function loadExistingTasks() { | |
| try { | |
| const { data: activeTasks, error } = await supabase | |
| .from('tasks') | |
| .select('*') | |
| .eq('state', 'active'); | |
| if (error) throw error; | |
| // Start all active tasks | |
| for (const task of activeTasks) { | |
| await startTaskProcess(task); | |
| } | |
| } catch (error) { | |
| console.error('Error loading existing tasks:', error); | |
| } | |
| } | |
| // Add this to your server initialization | |
| export function setupTaskSystem(app) { | |
| const taskSystem = initializeTaskSystem(); | |
| // Cleanup on server shutdown | |
| process.on('SIGTERM', () => { | |
| taskSystem.cleanup(); | |
| }); | |
| process.on('SIGINT', () => { | |
| taskSystem.cleanup(); | |
| }); | |
| } |