Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| import time | |
| from typing import Callable, TypeVar | |
| T = TypeVar("T") | |
| def run_tx( | |
| conn: sqlite3.Connection, | |
| fn: Callable[[sqlite3.Connection], T], | |
| retries: int = 5, | |
| delay: float = 0.2, | |
| ) -> T: | |
| last_exc = None | |
| for attempt in range(retries): | |
| try: | |
| result = fn(conn) | |
| conn.commit() | |
| return result | |
| except sqlite3.OperationalError as e: | |
| conn.rollback() | |
| last_exc = e | |
| if "locked" in str(e).lower(): | |
| time.sleep(delay * (attempt + 1)) | |
| continue | |
| raise | |
| raise last_exc | |