Factor Studios commited on
Commit
6195e16
·
verified ·
1 Parent(s): 9fb0319

Update http_storage.py

Browse files
Files changed (1) hide show
  1. http_storage.py +475 -295
http_storage.py CHANGED
@@ -1,336 +1,516 @@
1
- import json
 
 
 
 
 
 
 
 
2
  import numpy as np
3
- from typing import Dict, Any, Optional, Union
4
- import threading
5
  import time
6
- import hashlib
7
- import logging
8
  import os
9
- import shutil
10
- import uuid
11
- from pathlib import Path
12
-
13
- class HTTPGPUStorage:
14
- """
15
- HTTP-based GPU storage client that replaces WebSocket functionality.
16
- Maintains the same interface as WebSocketGPUStorage for backward compatibility.
17
- """
18
-
19
- # Singleton instance
20
- _instance = None
21
- _lock = threading.Lock()
22
 
23
- def __new__(cls, storage_path: str = "storage"):
24
- with cls._lock:
25
- if cls._instance is None:
26
- cls._instance = super().__new__(cls)
27
- # Convert to absolute path if relative
28
- if not os.path.isabs(storage_path):
29
- storage_path = os.path.abspath(storage_path)
30
- cls._instance._init_singleton(storage_path)
31
- return cls._instance
32
 
33
- def _init_singleton(self, storage_path: str):
34
- """Initialize the singleton instance with local storage"""
35
- if hasattr(self, 'initialized'):
36
- return
37
-
38
- # Setup storage paths
39
- self.base_path = Path(storage_path)
40
- self.vram_path = self.base_path / "vram_blocks"
41
- self.models_path = self.base_path / "models"
42
- self.cache_path = self.base_path / "cache"
43
- self.state_path = self.base_path / "states"
44
 
45
- # Create directories
46
- for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]:
47
- path.mkdir(parents=True, exist_ok=True)
 
 
48
 
49
- self.lock = threading.Lock()
50
- self._closing = False
51
- self.error_count = 0
52
- self.last_error_time = 0
53
- self.session_id = str(uuid.uuid4())
54
-
55
- # Tensor and model registries (maintained for compatibility)
56
- self.tensor_registry: Dict[str, Dict[str, Any]] = {}
57
- self.model_registry: Dict[str, Dict[str, Any]] = {}
58
- self.resource_monitor = {
59
- 'vram_used': 0,
60
- 'active_tensors': 0,
61
- 'loaded_models': set()
62
- }
 
 
 
 
 
 
 
 
63
 
64
- # Initialize local storage monitoring
65
- self.storage_monitor = {
66
- 'total_size': 0,
67
- 'last_access': time.time(),
68
- 'disk_usage': os.path.getsize(str(self.base_path)) if os.path.exists(str(self.base_path)) else 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  }
70
-
71
- # Initialize session
72
- self._create_session()
73
- self.initialized = True
74
-
75
- def __init__(self, storage_path: str = "storage"):
76
- """This will actually just return the singleton instance.
77
- The actual initialization happens in __new__ and _init_singleton"""
78
- pass
79
 
80
- def _create_session(self):
81
- """Initialize local storage session"""
82
- try:
83
- # Create status file to track session
84
- status_path = self.base_path / "session_status.json"
85
- status_data = {
86
- "session_id": self.session_id,
87
- "created_at": time.time(),
88
- "resource_limits": {
89
- "max_vram_gb": 40, # A100 size
90
- "max_models": 5,
91
- "max_batch_size": 32
92
- }
93
- }
94
 
95
- with open(status_path, 'w') as f:
96
- json.dump(status_data, f, indent=2)
 
97
 
98
- logging.info(f"Local storage session created: {self.session_id}")
99
- return True
 
100
 
101
- except Exception as e:
102
- logging.error(f"Failed to create HTTP session: {e}")
103
- self.error_count += 1
104
- self.last_error_time = time.time()
105
- return False
106
 
107
- def _check_storage(self) -> Dict[str, Any]:
108
- """Check local storage status and usage"""
109
- try:
110
- # Update storage monitoring
111
- self.storage_monitor.update({
112
- 'total_size': sum(f.stat().st_size for f in self.base_path.rglob('*') if f.is_file()),
113
- 'last_access': time.time(),
114
- 'disk_usage': os.path.getsize(str(self.base_path)) if os.path.exists(str(self.base_path)) else 0
115
  })
