File size: 10,383 Bytes
684cc60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

"""
Virtual Flash Module - Simulates NAND Flash behavior
"""
import os
import json
import threading
import time
import random
import base64
from typing import Dict, Optional, Tuple
from builtins import open

class VirtualFlash:
    """
    Simulates NAND Flash behavior with pages, blocks, and wear leveling.
    Stores data entirely in-memory and provides snapshot functionality for persistence.
    """
    
    def __init__(self, capacity_gb: int = 2048, page_size: int = 4096, pages_per_block: int = 256):
        self.capacity_gb = capacity_gb
        self.page_size = page_size  # 4KB pages
        self.pages_per_block = pages_per_block  # 256 pages per block
        self.block_size = page_size * pages_per_block  # 1MB blocks
        
        # Calculate total pages and blocks
        self.total_bytes = capacity_gb * 1024 * 1024 * 1024
        self.total_pages = self.total_bytes // page_size
        self.total_blocks = self.total_pages // pages_per_block
        
        # In-memory storage for flash pages: {page_number: data_bytes}
        self.pages: Dict[int, bytes] = {}
        
        # In-memory storage for block metadata: {block_number: {'erase_count': int, 'bad_block': bool, 'last_erase_time': float}}
        self.block_metadata: Dict[int, Dict] = {}
        
        # Global metadata
        self.metadata = {
            "total_writes": 0,
            "total_erases": 0,
            "bad_blocks": [],
            "wear_leveling_threshold": 1000,
            "power_cycles": 0
        }
        
        # Thread lock for thread safety
        self.lock = threading.RLock()
        
        print(f"VirtualFlash initialized: {capacity_gb}GB, {self.total_pages:,} pages, {self.total_blocks:,} blocks")
    
    def get_page_address(self, page_number: int) -> Tuple[int, int]:
        """Convert page number to block and page within block."""
        block_number = page_number // self.pages_per_block
        page_in_block = page_number % self.pages_per_block
        return block_number, page_in_block
    
    def get_page_number(self, block_number: int, page_in_block: int) -> int:
        """Convert block and page within block to page number."""
        return block_number * self.pages_per_block + page_in_block
    
    def write_page(self, page_number: int, data: bytes) -> bool:
        """
        Write data to a specific page.
        Simulates NAND flash write behavior.
        """
        if page_number >= self.total_pages:
            raise ValueError(f"Page number {page_number} exceeds capacity")
        
        if len(data) > self.page_size:
            raise ValueError(f"Data size {len(data)} exceeds page size {self.page_size}")
        
        # Pad data to page size
        if len(data) < self.page_size:
            data = data + b'\x00' * (self.page_size - len(data))
        
        block_number, page_in_block = self.get_page_address(page_number)
        
        with self.lock:
            # Check if block is bad
            if block_number in self.metadata.get("bad_blocks", []):
                return False
            
            # Simulate write delay
            time.sleep(0.0001)  # 0.1ms write delay
            
            # Update page data
            self.pages[page_number] = data
            
            # Update block metadata (initialize if not exists)
            if block_number not in self.block_metadata:
                self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()}
            
            # Update global metadata
            self.metadata["total_writes"] += 1
            
            # Simulate wear (very small chance of block going bad)
            if random.random() < 0.000001:  # 1 in 1,000,000 chance
                self.metadata["bad_blocks"].append(block_number)
                self.block_metadata[block_number]['bad_block'] = True
                print(f"Block {block_number} marked as bad due to wear")
            
            return True
    
    def read_page(self, page_number: int) -> Optional[bytes]:
        """
        Read data from a specific page.
        Returns None if page doesn't exist or is invalid.
        """
        if page_number >= self.total_pages:
            raise ValueError(f"Page number {page_number} exceeds capacity")
        
        with self.lock:
            # Simulate read delay
            time.sleep(0.00005)  # 0.05ms read delay
            
            # Check if block is bad
            block_number, _ = self.get_page_address(page_number)
            if self.block_metadata.get(block_number, {}).get('bad_block', False):
                return b'\x00' * self.page_size # Return empty for bad blocks

            return self.pages.get(page_number, b'\x00' * self.page_size)
    
    def erase_block(self, block_number: int) -> bool:
        """
        Erase an entire block (mark all pages as invalid).
        Simulates block erase operation.
        """
        if block_number >= self.total_blocks:
            raise ValueError(f"Block number {block_number} exceeds capacity")
        
        with self.lock:
            # Check if block is bad
            if self.block_metadata.get(block_number, {}).get('bad_block', False):
                return False
            
            # Simulate erase delay (much longer than write)
            time.sleep(0.002)  # 2ms erase delay
            
            # Remove pages belonging to this block
            pages_to_remove = [p_num for p_num in self.pages if self.get_page_address(p_num)[0] == block_number]
            for p_num in pages_to_remove:
                del self.pages[p_num]
            
            # Update block metadata
            if block_number not in self.block_metadata:
                self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()}
            self.block_metadata[block_number]['erase_count'] += 1
            self.block_metadata[block_number]['last_erase_time'] = time.time()
            
            # Update global metadata
            self.metadata["total_erases"] += 1
            
            return True
    
    def get_block_info(self, block_number: int) -> Dict:
        """Get information about a specific block."""
        with self.lock:
            block_meta = self.block_metadata.get(block_number, {'erase_count': 0, 'bad_block': False, 'last_erase_time': 0})
            
            valid_pages = sum(1 for p_num in self.pages if self.get_page_address(p_num)[0] == block_number)
            
            return {
                "block_number": block_number,
                "erase_count": block_meta['erase_count'],
                "is_bad": block_meta['bad_block'],
                "last_erase_time": block_meta['last_erase_time'],
                "valid_pages": valid_pages,
                "total_pages": self.pages_per_block
            }
    
    def get_flash_stats(self) -> Dict:
        """Get overall flash statistics."""
        with self.lock:
            written_pages = len(self.pages)
            used_blocks = len(set(self.get_page_address(p_num)[0] for p_num in self.pages))
            
            used_bytes = written_pages * self.page_size
            usage_percent = (used_bytes / self.total_bytes) * 100 if self.total_bytes > 0 else 0
            
            return {
                "capacity_gb": self.capacity_gb,
                "total_bytes": self.total_bytes,
                "used_bytes": used_bytes,
                "free_bytes": self.total_bytes - used_bytes,
                "usage_percent": usage_percent,
                "total_pages": self.total_pages,
                "written_pages": written_pages,
                "free_pages": self.total_pages - written_pages,
                "total_blocks": self.total_blocks,
                "used_blocks": used_blocks,
                "free_blocks": self.total_blocks - used_blocks,
                "bad_blocks": len(self.metadata.get("bad_blocks", [])),
                "total_erases": self.metadata.get("total_erases", 0)
            }


    def export_snapshot(self) -> str:
        """
        Exports the current state of the virtual flash as a base64 encoded JSON string.
        """
        with self.lock:
            snapshot_data = {
                "pages": {str(k): base64.b64encode(v).decode("utf-8") for k, v in self.pages.items()},
                "block_metadata": self.block_metadata,
                "metadata": self.metadata,
                "capacity_gb": self.capacity_gb,
                "page_size": self.page_size,
                "pages_per_block": self.pages_per_block
            }
            return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8")

    def mount_snapshot(self, snapshot_json_or_url: str):
        """
        Loads the state of the virtual flash from a base64 encoded JSON string.
        """
        with self.lock:
            try:
                decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8")
                snapshot_data = json.loads(decoded_data)
                
                self.capacity_gb = snapshot_data.get("capacity_gb", self.capacity_gb)
                self.page_size = snapshot_data.get("page_size", self.page_size)
                self.pages_per_block = snapshot_data.get("pages_per_block", self.pages_per_block)

                self.total_bytes = self.capacity_gb * 1024 * 1024 * 1024
                self.total_pages = self.total_bytes // self.page_size
                self.total_blocks = self.total_pages // self.pages_per_block

                self.pages = {int(k): base64.b64decode(v) for k, v in snapshot_data["pages"].items()}
                self.block_metadata = snapshot_data["block_metadata"]
                self.metadata = snapshot_data["metadata"]
                print("VirtualFlash state loaded from snapshot.")
            except Exception as e:
                print(f"Error loading snapshot: {e}")
                raise




    def shutdown(self):
        """
        No specific shutdown needed for in-memory flash, as state is not persisted to disk automatically.
        """
        print("VirtualFlash shutdown complete")
    
    def __del__(self):
        """
        Destructor to ensure proper cleanup.
        """
        try:
            self.shutdown()
        except:
            pass