Factor Studios commited on
Commit
7b251d0
·
verified ·
1 Parent(s): 92b38f0

Update http_storage.py

Browse files
Files changed (1) hide show
  1. http_storage.py +41 -92
http_storage.py CHANGED
@@ -414,113 +414,62 @@ class HTTPGPUStorage:
414
  logging.error(f"Error during inference for model {model_name}: {str(e)}")
415
  return None
416
 
417
- def wait_for_connection(self, timeout: float = 30.0) -> bool:
418
- """Wait for HTTP connection to be established (compatibility method)"""
419
- # For HTTP, we just check if we can make a request
420
  try:
421
- if not self.session_token:
422
- return self._create_session()
423
-
424
- # Test connection with a simple request
425
- response = self._make_request('GET', '/cache/connection_test')
426
- return response is not None
427
-
428
  except Exception as e:
429
- logging.error(f"Connection test failed: {e}")
430
  return False
431
 
432
  def is_connected(self) -> bool:
433
- """Check if HTTP connection is active (compatibility method)"""
434
- return self.session_token is not None and not self._closing
435
 
436
  def get_connection_status(self) -> Dict[str, Any]:
437
- """Get detailed connection status"""
438
- return {
439
- "connected": self.is_connected(),
440
- "closing": self._closing,
441
- "error_count": self.error_count,
442
- "base_url": self.base_url,
443
- "last_error_time": self.last_error_time,
444
- "loaded_models": list(self.resource_monitor['loaded_models']),
445
- "session_id": self.session_id
446
- }
447
 
448
- def set_keep_alive(self, enabled: bool):
449
- """Set keep-alive mode (compatibility method for HTTP)"""
450
- # HTTP connections are stateless, so this is a no-op
451
- pass
452
 
453
  def reconnect(self):
454
- """Reconnect to server (recreate session for HTTP)"""
455
- self.session_token = None
456
- self.session_id = None
457
- return self._create_session()
 
 
 
 
 
 
 
 
 
 
458
 
459
  def close(self):
460
  """Close HTTP client"""
461
  self._closing = True
462
- if self.http_session:
463
- self.http_session.close()
464
-
465
- # Additional methods for multi-chip coordination
466
- def transfer_between_chips(self, src_chip: int, dst_chip: int, data_id: str) -> Optional[str]:
467
- """Transfer data between chips via HTTP API"""
468
- try:
469
- request_data = {"data_id": data_id}
470
-
471
- response = self._make_request(
472
- 'POST',
473
- f'/chips/{src_chip}/transfer/{dst_chip}',
474
- json=request_data
475
- )
476
-
477
- if response and response.get('status') == 'success':
478
- return response.get('new_data_id')
479
- else:
480
- logging.error(f"Chip transfer failed: {response.get('message', 'Unknown error')}")
481
- return None
482
-
483
- except Exception as e:
484
- logging.error(f"Error in chip transfer: {str(e)}")
485
- return None
486
-
487
- def create_sync_barrier(self, barrier_id: str, num_participants: int) -> bool:
488
- """Create synchronization barrier via HTTP API"""
489
- try:
490
- request_data = {"num_participants": num_participants}
491
-
492
- response = self._make_request(
493
- 'POST',
494
- f'/sync/barrier/{barrier_id}',
495
- json=request_data
496
- )
497
-
498
- return response and response.get('status') == 'success'
499
-
500
- except Exception as e:
501
- logging.error(f"Error creating sync barrier: {str(e)}")
502
- return False
503
-
504
- def wait_sync_barrier(self, barrier_id: str) -> bool:
505
- """Wait at synchronization barrier via HTTP API"""
506
- try:
507
- response = self._make_request('PUT', f'/sync/barrier/{barrier_id}/wait')
508
-
509
- if response:
510
- status = response.get('status')
511
- if status == 'released':
512
- return True
513
- elif status == 'waiting':
514
- # In a real implementation, this might poll or use long-polling
515
- time.sleep(0.1) # Brief delay before next check
516
- return False
517
-
518
- return False
519
-
520
- except Exception as e:
521
- logging.error(f"Error waiting at sync barrier: {str(e)}")
522
- return False
523
 
524
  # Compatibility alias for existing code
525
  WebSocketGPUStorage = HTTPGPUStorage
526
 
 
 
414
  logging.error(f"Error during inference for model {model_name}: {str(e)}")
415
  return None
416
 
417
+ def ping(self) -> bool:
418
+ """Ping the server to check connection status."""
 
419
  try:
420
+ response = self._make_request('GET', '/status')
421
+ return response and response.get('status') == 'ok'
 
 
 
 
 
422
  except Exception as e:
423
+ logging.error(f"Ping failed: {e}")
424
  return False
425
 
426
  def is_connected(self) -> bool:
427
+ """Check if the client is connected to the server."""
428
+ return self.ping()
429
 
430
  def get_connection_status(self) -> Dict[str, Any]:
431
+ """Get detailed connection status."""
432
+ if self.is_connected():
433
+ return {"status": "connected", "session_id": self.session_id}
434
+ else:
435
+ return {"status": "disconnected", "error_count": self.error_count}
 
 
 
 
 
436
 
437
+ def set_keep_alive(self, interval: int):
438
+ """Set keep-alive interval (compatibility method)."""
439
+ logging.info(f"Keep-alive interval set to {interval} seconds (HTTP client does not use websockets).")
 
440
 
441
  def reconnect(self):
442
+ """Attempt to reconnect (compatibility method)."""
443
+ logging.info("Attempting to reconnect HTTP client...")
444
+ self._create_session()
445
+
446
+ def wait_for_connection(self, timeout: float = 30.0) -> bool:
447
+ """Wait for HTTP connection to be established (compatibility method)"""
448
+ start_time = time.time()
449
+ while time.time() - start_time < timeout:
450
+ if self.is_connected():
451
+ logging.info("HTTP connection established.")
452
+ return True
453
+ time.sleep(1) # Wait for 1 second before retrying
454
+ logging.error("HTTP connection not established within timeout.")
455
+ return False
456
 
457
  def close(self):
458
  """Close HTTP client"""
459
  self._closing = True
460
+ logging.info("HTTP client is closing.")
461
+ # Invalidate session on server side if possible
462
+ if self.session_token:
463
+ try:
464
+ self.http_session.post(f"{self.api_base}/sessions/invalidate",
465
+ headers={'Authorization': f'Bearer {self.session_token}'},
466
+ timeout=5)
467
+ except Exception as e:
468
+ logging.warning(f"Failed to invalidate session on server: {e}")
469
+ self.http_session.close()
470
+ HTTPGPUStorage._instance = None # Clear singleton instance
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
471
 
472
  # Compatibility alias for existing code
473
  WebSocketGPUStorage = HTTPGPUStorage
474
 
475
+