116
- return {"status": "ok", "monitor": self.storage_monitor}
117
- except Exception as e:
118
- logging.error(f"Error checking storage: {e}")
119
- return {"status": "error", "message": str(e)}
120
 
121
- def store_tensor(self, tensor_id: str, data: np.ndarray, model_size: Optional[int] = None) -> bool:
122
- """Store tensor data in local storage"""
 
 
123
  try:
124
- if data is None:
125
- raise ValueError("Cannot store None tensor")
126
-
127
- # Calculate tensor metadata
128
- tensor_shape = data.shape
129
- tensor_dtype = str(data.dtype)
130
- tensor_size = data.nbytes
131
-
132
- # Save tensor data
133
- tensor_path = self.vram_path / f"{tensor_id}.npy"
134
- np.save(str(tensor_path), data)
135
-
136
- # Save metadata
137
- metadata = {
138
- 'shape': tensor_shape,
139
- 'dtype': tensor_dtype,
140
- 'size': tensor_size,
141
- 'timestamp': time.time(),
142
- 'model_size': model_size if model_size is not None else -1
143
  }
144
 
145
- metadata_path = self.vram_path / f"{tensor_id}_meta.json"
146
- with open(metadata_path, 'w') as f:
147
- json.dump(metadata, f)
148
-
149
- # Update tensor registry
150
- with self.lock:
151
- self.tensor_registry[tensor_id] = metadata
152
- self.resource_monitor['vram_used'] += tensor_size
153
- self.resource_monitor['active_tensors'] += 1
154
- return True
155
 
156
- except Exception as e:
157
- logging.error(f"Error storing tensor {tensor_id}: {str(e)}")
158
- return False
159
-
160
- def load_tensor(self, tensor_id: str) -> Optional[np.ndarray]:
161
- """Load tensor data from local storage"""
162
- try:
163
- tensor_path = self.vram_path / f"{tensor_id}.npy"
164
- metadata_path = self.vram_path / f"{tensor_id}_meta.json"
165
-
166
- # Check if tensor files exist
167
- if not tensor_path.exists() or not metadata_path.exists():
168
- logging.warning(f"Tensor {tensor_id} not found in local storage")
169
- return None
170
-
171
- # Load metadata
172
- with open(metadata_path, 'r') as f:
173
- metadata = json.load(f)
174
-
175
- # Load tensor data
176
- arr = np.load(str(tensor_path))
177
-
178
- # Update registry if not present
179
- if tensor_id not in self.tensor_registry:
180
- with self.lock:
181
- self.tensor_registry[tensor_id] = metadata
182
-
183
- return arr
184
 
185
- except Exception as e:
186
- logging.error(f"Error loading tensor {tensor_id}: {str(e)}")
187
- return None
188
-
189
- def store_state(self, component: str, state_id: str, state_data: Dict[str, Any]) -> bool:
190
- """Store component state in local storage"""
191
- try:
192
- # Create component directory if needed
193
- component_dir = self.state_path / component
194
- component_dir.mkdir(parents=True, exist_ok=True)
195
-
196
- # Save state data with timestamp
197
- state_file = component_dir / f"{state_id}.json"
198
- data_to_save = {
199
- "data": state_data,
200
- "timestamp": time.time()
201
- }
202
-
203
- with open(state_file, 'w') as f:
204
- json.dump(data_to_save, f, indent=2)
205
-
206
- return True
207
 
208
- except Exception as e:
209
- logging.error(f"Error storing state for {component}/{state_id}: {str(e)}")
210
- return False
211
-
212
- def load_state(self, component: str, state_id: str) -> Optional[Dict[str, Any]]:
213
- """Load component state from local storage"""
214
- try:
215
- state_file = self.state_path / component / f"{state_id}.json"
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
- if not state_file.exists():
218
- logging.warning(f"State file not found for {component}/{state_id}")
219
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
 
221
- with open(state_file, 'r') as f:
222
- saved_data = json.load(f)
 
 
223
 
