bot-me / src /services.ts
Mohammed Foud
all
4857c79
import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { Telegraf } from "telegraf";
import { Api } from "telegram";
import { NewMessage, NewMessageEvent } from "telegram/events";
import { telegramClients, pendingAuthRequests, activeTasks, telegrafBots } from "./models";
import { getBlockKeywords, getKeywords, normalizeArabicText, sendMessage as sendTelegramMessage, sendReplayTelegramMessage } from "./utils";
import { COMMANDS, MESSAGES } from "./config"; // Import the COMMANDS and MESSAGES objects
import { fetchDataFromTable, updateDataInTable } from "./db/supabaseHelper";
import { supabase } from './db/supabase';
import { Request, Response } from 'express';
// Map to store active task handlers/intervals
const activeTaskHandlers = new Map();
export const handleSubmitCode = async (phoneNumber: string, code: string) => {
if (!phoneNumber || !code) {
return { status: 400, data: { error: "Phone number and code required" } };
}
const authRequest = pendingAuthRequests.get(phoneNumber);
if (!authRequest) {
return { status: 400, data: { error: "No active verification session" } };
}
try {
await authRequest.client.invoke(
new Api.auth.SignIn({
phoneNumber,
phoneCodeHash: authRequest.phoneCodeHash,
phoneCode: code,
})
);
const me = await authRequest.client.getMe();
const sessionString = authRequest.client.session.save() as unknown as string;
telegramClients.set(phoneNumber, {
client: authRequest.client,
lastActivity: new Date(),
username: me.username,
});
pendingAuthRequests.delete(phoneNumber);
return {
status: 200,
data: {
message: "Authentication successful",
session: sessionString,
phoneNumber: phoneNumber,
},
};
} catch (error: any) {
console.error("Sign-in error:", error);
pendingAuthRequests.delete(phoneNumber);
if (error.errorMessage === "PHONE_CODE_INVALID") {
return { status: 400, data: { error: "Invalid verification code" } };
} else {
return { status: 500, data: { error: error.message } };
}
}
};
export const handleLoginWithSession = async (
apiKey: string,
hash: string,
session: string,
phoneNumber: string
) => {
if (!apiKey || !hash || !session || !phoneNumber) {
return {
status: 400,
data: { error: "API ID, Hash, Session, and Phone Number are required" },
};
}
const apiId = Number(apiKey);
if (isNaN(apiId)) {
return { status: 400, data: { error: "Invalid API ID format" } };
}
try {
const client = new TelegramClient(new StringSession(session), apiId, hash, {
connectionRetries: 3,
});
await client.connect();
const me = await client.getMe();
telegramClients.set(phoneNumber, {
client,
lastActivity: new Date(),
username: me.username,
});
return {
status: 200,
data: { message: "Session login successful", phoneNumber: phoneNumber },
};
} catch (error: any) {
console.error("Session login error:", error);
return {
status: 401,
data: { error: "Session login failed", details: error.message },
};
}
};
export const handleRequestCode = async (
apiKey: string,
hash: string,
phoneNumber: string
) => {
if (!apiKey || !hash || !phoneNumber) {
return {
status: 400,
data: {
error:
"API ID, Hash, and Phone Number are required. Received: " +
JSON.stringify({ apiKey, hash, phoneNumber }),
},
};
}
const apiId = Number(apiKey);
if (isNaN(apiId)) {
return { status: 400, data: { error: "Invalid API ID format" } };
}
try {
const client = new TelegramClient(new StringSession(), apiId, hash, {
connectionRetries: 5,
});
await client.connect();
const { phoneCodeHash } = await client.sendCode(
{ apiId, apiHash: hash },
phoneNumber,
false
);
pendingAuthRequests.set(phoneNumber, { client, phoneCodeHash });
return {
status: 200,
data: { message: "Verification code sent successfully" },
};
} catch (error: any) {
console.error("Request code error:", error);
return { status: 500, data: { error: error.message } };
}
};
// export const handleAddTelegrafBot = async (botToken: string) => {
// if (!botToken) {
// return { status: 400, data: { error: "Bot Token is required" } };
// }
// try {
// const bot = new Telegraf(botToken);
// telegrafBots.set(botToken, bot);
// bot.launch();
// console.log("Bot launched successfully");
// return {
// status: 200,
// data: { message: "Telegraf bot started successfully" },
// };
// } catch (error: any) {
// console.error("Error launching bot:", error);
// return { status: 500, data: { error: "Failed to launch bot: " + error.message } };
// }
// };
export const createQuickMessageTask = (clientInfo: any) => {
const messageHandler = async (event: NewMessageEvent) => {
const message = event.message;
if (!message.out || !message.message.startsWith(COMMANDS.GO)) return;
const repliedMessage = await message.getReplyMessage();
if (!repliedMessage) {
try {
await sendTelegramMessage(clientInfo.client, message.sender, "لم يتم العثور على الرسالة المُرَد عليها.");
} catch (error) {
console.error("Error sending message:", error);
}
return;
}
const sender = await repliedMessage.sender;
if (!repliedMessage.fwdFrom || !repliedMessage.fwdFrom.fromId) {
try {
await sendTelegramMessage(clientInfo.client, sender, "المرسل مخفي");
} catch (error) {
console.error("Error sending message:", error);
}
return;
}
const originalSenderId = repliedMessage.fwdFrom?.fromId?.userId?.value;
if (!originalSenderId) {
try {
await sendTelegramMessage(clientInfo.client, sender, "تعذر تحديد المرسل الأصلي.");
} catch (error) {
console.error("Error sending message:", error);
}
return;
}
const originalSenderEntity = await clientInfo.client.getEntity(originalSenderId);
try {
await sendTelegramMessage(clientInfo.client, originalSenderEntity, repliedMessage.message);
await sendTelegramMessage(clientInfo.client, originalSenderEntity, MESSAGES.FORWARD_SUCCESS);
await sendReplayTelegramMessage(clientInfo.client, sender, "تم التوجيه بنجاح", repliedMessage.id);
} catch (error) {
await sendReplayTelegramMessage(clientInfo.client, sender, " حدث خطأ أثناء التوجيه, يبدو أن حسابك محظور مؤقتا", repliedMessage.id);
console.error("Error sending message:", error);
}
// Fetch dialog filters (folders)
let filters;
try {
const response = await clientInfo.client.invoke(new Api.messages.GetDialogFilters());
// Ensure filters is an array
filters = Array.isArray(response.filters) ? response.filters : [];
} catch (error) {
console.error("Error fetching dialog filters:", error);
filters = [];
}
console.log("Existing filters:", filters); // Debugging: Log the filters
// Check if the chat is already in a folder
const existingFilter = filters.find((filter: any) =>
filter.includePeers?.some((peer: any) =>
peer.userId?.value === originalSenderEntity.id
)
);
if (existingFilter) {
console.log("Chat is already in a folder. Skipping folder creation.");
return;
}
// Find a folder with space (less than 100 chats)
const folderWithSpace = filters.find((filter: any) => filter.includePeers?.length < 100);
if (folderWithSpace) {
// Add the chat to the existing folder
folderWithSpace.includePeers.push(new Api.InputPeerUser({
userId: originalSenderEntity.id,
accessHash: originalSenderEntity.accessHash
}));
try {
await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({
id: folderWithSpace.id,
filter: folderWithSpace,
}));
console.log("Chat added to existing folder with ID:", folderWithSpace.id);
} catch (error) {
console.error("Error updating existing folder:", error);
}
} else {
// No folder with space found, create a new folder
const existingIds = Array.isArray(filters) ? filters.map((filter: any) => filter.id) : [];
let newFilterId = 4; // Start with a higher ID to avoid conflicts
while (existingIds.includes(newFilterId)) {
newFilterId++;
}
// Ensure the new ID is within the valid range (1-255)
if (newFilterId > 255) {
console.error("Cannot create new filter: Maximum number of filters reached (255).");
return;
}
// Create a new folder and add the chat
const newFilter = new Api.DialogFilter({
id: newFilterId,
title: "M" + Math.floor(Math.random() * 100),
includePeers: [new Api.InputPeerUser({
userId: originalSenderEntity.id,
accessHash: originalSenderEntity.accessHash
})],
excludePeers: [],
contacts: false,
nonContacts: false,
groups: false,
broadcasts: false,
bots: false,
excludeMuted: false,
excludeRead: false,
excludeArchived: false,
pinnedPeers: [],
});
console.log("New filter object:", newFilter); // Debugging: Log the new filter
try {
await clientInfo.client.invoke(new Api.messages.UpdateDialogFilter({
id: newFilterId,
filter: newFilter,
}));
console.log("New filter created successfully with ID:", newFilterId);
} catch (error) {
console.error("Error creating new dialog filter:", error);
}
}
};
const eventFilter = new NewMessage({
outgoing: true,
incoming: false,
});
return { messageHandler, eventFilter };
};
// Helper function to create a monitor task
const createMonitorTask = async (
clientInfo: any,
forwardToUsernames: string[]
) => {
const normalizedKeywordList = await getKeywords();
const normalizedBlockKeywordList = await getBlockKeywords();
console.log("Normalized Keywords:", normalizedKeywordList);
console.log("Normalized Block Keywords:", normalizedBlockKeywordList);
const messageHandler = async (event: NewMessageEvent) => {
const message = event.message;
// Ensure the message is from a group and contains text
if (!message.message || !message.isGroup) return;
// Convert the message text to lowercase for case-insensitive comparison
const messageText = normalizeArabicText(message.message.toLowerCase());
// Check if the message contains any keyword from the normalizedKeywordList
const containsKeyword = normalizedKeywordList.some((keyword) =>
messageText.includes(keyword.toLowerCase())
);
// Check if the message does NOT contain any keyword from the normalizedBlockKeywordList
const doesNotContainBlockKeyword = !normalizedBlockKeywordList.some((keyword) =>
messageText.includes(keyword.toLowerCase())
);
// If the message meets the conditions, forward it to the specified users
if (containsKeyword && doesNotContainBlockKeyword) {
try {
// Forward the message to each user in the forwardToUsernames list
for (const username of forwardToUsernames) {
await clientInfo.client.forwardMessages(username, {
messages: [message.id],
fromPeer: message.peerId,
});
// console.log(`Forwarded message to ${username}`);
// } else {
// console.error(`User ${username} not found`);
// }
}
} catch (error: any) {
console.error("Error forwarding message:", error.message);
}
}
};
const eventFilter = new NewMessage({
outgoing: false, // Monitor incoming messages only
incoming: true,
});
return { messageHandler, eventFilter };
};
// Helper function to create a joinGroup task
const createJoinGroupTask = (clientInfo: any) => {
const messageHandler = async (event: NewMessageEvent) => {
const message = event.message;
if (message.message.startsWith("./join")) {
const groupLink = message.message.split("./join")[1]?.trim();
if (groupLink) {
try {
await clientInfo.client.invoke(new Api.messages.ImportChatInvite({ hash: groupLink }));
await sendTelegramMessage(clientInfo.client, message.sender, `Joined group: ${groupLink}`);
} catch (error: any) {
await sendTelegramMessage(clientInfo.client, message.sender, `Failed to join group: ${error.message}`);
}
}
}
};
const eventFilter = new NewMessage({
outgoing: true,
incoming: false,
});
return { messageHandler, eventFilter };
};
// Helper function to create a scrapeLinks task
const createScrapeLinksTask = (clientInfo: any) => {
const messageHandler = async (event: NewMessageEvent) => {
const message = event.message;
const links = message.message.match(/https?:\/\/[^\s]+/g);
if (links) {
await sendTelegramMessage(clientInfo.client, message.sender, `Found links: ${links.join(", ")}`);
}
};
const eventFilter = new NewMessage({
outgoing: false,
incoming: true,
});
return { messageHandler, eventFilter };
};
export const handleCreateTask = async (
phoneNumber: string,
taskType: API.TaskType,
taskData?: any
) => {
if (!phoneNumber) {
return { status: 400, data: { error: "Phone number is required" } };
}
try {
const clientInfo = telegramClients.get(phoneNumber);
if (!clientInfo) {
return { status: 404, data: { error: "Client not found for this phone number" } };
}
// Use the task ID from taskData instead of generating a new one
const taskId = taskData.id;
if (!taskId) {
return { status: 400, data: { error: "Task ID is required" } };
}
let messageHandler: (event: NewMessageEvent) => void;
let eventFilter: NewMessage;
switch (taskType) {
case "quickMessage":
({ messageHandler, eventFilter } = createQuickMessageTask(clientInfo));
break;
case "monitor":
if (
!taskData?.receiverUsernames
) {
return { status: 400, data: { error: "Missing required task data for monitor task" } };
}
({ messageHandler, eventFilter } = await createMonitorTask(
clientInfo,
taskData.receiverUsernames
));
break;
case "joinGroup":
({ messageHandler, eventFilter } = createJoinGroupTask(clientInfo));
break;
case "scrapeLinks":
({ messageHandler, eventFilter } = createScrapeLinksTask(clientInfo));
break;
default:
return { status: 400, data: { error: "Invalid task type" } };
}
clientInfo.client.addEventHandler(messageHandler, eventFilter);
activeTasks.set(taskId, {
client: clientInfo.client,
taskData:taskData,
taskType: taskType,
listeners: new Map([[phoneNumber, { handler: messageHandler, filter: eventFilter }]]),
});
telegramClients.set(phoneNumber, {
...clientInfo,
lastActivity: new Date(),
});
return {
status: 200,
data: {
message: `Task of type '${taskType}' created successfully`,
taskId,
monitoring: taskType === "monitor" ? "Monitoring incoming messages" : "ME (your own messages)",
},
};
} catch (error: any) {
console.error("Task creation error:", error);
return { status: 500, data: { error: "Failed to create task", details: error.message } };
}
};
export const handleCancelTask = async (taskId: string) => {
const task = activeTasks.get(taskId);
if (!task) {
return { status: 404, data: { error: "Task not found" } };
}
try {
for (const { handler, filter } of task.listeners.values()) {
task.client.removeEventHandler(handler, filter);
}
activeTasks.delete(taskId);
return { status: 200, data: { message: "Task canceled successfully" } };
} catch (error: any) {
console.error("Task cancellation error:", error);
return { status: 500, data: { error: "Failed to cancel task" } };
}
};
export const handleSendMessage = async (botToken: string, chatId: string, text: string) => {
const bot = telegrafBots.get(botToken);
if (!bot) {
return { status: 404, data: { error: "Telegraf bot not found" } };
}
try {
await bot.telegram.sendMessage(chatId, text);
return { status: 200, data: { message: "Message sent successfully" } };
} catch (error: any) {
return { status: 500, data: { error: error.message } };
}
};
export const startBots = async (): Promise<void> => {
try {
// Fetch personal accounts data
const accountsResponse = await fetchDataFromTable("personal_accounts_telegram", 1000, 0);
console.log("Accounts response:", accountsResponse.data);
// Fetch tasks data
const tasksResponse = await fetchDataFromTable("tasks", 1000, 0);
console.log("Tasks response:", tasksResponse.data);
for (const account of accountsResponse.data as API.PersonalAccount[]) {
if (account.isActive && account.session) {
try {
// Login to Telegram
const loginResponse = await handleLoginWithSession(
account.apiKey,
account.hash,
account.session,
account.phoneNumber
);
if (!loginResponse || loginResponse.status !== 200) {
console.error(`Failed to login account ${account.phoneNumber}:`, loginResponse?.data?.error);
continue; // Skip this account but continue with others
}
// Create tasks from the tasks table
for (const task of tasksResponse.data as API.Task[]) {
if (task.isActive && task.personalAccountId === account.id) {
try {
// Update task state to running
await updateDataInTable('tasks', {
id: task.id,
state: 'active',
updated_at: new Date().toISOString()
}).catch(err => {
console.error(`Failed to update task state for ${task.id}:`, err);
// Continue despite update error
});
const taskDataWithId = {
...task.taskData,
id: task.id
};
const taskResponse = await handleCreateTask(
account.phoneNumber,
task.taskType,
taskDataWithId
);
console.log(`${task.taskType} task response for ${account.phoneNumber}:`, taskResponse);
if (taskResponse.status !== 200) {
console.error(`Task creation failed for ${task.id}:`, taskResponse.data.error);
// Update task state to failed but don't throw error
await updateDataInTable('tasks', {
id: task.id,
state: 'failed',
error_message: taskResponse.data.error,
updated_at: new Date().toISOString()
}).catch(err => console.error(`Failed to update error state for ${task.id}:`, err));
continue; // Skip to next task
}
} catch (error) {
console.error(`Failed to process task ${task.id} for ${account.phoneNumber}:`, error);
// Try to update task state but continue regardless
try {
await updateDataInTable('tasks', {
id: task.id,
state: 'failed',
error_message: error.message,
updated_at: new Date().toISOString()
});
} catch (updateError) {
console.error(`Failed to update error state for ${task.id}:`, updateError);
}
continue; // Skip to next task
}
}
}
} catch (accountError) {
console.error(`Error processing account ${account.phoneNumber}:`, accountError);
continue; // Skip to next account
}
}
}
console.log("Completed processing all accounts and tasks");
return;
} catch (error: any) {
console.error("Error in startBots:", error.message);
// Don't throw the error, just log it
return;
}
}
// Add new service functions for stopping and restarting bots
export const stopBots = async (): Promise<void> => {
try {
// Stop all active tasks
for (const [taskId, task] of activeTasks.entries()) {
// Stop task handlers
for (const [phoneNumber, listener] of task.listeners.entries()) {
task.client.removeEventHandler(listener.handler, listener.filter);
}
// Update task state in database
await supabase
.from('tasks')
.update({
state: 'stopped',
updated_at: new Date().toISOString()
})
.eq('id', taskId);
activeTasks.delete(taskId);
}
// Disconnect all telegram clients
for (const [phoneNumber, clientInfo] of telegramClients.entries()) {
await clientInfo.client.disconnect();
telegramClients.delete(phoneNumber);
}
// Stop all telegraf bots
for (const [botToken, bot] of telegrafBots.entries()) {
await bot.stop();
telegrafBots.delete(botToken);
}
console.log("All bots stopped successfully");
} catch (error: any) {
console.error("Error stopping bots:", error.message);
throw error;
}
};
export const restartBots = async (): Promise<void> => {
try {
// Stop all bots first
await stopBots();
// Start bots again
await startBots();
console.log("All bots restarted successfully");
} catch (error: any) {
console.error("Error restarting bots:", error.message);
throw error;
}
};
export const getBotsState = async () => {
try {
// Check if there are any active telegram clients or telegraf bots
const hasActiveClients = telegramClients.size > 0;
const hasActiveBots = telegrafBots.size > 0;
const activeTasksCount = activeTasks.size;
return {
state: hasActiveClients || hasActiveBots ? 'running' : 'stopped',
details: {
activeClients: telegramClients.size,
activeBots: telegrafBots.size,
activeTasks: activeTasksCount,
}
};
} catch (error: any) {
console.error("Error getting bots state:", error.message);
throw error;
}
};
export const setupTaskWatcher = () => {
const subscription = supabase
.channel('tasks-changes')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'tasks'
},
async (payload) => {
const { eventType, new: newRecord, old: oldRecord } = payload;
switch (eventType) {
case 'UPDATE':
await handleTaskUpdate(newRecord, oldRecord);
break;
case 'DELETE':
await handleTaskDelete(oldRecord);
break;
}
}
)
.subscribe();
return subscription;
};
async function handleTaskUpdate(newTask: any, oldTask: any) {
try {
// Check if task data or state has changed
const taskDataChanged = JSON.stringify(newTask.taskData) !== JSON.stringify(oldTask.taskData);
const stateChanged = newTask.state !== oldTask.state;
if (taskDataChanged && newTask.state === 'active') {
// Stop existing task
await stopTaskProcess(newTask.id);
// Restart with new configuration
await startTaskProcess(newTask);
} else if (stateChanged) {
if (newTask.state === 'active') {
await startTaskProcess(newTask);
} else if (newTask.state === 'stopped' || newTask.state === 'failed') {
await stopTaskProcess(newTask.id);
}
}
} catch (error) {
console.error('Error handling task update:', error);
// Update task state to failed if there's an error
await supabase
.from('tasks')
.update({ state: 'failed' })
.eq('id', newTask.id);
}
}
async function handleTaskDelete(task: any) {
try {
// Stop and cleanup any running processes
await stopTaskProcess(task.id);
} catch (error) {
console.error('Error handling task deletion:', error);
}
}
async function startTaskProcess(task: any) {
// Stop existing task if running
await stopTaskProcess(task.id);
try {
let taskHandler: any;
switch (task.taskType) {
case 'monitor':
taskHandler = await setupMonitorTask(task);
break;
case 'quickMessage':
taskHandler = await setupMessageTask(task);
break;
// Add other task types as needed
}
if (taskHandler) {
activeTaskHandlers.set(task.id, taskHandler);
}
} catch (error) {
console.error(`Error starting task ${task.id}:`, error);
await supabase
.from('tasks')
.update({ state: 'failed' })
.eq('id', task.id);
}
}
async function stopTaskProcess(taskId: string) {
const handler = activeTaskHandlers.get(taskId);
if (handler) {
if (handler.interval) {
clearInterval(handler.interval);
}
if (handler.cleanup) {
await handler.cleanup();
}
activeTaskHandlers.delete(taskId);
}
}
//
// Example message task setup
async function setupMessageTask(task: any) {
// Implement message task logic
// Return handler with cleanup function
}
// Initialize task watcher when server starts
export function initializeTaskSystem() {
// Setup realtime subscription
const subscription = setupTaskWatcher();
// Load and start existing active tasks
loadExistingTasks();
return {
cleanup: () => {
subscription.unsubscribe();
// Stop all active tasks
for (const taskId of activeTaskHandlers.keys()) {
stopTaskProcess(taskId);
}
}
};
}
async function loadExistingTasks() {
try {
const { data: activeTasks, error } = await supabase
.from('tasks')
.select('*')
.eq('state', 'active');
if (error) throw error;
// Start all active tasks
for (const task of activeTasks) {
await startTaskProcess(task);
}
} catch (error) {
console.error('Error loading existing tasks:', error);
}
}
// Add this to your server initialization
export function setupTaskSystem(app) {
const taskSystem = initializeTaskSystem();
// Cleanup on server shutdown
process.on('SIGTERM', () => {
taskSystem.cleanup();
});
process.on('SIGINT', () => {
taskSystem.cleanup();
});
}