eroha-agentapi / core /alerters.py
Yermek68's picture
Create core/alerters.py
52a5c61 verified
raw
history blame
1.27 kB
# core/alerters.py
import datetime
import requests
import aiohttp
class BaseAlerter:
def send(self, message: str):
raise NotImplementedError("send() must be implemented.")
class ConsoleAlerter(BaseAlerter):
def send(self, message: str):
timestamp = datetime.datetime.now().isoformat()
print(f"[Console Alert][{timestamp}] {message}")
class FileAlerter(BaseAlerter):
def __init__(self, filepath="failsafe_errors.log"):
self.filepath = filepath
def send(self, message: str):
timestamp = datetime.datetime.now().isoformat()
with open(self.filepath, "a") as f:
f.write(f"[File Alert][{timestamp}] {message}\n")
class WebhookAlerter(BaseAlerter):
def __init__(self, url: str):
self.url = url
def send(self, message: str):
# Синхронный webhook
try:
requests.post(self.url, json={"alert": message})
except Exception as e:
print("[WebhookAlert Failed]", e)
class AsyncWebhookAlerter(BaseAlerter):
def __init__(self, url: str):
self.url = url
async def send(self, message: str):
async with aiohttp.ClientSession() as session:
await session.post(self.url, json={"alert": message})