224
- return saved_data.get('data')
 
 
 
 
225
 
226
- except Exception as e:
227
- logging.error(f"Error loading state for {component}/{state_id}: {str(e)}")
228
- return None
 
 
 
 
 
 
 
229
 
230
- def cache_data(self, key: str, data: Any) -> bool:
231
- """Cache data via HTTP API"""
232
- try:
233
- request_data = {"data": data}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
- response = self._make_request(
236
- 'POST',
237
- f'/cache/{key}',
238
- json=request_data
239
- )
240
 
241
- return response and response.get('status') == 'success'
 
 
242
 
243
- except Exception as e:
244
- logging.error(f"Error caching data for key {key}: {str(e)}")
245
- return False
246
-
247
- def get_cached_data(self, key: str) -> Optional[Any]:
248
- """Get cached data via HTTP API"""
249
- try:
250
- response = self._make_request("GET", f"/cache/{key}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
251
 
252
- if response and response.get('status') == 'success':
253
- return response.get('data')
254
- return None
255
 
256
- except Exception as e:
257
- logging.error(f"Error getting cached data for key {key}: {str(e)}")
258
- return None
259
-
260
- def load_model(self, model_name: str, model_path: Optional[str] = None, model_data: Optional[Dict] = None) -> bool:
261
- """Load a model from local storage"""
262
- try:
263
- # Check if model is already loaded
264
- if self.is_model_loaded(model_name):
265
- logging.info(f"Model {model_name} already loaded")
266
- return True
267
 
268
- # Generate model directory path
269
- model_dir = self.models_path / model_name.replace('/', '_')
270
- model_dir.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
 
272
- # Clean up any existing files
273
- for existing_file in model_dir.glob('*'):
274
- try:
275
- if existing_file.is_file():
276
- existing_file.unlink()
277
- except Exception as e:
278
- logging.warning(f"Could not remove existing file {existing_file}: {e}")
279
 
280
- # Save model data if provided
281
- if model_data:
282
- model_config_path = model_dir / "config.json"
283
- with open(model_config_path, 'w') as f:
284
- json.dump(model_data, f, indent=2)
285
 
286
- # Update model registry
287
- with self.lock:
288
- self.model_registry[model_name] = {
289
- 'path': str(model_dir),
290
- 'config': model_data,
291
- 'loaded_at': time.time(),
292
- 'hash': self._calculate_model_hash(model_path) if model_path else None
293
- }
294
- self.resource_monitor['loaded_models'].add(model_name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
295
 
296
- # Copy model files if path provided
297
- if model_path and os.path.exists(model_path):
298
- model_file_path = model_dir / "model.bin"
299
- shutil.copy2(model_path, model_file_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
300
 
301
- logging.info(f"Successfully loaded model {model_name} to local storage")
302
- return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
303
 
304
- except Exception as e:
305
- logging.error(f"Error loading model {model_name}: {str(e)}")
306
- return False
307
-
308
-
309
- def _calculate_model_hash(self, model_path: str) -> str:
310
- """Calculate SHA256 hash of model file"""
311
- try:
312
- sha256_hash = hashlib.sha256()
313
- with open(model_path, "rb") as f:
314
- for byte_block in iter(lambda: f.read(4096), b""):
315
- sha256_hash.update(byte_block)
316
- return sha256_hash.hexdigest()
317
- except Exception as e:
318
- logging.error(f"Error calculating model hash: {str(e)}")
319
- return ""
320
-
321
-
322
- def ping(self) -> bool:
323
- """Check if local storage is accessible"""
324
- try:
325
- # Check if all storage directories exist and are accessible
326
- for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]:
327
- if not path.exists() or not os.access(str(path), os.R_OK | os.W_OK):
328
- return False
329
- return True
330
- except Exception as e:
331
- logging.error(f"Storage check failed: {e}")
332
- return False
333
- # Compatibility alias for existing code
334
- WebSocketGPUStorage = HTTPGPUStorage
335
 
 
 
336
 
 
1
+ """
2
+ Test AI integration with local storage and zero CPU memory usage.
3
+ All operations are performed through local storage with direct tensor core access.
4
+ """
5
+ import asyncio
6
+ from gpu_arch import Chip
7
+ from ai_http import AIAccelerator
8
+ from virtual_vram import VirtualVRAM
9
+ from PIL import Image
10
  import numpy as np
11
+ from http_storage import HTTPGPUStorage as LocalGPUStorage
 
12
  import time
 
 
13
  import os
14
+ import platform
15
+ import contextlib
16
+ import atexit
17
+ import logging
 
 
 
 
 
 
 
 
 
18
 
19
+ # Configure logging
20
+ logging.basicConfig(
21
+ level=logging.INFO,
22
+ format='%(asctime)s - %(levelname)s - %(message)s'
23
+ )
 
 
 
 
24
 
25
+ # Local storage manager
26
+ @contextlib.contextmanager
27
+ def storage_manager():
28
+ storage = None
29
+
30
+ try:
31
+ # Create new storage instance with local path
32
+ storage = LocalGPUStorage(storage_path="local_storage")
 
 
 
33
 
34
+ # Verify storage is accessible
35
+ if storage.ping():
36
+ logging.info("Successfully initialized local storage")
37
+ else:
38
+ raise RuntimeError("Local storage is not accessible")
39
 
40
+ yield storage
41
+
42
+ except Exception as e:
43
+ logging.error(f"Storage initialization error: {e}")
44
+ raise
45
+
46
+ try:
47
+ yield storage
48
+ except Exception as e:
49
+ logging.error(f"HTTP operation failed: {e}")
50
+ # Try to reconnect once if operation fails
51
+ if try_connect():
52
+ logging.info("Successfully reconnected to GPU storage server via HTTP")
53
+ yield storage
54
+ else:
55
+ raise
56
+ finally:
57
+ if storage:
58
+ try:
59
+ storage.close()
60
+ except:
61
+ pass
62
 
63
+ # Cleanup handler
64
+ def cleanup_resources():
65
+ try:
66
+ # Get the current storage instance if it exists
67
+ current_storage = LocalGPUStorage._instance
68
+ if current_storage is not None:
69
+ try:
70
+ # Clear any cached data
71
+ current_storage.resource_monitor['vram_used'] = 0
72
+ current_storage.resource_monitor['active_tensors'] = 0
73
+ current_storage.resource_monitor['loaded_models'].clear()
74
+ except Exception as e:
75
+ logging.error(f"Error cleaning up storage resources: {e}")
76
+ except Exception as e:
77
+ logging.error(f"Error in storage cleanup: {e}")
78
+
79
+ # Clear VRAM and other resources
80
+ import gc
81
+ gc.collect()
82
+
83
+ # Register enhanced cleanup handler
84
+ atexit.register(cleanup_resources)
85
+
86
+ def test_ai_integration():
87
+ print("\n--- Testing Local Storage-Based AI Integration with Zero CPU Usage ---")
88
+ from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
89
+
90
+ # Initialize components dictionary to store GPU resources
91
+ components = {
92
+ 'chips': [],
93
+ 'ai_accelerators': [],
94
+ 'model_id': None,
95
+ 'vram': None,
96
+ 'storage': None,
97
+ 'model_config': None,
98
+ 'tensor_registry': {},
99
+ 'initialized': False
100
+ }
101
+
102
+ # Initialize global tensor registry
103
+ global_tensor_registry = {
104
+ 'model_tensors': {},
105
+ 'runtime_tensors': {},
106
+ 'placeholder_tensors': {},
107
+ 'stats': {
108
+ 'total_vram_used': 0,
109
+ 'active_tensors': 0
110
  }
111
+ }
 
 
 
 
 
 
 
 
112
 
113
+ print(f"\nElectron-Speed Architecture Parameters:")
114
+ print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}")
115
+ print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}")
116
+ print(f"Electron drift velocity: {drift_velocity:.2e} m/s")
117
+ print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%")
118
+
119
+ # Test 1: Local Model Loading
120
+ print("\nTest 1: Model Loading with Local Storage")
121
+ try:
122
+ # Use storage manager for proper resource handling
123
+ with storage_manager() as storage:
124
+ components['storage'] = storage # Save storage reference
 
 
125
 
