""" Bus Module - Data Transfer Logic This module simulates memory movement and data transfer logic between different logical components (SSD, RAM, VRAM) with bandwidth simulation. """ import asyncio import time import numpy as np from typing import Dict, Any, Optional, Tuple, List from enum import Enum from dataclasses import dataclass class BusType(Enum): """Types of data buses in the system.""" SYSTEM_RAM = "system_ram" VRAM_BUS = "vram_bus" STORAGE_BUS = "storage_bus" PCIE = "pcie" MEMORY_CONTROLLER = "memory_controller" @dataclass class BusSpecification: """Specifications for a data bus.""" name: str bandwidth_gbps: float # Gigabytes per second latency_ms: float # Milliseconds max_concurrent_transfers: int bus_width_bits: int @dataclass class TransferRequest: """Represents a data transfer request.""" transfer_id: str transfer_type: TransferType source_address: int destination_address: int size_bytes: int priority: int = 0 created_time: float = 0.0 start_time: float = 0.0 end_time: float = 0.0 status: str = "pending" # pending, in_progress, completed, failed class DataBus: """Represents a single data bus with bandwidth and latency simulation.""" def __init__(self, spec: BusSpecification): self.spec = spec self.active_transfers: List[TransferRequest] = [] self.completed_transfers: List[TransferRequest] = [] self.transfer_queue = asyncio.Queue() # Statistics self.total_bytes_transferred = 0 self.total_transfer_time = 0.0 self.transfer_count = 0 async def submit_transfer(self, request: TransferRequest) -> str: """Submit a transfer request to the bus.""" request.created_time = time.time() await self.transfer_queue.put(request) return request.transfer_id async def process_transfers(self): """Process transfer requests with bandwidth and latency simulation.""" while True: try: # Wait for a transfer request request = await self.transfer_queue.get() # Check if we can start this transfer (concurrent limit) if len(self.active_transfers) >= self.spec.max_concurrent_transfers: # Put it back and wait await self.transfer_queue.put(request) await asyncio.sleep(0.001) # Small delay continue # Start the transfer await self._execute_transfer(request) except asyncio.CancelledError: break except Exception as e: print(f"Error processing transfer on bus {self.spec.name}: {e}") async def _execute_transfer(self, request: TransferRequest): """Execute a single transfer with realistic timing.""" request.status = "in_progress" request.start_time = time.time() self.active_transfers.append(request) try: # Calculate transfer time based on bandwidth transfer_time_seconds = request.size_bytes / (self.spec.bandwidth_gbps * 1e9) # Add latency total_time = transfer_time_seconds + (self.spec.latency_ms / 1000.0) # Simulate the transfer delay await asyncio.sleep(total_time) # Complete the transfer request.status = "completed" request.end_time = time.time() # Update statistics self.total_bytes_transferred += request.size_bytes self.total_transfer_time += total_time self.transfer_count += 1 print(f"Transfer {request.transfer_id} completed: " f"{request.size_bytes:,} bytes in {total_time:.4f}s " f"({request.size_bytes / (1024**2) / total_time:.2f} MB/s)") except Exception as e: request.status = "failed" print(f"Transfer {request.transfer_id} failed: {e}") finally: # Remove from active transfers if request in self.active_transfers: self.active_transfers.remove(request) self.completed_transfers.append(request) def get_utilization(self) -> float: """Get current bus utilization (0.0 to 1.0).""" return len(self.active_transfers) / max(1, self.spec.max_concurrent_transfers) def get_stats(self) -> Dict[str, Any]: """Get bus statistics.""" avg_transfer_time = self.total_transfer_time / max(1, self.transfer_count) effective_bandwidth = (self.total_bytes_transferred / (1024**3)) / max(0.001, self.total_transfer_time) return { "bus_name": self.spec.name, "bandwidth_gbps": self.spec.bandwidth_gbps, "latency_ms": self.spec.latency_ms, "total_transfers": self.transfer_count, "total_bytes_transferred": self.total_bytes_transferred, "total_transfer_time": self.total_transfer_time, "avg_transfer_time": avg_transfer_time, "effective_bandwidth_gbps": effective_bandwidth, "current_utilization": self.get_utilization(), "active_transfers": len(self.active_transfers), "queued_transfers": self.transfer_queue.qsize() } class BusManager: """Manages multiple data buses and coordinates transfers between components.""" def __init__(self): self.buses: Dict[str, DataBus] = {} self.transfer_counter = 0 self.running = False # Initialize standard buses self._initialize_standard_buses() def _initialize_standard_buses(self): """Initialize standard system buses with realistic specifications.""" # GDDR7 VRAM Bus (500GB capacity, high bandwidth) gddr7_spec = BusSpecification( name="GDDR7_VRAM", bandwidth_gbps=128.0, # 128 GB/s (realistic for GDDR7) latency_ms=0.1, # Very low latency max_concurrent_transfers=16, bus_width_bits=512 ) self.add_bus("vram", gddr7_spec) # PCIe 5.0 Bus (for GPU-CPU communication) pcie_spec = BusSpecification( name="PCIe_5.0_x16", bandwidth_gbps=64.0, # 64 GB/s for PCIe 5.0 x16 latency_ms=0.5, # Higher latency than VRAM max_concurrent_transfers=8, bus_width_bits=256 ) self.add_bus("pcie", pcie_spec) # System RAM Bus (DDR5) ddr5_spec = BusSpecification( name="DDR5_System_RAM", bandwidth_gbps=51.2, # 51.2 GB/s for DDR5-6400 latency_ms=0.2, max_concurrent_transfers=4, bus_width_bits=128 ) self.add_bus("system_ram", ddr5_spec) # NVMe SSD Bus nvme_spec = BusSpecification( name="NVMe_SSD", bandwidth_gbps=7.0, # 7 GB/s for high-end NVMe latency_ms=0.1, max_concurrent_transfers=32, bus_width_bits=64 ) self.add_bus("storage", nvme_spec) def add_bus(self, bus_id: str, spec: BusSpecification): """Add a new bus to the system.""" self.buses[bus_id] = DataBus(spec) async def start(self): """Start all bus processing tasks.""" if self.running: return self.running = True # Start processing tasks for all buses self.bus_tasks = [] for bus in self.buses.values(): task = asyncio.create_task(bus.process_transfers()) self.bus_tasks.append(task) print(f"Bus manager started with {len(self.buses)} buses") async def stop(self): """Stop all bus processing tasks.""" if not self.running: return self.running = False # Cancel all bus tasks for task in self.bus_tasks: task.cancel() await asyncio.gather(*self.bus_tasks, return_exceptions=True) print("Bus manager stopped") async def transfer_data(self, bus_id: str, transfer_type: TransferType, source_address: int, destination_address: int, size_bytes: int, priority: int = 0) -> str: """Initiate a data transfer on the specified bus.""" if bus_id not in self.buses: raise ValueError(f"Bus {bus_id} not found") transfer_id = f"transfer_{self.transfer_counter}" self.transfer_counter += 1 request = TransferRequest( transfer_id=transfer_id, transfer_type=transfer_type, source_address=source_address, destination_address=destination_address, size_bytes=size_bytes, priority=priority ) bus = self.buses[bus_id] await bus.submit_transfer(request) return transfer_id async def copy_to_vram(self, source_address: int, vram_address: int, size_bytes: int) -> str: """Copy data from system memory to VRAM.""" return await self.transfer_data( "vram", TransferType.WRITE, source_address, vram_address, size_bytes ) async def copy_from_vram(self, vram_address: int, destination_address: int, size_bytes: int) -> str: """Copy data from VRAM to system memory.""" return await self.transfer_data( "vram", TransferType.READ, vram_address, destination_address, size_bytes ) async def load_from_storage(self, storage_address: int, ram_address: int, size_bytes: int) -> str: """Load data from storage to system RAM.""" return await self.transfer_data( "storage", TransferType.READ, storage_address, ram_address, size_bytes ) async def save_to_storage(self, ram_address: int, storage_address: int, size_bytes: int) -> str: """Save data from system RAM to storage.""" return await self.transfer_data( "storage", TransferType.WRITE, ram_address, storage_address, size_bytes ) def get_bus_stats(self, bus_id: str) -> Optional[Dict[str, Any]]: """Get statistics for a specific bus.""" if bus_id in self.buses: return self.buses[bus_id].get_stats() return None def get_all_stats(self) -> Dict[str, Any]: """Get statistics for all buses.""" stats = { "total_buses": len(self.buses), "running": self.running, "buses": {} } total_bandwidth = 0 total_utilization = 0 for bus_id, bus in self.buses.items(): bus_stats = bus.get_stats() stats["buses"][bus_id] = bus_stats total_bandwidth += bus_stats["bandwidth_gbps"] total_utilization += bus_stats["current_utilization"] stats["total_bandwidth_gbps"] = total_bandwidth stats["avg_utilization"] = total_utilization / len(self.buses) if self.buses else 0 return stats async def benchmark_bus(self, bus_id: str, test_size_mb: int = 100) -> Dict[str, Any]: """Benchmark a specific bus with test transfers.""" if bus_id not in self.buses: raise ValueError(f"Bus {bus_id} not found") print(f"Benchmarking bus {bus_id} with {test_size_mb} MB transfers...") test_size_bytes = test_size_mb * 1024 * 1024 num_tests = 10 start_time = time.time() transfer_ids = [] # Submit multiple test transfers for i in range(num_tests): transfer_id = await self.transfer_data( bus_id, TransferType.COPY, i * test_size_bytes, (i + 1000) * test_size_bytes, test_size_bytes ) transfer_ids.append(transfer_id) # Wait for all transfers to complete bus = self.buses[bus_id] while len(bus.active_transfers) > 0 or bus.transfer_queue.qsize() > 0: await asyncio.sleep(0.1) end_time = time.time() total_time = end_time - start_time total_data_gb = (test_size_bytes * num_tests) / (1024**3) effective_bandwidth = total_data_gb / total_time return { "bus_id": bus_id, "test_size_mb": test_size_mb, "num_transfers": num_tests, "total_time_seconds": total_time, "total_data_gb": total_data_gb, "effective_bandwidth_gbps": effective_bandwidth, "theoretical_bandwidth_gbps": bus.spec.bandwidth_gbps, "efficiency_percent": (effective_bandwidth / bus.spec.bandwidth_gbps) * 100 } if __name__ == "__main__": # Test the bus system async def test_bus_system(): print("Testing Bus System...") # Create bus manager bus_manager = BusManager() await bus_manager.start() # Test individual transfers print("\nTesting individual transfers...") # Test VRAM transfer (large texture upload) texture_size = 64 * 1024 * 1024 # 64 MB texture vram_transfer = await bus_manager.copy_to_vram(0x1000, 0x10000000, texture_size) print(f"Submitted VRAM transfer: {vram_transfer}") # Test storage transfer (loading assets) asset_size = 128 * 1024 * 1024 # 128 MB asset storage_transfer = await bus_manager.load_from_storage(0x0, 0x2000, asset_size) print(f"Submitted storage transfer: {storage_transfer}") # Test PCIe transfer (CPU-GPU communication) command_size = 4 * 1024 # 4 KB command buffer pcie_transfer = await bus_manager.transfer_data( "pcie", TransferType.WRITE, 0x3000, 0x20000000, command_size ) print(f"Submitted PCIe transfer: {pcie_transfer}") # Wait for transfers to complete print("\nWaiting for transfers to complete...") await asyncio.sleep(2.0) # Print statistics print("\nBus Statistics:") all_stats = bus_manager.get_all_stats() for bus_id, bus_stats in all_stats["buses"].items(): print(f"\n{bus_id}:") print(f" Bandwidth: {bus_stats["bandwidth_gbps"]:.1f} GB/s") print(f" Transfers: {bus_stats["total_transfers"]}") print(f" Data transferred: {bus_stats["total_bytes_transferred"] / (1024**2):.1f} MB") print(f" Effective bandwidth: {bus_stats["effective_bandwidth_gbps"]:.2f} GB/s") print(f" Utilization: {bus_stats["current_utilization"]:.1%}") # Benchmark each bus print("\nBenchmarking buses...") for bus_id in ["vram", "pcie", "system_ram", "storage"]: try: benchmark_result = await bus_manager.benchmark_bus(bus_id, test_size_mb=50) print(f"\n{bus_id} benchmark:") print(f" Effective bandwidth: {benchmark_result["effective_bandwidth_gbps"]:.2f} GB/s") print(f" Theoretical bandwidth: {benchmark_result["theoretical_bandwidth_gbps"]:.2f} GB/s") print(f" Efficiency: {benchmark_result["efficiency_percent"]:.1f}%") except Exception as e: print(f"Benchmark failed for {bus_id}: {e}") # Stop bus manager await bus_manager.stop() print("\nBus system test completed!") # Run the test asyncio.run(test_bus_system())