Spaces:
Runtime error
Runtime error
| import asyncio | |
| import os | |
| from slack_sdk.errors import SlackApiError | |
| from slack_sdk.web.async_client import AsyncWebClient | |
| from src.utils import logger | |
| class SlackClient: | |
| """ | |
| A client for interacting with the Slack API asynchronously. | |
| Attributes: | |
| client (AsyncWebClient): The asynchronous Slack WebClient instance. | |
| Methods: | |
| __aenter__(): | |
| Asynchronous context manager entry method. | |
| __aexit__(exc_type, exc_val, exc_tb): | |
| Asynchronous context manager exit method. | |
| send_message(user_id, message: str, to_channel: bool = False): | |
| Sends a message to a specified user or channel. | |
| send_message_to_users(message): | |
| Sends a message to all users in the Slack workspace. | |
| get_file_info(file_id: str): | |
| Retrieves information about a specified file. | |
| get_users_list(): | |
| Retrieves a list of users in the Slack workspace, excluding bots. | |
| handle_oauth_callback(code): | |
| Handles the OAuth callback and exchanges the code for an access token. | |
| get_user_email(user_id: str) -> str | None: | |
| Retrieves the email address of a specified user. | |
| """ | |
| def __init__(self, token: str = None): | |
| self.client = AsyncWebClient(token=token) | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| async def send_message(self, user_id, message: str, to_channel: bool = False): | |
| response = await self.client.chat_postMessage( | |
| channel=user_id, text=message, mrkdwn=True | |
| ) | |
| return response | |
| async def send_message_to_users(self, message): | |
| users = await self.get_users_list() | |
| for user in users["members"]: | |
| try: | |
| await self.send_message(user["id"], message) | |
| await asyncio.sleep(5) | |
| except SlackApiError as e: | |
| logger.error(f"Error sending message: {e}") | |
| return "Messages sent successfully" | |
| async def get_file_info(self, file_id: str): | |
| file_info = await self.client.files_info(file=file_id) | |
| return file_info | |
| async def get_users_list(self): | |
| logger.info("Getting users list...") | |
| users_list = await self.client.users_list() | |
| filtered_users = [ | |
| user | |
| for user in users_list["members"] | |
| if user["is_bot"] == False and "email" in user["profile"] | |
| ] | |
| return filtered_users | |
| async def handle_oauth_callback(self, code): | |
| return await self.client.oauth_v2_access( | |
| client_id=os.environ.get("SLACK_CLIENT_ID"), | |
| client_secret=os.environ.get("SLACK_CLIENT_SECRET"), | |
| code=code, | |
| redirect_uri=os.environ.get("SLACK_REDIRECT_URI"), | |
| ) | |
| async def get_user_email(self, user_id: str) -> str | None: | |
| try: | |
| user_info = await self.client.users_info(user=user_id) | |
| if user_info["ok"] and "email" in user_info["user"]["profile"]: | |
| return user_info["user"]["profile"]["email"] | |
| return None | |
| except SlackApiError as e: | |
| logger.error(f"Error getting user email: {e}") | |
| return None | |