arbintel / src /alerts.py
AJAY KASU
Add tests, alerts, and blockchain tracker
81146df
import os
import requests
import logging
logger = logging.getLogger(__name__)
class AlertManager:
def __init__(self):
# Read Webhook URLs from environment
self.discord_webhook = os.environ.get("DISCORD_WEBHOOK_URL")
self.slack_webhook = os.environ.get("SLACK_WEBHOOK_URL")
def send_discord_alert(self, title: str, message: str, color: int = 5814783):
"""Send a rich embed to Discord."""
if not self.discord_webhook:
logger.debug("Discord webhook not configured.")
return
payload = {
"embeds": [{
"title": title,
"description": message,
"color": color
}]
}
try:
response = requests.post(self.discord_webhook, json=payload)
if response.status_code != 204:
logger.error(f"Failed to send Discord alert: {response.status_code}")
except Exception as e:
logger.error(f"Error sending Discord alert: {e}")
def send_slack_alert(self, title: str, message: str):
"""Send a message to Slack."""
if not self.slack_webhook:
logger.debug("Slack webhook not configured.")
return
payload = {
"text": f"*{title}*\n{message}"
}
try:
response = requests.post(self.slack_webhook, json=payload)
if response.status_code != 200:
logger.error(f"Failed to send Slack alert: {response.status_code}")
except Exception as e:
logger.error(f"Error sending Slack alert: {e}")
def alert_arbitrage(self, opp: dict):
"""Format and send an arbitrage opportunity alert."""
title = "🚨 New Arbitrage Opportunity Detected!"
msg = f"**{opp['event_name']}**\n"
msg += f"Buy on {opp['buy_platform']} @ ${opp['buy_price']}\n"
msg += f"Sell on {opp['sell_platform']} @ ${opp['sell_price']}\n"
msg += f"**Net Edge:** {opp['expected_profit_margin']*100:.2f}%\n"
msg += f"**Max Size:** ${opp['buy_size']:.2f}"
# Color: Greenish for profit
self.send_discord_alert(title, msg, color=5763719)
self.send_slack_alert(title, msg)