Fred808 commited on
Commit
daabb36
·
verified ·
1 Parent(s): 2782410

Update qemu/web_interface.py

Browse files
Files changed (1) hide show
  1. qemu/web_interface.py +722 -106
qemu/web_interface.py CHANGED
@@ -1,130 +1,746 @@
1
  """
2
- Web Interface for QEMU VM
3
- Provides browser-based access to VM display
4
  """
5
 
6
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
7
- from fastapi.staticfiles import StaticFiles
8
- from fastapi.responses import HTMLResponse
9
  import asyncio
 
 
 
 
10
  import logging
11
  from pathlib import Path
12
- from typing import Optional, Dict
13
- import json
14
- import os
15
- import sys
16
-
17
- # Add the parent directory to the Python path
18
- sys.path.insert(0, str(Path(__file__).parent.parent))
19
-
20
- from qemu.qemu_manager import QEMUManager
21
 
22
  # Configure logging
23
  logging.basicConfig(level=logging.DEBUG)
24
  logger = logging.getLogger(__name__)
25
 
26
- app = FastAPI()
27
-
28
- # Mount static files
29
- static_path = Path(__file__).parent / "static"
30
- app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
31
-
32
- # Global QEMU manager instance
33
- qemu: Optional[QEMUManager] = None
34
-
35
- class VMConnection:
36
-
37
- def __init__(self):
38
- self.active_connections: Dict[int, WebSocket] = {}
39
 
40
- async def connect(self, websocket: WebSocket):
41
- await websocket.accept()
42
- client_id = id(websocket)
43
- self.active_connections[client_id] = websocket
44
- return client_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
- def disconnect(self, client_id: int):
47
- self.active_connections.pop(client_id, None)
 
 
 
 
 
48
 
49
- async def broadcast(self, message: str):
50
- for ws in self.active_connections.values():
51
- try:
52
- await ws.send_text(message)
53
- except:
54
- continue
55
 
56
- vm_connection = VMConnection()
 
 
 
 
 
 
57
 
58
- @app.get("/")
59
- async def get_index():
60
- """Serve main HTML interface"""
61
- html_path = static_path / "index.html"
62
- return HTMLResponse(content=html_path.read_text())
63
 
64
- @app.websocket("/ws/vm")
65
- async def vm_websocket(websocket: WebSocket):
66
- """Handle VM display and control WebSocket"""
67
- client_id = await vm_connection.connect(websocket)
68
 
