tsunami / src /tsuwave /database /timescale.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""TimescaleDB connection and operations"""
import asyncpg
from typing import Optional, Dict, List, Any
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class TimescaleDB:
"""TimescaleDB database handler"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.pool = None
async def connect(self):
"""Create connection pool"""
try:
self.pool = await asyncpg.create_pool(
host=self.config['host'],
port=self.config['port'],
database=self.config['name'],
user=self.config['user'],
password=self.config['password'],
min_size=5,
max_size=self.config.get('pool_size', 10)
)
logger.info("Connected to TimescaleDB")
# Initialize schema
await self._init_schema()
except Exception as e:
logger.error(f"Database connection failed: {e}")
raise
async def disconnect(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
logger.info("Disconnected from TimescaleDB")
async def _init_schema(self):
"""Initialize database schema"""
async with self.pool.acquire() as conn:
# Create hypertable for parameters
await conn.execute('''
CREATE TABLE IF NOT EXISTS parameters (
time TIMESTAMPTZ NOT NULL,
event_id TEXT,
zone TEXT,
wcc FLOAT,
kpr FLOAT,
hfsi FLOAT,
becf FLOAT,
sdb FLOAT,
sbsp FLOAT,
smvi FLOAT,
chi FLOAT
);
''')
# Convert to hypertable
await conn.execute('''
SELECT create_hypertable('parameters', 'time',
if_not_exists => TRUE);
''')
# Create events table
await conn.execute('''
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
name TEXT,
source_location TEXT,
magnitude FLOAT,
depth FLOAT,
time TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
active BOOLEAN DEFAULT TRUE,
metadata JSONB
);
''')
# Create alerts table
await conn.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id TEXT PRIMARY KEY,
event_id TEXT REFERENCES events(id),
zone TEXT,
level TEXT,
chi FLOAT,
message TEXT,
issued_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ,
acknowledged BOOLEAN DEFAULT FALSE,
acknowledged_by TEXT,
acknowledged_at TIMESTAMPTZ,
metadata JSONB
);
''')
# Create zones table
await conn.execute('''
CREATE TABLE IF NOT EXISTS zones (
id TEXT PRIMARY KEY,
name TEXT,
region TEXT,
becf FLOAT,
latitude FLOAT,
longitude FLOAT,
population INTEGER,
priority TEXT,
metadata JSONB
);
''')
# Create indexes
await conn.execute('''
CREATE INDEX IF NOT EXISTS idx_parameters_time
ON parameters (time DESC);
CREATE INDEX IF NOT EXISTS idx_parameters_zone
ON parameters (zone, time DESC);
CREATE INDEX IF NOT EXISTS idx_alerts_active
ON alerts (zone, level) WHERE acknowledged = FALSE;
''')
logger.info("Database schema initialized")
async def insert_parameters(self, data: Dict[str, Any]):
"""Insert parameter data"""
async with self.pool.acquire() as conn:
await conn.execute('''
INSERT INTO parameters
(time, event_id, zone, wcc, kpr, hfsi, becf, sdb, sbsp, smvi, chi)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
''',
data.get('time', datetime.utcnow()),
data.get('event_id'),
data.get('zone'),
data.get('wcc'),
data.get('kpr'),
data.get('hfsi'),
data.get('becf'),
data.get('sdb'),
data.get('sbsp'),
data.get('smvi'),
data.get('chi')
)
async def get_parameters(self, zone: str,
start_time: datetime,
end_time: Optional[datetime] = None) -> List[Dict]:
"""Get parameter time series"""
async with self.pool.acquire() as conn:
if end_time is None:
end_time = datetime.utcnow()
rows = await conn.fetch('''
SELECT * FROM parameters
WHERE zone = $1
AND time BETWEEN $2 AND $3
ORDER BY time DESC
''', zone, start_time, end_time)
return [dict(row) for row in rows]
async def create_event(self, event_data: Dict) -> str:
"""Create new event"""
async with self.pool.acquire() as conn:
event_id = event_data.get('id')
await conn.execute('''
INSERT INTO events
(id, name, source_location, magnitude, depth, time, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
updated_at = NOW()
''',
event_id,
event_data.get('name'),
event_data.get('source_location'),
event_data.get('magnitude'),
event_data.get('depth'),
event_data.get('time', datetime.utcnow()),
event_data.get('metadata', {})
)
return event_id
async def get_active_events(self) -> List[Dict]:
"""Get active events"""
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT * FROM events
WHERE active = TRUE
ORDER BY time DESC
''')
return [dict(row) for row in rows]
async def create_alert(self, alert_data: Dict) -> str:
"""Create new alert"""
async with self.pool.acquire() as conn:
alert_id = alert_data.get('id')
await conn.execute('''
INSERT INTO alerts
(id, event_id, zone, level, chi, message,
issued_at, expires_at, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
''',
alert_id,
alert_data.get('event_id'),
alert_data.get('zone'),
alert_data.get('level'),
alert_data.get('chi'),
alert_data.get('message'),
alert_data.get('issued_at', datetime.utcnow()),
alert_data.get('expires_at'),
alert_data.get('metadata', {})
)
return alert_id
async def get_active_alerts(self) -> List[Dict]:
"""Get active alerts"""
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT * FROM alerts
WHERE acknowledged = FALSE
AND (expires_at IS NULL OR expires_at > NOW())
ORDER BY issued_at DESC
''')
return [dict(row) for row in rows]
async def acknowledge_alert(self, alert_id: str, user: str):
"""Acknowledge alert"""
async with self.pool.acquire() as conn:
await conn.execute('''
UPDATE alerts
SET acknowledged = TRUE,
acknowledged_by = $2,
acknowledged_at = NOW()
WHERE id = $1
''', alert_id, user)
async def get_zone_info(self, zone_id: str) -> Optional[Dict]:
"""Get zone information"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow('''
SELECT * FROM zones WHERE id = $1
''', zone_id)
return dict(row) if row else None
async def init_zones(self, zones_data: List[Dict]):
"""Initialize zone data"""
async with self.pool.acquire() as conn:
for zone in zones_data:
await conn.execute('''
INSERT INTO zones
(id, name, region, becf, latitude, longitude,
population, priority, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
becf = EXCLUDED.becf
''',
zone['id'],
zone['name'],
zone.get('region'),
zone.get('becf'),
zone.get('latitude'),
zone.get('longitude'),
zone.get('population'),
zone.get('priority'),
zone.get('metadata', {})
)