126
+ # Initialize virtual GPU stack with unlimited local storage
127
+ chip_for_loading = Chip(chip_id=0, vram_size_gb=None, storage=storage) # Unlimited VRAM
128
+ components['chips'].append(chip_for_loading)
129
 
130
+ # Initialize VRAM with local storage (unlimited)
131
+ vram = VirtualVRAM(storage=storage)
132
+ components['vram'] = vram
133
 
134
+ # Set up AI accelerator
135
+ ai_accelerator_for_loading = AIAccelerator(vram=vram, storage=storage)
136
+ ai_accelerator_for_loading.initialize_tensor_cores()
137
+ components['ai_accelerators'].append(ai_accelerator_for_loading)
 
138
 
139
+ # Initialize model registry in local storage (unlimited)
140
+ storage.store_state("model_registry", "state", {
141
+ "initialized": True,
142
+ "max_vram": None, # Unlimited VRAM
143
+ "active_models": {}
 
 
 
144
  })
 
 
 
 
145
 
146
+ # Load BLIP-2 Large model directly to HTTP storage
147
+ model_id = "microsoft/florence-2-large"
148
+ print(f"Loading model {model_id} directly to HTTP storage...")
149
+
150
  try:
151
+ # Simulate model loading (in real scenario, would load actual model)
152
+ model_data = {
153
+ "model_name": model_id,
154
+ "model_type": "florence-2-large",
155
+ "parameters": 771000000,
156
+ "architecture": "vision-language",
157
+ "loaded_at": time.time()
 
 
 
 
 
 
 
 
 
 
 
 
158
  }
