Spaces:
Sleeping
Sleeping
| # core/alerters.py | |
| import datetime | |
| import requests | |
| import aiohttp | |
| class BaseAlerter: | |
| def send(self, message: str): | |
| raise NotImplementedError("send() must be implemented.") | |
| class ConsoleAlerter(BaseAlerter): | |
| def send(self, message: str): | |
| timestamp = datetime.datetime.now().isoformat() | |
| print(f"[Console Alert][{timestamp}] {message}") | |
| class FileAlerter(BaseAlerter): | |
| def __init__(self, filepath="failsafe_errors.log"): | |
| self.filepath = filepath | |
| def send(self, message: str): | |
| timestamp = datetime.datetime.now().isoformat() | |
| with open(self.filepath, "a") as f: | |
| f.write(f"[File Alert][{timestamp}] {message}\n") | |
| class WebhookAlerter(BaseAlerter): | |
| def __init__(self, url: str): | |
| self.url = url | |
| def send(self, message: str): | |
| # Синхронный webhook | |
| try: | |
| requests.post(self.url, json={"alert": message}) | |
| except Exception as e: | |
| print("[WebhookAlert Failed]", e) | |
| class AsyncWebhookAlerter(BaseAlerter): | |
| def __init__(self, url: str): | |
| self.url = url | |
| async def send(self, message: str): | |
| async with aiohttp.ClientSession() as session: | |
| await session.post(self.url, json={"alert": message}) | |