Update app.py
Browse files
app.py
CHANGED
|
@@ -132,6 +132,8 @@ class ControllerState:
|
|
| 132 |
self.tensor_servers: Dict[str, TensorServer] = {}
|
| 133 |
self.model_chunks: Dict[int, ModelChunk] = {}
|
| 134 |
self.is_model_loaded = False
|
|
|
|
|
|
|
| 135 |
self.operation_results: Dict[str, Dict] = {} # Track operation results from tensor servers
|
| 136 |
self.pending_operations: Dict[str, asyncio.Task] = {} # Track ongoing operations
|
| 137 |
|
|
@@ -163,18 +165,29 @@ async def split_model_weights():
|
|
| 163 |
num_chunks = num_servers # One chunk per server initially
|
| 164 |
|
| 165 |
chunk_size = math.ceil(file_size / num_chunks)
|
| 166 |
-
|
| 167 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 168 |
|
| 169 |
-
#
|
| 170 |
-
|
| 171 |
-
os.makedirs(chunks_dir, exist_ok=True)
|
| 172 |
|
| 173 |
# Split the file into chunks
|
| 174 |
with open(model_file, 'rb') as f:
|
| 175 |
chunk_sizes = [] # Track actual chunk sizes
|
| 176 |
for chunk_id in range(num_chunks):
|
| 177 |
-
chunk_path = os.path.join(chunks_dir, f"chunk_{chunk_id}.bin")
|
| 178 |
|
| 179 |
# Calculate chunk boundaries
|
| 180 |
start_pos = chunk_id * chunk_size
|
|
@@ -208,19 +221,28 @@ async def split_model_weights():
|
|
| 208 |
status="ready"
|
| 209 |
)
|
| 210 |
|
| 211 |
-
print(f"[INFO] Created chunk {chunk_id}: {current_chunk_size
|
| 212 |
|
| 213 |
# Verify distribution
|
| 214 |
total_size_actual = sum(chunk_sizes)
|
| 215 |
if total_size_actual != file_size:
|
| 216 |
-
print(f"[WARN] Total chunk size ({total_size_actual}) differs from original file size ({file_size})")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 217 |
|
| 218 |
print(f"\n[INFO] Distribution Summary:")
|
| 219 |
print(f"- Original file: {os.path.basename(model_file)}")
|
| 220 |
-
print(f"- Total size: {file_size
|
| 221 |
print(f"- Number of chunks: {len(state.model_chunks)}")
|
| 222 |
-
print(f"- Chunks directory: {chunks_dir}")
|
| 223 |
-
print(f"-
|
|
|
|
|
|
|
|
|
|
| 224 |
|
| 225 |
return True
|
| 226 |
|
|
@@ -361,15 +383,17 @@ async def send_chunk_to_server(server_url: str, chunk_id: int, chunk_info: Dict)
|
|
| 361 |
"""Send a model chunk to a tensor server"""
|
| 362 |
try:
|
| 363 |
print(f"[INFO] Sending chunk {chunk_id} to server {server_url}")
|
| 364 |
-
chunk_path = os.path.join(state.
|
| 365 |
|
| 366 |
if not os.path.exists(chunk_path):
|
| 367 |
raise Exception(f"Chunk file not found: {chunk_path}")
|
| 368 |
|
|
|
|
|
|
|
| 369 |
chunk_data = {
|
| 370 |
'chunk_id': chunk_id,
|
| 371 |
-
'files': [
|
| 372 |
-
'config':
|
| 373 |
}
|
| 374 |
|
| 375 |
async with aiohttp.ClientSession() as session:
|
|
@@ -570,6 +594,11 @@ async def download_model_files():
|
|
| 570 |
model_path = model_version_dir
|
| 571 |
print(f"[INFO] Successfully cloned model to {model_path}")
|
| 572 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 573 |
# Load and parse the config
|
| 574 |
config_path = os.path.join(model_path, "config.json")
|
| 575 |
if os.path.exists(config_path):
|
|
|
|
| 132 |
self.tensor_servers: Dict[str, TensorServer] = {}
|
| 133 |
self.model_chunks: Dict[int, ModelChunk] = {}
|
| 134 |
self.is_model_loaded = False
|
| 135 |
+
self.model_path: str = "" # Base path where model files are stored
|
| 136 |
+
self.chunks_dir: str = "" # Directory containing chunk files
|
| 137 |
self.operation_results: Dict[str, Dict] = {} # Track operation results from tensor servers
|
| 138 |
self.pending_operations: Dict[str, asyncio.Task] = {} # Track ongoing operations
|
| 139 |
|
|
|
|
| 165 |
num_chunks = num_servers # One chunk per server initially
|
| 166 |
|
| 167 |
chunk_size = math.ceil(file_size / num_chunks)
|
| 168 |
+
|
| 169 |
+
# Format sizes for display
|
| 170 |
+
def format_size(size_bytes):
|
| 171 |
+
if size_bytes >= 1024*1024*1024: # GB
|
| 172 |
+
return f"{size_bytes / (1024*1024*1024):.2f} GB"
|
| 173 |
+
elif size_bytes >= 1024*1024: # MB
|
| 174 |
+
return f"{size_bytes / (1024*1024):.2f} MB"
|
| 175 |
+
elif size_bytes >= 1024: # KB
|
| 176 |
+
return f"{size_bytes / 1024:.2f} KB"
|
| 177 |
+
else:
|
| 178 |
+
return f"{size_bytes} bytes"
|
| 179 |
+
|
| 180 |
+
print(f"[INFO] Model file size: {format_size(file_size)}")
|
| 181 |
+
print(f"[INFO] Creating {num_chunks} chunks of approximately {format_size(chunk_size)} each")
|
| 182 |
|
| 183 |
+
# Use the chunks directory from state
|
| 184 |
+
os.makedirs(state.chunks_dir, exist_ok=True)
|
|
|
|
| 185 |
|
| 186 |
# Split the file into chunks
|
| 187 |
with open(model_file, 'rb') as f:
|
| 188 |
chunk_sizes = [] # Track actual chunk sizes
|
| 189 |
for chunk_id in range(num_chunks):
|
| 190 |
+
chunk_path = os.path.join(state.chunks_dir, f"chunk_{chunk_id}.bin")
|
| 191 |
|
| 192 |
# Calculate chunk boundaries
|
| 193 |
start_pos = chunk_id * chunk_size
|
|
|
|
| 221 |
status="ready"
|
| 222 |
)
|
| 223 |
|
| 224 |
+
print(f"[INFO] Created chunk {chunk_id}: {format_size(current_chunk_size)} ({current_chunk_size:,} bytes)")
|
| 225 |
|
| 226 |
# Verify distribution
|
| 227 |
total_size_actual = sum(chunk_sizes)
|
| 228 |
if total_size_actual != file_size:
|
| 229 |
+
print(f"[WARN] Total chunk size ({format_size(total_size_actual)}) differs from original file size ({format_size(file_size)})")
|
| 230 |
+
print(f"[WARN] Difference: {format_size(abs(total_size_actual - file_size))}")
|
| 231 |
+
|
| 232 |
+
# Calculate statistics
|
| 233 |
+
avg_chunk_size = sum(chunk_sizes) / len(chunk_sizes) if chunk_sizes else 0
|
| 234 |
+
min_chunk_size = min(chunk_sizes) if chunk_sizes else 0
|
| 235 |
+
max_chunk_size = max(chunk_sizes) if chunk_sizes else 0
|
| 236 |
|
| 237 |
print(f"\n[INFO] Distribution Summary:")
|
| 238 |
print(f"- Original file: {os.path.basename(model_file)}")
|
| 239 |
+
print(f"- Total size: {format_size(file_size)} ({file_size:,} bytes)")
|
| 240 |
print(f"- Number of chunks: {len(state.model_chunks)}")
|
| 241 |
+
print(f"- Chunks directory: {state.chunks_dir}")
|
| 242 |
+
print(f"- Average chunk size: {format_size(avg_chunk_size)}")
|
| 243 |
+
print(f"- Smallest chunk: {format_size(min_chunk_size)}")
|
| 244 |
+
print(f"- Largest chunk: {format_size(max_chunk_size)}")
|
| 245 |
+
print(f"- Size variance: {((max_chunk_size - min_chunk_size) / avg_chunk_size * 100):.1f}%")
|
| 246 |
|
| 247 |
return True
|
| 248 |
|
|
|
|
| 383 |
"""Send a model chunk to a tensor server"""
|
| 384 |
try:
|
| 385 |
print(f"[INFO] Sending chunk {chunk_id} to server {server_url}")
|
| 386 |
+
chunk_path = os.path.join(state.chunks_dir, f"chunk_{chunk_id}.bin")
|
| 387 |
|
| 388 |
if not os.path.exists(chunk_path):
|
| 389 |
raise Exception(f"Chunk file not found: {chunk_path}")
|
| 390 |
|
| 391 |
+
# Get chunk metadata
|
| 392 |
+
chunk = state.model_chunks[chunk_id]
|
| 393 |
chunk_data = {
|
| 394 |
'chunk_id': chunk_id,
|
| 395 |
+
'files': [os.path.basename(chunk_path)],
|
| 396 |
+
'config': chunk.config
|
| 397 |
}
|
| 398 |
|
| 399 |
async with aiohttp.ClientSession() as session:
|
|
|
|
| 594 |
model_path = model_version_dir
|
| 595 |
print(f"[INFO] Successfully cloned model to {model_path}")
|
| 596 |
|
| 597 |
+
# Set model paths in state
|
| 598 |
+
state.model_path = model_path
|
| 599 |
+
state.chunks_dir = os.path.join(model_path, "chunks")
|
| 600 |
+
os.makedirs(state.chunks_dir, exist_ok=True)
|
| 601 |
+
|
| 602 |
# Load and parse the config
|
| 603 |
config_path = os.path.join(model_path, "config.json")
|
| 604 |
if os.path.exists(config_path):
|