159
 
160
+ # Load model with local storage verification
161
+ try:
162
+ # Verify storage is accessible
163
+ if not ai_accelerator_for_loading.storage.ping():
164
+ raise RuntimeError("Local storage not accessible")
 
 
 
 
 
165
 
166
+ # Calculate model size for proper VRAM allocation
167
+ model_size = model_data["parameters"] * 4 # 4 bytes per parameter (float32)
168
+ print(f"Model size: {model_size / (1024**3):.2f} GB")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
 
170
+ # Pre-allocate VRAM for model
171
+ ai_accelerator_for_loading.pre_allocate_vram(model_size)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
 
173
+ # Load model with local storage
174
+ success = ai_accelerator_for_loading.load_model(
175
+ model_id=model_id,
176
+ model=model_data,
177
+ processor=None,
178
+ verify_load=True
179
+ )
180
+ except Exception as e:
181
+ print(f"Exception during model loading: {str(e)}")
182
+ success = False
183
+
184
+ if success:
185
+ print(f"Model '{model_id}' loaded successfully to HTTP storage.")
186
+ assert ai_accelerator_for_loading.has_model(model_id), "Model not found in HTTP storage after loading."
187
+
188
+ # Store model parameters in components dict
189
+ components['model_id'] = model_id
190
+ components['model_size'] = model_size
191
+ components['model_config'] = model_data
192
+ else:
193
+ raise RuntimeError("Failed to load model via HTTP storage")
194
 
195
+ except Exception as e:
196
+ print(f"Detailed model loading error: {str(e)}")
197
+ print("Falling back to placeholder model mode...")
198
+ # Try loading with placeholder model
199
+ try:
200
+ # Match server-side model configuration
201
+ placeholder_model = {
202
+ "model_name": model_id,
203
+ "model_type": "placeholder",
204
+ "parameters": 1000000, # Small placeholder
205
+ "architecture": {
206
+ "type": "nvidia_ampere",
207
+ "features": ["tensor_cores", "ray_tracing", "dynamic_scheduling"]
208
+ },
209
+ "loaded_at": time.time(),
210
+ # Server-validated GPU architecture configuration
211
+ "num_sms": 108, # A100 config
212
+ "tensor_cores_per_sm": 4,
213
+ "cuda_cores_per_sm": 64,
214
+ "compute_capability": "8.0",
215
+ "vram_config": {
216
+ "size_gb": 40,
217
+ "bandwidth_gbps": 1555,
218
+ "cache_size_mb": 40,
219
+ "allocation": "dynamic"
220
+ }
221
+ }
222
 
