Spaces:
Running
Running
| """Alert service for checking threshold conditions.""" | |
| from datetime import datetime, timezone | |
| from typing import Optional | |
| from app.services.funding_service import get_funding_service | |
| from app.models import AlertConfig, AlertItem, AlertCheckResponse | |
| class AlertService: | |
| """Service for checking alert conditions.""" | |
| def __init__(self): | |
| self.funding_service = get_funding_service() | |
| self._config = AlertConfig() # Default config | |
| def get_config(self) -> AlertConfig: | |
| """Get current alert configuration.""" | |
| return self._config | |
| def update_config(self, config: AlertConfig) -> AlertConfig: | |
| """Update alert configuration.""" | |
| self._config = config | |
| return self._config | |
| async def check_alerts(self, config: Optional[AlertConfig] = None) -> AlertCheckResponse: | |
| """ | |
| Check all alert conditions. | |
| Args: | |
| config: Optional override config (uses stored config if not provided) | |
| Returns: | |
| AlertCheckResponse with all triggered alerts | |
| """ | |
| cfg = config or self._config | |
| if not cfg.enabled: | |
| return AlertCheckResponse( | |
| alerts=[], | |
| total_alerts=0, | |
| checked_at=datetime.now(timezone.utc), | |
| ) | |
| all_rates = await self.funding_service.get_all_funding_rates() | |
| alerts = [] | |
| for coin in all_rates: | |
| fr_percent = coin.funding_rate * 100 | |
| # Check high positive FR | |
| if fr_percent > cfg.positive_threshold: | |
| alerts.append(AlertItem( | |
| symbol=coin.symbol, | |
| alert_type="high_positive_fr", | |
| value=fr_percent, | |
| threshold=cfg.positive_threshold, | |
| message=f"{coin.symbol} has high positive FR: {coin.funding_rate_percent}", | |
| timestamp=datetime.now(timezone.utc), | |
| )) | |
| # Check high negative FR | |
| elif fr_percent < cfg.negative_threshold: | |
| alerts.append(AlertItem( | |
| symbol=coin.symbol, | |
| alert_type="high_negative_fr", | |
| value=fr_percent, | |
| threshold=cfg.negative_threshold, | |
| message=f"{coin.symbol} has high negative FR: {coin.funding_rate_percent}", | |
| timestamp=datetime.now(timezone.utc), | |
| )) | |
| # Check volume spike | |
| if coin.volume_spike: | |
| alerts.append(AlertItem( | |
| symbol=coin.symbol, | |
| alert_type="volume_spike", | |
| value=coin.volume_24h, | |
| threshold=cfg.volume_spike_multiplier, | |
| message=f"{coin.symbol} has unusual volume spike", | |
| timestamp=datetime.now(timezone.utc), | |
| )) | |
| # Sort alerts by absolute FR value (most extreme first) | |
| alerts.sort(key=lambda x: abs(x.value), reverse=True) | |
| return AlertCheckResponse( | |
| alerts=alerts, | |
| total_alerts=len(alerts), | |
| checked_at=datetime.now(timezone.utc), | |
| ) | |
| # Singleton instance | |
| _service: AlertService = None | |
| def get_alert_service() -> AlertService: | |
| """Get or create AlertService singleton.""" | |
| global _service | |
| if _service is None: | |
| _service = AlertService() | |
| return _service | |