69
- try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  while True:
71
- message = await websocket.receive_json()
72
-
73
- if message["type"] == "install":
74
- # Start fresh OS installation
75
- if qemu:
76
- vnc_port = await qemu.install_os(message["iso_url"])
77
- await websocket.send_json({
78
- "type": "vnc_info",
79
- "port": vnc_port
80
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
- elif message["type"] == "boot":
83
- # Boot existing OS
84
- if qemu:
85
- try:
86
- vnc_port = await qemu.boot_os()
87
- await websocket.send_json({
88
- "type": "vnc_info",
89
- "port": vnc_port
90
- })
91
- except FileNotFoundError:
92
- await websocket.send_json({
93
- "type": "error",
94
- "message": "No OS installation found"
95
- })
96
-
97
- elif message["type"] == "shutdown":
98
- # Shutdown VM
99
- if qemu:
100
- await qemu.shutdown()
101
- await websocket.send_json({
102
- "type": "status",
103
- "message": "VM shut down"
104
- })
105
 
106
- except WebSocketDisconnect:
107
- vm_connection.disconnect(client_id)
108
-
109
- @app.on_event("startup")
110
- async def startup_event():
111
- """Initialize QEMU manager on startup"""
112
- global qemu
113
- qemu = QEMUManager()
114
- logger.info("QEMU manager initialized")
115
-
116
- @app.on_event("shutdown")
117
- async def shutdown_event():
118
- """Cleanup on shutdown"""
119
- if qemu:
120
- await qemu.shutdown()
121
- logger.info("QEMU manager shut down")
122
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
- def main():
125
- """Entry point for the FServe application"""
126
- import uvicorn
127
- uvicorn.run(app, host="0.0.0.0", port=8080)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
 
129
  if __name__ == "__main__":
130
- main()
 
1
  """
2
+ QEMU Virtual Machine Manager with Direct DuckDB Storage and Virtual Hardware Integration
3
+ Manages VM state, virtual GPU, VRAM, and CPU states through remote DuckDB backend
4
  """
5
 
6
+ import os
7
+ import sys
 
8
  import asyncio
9
+ import subprocess
10
+ import time
11
+ import socket
12
+ import json
13
  import logging
14
  from pathlib import Path
15
+ from typing import Dict, Optional, List, Union, Any
16
+ from datetime import datetime
 
 
 
 
 
 
 
17
 
18
  # Configure logging
19
  logging.basicConfig(level=logging.DEBUG)
20
  logger = logging.getLogger(__name__)
21
 
22
+ class QMPClient:
23
+ """Simple QMP (QEMU Machine Protocol) client implementation"""
24
+
25
+ def __init__(self, address: str = 'localhost', port: int = 4444):
26
+ self.address = address
27
+ self.port = port
28
+ self.sock: Optional[socket.socket] = None
 
 
 
 
 
 
29
 
30
+ async def connect(self):
31
+ """Connect to QEMU's QMP interface"""
32
+ try:
33
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
34
+ self.sock.connect((self.address, self.port))
35
+ greeting = await self._receive()
36
+ await self._execute("qmp_capabilities")
37
+ return greeting
38
+ except Exception as e:
39
+ logger.error(f"QMP connection failed: {e}")
40
+ if self.sock:
41
+ self.sock.close()
42
+ raise
43
+
44
+ async def _execute(self, cmd: str, args: Dict = None) -> Dict[str, Any]:
45
+ """Execute a QMP command"""
46
+ if not self.sock:
47
+ raise RuntimeError("Not connected")
48
+
49
+ request = {"execute": cmd}
50
+ if args:
51
+ request["arguments"] = args
52
+
53
+ self.sock.send(json.dumps(request).encode() + b'\n')
54
+ return await self._receive()
55
 
56
+ async def _receive(self) -> Dict[str, Any]:
57
+ """Receive and parse QMP response"""
58
+ if not self.sock:
59
+ raise RuntimeError("Not connected")
60
+
61
+ data = self.sock.recv(4096).decode()
62
+ return json.loads(data)
63
 
64
+ def close(self):
65
+ """Close QMP connection"""
66
+ if self.sock:
67
+ self.sock.close()
68
+ self.sock = None
 
69
 
70
+ # Import local modules using absolute imports
71
+ from FServe.virtual_gpu_driver.src.driver_api import GPUError, VirtualGPUDriver
72
+ from FServe.virtual_gpu_driver.src.memory.duckdb_memory_manager import DuckDBMemoryManager
73
+ from FServe.virtual_gpu_driver.src.hal.hal import HardwareAbstractionLayer
74
+ from FServe.vram.remote_storage import RemoteStorageManager
75
+ from FServe.config import get_db_url, get_hf_token_cached
76
+ from FServe.cpu.enhanced_cpu import EnhancedCPU, CPUGroupType, VirtualCPU, CPUInstruction, InstructionType
77
 
78
+ # Configure logging
79
+ logging.basicConfig(level=logging.DEBUG)
80
+ logger = logging.getLogger(__name__)
 
 
81
 
82
+ class QEMUManager:
 
 
 
83
 
84
+ def __init__(self, config_path: str = "config.json"):
85
+ self.config = self.load_config(config_path)
86
+ self.vm_process: Optional[subprocess.Popen] = None
87
+ self.monitor_socket = None
88
+ self.vnc_port = 5900
89
+ self.memory_service = None
90
+
91
+ # Initialize storage and hardware components
92
+ self.db_url = "hf://datasets/Fred808/helium/storage.json"
93
+ self.con = self._init_db_connection()
94
+ self.memory_manager = DuckDBMemoryManager(self.db_url)
95
+ self.storage_manager = RemoteStorageManager(self.db_url)
96
+
97
+ # Memory service socket path
98
+ self.memory_socket = "/tmp/memory-backend.sock"
99
+
100
+ def _init_db_connection(self) -> duckdb.DuckDBPyConnection:
101
+ """Initialize database connection with HuggingFace configuration"""
102
+ con = duckdb.connect(self.db_url)
103
+
104
+ # Configure HuggingFace access
105
+ con.execute("INSTALL httpfs;")
106
+ con.execute("LOAD httpfs;")
107
+ con.execute("SET s3_endpoint='hf.co';")
108
+ con.execute("SET s3_use_ssl=true;")
109
+ con.execute("SET s3_url_style='path';")
110
+ con.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';")
111
+ con.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';")
112
+
113
+ return con
114
+
115
+ # Initialize virtual GPU driver with our architecture
116
+ self.gpu_driver = VirtualGPUDriver(
117
+ num_gpus=8, # 8 virtual GPUs
118
+ num_sms_per_gpu=1500, # 1500 SMs per GPU
119
+ cores_per_sm=128 # 128 cores per SM
120
+ )
121
+
122
+ self.init_virtual_hardware()
123
+
124
+ # MMIO regions for device communication
125
+ self.mmio_regions = {
126
+ 'gpu_cmd': 0xF0000000,
127
+ 'gpu_fb': 0xF1000000,
128
+ 'gpu_status': 0xF3000000
129
+ }
130
+
131
+ def init_virtual_hardware(self):
132
+ """Initialize virtual hardware components"""
133
+ self._init_cpu_state()
134
+ self._init_virtual_gpu()
135
+ self._init_virtual_vram()
136
+
137
+ def _init_cpu_state(self):
138
+ """Initialize CPU state tracking with EnhancedCPU and GPU integration"""
139
+ self.cpu_groups = {
140
+ CPUGroupType.UI_DISPLAY: (0, 499),
141
+ CPUGroupType.COMPUTATION: (500, 999),
142
+ CPUGroupType.IO_STORAGE: (1000, 1499),
143
+ CPUGroupType.SYSTEM_TASKS: (1500, 1999)
144
+ }
145
+
146
+ # Initialize HAL for GPU integration
147
+ self.hal = HardwareAbstractionLayer()
148
+
149
+ # Initialize virtual CPUs for each group
150
+ self.virtual_cpus = {}
151
+ for group_type, (start, end) in self.cpu_groups.items():
152
+ for cpu_id in range(start, end + 1):
153
+ # Create CPU with shared GPU driver instance
154
+ cpu = EnhancedCPU(
155
+ cpu_id=cpu_id,
156
+ group_type=group_type,
157
+ gpu_driver=self.gpu_driver
158
+ )
159
+ self.virtual_cpus[cpu_id] = cpu
160
+
161
+ # Initialize state tracking in DuckDB
162
+ self.con.execute("""
163
+ CREATE TABLE IF NOT EXISTS cpu_states (
164
+ cpu_id INTEGER,
165
+ core_id INTEGER,
166
+ thread_id INTEGER,
167
+ state JSON,
168
+ registers JSON,
169
+ last_instruction INTEGER,
170
+ group_type VARCHAR,
171
+ timestamp TIMESTAMP,
172
+ PRIMARY KEY (cpu_id, core_id, thread_id)
173
+ )
174
+ """)
175
+
176
+ def _init_virtual_gpu(self):
177
+ """Initialize virtual GPU state"""
178
+ self.con.execute("""
179
+ CREATE TABLE IF NOT EXISTS gpu_state (
180
+ device_id INTEGER PRIMARY KEY,
181
+ command_buffer JSON,
182
+ framebuffer BLOB,
183
+ status JSON,
184
+ timestamp TIMESTAMP
185
+ )
186
+ """)
187
+
188
+ def _init_virtual_vram(self):
189
+ """Initialize virtual VRAM mapping"""
190
+ self.con.execute("""
191
+ CREATE TABLE IF NOT EXISTS vram_mapping (
192
+ address BIGINT PRIMARY KEY,
193
+ size INTEGER,
194
+ content BLOB,
195
+ flags INTEGER,
196
+ last_access TIMESTAMP
197
+ )
198
+ """)
199
+
200
+ def load_config(self, config_path: str) -> Dict:
201
+ """Load VM configuration"""
202
+ with open(config_path) as f:
203
+ config = json.load(f)
204
+
205
+ # Hardware-based configuration limits for 2025 server hardware
206
+ max_cpus = 8 # Maximum physical CPU sockets (high-end server)
207
+ max_cores = 128 # Maximum cores per CPU (e.g., future EPYC)
208
+ max_threads = 2 # Standard hyperthreading (2 threads per core)
209
+ max_memory = '8192G' # 8TB RAM maximum for 2025 server
210
+ max_disk = '16384G' # 16TB maximum storage
211
+ max_vram = '192G' # Maximum VRAM (future datacenter GPU)
212
+ max_fps = 144 # Maximum refresh rate supported
213
+
214
+ # Validate and set defaults with hardware limits
215
+ cpu_count = min(int(config.get('cpus', max_cpus)), max_cpus)
216
+ core_count = min(int(config.get('cores_per_cpu', max_cores)), max_cores)
217
+ thread_count = min(int(config.get('threads_per_core', max_threads)), max_threads)
218
+ fps = min(int(config.get('fps', 60)), max_fps) # Default to 60 FPS, cap at max_fps
219
+
220
+ # Convert memory sizes to GB for validation
221
+ def parse_size(size_str):
222
+ if isinstance(size_str, (int, float)):
223
+ return size_str
224
+ unit = size_str[-1].upper()
225
+ value = float(size_str[:-1])
226
+ if unit == 'T': value *= 1024
227
+ elif unit == 'G': value = value
228
+ elif unit == 'M': value /= 1024
229
+ return value
230
+
231
+ mem_size = min(parse_size(config.get('memory', max_memory)), parse_size(max_memory))
232
+ disk_size = min(parse_size(config.get('disk_size', max_disk)), parse_size(max_disk))
233
+ vram_size = min(parse_size(config.get('vgpu_memory', max_vram)), parse_size(max_vram))
234
+
235
+ # Set validated configuration
236
+ config.update({
237
+ 'cpus': cpu_count, # Number of CPU sockets
238
+ 'cores_per_cpu': core_count, # Cores per CPU
239
+ 'threads_per_core': thread_count, # Threads per core
240
+ 'memory': f'{int(mem_size)}G', # System memory
241
+ 'disk_size': f'{int(disk_size)}G', # Virtual disk size
242
+ 'vgpu_memory': f'{int(vram_size)}G', # VRAM size
243
+ 'fps': fps, # Display refresh rate
244
+ })
245
+
246
+ # Log actual configuration
247
+ total_vcpus = cpu_count * core_count * thread_count
248
+ logger.info(f"VM Configuration:")
249
+ logger.info(f" - Total vCPUs: {total_vcpus} ({cpu_count} sockets × {core_count} cores × {thread_count} threads)")
250
+ logger.info(f" - Memory: {config['memory']}")
251
+ logger.info(f" - Disk Size: {config['disk_size']}")
252
+ logger.info(f" - VGPU Memory: {config['vgpu_memory']}")
253
+
254
+ return config
255
+
256
+ def create_virtual_disk(self) -> str:
257
+ """Create virtual disk using DuckDB backend"""
258
+ logger.info("Initializing DuckDB-backed virtual disk")
259
+
260
+ # Initialize disk structure in DuckDB
261
+ self.con.execute("""
262
+ CREATE TABLE IF NOT EXISTS virtual_disk (
263
+ sector_id BIGINT PRIMARY KEY,
264
+ data BLOB,
265
+ flags INTEGER,
266
+ last_access TIMESTAMP
267
+ )
268
+ """)
269
+
270
+ # Create disk mapping in memory
271
+ self.memory_manager.conn.execute("""
272
+ CREATE TABLE IF NOT EXISTS disk_mapping (
273
+ virtual_address BIGINT PRIMARY KEY,
274
+ sector_id BIGINT,
275
+ permissions INTEGER,
276
+ mapped_time TIMESTAMP
277
+ )
278
+ """)
279
+
280
+ # Return special URL for QEMU to access our virtual disk
281
+ return f"duckdb://{self.db_url}?table=virtual_disk"
282
+
283
+ def get_qemu_command(self, disk_path: str, iso_path: Optional[str] = None) -> str:
284
+ """Generate QEMU command with all hardware configuration"""
285
+ # Calculate maximum CPUs for our virtual hardware
286
+ total_cpus = (
287
+ int(self.config.get('max_cpus', 8)) *
288
+ int(self.config.get('max_cores', 128)) *
289
+ int(self.config.get('max_threads', 2))
290
+ )
291
+
292
+ # Start our virtual memory management service
293
+ self._start_memory_service()
294
+
295
+ cmd = [
296
+ "qemu-system-x86_64",
297
+
298
+ # Machine configuration - using our virtual hardware
299
+ "-machine microvm", # Lightweight VM without emulated BIOS/hardware
300
+ "-nodefaults", # Don't create default devices
301
+ "-no-acpi", # Using our own ACPI implementation
302
+
303
+ # Enhanced Virtual CPU configuration
304
+ "-cpu", "custom,vendor=Virtual,family=2,model=1,stepping=1",
305
+ "-cpu-cores", f"{self.config['cores_per_cpu']}",
306
+ "-cpu-threads", f"{self.config['threads_per_core']}",
307
+ "-cpu-features", "virtual-insns=on,virtual-mmu=on,virtual-timer=on", # Virtual CPU features
308
+ "-cpu-grid", "enhanced-grid=on,cpu-groups=4", # Enhanced CPU features
309
+ "-cpu-group-sizes", f"{','.join(str(end-start+1) for start, end in self.cpu_groups.values())}",
310
+ "-no-hpet", "-no-tsc" # Using pure virtual timing
311
+
312
+ # CPU topology using our virtual cores
313
+ "-smp", f"cpus={self.config['cpus']},"
314
+ f"cores={self.config['cores_per_cpu']},"
315
+ f"threads={self.config['threads_per_core']},"
316
+ f"sockets=1,maxcpus={total_cpus}"
317
+
318
+ # Memory configuration using our virtual memory manager
319
+ "-object memory-backend-ram,"
320
+ f"size={self.config['memory']},id=ram0",
321
+ "-object rng-random,filename=/dev/urandom,id=rng0",
322
+ "-device virtio-rng-pci,rng=rng0,bus=pcie.0",
323
+ "-numa node,memdev=ram0,nodeid=0",
324
+
325
+ # Enable our memory management daemon
326
+ "-chardev socket,id=memory-backend,"
327
+ "path=/tmp/memory-backend.sock,server=on,wait=off",
328
+ "-object memory-backend-proxy,id=mem1,"
329
+ "chardev=memory-backend,size=8G",
330
+
331
+ # Pure virtual GPU configuration
332
+ "-device virtio-gpu-pci," # Use virtio for virtual GPU
333
+ "id=gpu0,max_outputs=1," # Single display output
334
+ "virtual-gpu=on," # Enable virtual GPU mode
335
+ f"bus=pcie.0,addr=0x2,"
336
+ "virtual-render-api=custom," # Use our custom render API
337
+ f"virtual-gpu-path={self.hal.get_gpu_path()}," # Virtual device path
338
+
339
+ # VRAM configuration using IVSHMEM
340
+ "-object memory-backend-file,"
341
+ f"size={self.config['vgpu_memory']},"
342
+ "id=vram0,share=on",
343
+ "-device ivshmem-plain,"
344
+ "memdev=vram0,"
345
+ f"bus=pcie.0,addr=0x3",
346
+
347
+ # MMIO region mapping
348
+ "-device ivshmem-doorbell,"
349
+ f"vectors={len(self.mmio_regions)},"
350
+ "id=shmem0",
351
+
352
+ # Storage configuration
353
+ f"-drive file={disk_path},"
354
+ "format=raw," # Use raw format for DuckDB backend
355
+ "if=none,id=drive0,"
356
+ "aio=native,cache.direct=on", # Enable direct I/O
357
+ "-device virtio-blk-pci,"
358
+ "drive=drive0,bootindex=1",
359
+
360
+ # Network configuration
361
+ "-netdev user,id=net0",
362
+ "-device virtio-net-pci,netdev=net0",
363
+
364
+ # Display and monitoring
365
+ "-display none", # Headless mode
366
+ f"-vnc :{self.vnc_port - 5900}", # VNC for remote access
367
+ "-monitor unix:qemu-monitor-socket,server,nowait",
368
+
369
+ # Debug and performance options
370
+ "-no-user-config", # Don't load user config
371
+ "-nodefaults", # Don't add default devices
372
+ "-global kvm-pit.lost_tick_policy=delay", # Better timing
373
+ "-rtc base=localtime,clock=host", # Use host clock
374
+ "-boot strict=off" # Flexible boot options
375
+ ]
376
+
377
+ # Add ISO if installing
378
+ if iso_path:
379
+ cmd.extend([
380
+ "-cdrom", iso_path,
381
+ "-boot", "d"
382
+ ])
383
+
384
+ return " ".join(cmd)
385
+
386
+ async def install_os(self, iso_url: str):
387
+ """Install OS from ISO with virtual hardware support"""
388
+ import aiohttp
389
+ import aiofiles
390
+ import hashlib
391
+
392
+ # Initialize virtual hardware first
393
+ await self._init_virtual_hardware()
394
+
395
+ # Download ISO if needed
396
+ iso_path = os.path.join(os.path.dirname(__file__), 'iso', 'os.iso')
397
+ os.makedirs(os.path.dirname(iso_path), exist_ok=True)
398
+
399
+ if not os.path.exists(iso_path):
400
+ logger.info(f"Downloading ISO from {iso_url}")
401
+ try:
402
+ async with aiohttp.ClientSession() as session:
403
+ async with session.get(iso_url) as response:
404
+ if response.status != 200:
405
+ raise RuntimeError(f"Failed to download ISO: {response.status}")
406
+
407
+ # Stream download with progress
408
+ total_size = int(response.headers.get('content-length', 0))
409
+ chunk_size = 8192
410
+ downloaded = 0
411
+
412
+ async with aiofiles.open(iso_path, 'wb') as f:
413
+ async for chunk in response.content.iter_chunked(chunk_size):
414
+ await f.write(chunk)
415
+ downloaded += len(chunk)
416
+ progress = (downloaded / total_size) * 100
417
+ logger.info(f"Download progress: {progress:.1f}%")
418
+
419
+ # Verify download
420
+ async with aiofiles.open(iso_path, 'rb') as f:
421
+ content = await f.read()
422
+ checksum = hashlib.sha256(content).hexdigest()
423
+ logger.info(f"ISO downloaded. SHA256: {checksum}")
424
+
425
+ except Exception as e:
426
+ if os.path.exists(iso_path):
427
+ os.remove(iso_path)
428
+ raise RuntimeError(f"Failed to download ISO: {e}")
429
+
430
+ # Create virtual disk and map hardware
431
+ disk_url = self.create_virtual_disk()
432
+ self._map_virtual_hardware()
433
+
434
+ # Prepare for installation
435
+ logger.info("Preparing virtual hardware for OS installation...")
436
+
437
+ # Configure GPU for installation
438
+ await self.hal.set_gpu_mode_async('install')
439
+
440
+ # Start hardware monitoring
441
+ monitor_task = asyncio.create_task(self._monitor_hardware())
442
+
443
+ # Start QEMU with ISO and virtual hardware
444
+ cmd = self.get_qemu_command(disk_url, iso_path)
445
+ logger.info("Starting OS installation with virtual hardware...")
446
+
447
+ # Set environment for virtual devices during install
448
+ env = os.environ.copy()
449
+ env.update({
450
+ 'VGPU_DB_URL': self.db_url,
451
+ 'VGPU_MMIO_CMD': hex(self.mmio_regions['gpu_cmd']),
452
+ 'VGPU_MMIO_FB': hex(self.mmio_regions['gpu_fb']),
453
+ 'VGPU_MMIO_STATUS': hex(self.mmio_regions['gpu_status']),
454
+ 'VRAM_SIZE': self.config['vgpu_memory'],
455
+ 'CPU_TOPOLOGY': json.dumps({
456
+ 'cpus': self.config['cpus'],
457
+ 'cores': self.config['cores_per_cpu'],
458
+ 'threads': self.config['threads_per_core']
459
+ }),
460
+ 'INSTALL_MODE': '1' # Signal we're in installation mode
461
+ })
462
+
463
+ self.vm_process = subprocess.Popen(
464
+ cmd.split(),
465
+ env=env,
466
+ stdout=subprocess.PIPE,
467
+ stderr=subprocess.PIPE
468
+ )
469
+
470
+ # Wait for hardware initialization
471
+ try:
472
+ await asyncio.wait_for(self._wait_for_hardware_init(), timeout=30.0)
473
+ except asyncio.TimeoutError:
474
+ logger.error("Hardware initialization timeout during installation")
475
+ await self.shutdown()
476
+ raise RuntimeError("Failed to initialize virtual hardware for installation")
477
+
478
+ logger.info("Virtual hardware initialized for installation")
479
+
480
+ # Start monitoring installation progress
481
+ asyncio.create_task(self._monitor_installation())
482
+
483
+ return self.vnc_port
484
+
485
+ async def _monitor_installation(self):
486
+ """Monitor OS installation progress"""
487
+ while self.vm_process and self.vm_process.poll() is None:
488
+ try:
489
+ # Check installation progress through QEMU QMP
490
+ async with self.hal.get_qmp_connection() as qmp:
491
+ info = await qmp.execute("query-block")
492
+ for device in info:
493
+ if device.get('device') == 'drive0':
494
+ progress = device.get('inserted', {}).get('progress', 0)
495
+ if progress > 0:
496
+ logger.info(f"Installation progress: {progress}%")
497
+
498
+ # Check virtual hardware status
499
+ gpu_state = await self.hal.get_gpu_state_async()
500
+ if gpu_state['status'].get('installation_complete'):
501
+ logger.info("OS installation completed successfully")
502
+ return
503
+
504
+ except Exception as e:
505
+ logger.warning(f"Installation monitoring error: {e}")
506
+
507
+ await asyncio.sleep(5) # Check every 5 seconds
508
+
509
+ async def boot_os(self):
510
+ """Boot existing OS installation with virtual hardware support"""
511
+ # Initialize virtual hardware
512
+ await self._init_virtual_hardware()
513
+
514
+ # Get virtual disk URL from DuckDB
515
+ disk_url = self.create_virtual_disk()
516
+
517
+ # Map MMIO regions
518
+ self._map_virtual_hardware()
519
+
520
+ # Start hardware monitoring
521
+ monitor_task = asyncio.create_task(self._monitor_hardware())
522
+
523
+ # Start QEMU with virtual hardware configuration
524
+ cmd = self.get_qemu_command(disk_url)
525
+ logger.info("Booting OS with virtual hardware...")
526
+
527
+ # Set environment for virtual devices
528
+ env = os.environ.copy()
529
+ env.update({
530
+ 'VGPU_DB_URL': self.db_url,
531
+ 'VGPU_MMIO_CMD': hex(self.mmio_regions['gpu_cmd']),
532
+ 'VGPU_MMIO_FB': hex(self.mmio_regions['gpu_fb']),
533
+ 'VGPU_MMIO_STATUS': hex(self.mmio_regions['gpu_status']),
534
+ 'VRAM_SIZE': self.config['vgpu_memory'],
535
+ 'CPU_TOPOLOGY': json.dumps({
536
+ 'cpus': self.config['cpus'],
537
+ 'cores': self.config['cores_per_cpu'],
538
+ 'threads': self.config['threads_per_core']
539
+ })
540
+ })
541
+
542
+ self.vm_process = subprocess.Popen(
543
+ cmd.split(),
544
+ env=env,
545
+ stdout=subprocess.PIPE,
546
+ stderr=subprocess.PIPE
547
+ )
548
+
549
+ # Wait for hardware initialization
550
+ try:
551
+ await asyncio.wait_for(self._wait_for_hardware_init(), timeout=30.0)
552
+ except asyncio.TimeoutError:
553
+ logger.error("Hardware initialization timeout")
554
+ await self.shutdown()
555
+ raise RuntimeError("Failed to initialize virtual hardware")
556
+
557
+ logger.info("Virtual hardware initialized successfully")
558
+ return self.vnc_port
559
+
560
+ async def _wait_for_hardware_init(self):
561
+ """Wait for virtual hardware to initialize"""
562
  while True:
563
+ try:
564
+ # Check GPU status
565
+ gpu_state = await self.hal.get_gpu_state_async()
566
+ if gpu_state['status'].get('initialized'):
567
+ # Check VRAM mapping
568
+ vram_status = await self.hal.get_vram_status_async()
569
+ if vram_status['mapped']:
570
+ # Check CPU topology
571
+ cpu_status = await self.hal.get_cpu_status_async()
572
+ if cpu_status['topology_set']:
573
+ return
574
+ except Exception as e:
575
+ logger.warning(f"Hardware check failed: {e}")
576
+ await asyncio.sleep(0.1)
577
+
578
+ async def _monitor_hardware(self):
579
+ """Monitor virtual hardware state"""
580
+ while self.vm_process and self.vm_process.poll() is None:
581
+ try:
582
+ # Update Enhanced CPU states
583
+ for cpu_id, cpu in self.virtual_cpus.items():
584
+ cpu_state = cpu.get_state()
585
+ for core_id in range(cpu.virtual_cpu.core_count):
586
+ for thread_id in range(cpu.virtual_cpu.thread_count):
587
+ thread_state = cpu_state['cores'][core_id]['threads'][thread_id]
588
+ await self.memory_manager.conn.execute_async("""
589
+ INSERT OR REPLACE INTO cpu_states
590
+ VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
591
+ """, (cpu_id, core_id, thread_id,
592
+ json.dumps(thread_state['state']),
593
+ json.dumps(thread_state['registers']),
594
+ thread_state['last_instruction'],
595
+ cpu.virtual_cpu.group_type.name))
596
+
597
+ # Update GPU state
598
+ gpu_state = await self.hal.get_gpu_state_async()
599
+ await self.memory_manager.conn.execute_async("""
600
+ INSERT OR REPLACE INTO gpu_state
601
+ VALUES (0, ?, ?, ?, CURRENT_TIMESTAMP)
602
+ """, (json.dumps(gpu_state['cmd_buffer']),
603
+ gpu_state['framebuffer'],
604
+ json.dumps(gpu_state['status'])))
605
+
606
+ # Update VRAM mappings
607
+ dirty_pages = await self.hal.get_dirty_vram_pages_async()
608
+ for addr, data in dirty_pages.items():
609
+ await self.memory_manager.conn.execute_async("""
610
+ INSERT OR REPLACE INTO vram_mapping
611
+ VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
612
+ """, (addr, len(data), data,
613
+ await self.hal.get_vram_flags_async(addr)))
614
+
615
+ except Exception as e:
616
+ logger.error(f"Hardware monitoring error: {e}")
617
+
618
+ await asyncio.sleep(0.01) # 10ms polling interval
619
+
620
+ def _start_memory_service(self):
621
+ """Start the virtual memory management service"""
622
+ if os.path.exists(self.memory_socket):
623
+ os.remove(self.memory_socket)
624
+
625
+ # Start memory management daemon
626
+ self.memory_service = subprocess.Popen([
627
+ sys.executable,
628
+ "-m", "virtual_gpu_driver.src.memory.memory_daemon",
629
+ "--socket", self.memory_socket,
630
+ "--db-url", self.db_url,
631
+ "--size", self.config['memory']
632
+ ])
633
+
634
+ # Wait for socket to be created
635
+ timeout = 30
636
+ while timeout > 0 and not os.path.exists(self.memory_socket):
637
+ time.sleep(0.1)
638
+ timeout -= 0.1
639
+
640
+ if not os.path.exists(self.memory_socket):
641
+ raise RuntimeError("Memory management service failed to start")
642
+
643
+ logger.info("Virtual memory management service started")
644
+
645
+ def _stop_memory_service(self):
646
+ """Stop the virtual memory management service"""
647
+ if self.memory_service:
648
+ self.memory_service.terminate()
649
+ self.memory_service.wait()
650
+ self.memory_service = None
651
+
652
+ if os.path.exists(self.memory_socket):
653
+ os.remove(self.memory_socket)
654
+
655
+ async def shutdown(self):
656
+ """Gracefully shutdown VM and save hardware state"""
657
+ if self.vm_process:
658
+ logger.info("Initiating graceful shutdown...")
659
+
660
+ try:
661
+ # Save final hardware state
662
+ await self._save_hardware_state()
663
+
664
+ # Send ACPI shutdown through QEMU monitor
665
+ async with self.hal.get_qmp_connection() as qmp:
666
+ await qmp.execute("system_powerdown")
667
 
668
+ # Wait for VM to shutdown gracefully
669
+ try:
670
+ await asyncio.wait_for(
671
+ self.vm_process.wait(),
672
+ timeout=30.0
673
+ )
674
+ except asyncio.TimeoutError:
675
+ logger.warning("Graceful shutdown timed out, forcing...")
676
+ self.vm_process.kill()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
677
 
678
+ except Exception as e:
679
+ logger.error(f"Error during shutdown: {e}")
680
+ self.vm_process.kill()
681
+
682
+ finally:
683
+ self.vm_process = None
684
+ # Stop memory management service
685
+ self._stop_memory_service()
686
+ logger.info("VM and services shutdown complete")
687
+
688
+ async def _save_hardware_state(self):
689
+ """Save final hardware state before shutdown"""
690
+ try:
691
+ # Get final CPU states
692
+ cpu_states = await self.hal.get_cpu_states_async()
693
+ for cpu_id, cores in cpu_states.items():
694
+ for core_id, threads in cores.items():
695
+ for thread_id, state in threads.items():
696
+ await self.memory_manager.conn.execute_async("""
697
+ INSERT OR REPLACE INTO cpu_states
698
+ VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
699
+ """, (cpu_id, core_id, thread_id,
700
+ json.dumps(state['state']),
701
+ json.dumps(state['registers']),
702
+ state['last_instruction']))
703
+
704
+ # Get final GPU state
705
+ gpu_state = await self.hal.get_gpu_state_async()
706
+ await self.memory_manager.conn.execute_async("""
707
+ INSERT OR REPLACE INTO gpu_state
708
+ VALUES (0, ?, ?, ?, CURRENT_TIMESTAMP)
709
+ """, (json.dumps(gpu_state['cmd_buffer']),
710
+ gpu_state['framebuffer'],
711
+ json.dumps(gpu_state['status'])))
712
+
713
+ # Commit any pending changes
714
+ await self.memory_manager.conn.commit_async()
715
+
716
+ except Exception as e:
717
+ logger.error(f"Error saving hardware state: {e}")
718
+ raise
719
+
720
+ async def __aenter__(self):
721
+ return self
722
+
723
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
724
+ await self.shutdown()
725
 
726
+ async def main():
727
+ """Test QEMU manager"""
728
+ async with QEMUManager() as qemu:
729
+ # For testing, try to boot existing OS or install new one
730
+ try:
731
+ vnc_port = await qemu.boot_os()
732
+ logger.info(f"OS booted. VNC available on port {vnc_port}")
733
+ except FileNotFoundError:
734
+ # No OS installed, do fresh install
735
+ iso_url = "http://releases.ubuntu.com/22.04/ubuntu-22.04-desktop-amd64.iso"
736
+ vnc_port = await qemu.install_os(iso_url)
737
+ logger.info(f"Installing OS. Monitor installation on VNC port {vnc_port}")
738
+
739
+ # Keep running until interrupted
740
+ try:
741
+ await asyncio.Event().wait()
742
+ except KeyboardInterrupt:
743
+ logger.info("Shutting down...")
744
 
745
  if __name__ == "__main__":
746
+ asyncio.run(main())