import sqlite3 import json from datetime import datetime class StellarDB: def __init__(self, db_path="stellar.db"): self.db_path = db_path self._init_db() def _init_db(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 卫星状态表 cursor.execute(''' CREATE TABLE IF NOT EXISTS satellites ( id TEXT PRIMARY KEY, name TEXT, status TEXT, battery INTEGER, compute_load INTEGER, last_updated TIMESTAMP ) ''') # 任务记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, description TEXT, status TEXT, result TEXT, agent_log TEXT, created_at TIMESTAMP ) ''') # 初始卫星数据 satellites = [ ('SAT-001', 'Stellar-Alpha (领航者)', 'Orbiting', 85, 10, datetime.now()), ('SAT-002', 'Stellar-Beta (守望者)', 'Orbiting', 92, 5, datetime.now()), ('SAT-003', 'Stellar-Gamma (开拓者)', 'Maintenance', 45, 0, datetime.now()), ('SAT-004', 'Stellar-Delta (中继站)', 'Orbiting', 78, 25, datetime.now()), ('SAT-005', 'Stellar-Epsilon (监测站)', 'Orbiting', 60, 40, datetime.now()), ] cursor.executemany('INSERT OR IGNORE INTO satellites VALUES (?,?,?,?,?,?)', satellites) conn.commit() def get_satellites(self): with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute('SELECT * FROM satellites') return [dict(row) for row in cursor.fetchall()] def update_satellite(self, sat_id, status=None, battery=None, compute_load=None): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if status: cursor.execute('UPDATE satellites SET status = ? WHERE id = ?', (status, sat_id)) if battery is not None: cursor.execute('UPDATE satellites SET battery = ? WHERE id = ?', (battery, sat_id)) if compute_load is not None: cursor.execute('UPDATE satellites SET compute_load = ? WHERE id = ?', (compute_load, sat_id)) cursor.execute('UPDATE satellites SET last_updated = ? WHERE id = ?', (datetime.now(), sat_id)) conn.commit() def save_task(self, task_id, description, status, result="", agent_log=""): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO tasks (id, description, status, result, agent_log, created_at) VALUES (?, ?, ?, ?, ?, ?) ''', (task_id, description, status, result, agent_log, datetime.now())) conn.commit() def get_tasks(self): with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute('SELECT * FROM tasks ORDER BY created_at DESC') tasks = [] for row in cursor.fetchall(): task = dict(row) # 确保 created_at 是字符串格式,方便模板显示 if isinstance(task['created_at'], datetime): task['created_at'] = task['created_at'].strftime('%Y-%m-%d %H:%M:%S') elif isinstance(task['created_at'], str): # 如果已经是字符串(SQLite 默认返回),保留前 19 位 (YYYY-MM-DD HH:MM:SS) task['created_at'] = task['created_at'][:19] tasks.append(task) return tasks