223
+ # Validate required fields before loading
224
+ required_fields = ["num_sms", "tensor_cores_per_sm", "cuda_cores_per_sm"]
225
+ if not all(field in placeholder_model for field in required_fields):
226
+ raise ValueError(f"Missing required GPU architecture fields: {[f for f in required_fields if f not in placeholder_model]}")
227
 
228
+ success = ai_accelerator_for_loading.load_model(
229
+ model_id=model_id,
230
+ model=placeholder_model,
231
+ processor=None
232
+ )
233
 
234
+ if success:
235
+ components['model_id'] = model_id
236
+ components['model_config'] = placeholder_model
237
+ print("Successfully loaded placeholder model via HTTP")
238
+ else:
239
+ raise RuntimeError("Placeholder model loading also failed")
240
+
241
+ except Exception as e2:
242
+ print(f"Placeholder fallback also failed: {str(e2)}")
243
+ raise
244
 
245
+ except Exception as e:
246
+ print(f"Model loading test failed: {e}")
247
+ return
248
+
249
+ # Test 2: Multi-Chip Parallel Processing
250
+ print("\nTest 2: Parallel Processing across Multiple Chips")
251
+ num_chips = 4 # Using multiple chips for maximum parallelization
252
+ chips = []
253
+ ai_accelerators = []
254
+
255
+ try:
256
+ # Try to reuse existing connection with verification
257
+ shared_storage = None
258
+ max_connection_attempts = 3
259
+
260
+ for attempt in range(max_connection_attempts):
261
+ try:
262
+ if (components['storage'] and
263
+ components['storage'].is_connected()):
264
+ shared_storage = components['storage']
265
+ logging.info("Successfully reused existing HTTP connection")
266
+ break
267
+ else:
268
+ logging.warning("Existing connection unavailable, creating new HTTP connection...")
269
+ with http_storage_manager() as new_storage:
270
+ if new_storage and new_storage.is_connected():
271
+ components['storage'] = new_storage
272
+ shared_storage = new_storage
273
+ logging.info("Successfully established new HTTP connection")
274
+ break
275
+ except Exception as e:
276
+ logging.error(f"HTTP connection attempt {attempt + 1} failed: {e}")
277
+ if attempt < max_connection_attempts - 1:
278
+ time.sleep(2)
279
+ continue
280
+ raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts")
281
+
282
+ # Initialize high-performance chip array with HTTP storage
283
+ total_sms = 0
284
+ total_cores = 0
285
+
286
+ # Create optical interconnect for chip communication
287
+ from gpu_arch import OpticalInterconnect
288
+ optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1)
289
+
290
+ # Reuse existing VRAM instance with shared storage
291
+ shared_vram = components['vram']
292
+ if shared_vram is None:
293
+ shared_vram = VirtualVRAM(storage=shared_storage)
294
+ shared_vram.storage = shared_storage
295
+
296
+ for i in range(num_chips):
297
+ # Configure each chip with shared HTTP storage
298
+ chip = Chip(chip_id=i, vram_size_gb=None, storage=shared_storage)
299
+ chips.append(chip)
300
 
301
+ # Connect chips in a ring topology
302
+ if i > 0:
303
+ chip.connect_chip(chips[i-1], optical_link)
 
 
304
 
305
+ # Initialize AI accelerator with shared resources
306
+ ai_accelerator = AIAccelerator(vram=shared_vram, storage=shared_storage)
307
+ ai_accelerators.append(ai_accelerator)
308
 
309
+ # Verify and potentially repair HTTP connection
310
+ max_retry = 3
311
+ for retry in range(max_retry):
312
+ try:
313
+ if not shared_storage.is_connected():
314
+ logging.warning(f"Connection check failed for chip {i}, attempt {retry + 1}")
315
+ shared_storage._create_session() # Attempt to reconnect
316
+ time.sleep(1)
317
+ continue
318
+
319
+ # Load model weights from HTTP storage (no CPU transfer)
320
+ success = ai_accelerator.load_model(components['model_id'], components['model_config'], None)
321
+ if success:
322
+ logging.info(f"Successfully initialized chip {i} with model via HTTP")
323
+ break
324
+ else:
325
+ raise RuntimeError("Model loading failed")
326
+
327
+ except Exception as e:
328
+ if retry < max_retry - 1:
329
+ logging.warning(f"Error initializing chip {i}, attempt {retry + 1}: {e}")
330
+ time.sleep(1)
331
+ continue
332
+ else:
333
+ logging.error(f"Failed to initialize chip {i} after {max_retry} attempts: {e}")
334
+ raise
335
 
