| import sqlite3 | |
| import os | |
| import json | |
| from typing import Optional | |
| try: | |
| import psycopg2 | |
| from psycopg2 import pool | |
| except ImportError: | |
| psycopg2 = None | |
| DB_FILE = os.getenv("MEP_SQLITE_PATH", "ledger.db") | |
| DB_URL = os.getenv("MEP_DATABASE_URL") | |
| PG_POOL_MIN = int(os.getenv("MEP_PG_POOL_MIN", "1")) | |
| PG_POOL_MAX = int(os.getenv("MEP_PG_POOL_MAX", "5")) | |
| _pg_pool: Optional["pool.SimpleConnectionPool"] = None | |
| def _is_postgres() -> bool: | |
| return bool(DB_URL) | |
| def _get_pg_pool(): | |
| global _pg_pool | |
| if _pg_pool is None: | |
| if psycopg2 is None: | |
| raise RuntimeError("psycopg2 is required for Postgres") | |
| _pg_pool = pool.SimpleConnectionPool(PG_POOL_MIN, PG_POOL_MAX, DB_URL) | |
| return _pg_pool | |
| def _get_conn(): | |
| if _is_postgres(): | |
| return _get_pg_pool().getconn() | |
| return sqlite3.connect(DB_FILE, check_same_thread=False) | |
| def _release_conn(conn): | |
| if _is_postgres(): | |
| _get_pg_pool().putconn(conn) | |
| else: | |
| conn.close() | |
| def _row_to_dict(cursor, row): | |
| if row is None: | |
| return None | |
| columns = [desc[0] for desc in cursor.description] | |
| return dict(zip(columns, row)) | |
| def init_db(): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS ledger ( | |
| node_id TEXT PRIMARY KEY, | |
| pub_pem TEXT NOT NULL, | |
| balance REAL NOT NULL | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| task_id TEXT PRIMARY KEY, | |
| consumer_id TEXT NOT NULL, | |
| provider_id TEXT, | |
| payload TEXT NOT NULL, | |
| bounty REAL NOT NULL, | |
| status TEXT NOT NULL, | |
| target_node TEXT, | |
| model_requirement TEXT, | |
| result_payload TEXT, | |
| created_at REAL NOT NULL, | |
| updated_at REAL NOT NULL | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS idempotency ( | |
| node_id TEXT NOT NULL, | |
| endpoint TEXT NOT NULL, | |
| idem_key TEXT NOT NULL, | |
| response TEXT NOT NULL, | |
| status_code INTEGER NOT NULL, | |
| created_at REAL NOT NULL, | |
| PRIMARY KEY (node_id, endpoint, idem_key) | |
| ) | |
| ''') | |
| conn.commit() | |
| _release_conn(conn) | |
| def register_node(node_id: str, pub_pem: str) -> float: | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) | |
| else: | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| if _is_postgres(): | |
| cursor.execute( | |
| "INSERT INTO ledger (node_id, pub_pem, balance) VALUES (%s, %s, %s) ON CONFLICT (node_id) DO NOTHING", | |
| (node_id, pub_pem, 10.0) | |
| ) | |
| else: | |
| cursor.execute( | |
| "INSERT OR IGNORE INTO ledger (node_id, pub_pem, balance) VALUES (?, ?, ?)", | |
| (node_id, pub_pem, 10.0) | |
| ) | |
| conn.commit() | |
| if _is_postgres(): | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) | |
| else: | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) | |
| row = cursor.fetchone() | |
| _release_conn(conn) | |
| return row[0] if row else 10.0 | |
| def get_pub_pem(node_id: str) -> Optional[str]: | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("SELECT pub_pem FROM ledger WHERE node_id = %s", (node_id,)) | |
| else: | |
| cursor.execute("SELECT pub_pem FROM ledger WHERE node_id = ?", (node_id,)) | |
| row = cursor.fetchone() | |
| _release_conn(conn) | |
| return row[0] if row else None | |
| def get_balance(node_id: str) -> Optional[float]: | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) | |
| else: | |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) | |
| row = cursor.fetchone() | |
| _release_conn(conn) | |
| return row[0] if row else None | |
| def set_balance(node_id: str, balance: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("UPDATE ledger SET balance = %s WHERE node_id = %s", (balance, node_id)) | |
| else: | |
| cursor.execute("UPDATE ledger SET balance = ? WHERE node_id = ?", (balance, node_id)) | |
| conn.commit() | |
| _release_conn(conn) | |
| def add_balance(node_id: str, amount: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("UPDATE ledger SET balance = balance + %s WHERE node_id = %s", (amount, node_id)) | |
| else: | |
| cursor.execute("UPDATE ledger SET balance = balance + ? WHERE node_id = ?", (amount, node_id)) | |
| conn.commit() | |
| _release_conn(conn) | |
| def deduct_balance(node_id: str, amount: float) -> bool: | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute( | |
| "UPDATE ledger SET balance = balance - %s WHERE node_id = %s AND balance >= %s", | |
| (amount, node_id, amount) | |
| ) | |
| else: | |
| cursor.execute( | |
| "UPDATE ledger SET balance = balance - ? WHERE node_id = ? AND balance >= ?", | |
| (amount, node_id, amount) | |
| ) | |
| updated = cursor.rowcount | |
| conn.commit() | |
| _release_conn(conn) | |
| return updated > 0 | |
| def create_task(task_id: str, consumer_id: str, payload: str, bounty: float, status: str, target_node: Optional[str], model_requirement: Optional[str], created_at: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute( | |
| "INSERT INTO tasks (task_id, consumer_id, provider_id, payload, bounty, status, target_node, model_requirement, result_payload, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", | |
| (task_id, consumer_id, None, payload, bounty, status, target_node, model_requirement, None, created_at, created_at) | |
| ) | |
| else: | |
| cursor.execute( | |
| "INSERT INTO tasks (task_id, consumer_id, provider_id, payload, bounty, status, target_node, model_requirement, result_payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | |
| (task_id, consumer_id, None, payload, bounty, status, target_node, model_requirement, None, created_at, created_at) | |
| ) | |
| conn.commit() | |
| _release_conn(conn) | |
| def update_task_assignment(task_id: str, provider_id: str, status: str, updated_at: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute( | |
| "UPDATE tasks SET provider_id = %s, status = %s, updated_at = %s WHERE task_id = %s", | |
| (provider_id, status, updated_at, task_id) | |
| ) | |
| else: | |
| cursor.execute( | |
| "UPDATE tasks SET provider_id = ?, status = ?, updated_at = ? WHERE task_id = ?", | |
| (provider_id, status, updated_at, task_id) | |
| ) | |
| conn.commit() | |
| _release_conn(conn) | |
| def update_task_result(task_id: str, provider_id: str, result_payload: str, status: str, updated_at: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute( | |
| "UPDATE tasks SET provider_id = %s, result_payload = %s, status = %s, updated_at = %s WHERE task_id = %s", | |
| (provider_id, result_payload, status, updated_at, task_id) | |
| ) | |
| else: | |
| cursor.execute( | |
| "UPDATE tasks SET provider_id = ?, result_payload = ?, status = ?, updated_at = ? WHERE task_id = ?", | |
| (provider_id, result_payload, status, updated_at, task_id) | |
| ) | |
| conn.commit() | |
| _release_conn(conn) | |
| def get_task(task_id: str) -> Optional[dict]: | |
| conn = _get_conn() | |
| if not _is_postgres(): | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("SELECT * FROM tasks WHERE task_id = %s", (task_id,)) | |
| else: | |
| cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| _release_conn(conn) | |
| return None | |
| if _is_postgres(): | |
| result = _row_to_dict(cursor, row) | |
| _release_conn(conn) | |
| return result | |
| result = dict(row) | |
| _release_conn(conn) | |
| return result | |
| def get_active_tasks() -> list: | |
| conn = _get_conn() | |
| if not _is_postgres(): | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute("SELECT * FROM tasks WHERE status IN ('bidding', 'assigned')") | |
| else: | |
| cursor.execute("SELECT * FROM tasks WHERE status IN ('bidding', 'assigned')") | |
| rows = cursor.fetchall() | |
| if _is_postgres(): | |
| result = [_row_to_dict(cursor, row) for row in rows] | |
| _release_conn(conn) | |
| return result | |
| result = [dict(row) for row in rows] | |
| _release_conn(conn) | |
| return result | |
| def get_idempotency(node_id: str, endpoint: str, idem_key: str) -> Optional[dict]: | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| if _is_postgres(): | |
| cursor.execute( | |
| "SELECT response, status_code FROM idempotency WHERE node_id = %s AND endpoint = %s AND idem_key = %s", | |
| (node_id, endpoint, idem_key) | |
| ) | |
| else: | |
| cursor.execute( | |
| "SELECT response, status_code FROM idempotency WHERE node_id = ? AND endpoint = ? AND idem_key = ?", | |
| (node_id, endpoint, idem_key) | |
| ) | |
| row = cursor.fetchone() | |
| if not row: | |
| _release_conn(conn) | |
| return None | |
| response = json.loads(row[0]) | |
| result = {"response": response, "status_code": row[1]} | |
| _release_conn(conn) | |
| return result | |
| def set_idempotency(node_id: str, endpoint: str, idem_key: str, response: dict, status_code: int, created_at: float): | |
| conn = _get_conn() | |
| cursor = conn.cursor() | |
| payload = json.dumps(response) | |
| if _is_postgres(): | |
| cursor.execute( | |
| "INSERT INTO idempotency (node_id, endpoint, idem_key, response, status_code, created_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (node_id, endpoint, idem_key) DO NOTHING", | |
| (node_id, endpoint, idem_key, payload, status_code, created_at) | |
| ) | |
| else: | |
| cursor.execute( | |
| "INSERT OR IGNORE INTO idempotency (node_id, endpoint, idem_key, response, status_code, created_at) VALUES (?, ?, ?, ?, ?, ?)", | |
| (node_id, endpoint, idem_key, payload, status_code, created_at) | |
| ) | |
| conn.commit() | |
| _release_conn(conn) | |
| init_db() | |