Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| import threading | |
| class RAMController: | |
| def __init__(self, size_bytes, db_path='ram_storage.db'): | |
| self.size_bytes = size_bytes | |
| self.conn = sqlite3.connect(db_path, check_same_thread=False) | |
| self.db_lock = threading.Lock() | |
| with self.db_lock: | |
| self.conn.execute('''CREATE TABLE IF NOT EXISTS ram_cells ( | |
| address INTEGER PRIMARY KEY, | |
| data BLOB | |
| )''') | |
| self.conn.commit() | |
| def read(self, address, length): | |
| if address < 0 or address + length > self.size_bytes: | |
| raise IndexError("Memory access out of bounds") | |
| with self.db_lock: | |
| cur = self.conn.execute( | |
| "SELECT address, data FROM ram_cells WHERE address >= ? AND address < ? ORDER BY address ASC", | |
| (address, address + length) | |
| ) | |
| # Build a bytearray of the requested range | |
| result = bytearray([0] * length) | |
| for row in cur: | |
| addr = row[0] | |
| data = row[1] | |
| if address <= addr < address + length: | |
| result[addr - address] = data[0] if isinstance(data, (bytes, bytearray)) else data | |
| return result | |
| def write(self, address, data): | |
| if address < 0 or address + len(data) > self.size_bytes: | |
| raise IndexError("Memory access out of bounds") | |
| with self.db_lock: | |
| for offset, value in enumerate(data): | |
| self.conn.execute( | |
| "INSERT OR REPLACE INTO ram_cells (address, data) VALUES (?, ?)", | |
| (address + offset, sqlite3.Binary(bytes([value]))) | |
| ) | |
| self.conn.commit() | |
| def close(self): | |
| with self.db_lock: | |
| if self.conn: | |
| self.conn.close() | |
| self.conn = None | |