|
|
|
|
|
import sqlite3
|
|
|
import threading
|
|
|
|
|
|
class RAMController:
|
|
|
def __init__(self, size_bytes, db_path='ram_storage.db'):
|
|
|
self.size_bytes = size_bytes
|
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
|
self.db_lock = threading.Lock()
|
|
|
with self.db_lock:
|
|
|
self.conn.execute('''CREATE TABLE IF NOT EXISTS ram_cells (
|
|
|
address INTEGER PRIMARY KEY,
|
|
|
data BLOB
|
|
|
)''')
|
|
|
self.conn.commit()
|
|
|
|
|
|
def read(self, address, length):
|
|
|
if address < 0 or address + length > self.size_bytes:
|
|
|
raise IndexError("Memory access out of bounds")
|
|
|
with self.db_lock:
|
|
|
cur = self.conn.execute(
|
|
|
"SELECT address, data FROM ram_cells WHERE address >= ? AND address < ? ORDER BY address ASC",
|
|
|
(address, address + length)
|
|
|
)
|
|
|
|
|
|
result = bytearray([0] * length)
|
|
|
for row in cur:
|
|
|
addr = row[0]
|
|
|
data = row[1]
|
|
|
if address <= addr < address + length:
|
|
|
result[addr - address] = data[0] if isinstance(data, (bytes, bytearray)) else data
|
|
|
return result
|
|
|
|
|
|
def write(self, address, data):
|
|
|
if address < 0 or address + len(data) > self.size_bytes:
|
|
|
raise IndexError("Memory access out of bounds")
|
|
|
with self.db_lock:
|
|
|
for offset, value in enumerate(data):
|
|
|
self.conn.execute(
|
|
|
"INSERT OR REPLACE INTO ram_cells (address, data) VALUES (?, ?)",
|
|
|
(address + offset, sqlite3.Binary(bytes([value])))
|
|
|
)
|
|
|
self.conn.commit()
|
|
|
|
|
|
def close(self):
|
|
|
with self.db_lock:
|
|
|
if self.conn:
|
|
|
self.conn.close()
|
|
|
self.conn = None
|
|
|
|
|
|
|
|
|
|