import asyncio import json import time import logging from telegram import Bot from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, List, Any, Optional from config import NOTIFIER ENABLE_TELEGRAM_NOTIFICATIONS = NOTIFIER["ENABLE_TELEGRAM_NOTIFICATIONS"] TELEGRAM_BOT_TOKEN = NOTIFIER["TELEGRAM_BOT_TOKEN"] TELEGRAM_CHAT_ID = NOTIFIER["TELEGRAM_CHAT_ID"] INACTIVE_PERCENTAGE_THRESHOLD = NOTIFIER["INACTIVE_PERCENTAGE_THRESHOLD"] UNHEALTHY_HISTORY_LENGTH = NOTIFIER["UNHEALTHY_HISTORY_LENGTH"] UNHEALTHY_ALERT_THRESHOLD = NOTIFIER["UNHEALTHY_ALERT_THRESHOLD"] DENSITY_COUNT_THRESHOLD = NOTIFIER["DENSITY_COUNT_THRESHOLD"] # --- Helper --- def _now_local() -> datetime: return datetime.now().astimezone() @dataclass class AlertState: """ Tracks each camera alert state and cooldown """ sent_inactive_danger: bool = False sent_unhealthy_danger: bool = False sent_density_danger: bool = False last_danger_alert_time: float = 0.0 # timestamp last_warning_alert_time: float = 0.0 cooldown_period_danger: int = 1800 # 30 minutes cooldown_period_warning: int = 3600 class TelegramNotifier: def __init__(self, bot_token: str, chat_id: str, *, logger: Optional[logging.Logger] = None): self.bot_token = bot_token self.chat_id = chat_id self._logger = logger or logging.getLogger(__name__) async def _send_async(self, text: str) -> bool: try: bot = Bot(token=self.bot_token) await bot.send_message(chat_id=self.chat_id, text=text, parse_mode="HTML", disable_web_page_preview=True) return True except Exception as e: self._logger.error("[Telegram] Gagal kirim pesan: %s", e) return False def send(self, text: str) -> bool: """ Send message """ try: return asyncio.run(self._send_async(text)) except Exception as e: self._logger.error("[Telegram] Gagal jalankan async: %s", e) return False SENSOR_JSON_PATH = Path(NOTIFIER["SENSOR_DATA_JSON_PATH"]) alert_states: Dict[str, AlertState] = {} def _load_sensor_data() -> List[Dict[str, Any]]: """ Load sensor_data.json """ if not SENSOR_JSON_PATH.exists(): return [] try: with SENSOR_JSON_PATH.open("r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, list) else [] except Exception as e: logging.error("[Sensor JSON] Gagal baca: %s", e) return [] def _count_unhealthy(vocal_history: List[str]) -> int: """ Count 'Unhealthy' """ recent = vocal_history[-UNHEALTHY_HISTORY_LENGTH:] return sum(1 for s in recent if isinstance(s, str) and s.strip() == "Unhealthy") def _get_prediction_with_prob(entry: Dict) -> tuple[str, float]: pred = entry.get("prediction") probs = entry.get("probabilities", {}) if pred and pred in probs: return pred, float(probs[pred]) return pred or "Unknown", 0.0 def generate_alert_message( camera_id: str, inactive_ratio: float, audio_pred: str, audio_prob: float, density_count: int, unhealthy_count: int, timestamp: str ) -> str: details = [] is_danger = False is_warning = False is_mic = camera_id == "MIC" # Inactivity Logic if not is_mic: if inactive_ratio >= INACTIVE_PERCENTAGE_THRESHOLD: is_danger = True pcnt = round(inactive_ratio * 100, 1) details.append(f"Kamera {camera_id}: Inaktivitas ayam mencapai {pcnt}%.") elif inactive_ratio > 0: is_warning = True details.append(f"Kamera {camera_id}: Terdeteksi beberapa ayam tidak aktif.") # Density Logic if not is_mic: if density_count >= DENSITY_COUNT_THRESHOLD: is_danger = True details.append(f"Kamera {camera_id}: Terdeteksi {density_count} area dengan kepadatan tinggi.") elif density_count > 1: is_warning = True details.append(f"Kamera {camera_id}: Terdeteksi kepadatan ayam lokal.\n({density_count} cluster)") # Audio Logic if audio_pred == "Unhealthy": pcnt = round(audio_prob * 100, 1) if unhealthy_count >= UNHEALTHY_ALERT_THRESHOLD: is_danger = True details.append(f"Audio {camera_id}: Tanda Tidak Sehat terdeteksi berulang (Prob: {pcnt}%).") else: is_warning = True details.append(f"Audio {camera_id}: Terdengar tanda awal Tidak Sehat (Prob: {pcnt}%).") if not is_danger and not is_warning: return "" end_parts = [] if is_danger: end_parts.append("🔴 [BAHAYA]") elif is_warning: end_parts.append("🟠 [PERINGATAN]") # Build the message end_parts.append("") end_parts.extend(details) end_parts.append("") end_parts.append("Mohon segera periksa kondisi kandang!") end_parts.append(f"Waktu: {timestamp}") return "\n".join(end_parts) def monitor_once(notifier: Optional[TelegramNotifier]) -> None: data = _load_sensor_data() if not data: return timestamp_str = _now_local().strftime("%Y-%m-%d %H:%M:%S") for item in data: camera_id = str(item.get("camera_id", "unknown")) detected_count = item.get("detected_count", 0) inactive_count = item.get("inactive_count", 0) density_count = item.get("density_count", 0) vocal_history = item.get("vocalization_history", []) latest_audio_pred = vocal_history[-1] if vocal_history else None audio_pred, audio_prob = _get_prediction_with_prob({ "prediction": latest_audio_pred, "probabilities": item.get("latest_probabilities", {}) }) unhealthy_count = _count_unhealthy(vocal_history) inactive_ratio = inactive_count / detected_count if detected_count > 0 else 0. if camera_id not in alert_states: alert_states[camera_id] = AlertState() state = alert_states[camera_id] # Send alert State now = time.time() should_send = False message_type = "info" is_mic = camera_id == "MIC" # Type : Danger danger_cooldown_expired = (now - state.last_danger_alert_time) >= state.cooldown_period_danger if not is_mic and inactive_ratio >= INACTIVE_PERCENTAGE_THRESHOLD: if not state.sent_inactive_danger or danger_cooldown_expired: should_send = True message_type = "danger" state.sent_inactive_danger = True state.last_danger_alert_time = now if unhealthy_count >= UNHEALTHY_ALERT_THRESHOLD: if not state.sent_unhealthy_danger or danger_cooldown_expired: should_send = True message_type = "danger" state.sent_unhealthy_danger = True state.last_danger_alert_time = now if not is_mic and density_count >= DENSITY_COUNT_THRESHOLD: if not state.sent_density_danger or danger_cooldown_expired: should_send = True message_type = "danger" state.sent_density_danger = True state.last_danger_alert_time = now # Type : Warning warning_cooldown_expired = (now - state.last_warning_alert_time) >= state.cooldown_period_warning if not should_send and not is_mic and inactive_count > 0 and not state.sent_inactive_danger: if warning_cooldown_expired: should_send = True message_type = "warning" state.last_warning_alert_time = now if not should_send and audio_pred == "Unhealthy" and not state.sent_unhealthy_danger: if warning_cooldown_expired: should_send = True message_type = "warning" state.last_warning_alert_time = now if not should_send and not is_mic and density_count > 1 and not state.sent_density_danger: if warning_cooldown_expired: should_send = True message_type = "warning" state.last_warning_alert_time = now # Send alert if TRUE if should_send: msg = generate_alert_message( camera_id=camera_id, inactive_ratio=inactive_ratio, audio_pred=audio_pred, audio_prob=audio_prob, density_count=density_count, unhealthy_count=unhealthy_count, timestamp=timestamp_str ) if msg.strip() and notifier and ENABLE_TELEGRAM_NOTIFICATIONS: success = notifier.send(msg) logging.info("[Notif] %s alert sent for Camera %s: %s", message_type.upper(), camera_id, success) else: print(f"[Notif][Dry-Run] {message_type.upper()}:\n{msg}\n") # State Reset if inactive_ratio < INACTIVE_PERCENTAGE_THRESHOLD: state.sent_inactive_danger = False if unhealthy_count < UNHEALTHY_ALERT_THRESHOLD: state.sent_unhealthy_danger = False if density_count < DENSITY_COUNT_THRESHOLD: state.sent_density_danger = False def main() -> None: logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') # Initialize notifier notifier: Optional[TelegramNotifier] = None if ENABLE_TELEGRAM_NOTIFICATIONS and TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID: notifier = TelegramNotifier(TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, logger=logging.getLogger("Telegram")) logging.info("[Notif] Telegram aktif.") else: logging.info("[Notif] Telegram dinonaktifkan. Mode dry-run.") logging.info("[Notif] Memantau sensor_data.json setiap 60 detik...") while True: try: monitor_once(notifier) except Exception as e: logging.error("[Notif] Error saat monitoring: %s", e) time.sleep(60) if __name__ == "__main__": main()