import sqlite3 import threading class RAMController: def __init__(self, size_bytes, db_path='ram_storage.db'): self.size_bytes = size_bytes self.conn = sqlite3.connect(db_path, check_same_thread=False) self.db_lock = threading.Lock() with self.db_lock: self.conn.execute('''CREATE TABLE IF NOT EXISTS ram_cells ( address INTEGER PRIMARY KEY, data BLOB )''') self.conn.commit() def read(self, address, length): if address < 0 or address + length > self.size_bytes: raise IndexError("Memory access out of bounds") with self.db_lock: cur = self.conn.execute( "SELECT address, data FROM ram_cells WHERE address >= ? AND address < ? ORDER BY address ASC", (address, address + length) ) # Build a bytearray of the requested range result = bytearray([0] * length) for row in cur: addr = row[0] data = row[1] if address <= addr < address + length: result[addr - address] = data[0] if isinstance(data, (bytes, bytearray)) else data return result def write(self, address, data): if address < 0 or address + len(data) > self.size_bytes: raise IndexError("Memory access out of bounds") with self.db_lock: for offset, value in enumerate(data): self.conn.execute( "INSERT OR REPLACE INTO ram_cells (address, data) VALUES (?, ?)", (address + offset, sqlite3.Binary(bytes([value]))) ) self.conn.commit() def close(self): with self.db_lock: if self.conn: self.conn.close() self.conn = None