tele_bot / database.py
PRC142004's picture
Upload 21 files
945d0f8 verified
"""
Database Handler for Persistent Storage
========================================
Manages farmer profiles, alerts history, and user subscriptions
"""
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
class DatabaseManager:
"""Local JSON-based database for farmer data and alerts"""
def __init__(self, db_dir: str = "data"):
self.db_dir = Path(db_dir)
self.db_dir.mkdir(exist_ok=True)
self.farmers_file = self.db_dir / "farmers.json"
self.alerts_file = self.db_dir / "alerts_history.json"
self.subscriptions_file = self.db_dir / "subscriptions.json"
self._init_files()
def _init_files(self):
"""Initialize JSON files if they don't exist"""
for file in [self.farmers_file, self.alerts_file, self.subscriptions_file]:
if not file.exists():
file.write_text(json.dumps([], indent=2))
def save_farmer(self, farmer_data: Dict) -> bool:
"""Save or update farmer profile"""
try:
farmers = json.loads(self.farmers_file.read_text())
farmer_id = farmer_data.get("farmer_id")
# Update existing or add new
existing = next((i for i, f in enumerate(farmers) if f.get("farmer_id") == farmer_id), None)
if existing is not None:
farmers[existing] = farmer_data
else:
farmers.append(farmer_data)
self.farmers_file.write_text(json.dumps(farmers, indent=2, default=str))
return True
except Exception as e:
print(f"Error saving farmer: {e}")
return False
def get_farmer(self, farmer_id: str) -> Optional[Dict]:
"""Retrieve farmer profile"""
try:
farmers = json.loads(self.farmers_file.read_text())
return next((f for f in farmers if f.get("farmer_id") == farmer_id), None)
except:
return None
def get_all_farmers(self) -> List[Dict]:
"""Get all registered farmers"""
try:
return json.loads(self.farmers_file.read_text())
except:
return []
def save_alert(self, alert_data: Dict) -> bool:
"""Save alert to history"""
try:
alerts = json.loads(self.alerts_file.read_text())
alert_data["timestamp"] = datetime.now().isoformat()
alerts.append(alert_data)
# Keep only last 1000 alerts per farmer
farmer_id = alert_data.get("farmer_id")
farmer_alerts = [a for a in alerts if a.get("farmer_id") == farmer_id]
if len(farmer_alerts) > 1000:
alerts = [a for a in alerts if a.get("farmer_id") != farmer_id]
alerts.extend(farmer_alerts[-1000:])
self.alerts_file.write_text(json.dumps(alerts, indent=2, default=str))
return True
except Exception as e:
print(f"Error saving alert: {e}")
return False
def get_alerts_for_farmer(self, farmer_id: str, limit: int = 50) -> List[Dict]:
"""Get recent alerts for a farmer"""
try:
alerts = json.loads(self.alerts_file.read_text())
farmer_alerts = [a for a in alerts if a.get("farmer_id") == farmer_id]
return sorted(farmer_alerts, key=lambda x: x.get("timestamp", ""), reverse=True)[:limit]
except:
return []
def save_subscription(self, farmer_id: str, chat_id: str) -> bool:
"""Save Telegram subscription"""
try:
subs = json.loads(self.subscriptions_file.read_text())
existing = next((i for i, s in enumerate(subs) if s.get("farmer_id") == farmer_id), None)
sub = {
"farmer_id": farmer_id,
"telegram_chat_id": chat_id,
"subscribed_at": datetime.now().isoformat(),
"active": True
}
if existing is not None:
subs[existing] = sub
else:
subs.append(sub)
self.subscriptions_file.write_text(json.dumps(subs, indent=2, default=str))
return True
except Exception as e:
print(f"Error saving subscription: {e}")
return False
def get_active_subscriptions(self) -> List[Dict]:
"""Get all active subscriptions"""
try:
subs = json.loads(self.subscriptions_file.read_text())
return [s for s in subs if s.get("active", False)]
except:
return []
def get_subscription(self, farmer_id: str) -> Optional[Dict]:
"""Get subscription for farmer"""
try:
subs = json.loads(self.subscriptions_file.read_text())
return next((s for s in subs if s.get("farmer_id") == farmer_id), None)
except:
return None
# Global instance
db = DatabaseManager()