336
+ # Track total processing units
337
+ total_sms += chip.num_sms
338
+ total_cores += chip.num_sms * chip.cores_per_sm
339
 
340
+ # Store chip configuration in HTTP storage
341
+ shared_storage.store_state(f"chips/{i}/config", "state", {
342
+ "num_sms": chip.num_sms,
343
+ "cores_per_sm": chip.cores_per_sm,
344
+ "total_cores": chip.num_sms * chip.cores_per_sm,
345
+ "connected_chips": [c.chip_id for c in chip.connected_chips]
346
+ })
347
+
348
+ print(f"Chip {i} initialized with HTTP storage and optical interconnect")
 
 
349
 
350
+ print(f"\nTotal Processing Units:")
351
+ print(f"- Streaming Multiprocessors: {total_sms:,}")
352
+ print(f"- CUDA Cores: {total_cores:,}")
353
+ print(f"- Electron-speed tensor cores: {total_cores * 8:,}")
354
+
355
+ # Test multi-chip parallel inference with local storage
356
+ print(f"\nRunning parallel inference simulation")
357
+
358
+ # Create test input data
359
+ test_image = np.random.rand(224, 224, 3).astype(np.float32)
360
+ print(f"Created test image with shape: {test_image.shape}")
361
+
362
+ # Store input image in local storage
363
+ input_tensor_id = "test_input_image"
364
+ if shared_storage.store_tensor(input_tensor_id, test_image):
365
+ print(f"Successfully stored test image in local storage")
366
+ else:
367
+ raise RuntimeError("Failed to store test image")
368
+
369
+ # Synchronize all chips through HTTP storage
370
+ start_time = time.time()
371
+
372
+ # Distribute workload across chips using HTTP storage
373
+ batch_size = test_image.shape[0] // num_chips if test_image.shape[0] >= num_chips else 1
374
+ results = []
375
+
376
+ for i, accelerator in enumerate(ai_accelerators):
377
+ try:
378
+ # Run inference using locally stored weights
379
+ result = accelerator.inference(components['model_id'], input_tensor_id)
380
+
381
+ if result is not None:
382
+ # Store result in local storage
383
+ result_id = f"results/chip_{i}/test_image"
384
+ if shared_storage.store_tensor(result_id, result):
385
+ results.append(result)
386
+ print(f"Chip {i} completed inference and stored result")
387
+ else:
388
+ print(f"Chip {i} inference succeeded but result storage failed")
389
+ else:
390
+ print(f"Chip {i} inference failed")
391
+
392
+ except Exception as e:
393
+ print(f"Error in chip {i} inference: {e}")
394
+
395
+ elapsed = time.time() - start_time
396
+
397
+ # Calculate performance metrics
398
+ ops_per_inference = total_cores * 1024 # FMA ops per core
399
+ from electron_speed import drift_velocity, TARGET_SWITCHES_PER_SEC
400
+ electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
401
+ theoretical_time = electron_transit_time * ops_per_inference / total_cores
402
+
403
+ print(f"\nHTTP-Based Multi-Chip Inference Results:")
404
+ print(f"- Chips used: {num_chips}")
405
+ print(f"- Results collected: {len(results)}")
406
+ print(f"- Total time: {elapsed:.4f}s")
407
+ print(f"- Theoretical electron-speed time: {theoretical_time:.6f}s")
408
+ print(f"- Speed ratio: {theoretical_time/elapsed:.2f}x theoretical")
409
+ print(f"- Operations per second: {ops_per_inference/elapsed:.2e}")
410
+
411
+ # Test 3: HTTP Storage Performance
412
+ print(f"\nTest 3: HTTP Storage Performance Evaluation")
413
+
414
+ # Test tensor storage/retrieval performance
415
+ test_sizes = [1024, 4096, 16384, 65536] # Different tensor sizes
416
+ storage_times = []
417
+ retrieval_times = []
418
+
419
+ for size in test_sizes:
420
+ test_tensor = np.random.rand(size).astype(np.float32)
421
+ tensor_id = f"perf_test_{size}"
422
 
