import { TelegramClient } from "telegram"; import { StringSession } from "telegram/sessions"; import { Telegraf } from "telegraf"; import { Api } from "telegram"; import { NewMessage, NewMessageEvent } from "telegram/events"; import { telegramClients, pendingAuthRequests, activeTasks, telegrafBots } from "./models"; import { getBlockKeywords, getKeywords, normalizeArabicText, sendMessage as sendTelegramMessage, sendReplayTelegramMessage } from "./utils"; import { COMMANDS, MESSAGES } from "./config"; // Import the COMMANDS and MESSAGES objects import { fetchDataFromTable, updateDataInTable } from "./db/supabaseHelper"; import { supabase } from './db/supabase'; import { Request, Response } from 'express'; // Map to store active task handlers/intervals const activeTaskHandlers = new Map(); export const handleSubmitCode = async (phoneNumber: string, code: string) => { if (!phoneNumber || !code) { return { status: 400, data: { error: "Phone number and code required" } }; } const authRequest = pendingAuthRequests.get(phoneNumber); if (!authRequest) { return { status: 400, data: { error: "No active verification session" } }; } try { await authRequest.client.invoke( new Api.auth.SignIn({ phoneNumber, phoneCodeHash: authRequest.phoneCodeHash, phoneCode: code, }) ); const me = await authRequest.client.getMe(); const sessionString = authRequest.client.session.save() as unknown as string; telegramClients.set(phoneNumber, { client: authRequest.client, lastActivity: new Date(), username: me.username, }); pendingAuthRequests.delete(phoneNumber); return { status: 200, data: { message: "Authentication successful", session: sessionString, phoneNumber: phoneNumber, }, }; } catch (error: any) { console.error("Sign-in error:", error); pendingAuthRequests.delete(phoneNumber); if (error.errorMessage === "PHONE_CODE_INVALID") { return { status: 400, data: { error: "Invalid verification code" } }; } else { return { status: 500, data: { error: error.message } }; } } }; export const handleLoginWithSession = async ( apiKey: string, hash: string, session: string, phoneNumber: string ) => { if (!apiKey || !hash || !session || !phoneNumber) { return { status: 400, data: { error: "API ID, Hash, Session, and Phone Number are required" }, }; } const apiId = Number(apiKey); if (isNaN(apiId)) { return { status: 400, data: { error: "Invalid API ID format" } }; } try { const client = new TelegramClient(new StringSession(session), apiId, hash, { connectionRetries: 3, }); await client.connect(); const me = await client.getMe(); telegramClients.set(phoneNumber, { client, lastActivity: new Date(), username: me.username, }); return { status: 200, data: { message: "Session login successful", phoneNumber: phoneNumber }, }; } catch (error: any) { console.error("Session login error:", error); return { status: 401, data: { error: "Session login failed", details: error.message }, }; } }; export const handleRequestCode = async ( apiKey: string, hash: string, phoneNumber: string ) => { if (!apiKey || !hash || !phoneNumber) { return { status: 400, data: { error: "API ID, Hash, and Phone Number are required. Received: " + JSON.stringify({ apiKey, hash, phoneNumber }), }, }; } const apiId = Number(apiKey); if (isNaN(apiId)) { return { status: 400, data: { error: "Invalid API ID format" } }; } try { const client = new TelegramClient(new StringSession(), apiId, hash, { connectionRetries: 5, }); await client.connect(); const { phoneCodeHash } = await client.sendCode( { apiId, apiHash: hash }, phoneNumber, false ); pendingAuthRequests.set(phoneNumber, { client, phoneCodeHash }); return { status: 200, data: { message: "Verification code sent successfully" }, }; } catch (error: any) { console.error("Request code error:", error); return { status: 500, data: { error: error.message } }; } }; // export const handleAddTelegrafBot = async (botToken: string) => { // if (!botToken) { // return { status: 400, data: { error: "Bot Token is required" } }; // } // try { // const bot = new Telegraf(botToken); // telegrafBots.set(botToken, bot); // bot.launch(); // console.log("Bot launched successfully"); // return { // status: 200, // data: { message: "Telegraf bot started successfully" }, // }; // } catch (error: any) { // console.error("Error launching bot:", error); // return { status: 500, data: { error: "Failed to launch bot: " + error.message } }; // } // }; export const createQuickMessageTask = (clientInfo: any) => { const messageHandler = async (event: NewMessageEvent) => { const message = event.message; if (!message.out || !message.message.startsWith(COMMANDS.GO)) return; const repliedMessage = await message.getReplyMessage(); if (!repliedMessage) { try { await sendTelegramMessage(clientInfo.client, message.sender, "لم يتم العثور على الرسالة المُرَد عليها."); } catch (error) { console.error("Error sending message:", error); } return; } const sender = await repliedMessage.sender; if (!repliedMessage.fwdFrom || !repliedMessage.fwdFrom.fromId) { try { await sendTelegramMessage(clientInfo.client, sender, "المرسل مخفي"); } catch (error) { console.error("Error sending message:", error); } return; } const originalSenderId = repliedMessage.fwdFrom?.fromId?.userId?.value; if (!originalSenderId) { try { await sendTelegramMessage(clientInfo.client, sender, "تعذر تحديد المرسل الأصلي."); } catch (error) { console.error("Error sending message:", error); } return; } const originalSenderEntity = await clientInfo.client.getEntity(originalSenderId); try { await sendTelegramMessage(clientInfo.client, originalSenderEntity, repliedMessage.message); await sendTelegramMessage(clientInfo.client, originalSenderEntity, MESSAGES.FORWARD_SUCCESS); await sendReplayTelegramMessage(clientInfo.client, sender, "تم التوجيه بنجاح", repliedMessage.id); } catch (error) { await sendReplayTelegramMessage(clientInfo.client, sender, " حدث خطأ أثناء التوجيه, يبدو أن حسابك محظور مؤقتا", repliedMessage.id); console.error("Error sending message:", error); } // Fetch dialog filters (folders) let filters; try { const response = await clientInfo.client.invoke(new Api.messages.GetDialogFilters()); // Ensure filters is an array filters = Array.isArray(response.filters) ? response.filters : []; } catch (error) { console.error("Error fetching dialog filters:", error); filters = []; } console.log("Existing filters:", filters); // Debugging: Log the filters // Check if the chat is already in a folder const existingFilter = filters.find((filter: any) => filter.includePeers?.some((peer: any) => peer.userId?.value === originalSenderEntity.id ) ); if (existingFilter) { console.log("Chat is already in a folder. Skipping folder creation."); return; } // Find a folder with space (less than 100 chats) const folderWithSpace = filters.find((filter: any) => filter.includePeers?.length < 100); if (folderWithSpace) { // Add the chat to the existing folder folderWithSpace.includePeers.push(new Api.InputPeerUser({ userId: originalSenderEntity.id, accessHash: originalSenderEntity.accessHash })); try { await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ id: folderWithSpace.id, filter: folderWithSpace, })); console.log("Chat added to existing folder with ID:", folderWithSpace.id); } catch (error) { console.error("Error updating existing folder:", error); } } else { // No folder with space found, create a new folder const existingIds = Array.isArray(filters) ? filters.map((filter: any) => filter.id) : []; let newFilterId = 4; // Start with a higher ID to avoid conflicts while (existingIds.includes(newFilterId)) { newFilterId++; } // Ensure the new ID is within the valid range (1-255) if (newFilterId > 255) { console.error("Cannot create new filter: Maximum number of filters reached (255)."); return; } // Create a new folder and add the chat const newFilter = new Api.DialogFilter({ id: newFilterId, title: "M" + Math.floor(Math.random() * 100), includePeers: [new Api.InputPeerUser({ userId: originalSenderEntity.id, accessHash: originalSenderEntity.accessHash })], excludePeers: [], contacts: false, nonContacts: false, groups: false, broadcasts: false, bots: false, excludeMuted: false, excludeRead: false, excludeArchived: false, pinnedPeers: [], }); console.log("New filter object:", newFilter); // Debugging: Log the new filter try { await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({ id: newFilterId, filter: newFilter, })); console.log("New filter created successfully with ID:", newFilterId); } catch (error) { console.error("Error creating new dialog filter:", error); } } }; const eventFilter = new NewMessage({ outgoing: true, incoming: false, }); return { messageHandler, eventFilter }; }; // Helper function to create a monitor task const createMonitorTask = async ( clientInfo: any, forwardToUsernames: string[] ) => { const normalizedKeywordList = await getKeywords(); const normalizedBlockKeywordList = await getBlockKeywords(); console.log("Normalized Keywords:", normalizedKeywordList); console.log("Normalized Block Keywords:", normalizedBlockKeywordList); const messageHandler = async (event: NewMessageEvent) => { const message = event.message; // Ensure the message is from a group and contains text if (!message.message || !message.isGroup) return; // Convert the message text to lowercase for case-insensitive comparison const messageText = normalizeArabicText(message.message.toLowerCase()); // Check if the message contains any keyword from the normalizedKeywordList const containsKeyword = normalizedKeywordList.some((keyword) => messageText.includes(keyword.toLowerCase()) ); // Check if the message does NOT contain any keyword from the normalizedBlockKeywordList const doesNotContainBlockKeyword = !normalizedBlockKeywordList.some((keyword) => messageText.includes(keyword.toLowerCase()) ); // If the message meets the conditions, forward it to the specified users if (containsKeyword && doesNotContainBlockKeyword) { try { // Forward the message to each user in the forwardToUsernames list for (const username of forwardToUsernames) { await clientInfo.client.forwardMessages(username, { messages: [message.id], fromPeer: message.peerId, }); // console.log(`Forwarded message to ${username}`); // } else { // console.error(`User ${username} not found`); // } } } catch (error: any) { console.error("Error forwarding message:", error.message); } } }; const eventFilter = new NewMessage({ outgoing: false, // Monitor incoming messages only incoming: true, }); return { messageHandler, eventFilter }; }; // Helper function to create a joinGroup task const createJoinGroupTask = (clientInfo: any) => { const messageHandler = async (event: NewMessageEvent) => { const message = event.message; if (message.message.startsWith("./join")) { const groupLink = message.message.split("./join")[1]?.trim(); if (groupLink) { try { await clientInfo.client.invoke(new Api.messages.ImportChatInvite({ hash: groupLink })); await sendTelegramMessage(clientInfo.client, message.sender, `Joined group: ${groupLink}`); } catch (error: any) { await sendTelegramMessage(clientInfo.client, message.sender, `Failed to join group: ${error.message}`); } } } }; const eventFilter = new NewMessage({ outgoing: true, incoming: false, }); return { messageHandler, eventFilter }; }; // Helper function to create a scrapeLinks task const createScrapeLinksTask = (clientInfo: any) => { const messageHandler = async (event: NewMessageEvent) => { const message = event.message; const links = message.message.match(/https?:\/\/[^\s]+/g); if (links) { await sendTelegramMessage(clientInfo.client, message.sender, `Found links: ${links.join(", ")}`); } }; const eventFilter = new NewMessage({ outgoing: false, incoming: true, }); return { messageHandler, eventFilter }; }; export const handleCreateTask = async ( phoneNumber: string, taskType: API.TaskType, taskData?: any ) => { if (!phoneNumber) { return { status: 400, data: { error: "Phone number is required" } }; } try { const clientInfo = telegramClients.get(phoneNumber); if (!clientInfo) { return { status: 404, data: { error: "Client not found for this phone number" } }; } // Use the task ID from taskData instead of generating a new one const taskId = taskData.id; if (!taskId) { return { status: 400, data: { error: "Task ID is required" } }; } let messageHandler: (event: NewMessageEvent) => void; let eventFilter: NewMessage; switch (taskType) { case "quickMessage": ({ messageHandler, eventFilter } = createQuickMessageTask(clientInfo)); break; case "monitor": if ( !taskData?.receiverUsernames ) { return { status: 400, data: { error: "Missing required task data for monitor task" } }; } ({ messageHandler, eventFilter } = await createMonitorTask( clientInfo, taskData.receiverUsernames )); break; case "joinGroup": ({ messageHandler, eventFilter } = createJoinGroupTask(clientInfo)); break; case "scrapeLinks": ({ messageHandler, eventFilter } = createScrapeLinksTask(clientInfo)); break; default: return { status: 400, data: { error: "Invalid task type" } }; } clientInfo.client.addEventHandler(messageHandler, eventFilter); activeTasks.set(taskId, { client: clientInfo.client, taskData:taskData, taskType: taskType, listeners: new Map([[phoneNumber, { handler: messageHandler, filter: eventFilter }]]), }); telegramClients.set(phoneNumber, { ...clientInfo, lastActivity: new Date(), }); return { status: 200, data: { message: `Task of type '${taskType}' created successfully`, taskId, monitoring: taskType === "monitor" ? "Monitoring incoming messages" : "ME (your own messages)", }, }; } catch (error: any) { console.error("Task creation error:", error); return { status: 500, data: { error: "Failed to create task", details: error.message } }; } }; export const handleCancelTask = async (taskId: string) => { const task = activeTasks.get(taskId); if (!task) { return { status: 404, data: { error: "Task not found" } }; } try { for (const { handler, filter } of task.listeners.values()) { task.client.removeEventHandler(handler, filter); } activeTasks.delete(taskId); return { status: 200, data: { message: "Task canceled successfully" } }; } catch (error: any) { console.error("Task cancellation error:", error); return { status: 500, data: { error: "Failed to cancel task" } }; } }; export const handleSendMessage = async (botToken: string, chatId: string, text: string) => { const bot = telegrafBots.get(botToken); if (!bot) { return { status: 404, data: { error: "Telegraf bot not found" } }; } try { await bot.telegram.sendMessage(chatId, text); return { status: 200, data: { message: "Message sent successfully" } }; } catch (error: any) { return { status: 500, data: { error: error.message } }; } }; export const startBots = async (): Promise => { try { // Fetch personal accounts data const accountsResponse = await fetchDataFromTable("personal_accounts_telegram", 1000, 0); console.log("Accounts response:", accountsResponse.data); // Fetch tasks data const tasksResponse = await fetchDataFromTable("tasks", 1000, 0); console.log("Tasks response:", tasksResponse.data); for (const account of accountsResponse.data as API.PersonalAccount[]) { if (account.isActive && account.session) { try { // Login to Telegram const loginResponse = await handleLoginWithSession( account.apiKey, account.hash, account.session, account.phoneNumber ); if (!loginResponse || loginResponse.status !== 200) { console.error(`Failed to login account ${account.phoneNumber}:`, loginResponse?.data?.error); continue; // Skip this account but continue with others } // Create tasks from the tasks table for (const task of tasksResponse.data as API.Task[]) { if (task.isActive && task.personalAccountId === account.id) { try { // Update task state to running await updateDataInTable('tasks', { id: task.id, state: 'active', updated_at: new Date().toISOString() }).catch(err => { console.error(`Failed to update task state for ${task.id}:`, err); // Continue despite update error }); const taskDataWithId = { ...task.taskData, id: task.id }; const taskResponse = await handleCreateTask( account.phoneNumber, task.taskType, taskDataWithId ); console.log(`${task.taskType} task response for ${account.phoneNumber}:`, taskResponse); if (taskResponse.status !== 200) { console.error(`Task creation failed for ${task.id}:`, taskResponse.data.error); // Update task state to failed but don't throw error await updateDataInTable('tasks', { id: task.id, state: 'failed', error_message: taskResponse.data.error, updated_at: new Date().toISOString() }).catch(err => console.error(`Failed to update error state for ${task.id}:`, err)); continue; // Skip to next task } } catch (error) { console.error(`Failed to process task ${task.id} for ${account.phoneNumber}:`, error); // Try to update task state but continue regardless try { await updateDataInTable('tasks', { id: task.id, state: 'failed', error_message: error.message, updated_at: new Date().toISOString() }); } catch (updateError) { console.error(`Failed to update error state for ${task.id}:`, updateError); } continue; // Skip to next task } } } } catch (accountError) { console.error(`Error processing account ${account.phoneNumber}:`, accountError); continue; // Skip to next account } } } console.log("Completed processing all accounts and tasks"); return; } catch (error: any) { console.error("Error in startBots:", error.message); // Don't throw the error, just log it return; } } // Add new service functions for stopping and restarting bots export const stopBots = async (): Promise => { try { // Stop all active tasks for (const [taskId, task] of activeTasks.entries()) { // Stop task handlers for (const [phoneNumber, listener] of task.listeners.entries()) { task.client.removeEventHandler(listener.handler, listener.filter); } // Update task state in database await supabase .from('tasks') .update({ state: 'stopped', updated_at: new Date().toISOString() }) .eq('id', taskId); activeTasks.delete(taskId); } // Disconnect all telegram clients for (const [phoneNumber, clientInfo] of telegramClients.entries()) { await clientInfo.client.disconnect(); telegramClients.delete(phoneNumber); } // Stop all telegraf bots for (const [botToken, bot] of telegrafBots.entries()) { await bot.stop(); telegrafBots.delete(botToken); } console.log("All bots stopped successfully"); } catch (error: any) { console.error("Error stopping bots:", error.message); throw error; } }; export const restartBots = async (): Promise => { try { // Stop all bots first await stopBots(); // Start bots again await startBots(); console.log("All bots restarted successfully"); } catch (error: any) { console.error("Error restarting bots:", error.message); throw error; } }; export const getBotsState = async () => { try { // Check if there are any active telegram clients or telegraf bots const hasActiveClients = telegramClients.size > 0; const hasActiveBots = telegrafBots.size > 0; const activeTasksCount = activeTasks.size; return { state: hasActiveClients || hasActiveBots ? 'running' : 'stopped', details: { activeClients: telegramClients.size, activeBots: telegrafBots.size, activeTasks: activeTasksCount, } }; } catch (error: any) { console.error("Error getting bots state:", error.message); throw error; } }; export const setupTaskWatcher = () => { const subscription = supabase .channel('tasks-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'tasks' }, async (payload) => { const { eventType, new: newRecord, old: oldRecord } = payload; switch (eventType) { case 'UPDATE': await handleTaskUpdate(newRecord, oldRecord); break; case 'DELETE': await handleTaskDelete(oldRecord); break; } } ) .subscribe(); return subscription; }; async function handleTaskUpdate(newTask: any, oldTask: any) { try { // Check if task data or state has changed const taskDataChanged = JSON.stringify(newTask.taskData) !== JSON.stringify(oldTask.taskData); const stateChanged = newTask.state !== oldTask.state; if (taskDataChanged && newTask.state === 'active') { // Stop existing task await stopTaskProcess(newTask.id); // Restart with new configuration await startTaskProcess(newTask); } else if (stateChanged) { if (newTask.state === 'active') { await startTaskProcess(newTask); } else if (newTask.state === 'stopped' || newTask.state === 'failed') { await stopTaskProcess(newTask.id); } } } catch (error) { console.error('Error handling task update:', error); // Update task state to failed if there's an error await supabase .from('tasks') .update({ state: 'failed' }) .eq('id', newTask.id); } } async function handleTaskDelete(task: any) { try { // Stop and cleanup any running processes await stopTaskProcess(task.id); } catch (error) { console.error('Error handling task deletion:', error); } } async function startTaskProcess(task: any) { // Stop existing task if running await stopTaskProcess(task.id); try { let taskHandler: any; switch (task.taskType) { case 'monitor': taskHandler = await setupMonitorTask(task); break; case 'quickMessage': taskHandler = await setupMessageTask(task); break; // Add other task types as needed } if (taskHandler) { activeTaskHandlers.set(task.id, taskHandler); } } catch (error) { console.error(`Error starting task ${task.id}:`, error); await supabase .from('tasks') .update({ state: 'failed' }) .eq('id', task.id); } } async function stopTaskProcess(taskId: string) { const handler = activeTaskHandlers.get(taskId); if (handler) { if (handler.interval) { clearInterval(handler.interval); } if (handler.cleanup) { await handler.cleanup(); } activeTaskHandlers.delete(taskId); } } // // Example message task setup async function setupMessageTask(task: any) { // Implement message task logic // Return handler with cleanup function } // Initialize task watcher when server starts export function initializeTaskSystem() { // Setup realtime subscription const subscription = setupTaskWatcher(); // Load and start existing active tasks loadExistingTasks(); return { cleanup: () => { subscription.unsubscribe(); // Stop all active tasks for (const taskId of activeTaskHandlers.keys()) { stopTaskProcess(taskId); } } }; } async function loadExistingTasks() { try { const { data: activeTasks, error } = await supabase .from('tasks') .select('*') .eq('state', 'active'); if (error) throw error; // Start all active tasks for (const task of activeTasks) { await startTaskProcess(task); } } catch (error) { console.error('Error loading existing tasks:', error); } } // Add this to your server initialization export function setupTaskSystem(app) { const taskSystem = initializeTaskSystem(); // Cleanup on server shutdown process.on('SIGTERM', () => { taskSystem.cleanup(); }); process.on('SIGINT', () => { taskSystem.cleanup(); }); }