|
|
""" |
|
|
Telegram API Tool for EvoAgentX |
|
|
|
|
|
This module provides comprehensive Telegram integration including: |
|
|
- Message retrieval and search |
|
|
- Sending messages and scheduling |
|
|
- Chat management and file operations |
|
|
|
|
|
Compatible with EvoAgentX tool architecture and follows the latest Telegram API patterns. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import time |
|
|
from typing import Dict, Any, List |
|
|
from telethon import TelegramClient |
|
|
from telethon.tl.types import Message, User, Chat, Channel |
|
|
from telethon.errors import ( |
|
|
FloodWaitError, |
|
|
ChatAdminRequiredError, |
|
|
UserBannedInChannelError, |
|
|
ChannelPrivateError, |
|
|
UserNotParticipantError, |
|
|
ChatWriteForbiddenError, |
|
|
MessageEmptyError, |
|
|
MessageTooLongError |
|
|
) |
|
|
from dotenv import load_dotenv |
|
|
import PyPDF2 |
|
|
|
|
|
from .tool import Tool, Toolkit |
|
|
from ..core.module import BaseModule |
|
|
from ..core.logging import logger |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
SESSION_NAME = 'ai_agent_session' |
|
|
|
|
|
|
|
|
class TelegramBase(BaseModule): |
|
|
""" |
|
|
Base class for Telegram API interactions. |
|
|
Handles client management, authentication, and common utilities. |
|
|
""" |
|
|
|
|
|
def __init__(self, api_id: str = None, api_hash: str = None, phone: str = None, **kwargs): |
|
|
""" |
|
|
Initialize the Telegram base. |
|
|
|
|
|
Args: |
|
|
api_id (str, optional): Telegram API ID. If not provided, will try to get from TELEGRAM_API_ID environment variable. |
|
|
api_hash (str, optional): Telegram API Hash. If not provided, will try to get from TELEGRAM_API_HASH environment variable. |
|
|
phone (str, optional): Phone number for authentication. If not provided, will try to get from TELEGRAM_PHONE environment variable. |
|
|
**kwargs: Additional keyword arguments for parent class |
|
|
""" |
|
|
super().__init__(**kwargs) |
|
|
|
|
|
|
|
|
self.api_id = api_id or os.getenv("TELEGRAM_API_ID") |
|
|
self.api_hash = api_hash or os.getenv("TELEGRAM_API_HASH") |
|
|
self.phone = phone or os.getenv("TELEGRAM_PHONE") |
|
|
|
|
|
if not self.api_id or not self.api_hash: |
|
|
logger.warning( |
|
|
"No Telegram API credentials provided. Please set TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables " |
|
|
"or pass api_id and api_hash parameters. Get your credentials from: https://my.telegram.org/apps" |
|
|
) |
|
|
|
|
|
def _get_client(self) -> TelegramClient: |
|
|
""" |
|
|
Create and return a Telegram client instance. |
|
|
|
|
|
Returns: |
|
|
TelegramClient: Configured Telegram client |
|
|
""" |
|
|
if not self.api_id or not self.api_hash: |
|
|
raise ValueError("Telegram API credentials not found. Please set TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables.") |
|
|
|
|
|
client = TelegramClient(SESSION_NAME, self.api_id, self.api_hash) |
|
|
return client |
|
|
|
|
|
def _format_message(self, message: Message) -> Dict[str, Any]: |
|
|
""" |
|
|
Format a Telegram message for consistent output. |
|
|
|
|
|
Args: |
|
|
message: Telegram message object |
|
|
|
|
|
Returns: |
|
|
dict: Formatted message data |
|
|
""" |
|
|
return { |
|
|
"id": message.id, |
|
|
"text": message.text or "", |
|
|
"date": message.date.isoformat() if message.date else None, |
|
|
"sender_id": message.sender_id, |
|
|
"chat_id": message.chat_id, |
|
|
"is_reply": message.reply_to_msg_id is not None, |
|
|
"reply_to_msg_id": message.reply_to_msg_id, |
|
|
"has_media": message.media is not None, |
|
|
"media_type": type(message.media).__name__ if message.media else None |
|
|
} |
|
|
|
|
|
def _format_chat(self, chat) -> Dict[str, Any]: |
|
|
""" |
|
|
Format a Telegram chat for consistent output. |
|
|
|
|
|
Args: |
|
|
chat: Telegram chat object |
|
|
|
|
|
Returns: |
|
|
dict: Formatted chat data |
|
|
""" |
|
|
chat_type = "unknown" |
|
|
title = "Unknown" |
|
|
|
|
|
if isinstance(chat, User): |
|
|
chat_type = "user" |
|
|
title = f"{chat.first_name or ''} {chat.last_name or ''}".strip() or chat.username or "Unknown User" |
|
|
elif isinstance(chat, Chat): |
|
|
chat_type = "group" |
|
|
title = chat.title or "Unknown Group" |
|
|
elif isinstance(chat, Channel): |
|
|
chat_type = "channel" if chat.broadcast else "supergroup" |
|
|
title = chat.title or "Unknown Channel" |
|
|
|
|
|
return { |
|
|
"id": chat.id, |
|
|
"title": title, |
|
|
"type": chat_type, |
|
|
"username": getattr(chat, 'username', None) |
|
|
} |
|
|
|
|
|
def _run_async(self, coro): |
|
|
""" |
|
|
Run an async coroutine, handling both sync and async contexts. |
|
|
|
|
|
Args: |
|
|
coro: Async coroutine to run |
|
|
|
|
|
Returns: |
|
|
Result of the coroutine |
|
|
""" |
|
|
try: |
|
|
try: |
|
|
asyncio.get_running_loop() |
|
|
|
|
|
import concurrent.futures |
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
future = executor.submit(asyncio.run, coro) |
|
|
return future.result() |
|
|
except RuntimeError: |
|
|
|
|
|
return asyncio.run(coro) |
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to execute async operation: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
class FetchLatestMessagesTool(Tool): |
|
|
"""Retrieve the most recent messages from a specific Telegram contact by their name.""" |
|
|
|
|
|
name: str = "fetch_latest_messages" |
|
|
description: str = "Retrieve the most recent messages from a specific Telegram contact by their name. If multiple contacts match the name, it will ask for clarification." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to fetch messages from (e.g., 'Shivam Kumar')" |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of messages to retrieve (default: 10)" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, limit: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch the latest messages from a Telegram contact by their name. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
limit: Maximum number of messages to retrieve |
|
|
|
|
|
Returns: |
|
|
Dictionary with message results |
|
|
""" |
|
|
async def _fetch_messages(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id, "chat": dialog.entity}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
|
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat = matches[0]['chat'] |
|
|
|
|
|
|
|
|
messages = [] |
|
|
async for message in client.iter_messages(chat, limit=limit): |
|
|
messages.append(self.telegram_base._format_message(message)) |
|
|
|
|
|
|
|
|
chat_info = self.telegram_base._format_chat(chat) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"chat": chat_info, |
|
|
"messages_count": len(messages), |
|
|
"messages": messages |
|
|
} |
|
|
|
|
|
except FloodWaitError as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Rate limited. Please wait {e.seconds} seconds before trying again." |
|
|
} |
|
|
except (ChatAdminRequiredError, UserBannedInChannelError, ChannelPrivateError, |
|
|
UserNotParticipantError, ChatWriteForbiddenError) as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Access denied: {str(e)}" |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching messages: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to fetch messages: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
return self.telegram_base._run_async(_fetch_messages()) |
|
|
|
|
|
|
|
|
class SearchMessagesByKeywordTool(Tool): |
|
|
"""Find specific information by searching for a keyword within a contact's chat history.""" |
|
|
|
|
|
name: str = "search_messages_by_keyword" |
|
|
description: str = "Find specific information by searching for a keyword within a contact's chat history. If multiple contacts match the name, it will ask for clarification." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to search messages from (e.g., 'Shivam Kumar')" |
|
|
}, |
|
|
"keyword": { |
|
|
"type": "string", |
|
|
"description": "Keyword or phrase to search for in messages" |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of matching messages to retrieve (default: 10)" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name", "keyword"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, keyword: str, limit: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
Search for messages containing a specific keyword in a contact's chat. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
keyword: Keyword to search for |
|
|
limit: Maximum number of matching messages to retrieve |
|
|
|
|
|
Returns: |
|
|
Dictionary with search results |
|
|
""" |
|
|
async def _search_messages(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id, "chat": dialog.entity}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
|
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat = matches[0]['chat'] |
|
|
|
|
|
|
|
|
messages = [] |
|
|
async for message in client.iter_messages(chat, search=keyword, limit=limit): |
|
|
if message.text and keyword.lower() in message.text.lower(): |
|
|
messages.append(self.telegram_base._format_message(message)) |
|
|
|
|
|
|
|
|
chat_info = self.telegram_base._format_chat(chat) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"chat": chat_info, |
|
|
"keyword": keyword, |
|
|
"matches_count": len(messages), |
|
|
"messages": messages |
|
|
} |
|
|
|
|
|
except FloodWaitError as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Rate limited. Please wait {e.seconds} seconds before trying again." |
|
|
} |
|
|
except (ChatAdminRequiredError, UserBannedInChannelError, ChannelPrivateError, |
|
|
UserNotParticipantError, ChatWriteForbiddenError) as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Access denied: {str(e)}" |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error searching messages: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to search messages: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
return self.telegram_base._run_async(_search_messages()) |
|
|
|
|
|
|
|
|
class SendMessageTool(Tool): |
|
|
"""Send a text message to a Telegram contact by their name.""" |
|
|
|
|
|
|
|
|
name: str = "send_message_by_name" |
|
|
description: str = ( |
|
|
"Finds a contact by their name and sends them a text message. " |
|
|
"If multiple contacts match the name, it will ask for clarification." |
|
|
) |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to search for (e.g., 'Shivam Kumar')" |
|
|
}, |
|
|
"message_text": { |
|
|
"type": "string", |
|
|
"description": "The text message to send" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name", "message_text"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
|
|
|
def __call__(self, contact_name: str, message_text: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Finds a contact by name and sends them a message. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact. |
|
|
message_text: Text message to send. |
|
|
|
|
|
Returns: |
|
|
Dictionary with the send result or a request for clarification. |
|
|
""" |
|
|
async def _send_message_by_name(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if dialog.is_user and not dialog.entity.bot: |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
|
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat_id = matches[0]['id'] |
|
|
|
|
|
|
|
|
|
|
|
sent_message = await client.send_message(chat_id, message_text) |
|
|
|
|
|
|
|
|
chat = await client.get_entity(chat_id) |
|
|
chat_info = self.telegram_base._format_chat(chat) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message_id": sent_message.id, |
|
|
"chat": chat_info, |
|
|
"message_text": message_text, |
|
|
"sent_at": sent_message.date.isoformat() if sent_message.date else None |
|
|
} |
|
|
|
|
|
|
|
|
except FloodWaitError as e: |
|
|
return {"success": False, "error": f"Rate limited. Please wait {e.seconds} seconds."} |
|
|
except (ChatAdminRequiredError, UserBannedInChannelError, ChannelPrivateError, |
|
|
UserNotParticipantError, ChatWriteForbiddenError) as e: |
|
|
return {"success": False, "error": f"Access denied: {str(e)}"} |
|
|
except MessageEmptyError: |
|
|
return {"success": False, "error": "Message is empty."} |
|
|
except MessageTooLongError: |
|
|
return {"success": False, "error": "Message is too long."} |
|
|
except Exception as e: |
|
|
logger.error(f"Error sending message: {str(e)}") |
|
|
return {"success": False, "error": f"Failed to send message: {str(e)}"} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
return self.telegram_base._run_async(_send_message_by_name()) |
|
|
|
|
|
|
|
|
class ListRecentChatsTool(Tool): |
|
|
"""Get a list of recent conversations, allowing the agent to ask for clarification if a user's request is ambiguous.""" |
|
|
|
|
|
name: str = "list_recent_chats" |
|
|
description: str = "Get a list of recent conversations, allowing the agent to ask for clarification if a user's request is ambiguous (e.g., 'Summarize my last chat')." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of recent chats to retrieve (default: 10)" |
|
|
} |
|
|
} |
|
|
required: List[str] = [] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, limit: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
List recent Telegram chats. |
|
|
|
|
|
Args: |
|
|
limit: Maximum number of recent chats to retrieve |
|
|
|
|
|
Returns: |
|
|
Dictionary with chat list |
|
|
""" |
|
|
async def _list_chats(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
dialogs = [] |
|
|
async for dialog in client.iter_dialogs(limit=limit): |
|
|
chat_info = self.telegram_base._format_chat(dialog.entity) |
|
|
dialogs.append({ |
|
|
**chat_info, |
|
|
"last_message_date": dialog.date.isoformat() if dialog.date else None, |
|
|
"unread_count": dialog.unread_count |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"chats_count": len(dialogs), |
|
|
"chats": dialogs |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error listing chats: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to list chats: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
return self.telegram_base._run_async(_list_chats()) |
|
|
|
|
|
|
|
|
class FindAndRetrieveFileTool(Tool): |
|
|
"""Locate a specific file within a contact's chat based on a search query. This tool should return metadata about the file (name, size, type), not download its contents.""" |
|
|
|
|
|
name: str = "find_and_retrieve_file" |
|
|
description: str = "Locate a specific file within a contact's chat based on a search query. This tool should return metadata about the file (name, size, type), not download its contents. If multiple contacts match the name, it will ask for clarification." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to search files from (e.g., 'Shivam Kumar')" |
|
|
}, |
|
|
"filename_query": { |
|
|
"type": "string", |
|
|
"description": "Filename or search query to find files" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name", "filename_query"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, filename_query: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Find files in a Telegram contact's chat based on filename query. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
filename_query: Filename or search query to find files |
|
|
|
|
|
Returns: |
|
|
Dictionary with file search results |
|
|
""" |
|
|
async def _find_files(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id, "chat": dialog.entity}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
|
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat = matches[0]['chat'] |
|
|
|
|
|
|
|
|
files = [] |
|
|
message_count = 0 |
|
|
|
|
|
async for message in client.iter_messages(chat): |
|
|
message_count += 1 |
|
|
|
|
|
if message.document: |
|
|
|
|
|
doc = message.document |
|
|
filename = "Unknown" |
|
|
|
|
|
|
|
|
for attribute in doc.attributes: |
|
|
if hasattr(attribute, 'file_name'): |
|
|
filename = attribute.file_name |
|
|
break |
|
|
|
|
|
|
|
|
if not filename_query or filename_query.lower() in filename.lower(): |
|
|
files.append({ |
|
|
"message_id": message.id, |
|
|
"filename": filename, |
|
|
"file_size": doc.size, |
|
|
"mime_type": doc.mime_type, |
|
|
"date": message.date.isoformat() if message.date else None, |
|
|
"sender_id": message.sender_id, |
|
|
"caption": message.text or "" |
|
|
}) |
|
|
|
|
|
|
|
|
if message_count > 1000: |
|
|
break |
|
|
|
|
|
|
|
|
chat_info = self.telegram_base._format_chat(chat) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"chat": chat_info, |
|
|
"query": filename_query, |
|
|
"files_found": len(files), |
|
|
"files": files |
|
|
} |
|
|
|
|
|
except FloodWaitError as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Rate limited. Please wait {e.seconds} seconds before trying again." |
|
|
} |
|
|
except (ChatAdminRequiredError, UserBannedInChannelError, ChannelPrivateError, |
|
|
UserNotParticipantError, ChatWriteForbiddenError) as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Access denied: {str(e)}" |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error finding files: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to find files: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
return self.telegram_base._run_async(_find_files()) |
|
|
|
|
|
|
|
|
class SummarizeContactMessagesTool(Tool): |
|
|
"""Summarize recent messages from a specific Telegram contact by their name.""" |
|
|
|
|
|
name: str = "summarize_contact_messages" |
|
|
description: str = "Summarize recent messages from a specific Telegram contact by their name. Provides a summary of the conversation history." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to summarize messages for (e.g., 'Shivam Kumar')" |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of recent messages to analyze for summarization (default: 20)" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, limit: int = 20) -> Dict[str, Any]: |
|
|
""" |
|
|
Summarize recent messages from a contact by name. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
limit: Maximum number of recent messages to analyze |
|
|
|
|
|
Returns: |
|
|
Dictionary with summarization results |
|
|
""" |
|
|
async def _summarize_messages(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if dialog.is_user and not dialog.entity.bot: |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
|
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat_id = matches[0]['id'] |
|
|
|
|
|
|
|
|
chat = await client.get_entity(chat_id) |
|
|
chat_info = self.telegram_base._format_chat(chat) |
|
|
|
|
|
|
|
|
messages = [] |
|
|
async for message in client.iter_messages(chat, limit=limit): |
|
|
if message.text: |
|
|
messages.append({ |
|
|
"id": message.id, |
|
|
"text": message.text, |
|
|
"date": message.date.isoformat() if message.date else None, |
|
|
"sender_id": message.sender_id, |
|
|
"is_outgoing": message.out |
|
|
}) |
|
|
|
|
|
|
|
|
if not messages: |
|
|
summary = f"No recent text messages found with {contact_name}." |
|
|
else: |
|
|
|
|
|
total_messages = len(messages) |
|
|
outgoing_count = sum(1 for msg in messages if msg['is_outgoing']) |
|
|
incoming_count = total_messages - outgoing_count |
|
|
|
|
|
|
|
|
dates = [msg['date'] for msg in messages if msg['date']] |
|
|
if dates: |
|
|
latest_date = max(dates) |
|
|
earliest_date = min(dates) |
|
|
else: |
|
|
latest_date = earliest_date = "Unknown" |
|
|
|
|
|
|
|
|
all_text = " ".join([msg['text'] for msg in messages]) |
|
|
words = all_text.lower().split() |
|
|
word_freq = {} |
|
|
for word in words: |
|
|
if len(word) > 3: |
|
|
word_freq[word] = word_freq.get(word, 0) + 1 |
|
|
|
|
|
|
|
|
top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5] |
|
|
|
|
|
summary = f"""Conversation Summary with {contact_name}: |
|
|
• Total messages analyzed: {total_messages} |
|
|
• Messages from you: {outgoing_count} |
|
|
• Messages from {contact_name}: {incoming_count} |
|
|
• Date range: {earliest_date} to {latest_date} |
|
|
• Key topics: {', '.join([word for word, freq in top_words])} |
|
|
• Recent activity: {'Active' if total_messages > 0 else 'No recent messages'}""" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"contact": chat_info, |
|
|
"messages_analyzed": len(messages), |
|
|
"summary": summary, |
|
|
"recent_messages": messages[:5] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error summarizing messages: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to summarize messages: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
return self.telegram_base._run_async(_summarize_messages()) |
|
|
|
|
|
|
|
|
class DownloadFileTool(Tool): |
|
|
"""Download a file from a Telegram contact by their name.""" |
|
|
|
|
|
name: str = "download_file" |
|
|
description: str = "Download a file from a Telegram contact by their name. Downloads the file to a local directory." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to download file from (e.g., 'Vinay Kumar')" |
|
|
}, |
|
|
"filename_query": { |
|
|
"type": "string", |
|
|
"description": "Filename or search query to find the file (e.g., 'Kafka.pdf')" |
|
|
}, |
|
|
"download_dir": { |
|
|
"type": "string", |
|
|
"description": "Directory to download the file to (default: 'downloads')" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name", "filename_query"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, filename_query: str, download_dir: str = "downloads") -> Dict[str, Any]: |
|
|
""" |
|
|
Download a file from a Telegram contact. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
filename_query: Filename or search query to find the file |
|
|
download_dir: Directory to download the file to |
|
|
|
|
|
Returns: |
|
|
Dictionary with download result |
|
|
""" |
|
|
async def _download_file(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id, "chat": dialog.entity}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat = matches[0]['chat'] |
|
|
|
|
|
|
|
|
found_message = None |
|
|
message_count = 0 |
|
|
|
|
|
async for message in client.iter_messages(chat): |
|
|
message_count += 1 |
|
|
|
|
|
if message.document: |
|
|
doc = message.document |
|
|
filename = "Unknown" |
|
|
|
|
|
|
|
|
for attribute in doc.attributes: |
|
|
if hasattr(attribute, 'file_name'): |
|
|
filename = attribute.file_name |
|
|
break |
|
|
|
|
|
|
|
|
if filename_query.lower() in filename.lower(): |
|
|
found_message = message |
|
|
break |
|
|
|
|
|
if message_count > 1000: |
|
|
break |
|
|
|
|
|
if not found_message: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"File '{filename_query}' not found in contact '{contact_name}'" |
|
|
} |
|
|
|
|
|
|
|
|
if not os.path.exists(download_dir): |
|
|
os.makedirs(download_dir) |
|
|
|
|
|
downloaded_path = await client.download_media( |
|
|
found_message, |
|
|
file=os.path.join(download_dir, filename) |
|
|
) |
|
|
|
|
|
if downloaded_path: |
|
|
file_size = os.path.getsize(downloaded_path) |
|
|
return { |
|
|
"success": True, |
|
|
"message": "File downloaded successfully", |
|
|
"filename": filename, |
|
|
"file_path": downloaded_path, |
|
|
"file_size": file_size, |
|
|
"download_dir": download_dir, |
|
|
"contact_name": contact_name |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "File download failed" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to download file: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
return self.telegram_base._run_async(_download_file()) |
|
|
|
|
|
|
|
|
class ReadFileContentTool(Tool): |
|
|
"""Read the content of a file from a Telegram contact by their name.""" |
|
|
|
|
|
name: str = "read_file_content" |
|
|
description: str = "Read the content of a file from a Telegram contact by their name. Downloads the file and extracts its text content." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"contact_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the contact to read file from (e.g., 'Vinay Kumar')" |
|
|
}, |
|
|
"filename_query": { |
|
|
"type": "string", |
|
|
"description": "Filename or search query to find the file (e.g., 'Kafka.pdf')" |
|
|
}, |
|
|
"content_type": { |
|
|
"type": "string", |
|
|
"description": "Type of content to extract: 'full', 'first_lines', 'last_lines', 'summary' (default: 'full')" |
|
|
}, |
|
|
"lines_count": { |
|
|
"type": "integer", |
|
|
"description": "Number of lines to extract for first_lines/last_lines (default: 3)" |
|
|
} |
|
|
} |
|
|
required: List[str] = ["contact_name", "filename_query"] |
|
|
|
|
|
def __init__(self, telegram_base: TelegramBase): |
|
|
super().__init__() |
|
|
self.telegram_base = telegram_base |
|
|
|
|
|
def __call__(self, contact_name: str, filename_query: str, content_type: str = "full", lines_count: int = 3) -> Dict[str, Any]: |
|
|
""" |
|
|
Read the content of a file from a Telegram contact. |
|
|
|
|
|
Args: |
|
|
contact_name: The name of the Telegram contact |
|
|
filename_query: Filename or search query to find the file |
|
|
content_type: Type of content to extract |
|
|
lines_count: Number of lines for first_lines/last_lines |
|
|
|
|
|
Returns: |
|
|
Dictionary with file content |
|
|
""" |
|
|
async def _read_file_content(): |
|
|
client = None |
|
|
try: |
|
|
client = self.telegram_base._get_client() |
|
|
await client.start(phone=self.telegram_base.phone) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
async for dialog in client.iter_dialogs(): |
|
|
if contact_name.lower() in dialog.name.lower(): |
|
|
matches.append({"name": dialog.name, "id": dialog.id, "chat": dialog.entity}) |
|
|
|
|
|
if len(matches) == 0: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Contact '{contact_name}' not found. Please check the name." |
|
|
} |
|
|
|
|
|
if len(matches) > 1: |
|
|
clarification_list = [f"{m['name']} (ID: {m['id']})" for m in matches] |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Ambiguous contact name. Please clarify which user you mean.", |
|
|
"clarification_needed": clarification_list |
|
|
} |
|
|
|
|
|
|
|
|
chat = matches[0]['chat'] |
|
|
|
|
|
|
|
|
found_message = None |
|
|
message_count = 0 |
|
|
|
|
|
async for message in client.iter_messages(chat): |
|
|
message_count += 1 |
|
|
|
|
|
if message.document: |
|
|
doc = message.document |
|
|
filename = "Unknown" |
|
|
|
|
|
|
|
|
for attribute in doc.attributes: |
|
|
if hasattr(attribute, 'file_name'): |
|
|
filename = attribute.file_name |
|
|
break |
|
|
|
|
|
|
|
|
if filename_query.lower() in filename.lower(): |
|
|
found_message = message |
|
|
break |
|
|
|
|
|
if message_count > 1000: |
|
|
break |
|
|
|
|
|
if not found_message: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"File '{filename_query}' not found in contact '{contact_name}'" |
|
|
} |
|
|
|
|
|
|
|
|
temp_dir = "temp_downloads" |
|
|
if not os.path.exists(temp_dir): |
|
|
os.makedirs(temp_dir) |
|
|
|
|
|
|
|
|
unique_filename = f"{int(time.time())}_{filename}" |
|
|
downloaded_path = await client.download_media( |
|
|
found_message, |
|
|
file=os.path.join(temp_dir, unique_filename) |
|
|
) |
|
|
|
|
|
if not downloaded_path: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Failed to download file for reading" |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
if filename.lower().endswith('.pdf'): |
|
|
|
|
|
with open(downloaded_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
|
|
|
|
|
|
full_text = "" |
|
|
for page in pdf_reader.pages: |
|
|
full_text += page.extract_text() + "\n" |
|
|
|
|
|
|
|
|
lines = [line.strip() for line in full_text.split('\n') if line.strip()] |
|
|
|
|
|
if content_type == "full": |
|
|
content = full_text |
|
|
elif content_type == "first_lines": |
|
|
content = "\n".join(lines[:lines_count]) |
|
|
elif content_type == "last_lines": |
|
|
content = "\n".join(lines[-lines_count:]) |
|
|
elif content_type == "summary": |
|
|
content = f"Document has {len(pdf_reader.pages)} pages, {len(lines)} lines, {len(full_text)} characters" |
|
|
else: |
|
|
content = full_text |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": "File content read successfully", |
|
|
"filename": filename, |
|
|
"content_type": content_type, |
|
|
"content": content, |
|
|
"file_info": { |
|
|
"pages": len(pdf_reader.pages), |
|
|
"lines": len(lines), |
|
|
"characters": len(full_text) |
|
|
}, |
|
|
"contact_name": contact_name |
|
|
} |
|
|
else: |
|
|
|
|
|
with open(downloaded_path, 'r', encoding='utf-8') as file: |
|
|
content = file.read() |
|
|
|
|
|
lines = content.split('\n') |
|
|
|
|
|
if content_type == "full": |
|
|
processed_content = content |
|
|
elif content_type == "first_lines": |
|
|
processed_content = "\n".join(lines[:lines_count]) |
|
|
elif content_type == "last_lines": |
|
|
processed_content = "\n".join(lines[-lines_count:]) |
|
|
elif content_type == "summary": |
|
|
processed_content = f"File has {len(lines)} lines, {len(content)} characters" |
|
|
else: |
|
|
processed_content = content |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": "File content read successfully", |
|
|
"filename": filename, |
|
|
"content_type": content_type, |
|
|
"content": processed_content, |
|
|
"file_info": { |
|
|
"lines": len(lines), |
|
|
"characters": len(content) |
|
|
}, |
|
|
"contact_name": contact_name |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to read file content: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
|
|
|
try: |
|
|
if os.path.exists(downloaded_path): |
|
|
os.remove(downloaded_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to read file: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
if client: |
|
|
await client.disconnect() |
|
|
|
|
|
return self.telegram_base._run_async(_read_file_content()) |
|
|
|
|
|
|
|
|
class TelegramToolkit(Toolkit): |
|
|
""" |
|
|
Complete Telegram toolkit containing all available tools. |
|
|
""" |
|
|
|
|
|
def __init__(self, api_id: str = None, api_hash: str = None, phone: str = None, name: str = "TelegramToolkit"): |
|
|
""" |
|
|
Initialize the Telegram toolkit. |
|
|
|
|
|
Args: |
|
|
api_id (str, optional): Telegram API ID. If not provided, will try to get from TELEGRAM_API_ID environment variable. |
|
|
api_hash (str, optional): Telegram API Hash. If not provided, will try to get from TELEGRAM_API_HASH environment variable. |
|
|
phone (str, optional): Phone number for authentication. If not provided, will try to get from TELEGRAM_PHONE environment variable. |
|
|
name (str): Toolkit name |
|
|
""" |
|
|
|
|
|
telegram_base = TelegramBase(api_id=api_id, api_hash=api_hash, phone=phone) |
|
|
|
|
|
|
|
|
tools = [ |
|
|
FetchLatestMessagesTool(telegram_base=telegram_base), |
|
|
SearchMessagesByKeywordTool(telegram_base=telegram_base), |
|
|
SendMessageTool(telegram_base=telegram_base), |
|
|
ListRecentChatsTool(telegram_base=telegram_base), |
|
|
FindAndRetrieveFileTool(telegram_base=telegram_base), |
|
|
SummarizeContactMessagesTool(telegram_base=telegram_base), |
|
|
DownloadFileTool(telegram_base=telegram_base), |
|
|
ReadFileContentTool(telegram_base=telegram_base) |
|
|
] |
|
|
|
|
|
|
|
|
super().__init__(name=name, tools=tools) |
|
|
|
|
|
|
|
|
self.telegram_base = telegram_base |