Spaces:
Runtime error
Runtime error
File size: 629 Bytes
fe90c54 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import sqlite3
import time
from typing import Callable, TypeVar
T = TypeVar("T")
def run_tx(
conn: sqlite3.Connection,
fn: Callable[[sqlite3.Connection], T],
retries: int = 5,
delay: float = 0.2,
) -> T:
last_exc = None
for attempt in range(retries):
try:
result = fn(conn)
conn.commit()
return result
except sqlite3.OperationalError as e:
conn.rollback()
last_exc = e
if "locked" in str(e).lower():
time.sleep(delay * (attempt + 1))
continue
raise
raise last_exc
|