Spaces:
Runtime error
Runtime error
| """ | |
| Bus Module - Data Transfer Logic | |
| This module simulates memory movement and data transfer logic between | |
| different logical components (SSD, RAM, VRAM) with bandwidth simulation. | |
| """ | |
| import asyncio | |
| import time | |
| import numpy as np | |
| from typing import Dict, Any, Optional, Tuple, List | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| class BusType(Enum): | |
| """Types of data buses in the system.""" | |
| SYSTEM_RAM = "system_ram" | |
| VRAM_BUS = "vram_bus" | |
| STORAGE_BUS = "storage_bus" | |
| PCIE = "pcie" | |
| MEMORY_CONTROLLER = "memory_controller" | |
| class BusSpecification: | |
| """Specifications for a data bus.""" | |
| name: str | |
| bandwidth_gbps: float # Gigabytes per second | |
| latency_ms: float # Milliseconds | |
| max_concurrent_transfers: int | |
| bus_width_bits: int | |
| class TransferRequest: | |
| """Represents a data transfer request.""" | |
| transfer_id: str | |
| transfer_type: TransferType | |
| source_address: int | |
| destination_address: int | |
| size_bytes: int | |
| priority: int = 0 | |
| created_time: float = 0.0 | |
| start_time: float = 0.0 | |
| end_time: float = 0.0 | |
| status: str = "pending" # pending, in_progress, completed, failed | |
| class DataBus: | |
| """Represents a single data bus with bandwidth and latency simulation.""" | |
| def __init__(self, spec: BusSpecification): | |
| self.spec = spec | |
| self.active_transfers: List[TransferRequest] = [] | |
| self.completed_transfers: List[TransferRequest] = [] | |
| self.transfer_queue = asyncio.Queue() | |
| # Statistics | |
| self.total_bytes_transferred = 0 | |
| self.total_transfer_time = 0.0 | |
| self.transfer_count = 0 | |
| async def submit_transfer(self, request: TransferRequest) -> str: | |
| """Submit a transfer request to the bus.""" | |
| request.created_time = time.time() | |
| await self.transfer_queue.put(request) | |
| return request.transfer_id | |
| async def process_transfers(self): | |
| """Process transfer requests with bandwidth and latency simulation.""" | |
| while True: | |
| try: | |
| # Wait for a transfer request | |
| request = await self.transfer_queue.get() | |
| # Check if we can start this transfer (concurrent limit) | |
| if len(self.active_transfers) >= self.spec.max_concurrent_transfers: | |
| # Put it back and wait | |
| await self.transfer_queue.put(request) | |
| await asyncio.sleep(0.001) # Small delay | |
| continue | |
| # Start the transfer | |
| await self._execute_transfer(request) | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| print(f"Error processing transfer on bus {self.spec.name}: {e}") | |
| async def _execute_transfer(self, request: TransferRequest): | |
| """Execute a single transfer with realistic timing.""" | |
| request.status = "in_progress" | |
| request.start_time = time.time() | |
| self.active_transfers.append(request) | |
| try: | |
| # Calculate transfer time based on bandwidth | |
| transfer_time_seconds = request.size_bytes / (self.spec.bandwidth_gbps * 1e9) | |
| # Add latency | |
| total_time = transfer_time_seconds + (self.spec.latency_ms / 1000.0) | |
| # Simulate the transfer delay | |
| await asyncio.sleep(total_time) | |
| # Complete the transfer | |
| request.status = "completed" | |
| request.end_time = time.time() | |
| # Update statistics | |
| self.total_bytes_transferred += request.size_bytes | |
| self.total_transfer_time += total_time | |
| self.transfer_count += 1 | |
| print(f"Transfer {request.transfer_id} completed: " | |
| f"{request.size_bytes:,} bytes in {total_time:.4f}s " | |
| f"({request.size_bytes / (1024**2) / total_time:.2f} MB/s)") | |
| except Exception as e: | |
| request.status = "failed" | |
| print(f"Transfer {request.transfer_id} failed: {e}") | |
| finally: | |
| # Remove from active transfers | |
| if request in self.active_transfers: | |
| self.active_transfers.remove(request) | |
| self.completed_transfers.append(request) | |
| def get_utilization(self) -> float: | |
| """Get current bus utilization (0.0 to 1.0).""" | |
| return len(self.active_transfers) / max(1, self.spec.max_concurrent_transfers) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get bus statistics.""" | |
| avg_transfer_time = self.total_transfer_time / max(1, self.transfer_count) | |
| effective_bandwidth = (self.total_bytes_transferred / (1024**3)) / max(0.001, self.total_transfer_time) | |
| return { | |
| "bus_name": self.spec.name, | |
| "bandwidth_gbps": self.spec.bandwidth_gbps, | |
| "latency_ms": self.spec.latency_ms, | |
| "total_transfers": self.transfer_count, | |
| "total_bytes_transferred": self.total_bytes_transferred, | |
| "total_transfer_time": self.total_transfer_time, | |
| "avg_transfer_time": avg_transfer_time, | |
| "effective_bandwidth_gbps": effective_bandwidth, | |
| "current_utilization": self.get_utilization(), | |
| "active_transfers": len(self.active_transfers), | |
| "queued_transfers": self.transfer_queue.qsize() | |
| } | |
| class BusManager: | |
| """Manages multiple data buses and coordinates transfers between components.""" | |
| def __init__(self): | |
| self.buses: Dict[str, DataBus] = {} | |
| self.transfer_counter = 0 | |
| self.running = False | |
| # Initialize standard buses | |
| self._initialize_standard_buses() | |
| def _initialize_standard_buses(self): | |
| """Initialize standard system buses with realistic specifications.""" | |
| # GDDR7 VRAM Bus (500GB capacity, high bandwidth) | |
| gddr7_spec = BusSpecification( | |
| name="GDDR7_VRAM", | |
| bandwidth_gbps=128.0, # 128 GB/s (realistic for GDDR7) | |
| latency_ms=0.1, # Very low latency | |
| max_concurrent_transfers=16, | |
| bus_width_bits=512 | |
| ) | |
| self.add_bus("vram", gddr7_spec) | |
| # PCIe 5.0 Bus (for GPU-CPU communication) | |
| pcie_spec = BusSpecification( | |
| name="PCIe_5.0_x16", | |
| bandwidth_gbps=64.0, # 64 GB/s for PCIe 5.0 x16 | |
| latency_ms=0.5, # Higher latency than VRAM | |
| max_concurrent_transfers=8, | |
| bus_width_bits=256 | |
| ) | |
| self.add_bus("pcie", pcie_spec) | |
| # System RAM Bus (DDR5) | |
| ddr5_spec = BusSpecification( | |
| name="DDR5_System_RAM", | |
| bandwidth_gbps=51.2, # 51.2 GB/s for DDR5-6400 | |
| latency_ms=0.2, | |
| max_concurrent_transfers=4, | |
| bus_width_bits=128 | |
| ) | |
| self.add_bus("system_ram", ddr5_spec) | |
| # NVMe SSD Bus | |
| nvme_spec = BusSpecification( | |
| name="NVMe_SSD", | |
| bandwidth_gbps=7.0, # 7 GB/s for high-end NVMe | |
| latency_ms=0.1, | |
| max_concurrent_transfers=32, | |
| bus_width_bits=64 | |
| ) | |
| self.add_bus("storage", nvme_spec) | |
| def add_bus(self, bus_id: str, spec: BusSpecification): | |
| """Add a new bus to the system.""" | |
| self.buses[bus_id] = DataBus(spec) | |
| async def start(self): | |
| """Start all bus processing tasks.""" | |
| if self.running: | |
| return | |
| self.running = True | |
| # Start processing tasks for all buses | |
| self.bus_tasks = [] | |
| for bus in self.buses.values(): | |
| task = asyncio.create_task(bus.process_transfers()) | |
| self.bus_tasks.append(task) | |
| print(f"Bus manager started with {len(self.buses)} buses") | |
| async def stop(self): | |
| """Stop all bus processing tasks.""" | |
| if not self.running: | |
| return | |
| self.running = False | |
| # Cancel all bus tasks | |
| for task in self.bus_tasks: | |
| task.cancel() | |
| await asyncio.gather(*self.bus_tasks, return_exceptions=True) | |
| print("Bus manager stopped") | |
| async def transfer_data(self, bus_id: str, transfer_type: TransferType, | |
| source_address: int, destination_address: int, | |
| size_bytes: int, priority: int = 0) -> str: | |
| """Initiate a data transfer on the specified bus.""" | |
| if bus_id not in self.buses: | |
| raise ValueError(f"Bus {bus_id} not found") | |
| transfer_id = f"transfer_{self.transfer_counter}" | |
| self.transfer_counter += 1 | |
| request = TransferRequest( | |
| transfer_id=transfer_id, | |
| transfer_type=transfer_type, | |
| source_address=source_address, | |
| destination_address=destination_address, | |
| size_bytes=size_bytes, | |
| priority=priority | |
| ) | |
| bus = self.buses[bus_id] | |
| await bus.submit_transfer(request) | |
| return transfer_id | |
| async def copy_to_vram(self, source_address: int, vram_address: int, | |
| size_bytes: int) -> str: | |
| """Copy data from system memory to VRAM.""" | |
| return await self.transfer_data( | |
| "vram", TransferType.WRITE, source_address, vram_address, size_bytes | |
| ) | |
| async def copy_from_vram(self, vram_address: int, destination_address: int, | |
| size_bytes: int) -> str: | |
| """Copy data from VRAM to system memory.""" | |
| return await self.transfer_data( | |
| "vram", TransferType.READ, vram_address, destination_address, size_bytes | |
| ) | |
| async def load_from_storage(self, storage_address: int, ram_address: int, | |
| size_bytes: int) -> str: | |
| """Load data from storage to system RAM.""" | |
| return await self.transfer_data( | |
| "storage", TransferType.READ, storage_address, ram_address, size_bytes | |
| ) | |
| async def save_to_storage(self, ram_address: int, storage_address: int, | |
| size_bytes: int) -> str: | |
| """Save data from system RAM to storage.""" | |
| return await self.transfer_data( | |
| "storage", TransferType.WRITE, ram_address, storage_address, size_bytes | |
| ) | |
| def get_bus_stats(self, bus_id: str) -> Optional[Dict[str, Any]]: | |
| """Get statistics for a specific bus.""" | |
| if bus_id in self.buses: | |
| return self.buses[bus_id].get_stats() | |
| return None | |
| def get_all_stats(self) -> Dict[str, Any]: | |
| """Get statistics for all buses.""" | |
| stats = { | |
| "total_buses": len(self.buses), | |
| "running": self.running, | |
| "buses": {} | |
| } | |
| total_bandwidth = 0 | |
| total_utilization = 0 | |
| for bus_id, bus in self.buses.items(): | |
| bus_stats = bus.get_stats() | |
| stats["buses"][bus_id] = bus_stats | |
| total_bandwidth += bus_stats["bandwidth_gbps"] | |
| total_utilization += bus_stats["current_utilization"] | |
| stats["total_bandwidth_gbps"] = total_bandwidth | |
| stats["avg_utilization"] = total_utilization / len(self.buses) if self.buses else 0 | |
| return stats | |
| async def benchmark_bus(self, bus_id: str, test_size_mb: int = 100) -> Dict[str, Any]: | |
| """Benchmark a specific bus with test transfers.""" | |
| if bus_id not in self.buses: | |
| raise ValueError(f"Bus {bus_id} not found") | |
| print(f"Benchmarking bus {bus_id} with {test_size_mb} MB transfers...") | |
| test_size_bytes = test_size_mb * 1024 * 1024 | |
| num_tests = 10 | |
| start_time = time.time() | |
| transfer_ids = [] | |
| # Submit multiple test transfers | |
| for i in range(num_tests): | |
| transfer_id = await self.transfer_data( | |
| bus_id, TransferType.COPY, | |
| i * test_size_bytes, (i + 1000) * test_size_bytes, | |
| test_size_bytes | |
| ) | |
| transfer_ids.append(transfer_id) | |
| # Wait for all transfers to complete | |
| bus = self.buses[bus_id] | |
| while len(bus.active_transfers) > 0 or bus.transfer_queue.qsize() > 0: | |
| await asyncio.sleep(0.1) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| total_data_gb = (test_size_bytes * num_tests) / (1024**3) | |
| effective_bandwidth = total_data_gb / total_time | |
| return { | |
| "bus_id": bus_id, | |
| "test_size_mb": test_size_mb, | |
| "num_transfers": num_tests, | |
| "total_time_seconds": total_time, | |
| "total_data_gb": total_data_gb, | |
| "effective_bandwidth_gbps": effective_bandwidth, | |
| "theoretical_bandwidth_gbps": bus.spec.bandwidth_gbps, | |
| "efficiency_percent": (effective_bandwidth / bus.spec.bandwidth_gbps) * 100 | |
| } | |
| if __name__ == "__main__": | |
| # Test the bus system | |
| async def test_bus_system(): | |
| print("Testing Bus System...") | |
| # Create bus manager | |
| bus_manager = BusManager() | |
| await bus_manager.start() | |
| # Test individual transfers | |
| print("\nTesting individual transfers...") | |
| # Test VRAM transfer (large texture upload) | |
| texture_size = 64 * 1024 * 1024 # 64 MB texture | |
| vram_transfer = await bus_manager.copy_to_vram(0x1000, 0x10000000, texture_size) | |
| print(f"Submitted VRAM transfer: {vram_transfer}") | |
| # Test storage transfer (loading assets) | |
| asset_size = 128 * 1024 * 1024 # 128 MB asset | |
| storage_transfer = await bus_manager.load_from_storage(0x0, 0x2000, asset_size) | |
| print(f"Submitted storage transfer: {storage_transfer}") | |
| # Test PCIe transfer (CPU-GPU communication) | |
| command_size = 4 * 1024 # 4 KB command buffer | |
| pcie_transfer = await bus_manager.transfer_data( | |
| "pcie", TransferType.WRITE, 0x3000, 0x20000000, command_size | |
| ) | |
| print(f"Submitted PCIe transfer: {pcie_transfer}") | |
| # Wait for transfers to complete | |
| print("\nWaiting for transfers to complete...") | |
| await asyncio.sleep(2.0) | |
| # Print statistics | |
| print("\nBus Statistics:") | |
| all_stats = bus_manager.get_all_stats() | |
| for bus_id, bus_stats in all_stats["buses"].items(): | |
| print(f"\n{bus_id}:") | |
| print(f" Bandwidth: {bus_stats["bandwidth_gbps"]:.1f} GB/s") | |
| print(f" Transfers: {bus_stats["total_transfers"]}") | |
| print(f" Data transferred: {bus_stats["total_bytes_transferred"] / (1024**2):.1f} MB") | |
| print(f" Effective bandwidth: {bus_stats["effective_bandwidth_gbps"]:.2f} GB/s") | |
| print(f" Utilization: {bus_stats["current_utilization"]:.1%}") | |
| # Benchmark each bus | |
| print("\nBenchmarking buses...") | |
| for bus_id in ["vram", "pcie", "system_ram", "storage"]: | |
| try: | |
| benchmark_result = await bus_manager.benchmark_bus(bus_id, test_size_mb=50) | |
| print(f"\n{bus_id} benchmark:") | |
| print(f" Effective bandwidth: {benchmark_result["effective_bandwidth_gbps"]:.2f} GB/s") | |
| print(f" Theoretical bandwidth: {benchmark_result["theoretical_bandwidth_gbps"]:.2f} GB/s") | |
| print(f" Efficiency: {benchmark_result["efficiency_percent"]:.1f}%") | |
| except Exception as e: | |
| print(f"Benchmark failed for {bus_id}: {e}") | |
| # Stop bus manager | |
| await bus_manager.stop() | |
| print("\nBus system test completed!") | |
| # Run the test | |
| asyncio.run(test_bus_system()) | |