"""TimescaleDB connection and operations""" import asyncpg from typing import Optional, Dict, List, Any import logging from datetime import datetime, timedelta logger = logging.getLogger(__name__) class TimescaleDB: """TimescaleDB database handler""" def __init__(self, config: Dict[str, Any]): self.config = config self.pool = None async def connect(self): """Create connection pool""" try: self.pool = await asyncpg.create_pool( host=self.config['host'], port=self.config['port'], database=self.config['name'], user=self.config['user'], password=self.config['password'], min_size=5, max_size=self.config.get('pool_size', 10) ) logger.info("Connected to TimescaleDB") # Initialize schema await self._init_schema() except Exception as e: logger.error(f"Database connection failed: {e}") raise async def disconnect(self): """Close connection pool""" if self.pool: await self.pool.close() logger.info("Disconnected from TimescaleDB") async def _init_schema(self): """Initialize database schema""" async with self.pool.acquire() as conn: # Create hypertable for parameters await conn.execute(''' CREATE TABLE IF NOT EXISTS parameters ( time TIMESTAMPTZ NOT NULL, event_id TEXT, zone TEXT, wcc FLOAT, kpr FLOAT, hfsi FLOAT, becf FLOAT, sdb FLOAT, sbsp FLOAT, smvi FLOAT, chi FLOAT ); ''') # Convert to hypertable await conn.execute(''' SELECT create_hypertable('parameters', 'time', if_not_exists => TRUE); ''') # Create events table await conn.execute(''' CREATE TABLE IF NOT EXISTS events ( id TEXT PRIMARY KEY, name TEXT, source_location TEXT, magnitude FLOAT, depth FLOAT, time TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), active BOOLEAN DEFAULT TRUE, metadata JSONB ); ''') # Create alerts table await conn.execute(''' CREATE TABLE IF NOT EXISTS alerts ( id TEXT PRIMARY KEY, event_id TEXT REFERENCES events(id), zone TEXT, level TEXT, chi FLOAT, message TEXT, issued_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ, acknowledged BOOLEAN DEFAULT FALSE, acknowledged_by TEXT, acknowledged_at TIMESTAMPTZ, metadata JSONB ); ''') # Create zones table await conn.execute(''' CREATE TABLE IF NOT EXISTS zones ( id TEXT PRIMARY KEY, name TEXT, region TEXT, becf FLOAT, latitude FLOAT, longitude FLOAT, population INTEGER, priority TEXT, metadata JSONB ); ''') # Create indexes await conn.execute(''' CREATE INDEX IF NOT EXISTS idx_parameters_time ON parameters (time DESC); CREATE INDEX IF NOT EXISTS idx_parameters_zone ON parameters (zone, time DESC); CREATE INDEX IF NOT EXISTS idx_alerts_active ON alerts (zone, level) WHERE acknowledged = FALSE; ''') logger.info("Database schema initialized") async def insert_parameters(self, data: Dict[str, Any]): """Insert parameter data""" async with self.pool.acquire() as conn: await conn.execute(''' INSERT INTO parameters (time, event_id, zone, wcc, kpr, hfsi, becf, sdb, sbsp, smvi, chi) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ''', data.get('time', datetime.utcnow()), data.get('event_id'), data.get('zone'), data.get('wcc'), data.get('kpr'), data.get('hfsi'), data.get('becf'), data.get('sdb'), data.get('sbsp'), data.get('smvi'), data.get('chi') ) async def get_parameters(self, zone: str, start_time: datetime, end_time: Optional[datetime] = None) -> List[Dict]: """Get parameter time series""" async with self.pool.acquire() as conn: if end_time is None: end_time = datetime.utcnow() rows = await conn.fetch(''' SELECT * FROM parameters WHERE zone = $1 AND time BETWEEN $2 AND $3 ORDER BY time DESC ''', zone, start_time, end_time) return [dict(row) for row in rows] async def create_event(self, event_data: Dict) -> str: """Create new event""" async with self.pool.acquire() as conn: event_id = event_data.get('id') await conn.execute(''' INSERT INTO events (id, name, source_location, magnitude, depth, time, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, updated_at = NOW() ''', event_id, event_data.get('name'), event_data.get('source_location'), event_data.get('magnitude'), event_data.get('depth'), event_data.get('time', datetime.utcnow()), event_data.get('metadata', {}) ) return event_id async def get_active_events(self) -> List[Dict]: """Get active events""" async with self.pool.acquire() as conn: rows = await conn.fetch(''' SELECT * FROM events WHERE active = TRUE ORDER BY time DESC ''') return [dict(row) for row in rows] async def create_alert(self, alert_data: Dict) -> str: """Create new alert""" async with self.pool.acquire() as conn: alert_id = alert_data.get('id') await conn.execute(''' INSERT INTO alerts (id, event_id, zone, level, chi, message, issued_at, expires_at, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ''', alert_id, alert_data.get('event_id'), alert_data.get('zone'), alert_data.get('level'), alert_data.get('chi'), alert_data.get('message'), alert_data.get('issued_at', datetime.utcnow()), alert_data.get('expires_at'), alert_data.get('metadata', {}) ) return alert_id async def get_active_alerts(self) -> List[Dict]: """Get active alerts""" async with self.pool.acquire() as conn: rows = await conn.fetch(''' SELECT * FROM alerts WHERE acknowledged = FALSE AND (expires_at IS NULL OR expires_at > NOW()) ORDER BY issued_at DESC ''') return [dict(row) for row in rows] async def acknowledge_alert(self, alert_id: str, user: str): """Acknowledge alert""" async with self.pool.acquire() as conn: await conn.execute(''' UPDATE alerts SET acknowledged = TRUE, acknowledged_by = $2, acknowledged_at = NOW() WHERE id = $1 ''', alert_id, user) async def get_zone_info(self, zone_id: str) -> Optional[Dict]: """Get zone information""" async with self.pool.acquire() as conn: row = await conn.fetchrow(''' SELECT * FROM zones WHERE id = $1 ''', zone_id) return dict(row) if row else None async def init_zones(self, zones_data: List[Dict]): """Initialize zone data""" async with self.pool.acquire() as conn: for zone in zones_data: await conn.execute(''' INSERT INTO zones (id, name, region, becf, latitude, longitude, population, priority, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, becf = EXCLUDED.becf ''', zone['id'], zone['name'], zone.get('region'), zone.get('becf'), zone.get('latitude'), zone.get('longitude'), zone.get('population'), zone.get('priority'), zone.get('metadata', {}) )