423
+ # Test storage time
424
+ start = time.time()
425
+ success = shared_storage.store_tensor(tensor_id, test_tensor)
426
+ storage_time = time.time() - start
 
 
 
427
 
428
+ if success:
429
+ storage_times.append(storage_time)
 
 
 
430
 
431
+ # Test retrieval time
432
+ start = time.time()
433
+ retrieved = shared_storage.load_tensor(tensor_id)
434
+ retrieval_time = time.time() - start
435
+
436
+ if retrieved is not None and np.array_equal(test_tensor, retrieved):
437
+ retrieval_times.append(retrieval_time)
438
+ print(f"Size {size}: Store {storage_time:.4f}s, Retrieve {retrieval_time:.4f}s")
439
+ else:
440
+ print(f"Size {size}: Retrieval verification failed")
441
+ else:
442
+ print(f"Size {size}: Storage failed")
443
+
444
+ if storage_times and retrieval_times:
445
+ avg_storage = sum(storage_times) / len(storage_times)
446
+ avg_retrieval = sum(retrieval_times) / len(retrieval_times)
447
+ print(f"Average storage time: {avg_storage:.4f}s")
448
+ print(f"Average retrieval time: {avg_retrieval:.4f}s")
449
+
450
+ # Test 4: Multi-chip coordination via HTTP
451
+ print(f"\nTest 4: Multi-Chip Coordination via HTTP")
452
+
453
+ # Test cross-chip data transfer
454
+ test_data_id = "cross_chip_test_data"
455
+ test_data = np.array([1, 2, 3, 4, 5], dtype=np.float32)
456
+
457
+ if shared_storage.store_tensor(test_data_id, test_data):
458
+ print("Stored test data for cross-chip transfer")
459
 
460
+ # Transfer data between chips
461
+ new_data_id = shared_storage.transfer_between_chips(0, 1, test_data_id)
462
+ if new_data_id:
463
+ print(f"Successfully transferred data from chip 0 to chip 1: {new_data_id}")
464
+
465
+ # Verify transferred data
466
+ transferred_data = shared_storage.load_tensor(new_data_id)
467
+ if transferred_data is not None and np.array_equal(test_data, transferred_data):
468
+ print("Cross-chip transfer verification successful")
469
+ else:
470
+ print("Cross-chip transfer verification failed")
471
+ else:
472
+ print("Cross-chip transfer failed")
473
+
474
+ # Test synchronization barriers
475
+ barrier_id = "test_barrier"
476
+ num_participants = num_chips
477
+
478
+ if shared_storage.create_sync_barrier(barrier_id, num_participants):
479
+ print(f"Created synchronization barrier for {num_participants} participants")
480
 
481
+ # Simulate participants arriving at barrier
482
+ for i in range(num_participants):
483
+ result = shared_storage.wait_sync_barrier(barrier_id)
484
+ if i == num_participants - 1:
485
+ if result:
486
+ print("All participants reached barrier - synchronization successful")
487
+ else:
488
+ print("Barrier synchronization failed")
489
+ else:
490
+ print(f"Participant {i+1} reached barrier")
491
+
492
+ print(f"\nHTTP-based AI integration test completed successfully!")
493
+
494
+ # Final statistics
495
+ final_stats = {
496
+ "chips_initialized": len(chips),
497
+ "ai_accelerators": len(ai_accelerators),
498
+ "total_cores": total_cores,
499
+ "model_loaded": components['model_id'] is not None,
500
+ "storage_type": "HTTP",
501
+ "connection_status": shared_storage.get_connection_status()
502
+ }
503
+
504
+ print(f"\nFinal System Statistics:")
505
+ for key, value in final_stats.items():
506
+ print(f"- {key}: {value}")
507
 
508
+ except Exception as e:
509
+ print(f"Multi-chip processing test failed: {e}")
510
+ import traceback
511
+ traceback.print_exc()
512
+ return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
513
 
514
+ if __name__ == "__main__":
515
+ test_ai_integration_http()
516