import os import requests import logging logger = logging.getLogger(__name__) class AlertManager: def __init__(self): # Read Webhook URLs from environment self.discord_webhook = os.environ.get("DISCORD_WEBHOOK_URL") self.slack_webhook = os.environ.get("SLACK_WEBHOOK_URL") def send_discord_alert(self, title: str, message: str, color: int = 5814783): """Send a rich embed to Discord.""" if not self.discord_webhook: logger.debug("Discord webhook not configured.") return payload = { "embeds": [{ "title": title, "description": message, "color": color }] } try: response = requests.post(self.discord_webhook, json=payload) if response.status_code != 204: logger.error(f"Failed to send Discord alert: {response.status_code}") except Exception as e: logger.error(f"Error sending Discord alert: {e}") def send_slack_alert(self, title: str, message: str): """Send a message to Slack.""" if not self.slack_webhook: logger.debug("Slack webhook not configured.") return payload = { "text": f"*{title}*\n{message}" } try: response = requests.post(self.slack_webhook, json=payload) if response.status_code != 200: logger.error(f"Failed to send Slack alert: {response.status_code}") except Exception as e: logger.error(f"Error sending Slack alert: {e}") def alert_arbitrage(self, opp: dict): """Format and send an arbitrage opportunity alert.""" title = "🚨 New Arbitrage Opportunity Detected!" msg = f"**{opp['event_name']}**\n" msg += f"Buy on {opp['buy_platform']} @ ${opp['buy_price']}\n" msg += f"Sell on {opp['sell_platform']} @ ${opp['sell_price']}\n" msg += f"**Net Edge:** {opp['expected_profit_margin']*100:.2f}%\n" msg += f"**Max Size:** ${opp['buy_size']:.2f}" # Color: Greenish for profit self.send_discord_alert(title, msg, color=5763719) self.send_slack_alert(title, msg)