Spaces:
Sleeping
Sleeping
| """TimescaleDB connection and operations""" | |
| import asyncpg | |
| from typing import Optional, Dict, List, Any | |
| import logging | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| class TimescaleDB: | |
| """TimescaleDB database handler""" | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.pool = None | |
| async def connect(self): | |
| """Create connection pool""" | |
| try: | |
| self.pool = await asyncpg.create_pool( | |
| host=self.config['host'], | |
| port=self.config['port'], | |
| database=self.config['name'], | |
| user=self.config['user'], | |
| password=self.config['password'], | |
| min_size=5, | |
| max_size=self.config.get('pool_size', 10) | |
| ) | |
| logger.info("Connected to TimescaleDB") | |
| # Initialize schema | |
| await self._init_schema() | |
| except Exception as e: | |
| logger.error(f"Database connection failed: {e}") | |
| raise | |
| async def disconnect(self): | |
| """Close connection pool""" | |
| if self.pool: | |
| await self.pool.close() | |
| logger.info("Disconnected from TimescaleDB") | |
| async def _init_schema(self): | |
| """Initialize database schema""" | |
| async with self.pool.acquire() as conn: | |
| # Create hypertable for parameters | |
| await conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS parameters ( | |
| time TIMESTAMPTZ NOT NULL, | |
| event_id TEXT, | |
| zone TEXT, | |
| wcc FLOAT, | |
| kpr FLOAT, | |
| hfsi FLOAT, | |
| becf FLOAT, | |
| sdb FLOAT, | |
| sbsp FLOAT, | |
| smvi FLOAT, | |
| chi FLOAT | |
| ); | |
| ''') | |
| # Convert to hypertable | |
| await conn.execute(''' | |
| SELECT create_hypertable('parameters', 'time', | |
| if_not_exists => TRUE); | |
| ''') | |
| # Create events table | |
| await conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS events ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT, | |
| source_location TEXT, | |
| magnitude FLOAT, | |
| depth FLOAT, | |
| time TIMESTAMPTZ, | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| updated_at TIMESTAMPTZ DEFAULT NOW(), | |
| active BOOLEAN DEFAULT TRUE, | |
| metadata JSONB | |
| ); | |
| ''') | |
| # Create alerts table | |
| await conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS alerts ( | |
| id TEXT PRIMARY KEY, | |
| event_id TEXT REFERENCES events(id), | |
| zone TEXT, | |
| level TEXT, | |
| chi FLOAT, | |
| message TEXT, | |
| issued_at TIMESTAMPTZ DEFAULT NOW(), | |
| expires_at TIMESTAMPTZ, | |
| acknowledged BOOLEAN DEFAULT FALSE, | |
| acknowledged_by TEXT, | |
| acknowledged_at TIMESTAMPTZ, | |
| metadata JSONB | |
| ); | |
| ''') | |
| # Create zones table | |
| await conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS zones ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT, | |
| region TEXT, | |
| becf FLOAT, | |
| latitude FLOAT, | |
| longitude FLOAT, | |
| population INTEGER, | |
| priority TEXT, | |
| metadata JSONB | |
| ); | |
| ''') | |
| # Create indexes | |
| await conn.execute(''' | |
| CREATE INDEX IF NOT EXISTS idx_parameters_time | |
| ON parameters (time DESC); | |
| CREATE INDEX IF NOT EXISTS idx_parameters_zone | |
| ON parameters (zone, time DESC); | |
| CREATE INDEX IF NOT EXISTS idx_alerts_active | |
| ON alerts (zone, level) WHERE acknowledged = FALSE; | |
| ''') | |
| logger.info("Database schema initialized") | |
| async def insert_parameters(self, data: Dict[str, Any]): | |
| """Insert parameter data""" | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(''' | |
| INSERT INTO parameters | |
| (time, event_id, zone, wcc, kpr, hfsi, becf, sdb, sbsp, smvi, chi) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) | |
| ''', | |
| data.get('time', datetime.utcnow()), | |
| data.get('event_id'), | |
| data.get('zone'), | |
| data.get('wcc'), | |
| data.get('kpr'), | |
| data.get('hfsi'), | |
| data.get('becf'), | |
| data.get('sdb'), | |
| data.get('sbsp'), | |
| data.get('smvi'), | |
| data.get('chi') | |
| ) | |
| async def get_parameters(self, zone: str, | |
| start_time: datetime, | |
| end_time: Optional[datetime] = None) -> List[Dict]: | |
| """Get parameter time series""" | |
| async with self.pool.acquire() as conn: | |
| if end_time is None: | |
| end_time = datetime.utcnow() | |
| rows = await conn.fetch(''' | |
| SELECT * FROM parameters | |
| WHERE zone = $1 | |
| AND time BETWEEN $2 AND $3 | |
| ORDER BY time DESC | |
| ''', zone, start_time, end_time) | |
| return [dict(row) for row in rows] | |
| async def create_event(self, event_data: Dict) -> str: | |
| """Create new event""" | |
| async with self.pool.acquire() as conn: | |
| event_id = event_data.get('id') | |
| await conn.execute(''' | |
| INSERT INTO events | |
| (id, name, source_location, magnitude, depth, time, metadata) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7) | |
| ON CONFLICT (id) DO UPDATE SET | |
| name = EXCLUDED.name, | |
| updated_at = NOW() | |
| ''', | |
| event_id, | |
| event_data.get('name'), | |
| event_data.get('source_location'), | |
| event_data.get('magnitude'), | |
| event_data.get('depth'), | |
| event_data.get('time', datetime.utcnow()), | |
| event_data.get('metadata', {}) | |
| ) | |
| return event_id | |
| async def get_active_events(self) -> List[Dict]: | |
| """Get active events""" | |
| async with self.pool.acquire() as conn: | |
| rows = await conn.fetch(''' | |
| SELECT * FROM events | |
| WHERE active = TRUE | |
| ORDER BY time DESC | |
| ''') | |
| return [dict(row) for row in rows] | |
| async def create_alert(self, alert_data: Dict) -> str: | |
| """Create new alert""" | |
| async with self.pool.acquire() as conn: | |
| alert_id = alert_data.get('id') | |
| await conn.execute(''' | |
| INSERT INTO alerts | |
| (id, event_id, zone, level, chi, message, | |
| issued_at, expires_at, metadata) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) | |
| ''', | |
| alert_id, | |
| alert_data.get('event_id'), | |
| alert_data.get('zone'), | |
| alert_data.get('level'), | |
| alert_data.get('chi'), | |
| alert_data.get('message'), | |
| alert_data.get('issued_at', datetime.utcnow()), | |
| alert_data.get('expires_at'), | |
| alert_data.get('metadata', {}) | |
| ) | |
| return alert_id | |
| async def get_active_alerts(self) -> List[Dict]: | |
| """Get active alerts""" | |
| async with self.pool.acquire() as conn: | |
| rows = await conn.fetch(''' | |
| SELECT * FROM alerts | |
| WHERE acknowledged = FALSE | |
| AND (expires_at IS NULL OR expires_at > NOW()) | |
| ORDER BY issued_at DESC | |
| ''') | |
| return [dict(row) for row in rows] | |
| async def acknowledge_alert(self, alert_id: str, user: str): | |
| """Acknowledge alert""" | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(''' | |
| UPDATE alerts | |
| SET acknowledged = TRUE, | |
| acknowledged_by = $2, | |
| acknowledged_at = NOW() | |
| WHERE id = $1 | |
| ''', alert_id, user) | |
| async def get_zone_info(self, zone_id: str) -> Optional[Dict]: | |
| """Get zone information""" | |
| async with self.pool.acquire() as conn: | |
| row = await conn.fetchrow(''' | |
| SELECT * FROM zones WHERE id = $1 | |
| ''', zone_id) | |
| return dict(row) if row else None | |
| async def init_zones(self, zones_data: List[Dict]): | |
| """Initialize zone data""" | |
| async with self.pool.acquire() as conn: | |
| for zone in zones_data: | |
| await conn.execute(''' | |
| INSERT INTO zones | |
| (id, name, region, becf, latitude, longitude, | |
| population, priority, metadata) | |
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) | |
| ON CONFLICT (id) DO UPDATE SET | |
| name = EXCLUDED.name, | |
| becf = EXCLUDED.becf | |
| ''', | |
| zone['id'], | |
| zone['name'], | |
| zone.get('region'), | |
| zone.get('becf'), | |
| zone.get('latitude'), | |
| zone.get('longitude'), | |
| zone.get('population'), | |
| zone.get('priority'), | |
| zone.get('metadata', {}) | |
| ) | |