""" Database Handler for Persistent Storage ======================================== Manages farmer profiles, alerts history, and user subscriptions """ import json import os from datetime import datetime from typing import List, Dict, Optional from pathlib import Path class DatabaseManager: """Local JSON-based database for farmer data and alerts""" def __init__(self, db_dir: str = "data"): self.db_dir = Path(db_dir) self.db_dir.mkdir(exist_ok=True) self.farmers_file = self.db_dir / "farmers.json" self.alerts_file = self.db_dir / "alerts_history.json" self.subscriptions_file = self.db_dir / "subscriptions.json" self._init_files() def _init_files(self): """Initialize JSON files if they don't exist""" for file in [self.farmers_file, self.alerts_file, self.subscriptions_file]: if not file.exists(): file.write_text(json.dumps([], indent=2)) def save_farmer(self, farmer_data: Dict) -> bool: """Save or update farmer profile""" try: farmers = json.loads(self.farmers_file.read_text()) farmer_id = farmer_data.get("farmer_id") # Update existing or add new existing = next((i for i, f in enumerate(farmers) if f.get("farmer_id") == farmer_id), None) if existing is not None: farmers[existing] = farmer_data else: farmers.append(farmer_data) self.farmers_file.write_text(json.dumps(farmers, indent=2, default=str)) return True except Exception as e: print(f"Error saving farmer: {e}") return False def get_farmer(self, farmer_id: str) -> Optional[Dict]: """Retrieve farmer profile""" try: farmers = json.loads(self.farmers_file.read_text()) return next((f for f in farmers if f.get("farmer_id") == farmer_id), None) except: return None def get_all_farmers(self) -> List[Dict]: """Get all registered farmers""" try: return json.loads(self.farmers_file.read_text()) except: return [] def save_alert(self, alert_data: Dict) -> bool: """Save alert to history""" try: alerts = json.loads(self.alerts_file.read_text()) alert_data["timestamp"] = datetime.now().isoformat() alerts.append(alert_data) # Keep only last 1000 alerts per farmer farmer_id = alert_data.get("farmer_id") farmer_alerts = [a for a in alerts if a.get("farmer_id") == farmer_id] if len(farmer_alerts) > 1000: alerts = [a for a in alerts if a.get("farmer_id") != farmer_id] alerts.extend(farmer_alerts[-1000:]) self.alerts_file.write_text(json.dumps(alerts, indent=2, default=str)) return True except Exception as e: print(f"Error saving alert: {e}") return False def get_alerts_for_farmer(self, farmer_id: str, limit: int = 50) -> List[Dict]: """Get recent alerts for a farmer""" try: alerts = json.loads(self.alerts_file.read_text()) farmer_alerts = [a for a in alerts if a.get("farmer_id") == farmer_id] return sorted(farmer_alerts, key=lambda x: x.get("timestamp", ""), reverse=True)[:limit] except: return [] def save_subscription(self, farmer_id: str, chat_id: str) -> bool: """Save Telegram subscription""" try: subs = json.loads(self.subscriptions_file.read_text()) existing = next((i for i, s in enumerate(subs) if s.get("farmer_id") == farmer_id), None) sub = { "farmer_id": farmer_id, "telegram_chat_id": chat_id, "subscribed_at": datetime.now().isoformat(), "active": True } if existing is not None: subs[existing] = sub else: subs.append(sub) self.subscriptions_file.write_text(json.dumps(subs, indent=2, default=str)) return True except Exception as e: print(f"Error saving subscription: {e}") return False def get_active_subscriptions(self) -> List[Dict]: """Get all active subscriptions""" try: subs = json.loads(self.subscriptions_file.read_text()) return [s for s in subs if s.get("active", False)] except: return [] def get_subscription(self, farmer_id: str) -> Optional[Dict]: """Get subscription for farmer""" try: subs = json.loads(self.subscriptions_file.read_text()) return next((s for s in subs if s.get("farmer_id") == farmer_id), None) except: return None # Global instance db = DatabaseManager()