Sarthak commited on
Commit
12d70ca
·
1 Parent(s): 8083c06

refactor(beam-utils): use direct file operations for beam volumes

Browse files

This commit refactors the Beam volume interaction logic to use direct file operations when running on the Beam platform. It removes the dependency on the `beam cp` command for file transfers, which simplifies the code and improves performance. It also adds checks to determine if the code is running on Beam and adjusts the file operations accordingly.

Files changed (1) hide show
  1. src/distiller/beam_utils.py +160 -103
src/distiller/beam_utils.py CHANGED
@@ -13,6 +13,7 @@ Features:
13
  - Distributed storage optimization
14
  """
15
 
 
16
  import json
17
  import logging
18
  import shutil
@@ -25,6 +26,52 @@ from typing import Any
25
  logger = logging.getLogger(__name__)
26
 
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  class BeamVolumeManager:
29
  """Manager for Beam distributed storage volumes using direct file operations."""
30
 
@@ -792,7 +839,7 @@ def download_evaluation_results_from_beam(
792
  local_results_dir: str = "code_model2vec/evaluation_results",
793
  ) -> bool:
794
  """
795
- Download evaluation result files from Beam volume to local directory using beam cp.
796
 
797
  Args:
798
  volume_name: Name of the Beam volume
@@ -806,46 +853,43 @@ def download_evaluation_results_from_beam(
806
  local_path = Path(local_results_dir)
807
  local_path.mkdir(parents=True, exist_ok=True)
808
 
809
- # Use beam cp to download individual JSON files
810
- remote_path = f"{volume_name}:{remote_results_dir}"
 
811
 
812
- # First, list files in the remote directory
813
- list_cmd = ["beam", "cp", "-r", "--list-only", remote_path]
814
- try:
815
- result = subprocess.run(list_cmd, capture_output=True, text=True, check=True) # noqa: S603
816
- remote_files = [line.strip() for line in result.stdout.split("\n") if line.strip().endswith(".json")]
817
- except subprocess.CalledProcessError:
818
- logger.warning(f"Could not list files in {remote_path}")
819
- remote_files = []
820
 
821
- # Download each JSON file individually
822
- downloaded_files = []
823
- for file_name in remote_files:
824
- if file_name.endswith(".json"):
825
- remote_file_path = f"{volume_name}:{remote_results_dir}/{file_name}"
826
- local_file_path = local_path / file_name
827
 
828
  try:
829
- download_cmd = ["beam", "cp", remote_file_path, str(local_file_path)]
830
- subprocess.run(download_cmd, check=True, capture_output=True) # noqa: S603
831
- downloaded_files.append(file_name)
832
- logger.info(f"📥 Downloaded: {file_name}")
833
 
834
  # Delete the file from Beam volume after successful download
835
- delete_cmd = ["beam", "rm", remote_file_path]
836
- try:
837
- subprocess.run(delete_cmd, check=True, capture_output=True) # noqa: S603
838
- logger.info(f"🗑️ Deleted from volume: {file_name}")
839
- except subprocess.CalledProcessError as e:
840
- logger.warning(f"⚠️ Could not delete {file_name} from volume: {e}")
841
 
842
- except subprocess.CalledProcessError as e:
843
- logger.warning(f"⚠️ Failed to download {file_name}: {e}")
844
 
845
- if downloaded_files:
846
- logger.info(f"✅ Downloaded {len(downloaded_files)} evaluation result files")
 
 
847
  return True
848
- logger.info("ℹ️ No new evaluation files to download")
 
 
 
 
849
  return True
850
 
851
  except Exception:
@@ -861,7 +905,7 @@ def download_specific_evaluation_file(
861
  file_prefix: str = "codesearchnet_eval",
862
  ) -> bool:
863
  """
864
- Download a specific evaluation or benchmark result file from Beam volume.
865
 
866
  Args:
867
  volume_name: Name of the Beam volume
@@ -881,28 +925,27 @@ def download_specific_evaluation_file(
881
  safe_model_name = model_name.replace("/", "_")
882
  filename = f"{file_prefix}_{safe_model_name}.json"
883
 
884
- remote_file_path = f"{volume_name}:{remote_results_dir}/{filename}"
 
885
  local_file_path = local_path / filename
886
 
887
- # Download the specific file
888
- download_cmd = ["beam", "cp", remote_file_path, str(local_file_path)]
889
- subprocess.run(download_cmd, check=True, capture_output=True) # noqa: S603
 
 
 
 
 
890
 
891
  logger.info(f"📥 Downloaded {file_prefix} results for {model_name}")
892
 
893
  # Delete the file from Beam volume after successful download
894
- delete_cmd = ["beam", "rm", remote_file_path]
895
- try:
896
- subprocess.run(delete_cmd, check=True, capture_output=True) # noqa: S603
897
- logger.info(f"🗑️ Deleted {file_prefix} results for {model_name} from volume")
898
- except subprocess.CalledProcessError as e:
899
- logger.warning(f"⚠️ Could not delete {filename} from volume: {e}")
900
 
901
  return True
902
 
903
- except subprocess.CalledProcessError:
904
- logger.warning(f"⚠️ No {file_prefix} results found for {model_name} on Beam")
905
- return False
906
  except Exception:
907
  logger.exception(f"❌ Error downloading {file_prefix} results for {model_name}")
908
  return False
@@ -914,7 +957,7 @@ def download_model_from_beam(
914
  local_dir: str,
915
  ) -> bool:
916
  """
917
- Download a model from Beam volume to local directory.
918
 
919
  Args:
920
  volume_name: Name of the Beam volume
@@ -928,22 +971,27 @@ def download_model_from_beam(
928
  local_path = Path(local_dir)
929
  local_path.mkdir(parents=True, exist_ok=True)
930
 
931
- # Use beam cp to download the model directory
932
- remote_path = f"{volume_name}:models/{model_name}"
933
  local_model_path = local_path / model_name
934
 
935
- download_cmd = ["beam", "cp", "-r", remote_path, str(local_model_path)]
936
- subprocess.run(download_cmd, check=True, capture_output=True) # noqa: S603
 
 
 
 
 
 
 
 
937
 
938
  logger.info(f"📥 Downloaded model {model_name} from Beam to {local_dir}")
939
  return True
940
 
941
- except subprocess.CalledProcessError as e:
942
  logger.warning(f"⚠️ Failed to download model {model_name} from Beam: {e}")
943
  return False
944
- except Exception:
945
- logger.exception(f"❌ Error downloading model {model_name} from Beam")
946
- return False
947
 
948
 
949
  def upload_model_to_beam(
@@ -952,7 +1000,7 @@ def upload_model_to_beam(
952
  local_dir: str,
953
  ) -> bool:
954
  """
955
- Upload a model from local directory to Beam volume.
956
 
957
  Args:
958
  volume_name: Name of the Beam volume
@@ -968,21 +1016,24 @@ def upload_model_to_beam(
968
  logger.error(f"❌ Local model directory does not exist: {local_dir}")
969
  return False
970
 
971
- # Use beam cp to upload the model directory
972
- remote_path = f"{volume_name}:models/{model_name}"
 
 
 
 
 
973
 
974
- upload_cmd = ["beam", "cp", "-r", str(local_path), remote_path]
975
- subprocess.run(upload_cmd, check=True, capture_output=True) # noqa: S603
 
976
 
977
  logger.info(f"📤 Uploaded model {model_name} to Beam from {local_dir}")
978
  return True
979
 
980
- except subprocess.CalledProcessError as e:
981
  logger.warning(f"⚠️ Failed to upload model {model_name} to Beam: {e}")
982
  return False
983
- except Exception:
984
- logger.exception(f"❌ Error uploading model {model_name} to Beam")
985
- return False
986
 
987
 
988
  def download_checkpoints_from_beam(
@@ -992,7 +1043,7 @@ def download_checkpoints_from_beam(
992
  local_checkpoints_dir: str = "code_model2vec/checkpoints",
993
  ) -> bool:
994
  """
995
- Download checkpoint files from Beam volume to local directory.
996
 
997
  Args:
998
  volume_name: Name of the Beam volume
@@ -1007,52 +1058,49 @@ def download_checkpoints_from_beam(
1007
  local_path = Path(local_checkpoints_dir)
1008
  local_path.mkdir(parents=True, exist_ok=True)
1009
 
 
 
 
 
 
 
 
 
1010
  # Build the pattern for files to download
1011
  if stage:
1012
  local_stage_dir = local_path / stage
1013
  local_stage_dir.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
1014
  else:
1015
- pass
1016
-
1017
- # Use beam cp to download checkpoint files
1018
- remote_path = f"{volume_name}:{remote_checkpoints_dir}"
1019
-
1020
- # First, try to list files
1021
- list_cmd = ["beam", "cp", "-r", "--list-only", remote_path]
1022
- try:
1023
- result = subprocess.run(list_cmd, capture_output=True, text=True, check=True) # noqa: S603
1024
- remote_files = [
1025
- line.strip()
1026
- for line in result.stdout.split("\n")
1027
- if line.strip().endswith(".json") and "checkpoints_" in line.strip()
1028
- ]
1029
- except subprocess.CalledProcessError:
1030
- logger.warning(f"Could not list checkpoint files in {remote_path}")
1031
  remote_files = []
 
 
 
1032
 
1033
- # Filter by stage if specified
1034
- if stage:
1035
- remote_files = [f for f in remote_files if f"checkpoints_{stage}_" in f]
1036
-
1037
- # Download each checkpoint file
1038
  downloaded_files = []
1039
- for file_name in remote_files:
1040
- remote_file_path = f"{volume_name}:{remote_checkpoints_dir}/{file_name}"
1041
-
1042
  # Determine local subdirectory based on checkpoint stage
1043
- file_stage = file_name.split("_")[1] if "_" in file_name else "unknown"
1044
  local_stage_dir = local_path / file_stage
1045
  local_stage_dir.mkdir(parents=True, exist_ok=True)
1046
- local_file_path = local_stage_dir / file_name
1047
 
1048
  try:
1049
- download_cmd = ["beam", "cp", remote_file_path, str(local_file_path)]
1050
- subprocess.run(download_cmd, check=True, capture_output=True) # noqa: S603
1051
- downloaded_files.append(file_name)
1052
- logger.info(f"📥 Downloaded checkpoint: {file_name}")
 
1053
 
1054
- except subprocess.CalledProcessError as e:
1055
- logger.warning(f"⚠️ Failed to download checkpoint {file_name}: {e}")
1056
 
1057
  if downloaded_files:
1058
  logger.info(f"✅ Downloaded {len(downloaded_files)} checkpoint files")
@@ -1072,7 +1120,7 @@ def upload_checkpoints_to_beam(
1072
  remote_checkpoints_dir: str = "checkpoints",
1073
  ) -> bool:
1074
  """
1075
- Upload checkpoint files from local directory to Beam volume.
1076
 
1077
  Args:
1078
  volume_name: Name of the Beam volume
@@ -1089,6 +1137,10 @@ def upload_checkpoints_to_beam(
1089
  logger.warning(f"⚠️ Local checkpoints directory does not exist: {local_checkpoints_dir}")
1090
  return True # Not an error - no checkpoints to upload
1091
 
 
 
 
 
1092
  # Find checkpoint files to upload
1093
  if stage:
1094
  # Look in the stage subdirectory
@@ -1105,18 +1157,23 @@ def upload_checkpoints_to_beam(
1105
  logger.info(f"ℹ️ No checkpoint files found to upload for stage: {stage or 'all'}")
1106
  return True
1107
 
1108
- # Upload each checkpoint file
1109
  uploaded_files = []
1110
  for checkpoint_file in checkpoint_files:
1111
- remote_file_path = f"{volume_name}:{remote_checkpoints_dir}/{checkpoint_file.name}"
 
 
 
 
1112
 
1113
  try:
1114
- upload_cmd = ["beam", "cp", str(checkpoint_file), remote_file_path]
1115
- subprocess.run(upload_cmd, check=True, capture_output=True) # noqa: S603
 
1116
  uploaded_files.append(checkpoint_file.name)
1117
  logger.info(f"📤 Uploaded checkpoint: {checkpoint_file.name}")
1118
 
1119
- except subprocess.CalledProcessError as e:
1120
  logger.warning(f"⚠️ Failed to upload checkpoint {checkpoint_file.name}: {e}")
1121
 
1122
  if uploaded_files:
 
13
  - Distributed storage optimization
14
  """
15
 
16
+ # ruff: noqa: S603, S607, PLW1510
17
  import json
18
  import logging
19
  import shutil
 
26
  logger = logging.getLogger(__name__)
27
 
28
 
29
+ def _is_running_on_beam() -> bool:
30
+ """
31
+ Detect if we're running on Beam platform or locally.
32
+
33
+ On Beam, volumes are mounted as directories. Locally, we need to use beam CLI.
34
+ """
35
+ import os
36
+
37
+ # Check for Beam environment variables
38
+ beam_env_vars = [
39
+ "BEAM_TASK_ID",
40
+ "BEAM_FUNCTION_ID",
41
+ "BEAM_RUN_ID",
42
+ "BEAM_JOB_ID",
43
+ "BEAM_CONTAINER_ID",
44
+ ]
45
+
46
+ for env_var in beam_env_vars:
47
+ if os.environ.get(env_var):
48
+ return True
49
+
50
+ # Check for common Beam mount paths
51
+ beam_mount_paths = [
52
+ "/volumes", # Common Beam volume mount
53
+ "/mnt/beam",
54
+ "/var/beam",
55
+ "/beam",
56
+ ]
57
+
58
+ return any(Path(mount_path).exists() for mount_path in beam_mount_paths)
59
+
60
+
61
+ def _check_beam_cli_available() -> bool:
62
+ """
63
+ Check if beam CLI is available for local file operations.
64
+
65
+ Returns:
66
+ True if beam CLI is available, False otherwise
67
+ """
68
+ try:
69
+ result = subprocess.run(["beam", "--version"], capture_output=True, text=True, timeout=10)
70
+ return result.returncode == 0
71
+ except (FileNotFoundError, subprocess.TimeoutExpired):
72
+ return False
73
+
74
+
75
  class BeamVolumeManager:
76
  """Manager for Beam distributed storage volumes using direct file operations."""
77
 
 
839
  local_results_dir: str = "code_model2vec/evaluation_results",
840
  ) -> bool:
841
  """
842
+ Download evaluation result files from Beam volume to local directory.
843
 
844
  Args:
845
  volume_name: Name of the Beam volume
 
853
  local_path = Path(local_results_dir)
854
  local_path.mkdir(parents=True, exist_ok=True)
855
 
856
+ if _is_running_on_beam():
857
+ # Direct file operations when running on Beam
858
+ remote_path = Path(volume_name) / remote_results_dir
859
 
860
+ if not remote_path.exists():
861
+ logger.info("ℹ️ No evaluation results directory found on Beam")
862
+ return True
 
 
 
 
 
863
 
864
+ # Find and copy JSON result files
865
+ remote_files = list(remote_path.glob("*.json"))
866
+
867
+ downloaded_files = []
868
+ for result_file in remote_files:
869
+ local_file_path = local_path / result_file.name
870
 
871
  try:
872
+ shutil.copy2(result_file, local_file_path)
873
+ downloaded_files.append(result_file.name)
874
+ logger.info(f"📥 Downloaded: {result_file.name}")
 
875
 
876
  # Delete the file from Beam volume after successful download
877
+ result_file.unlink()
878
+ logger.info(f"🗑️ Deleted from volume: {result_file.name}")
 
 
 
 
879
 
880
+ except Exception as e:
881
+ logger.warning(f"⚠️ Failed to download {result_file.name}: {e}")
882
 
883
+ if downloaded_files:
884
+ logger.info(f"✅ Downloaded {len(downloaded_files)} evaluation result files")
885
+ return True
886
+ logger.info("ℹ️ No new evaluation files to download")
887
  return True
888
+
889
+ # When running locally, we cannot access Beam volumes directly
890
+ # This would require a proper Beam storage API or CLI tool
891
+ logger.info("ℹ️ Evaluation results download from local environment not supported")
892
+ logger.info("ℹ️ Evaluation results are only accessible when running on Beam platform")
893
  return True
894
 
895
  except Exception:
 
905
  file_prefix: str = "codesearchnet_eval",
906
  ) -> bool:
907
  """
908
+ Download a specific evaluation or benchmark result file from Beam volume using direct file operations.
909
 
910
  Args:
911
  volume_name: Name of the Beam volume
 
925
  safe_model_name = model_name.replace("/", "_")
926
  filename = f"{file_prefix}_{safe_model_name}.json"
927
 
928
+ # When running on Beam, the volume is mounted as a directory
929
+ remote_file_path = Path(volume_name) / remote_results_dir / filename
930
  local_file_path = local_path / filename
931
 
932
+ if not remote_file_path.exists():
933
+ logger.warning(f"⚠️ No {file_prefix} results found for {model_name} on Beam")
934
+ return False
935
+
936
+ # Copy the specific file
937
+ import shutil
938
+
939
+ shutil.copy2(remote_file_path, local_file_path)
940
 
941
  logger.info(f"📥 Downloaded {file_prefix} results for {model_name}")
942
 
943
  # Delete the file from Beam volume after successful download
944
+ remote_file_path.unlink()
945
+ logger.info(f"🗑️ Deleted {file_prefix} results for {model_name} from volume")
 
 
 
 
946
 
947
  return True
948
 
 
 
 
949
  except Exception:
950
  logger.exception(f"❌ Error downloading {file_prefix} results for {model_name}")
951
  return False
 
957
  local_dir: str,
958
  ) -> bool:
959
  """
960
+ Download a model from Beam volume to local directory using direct file operations.
961
 
962
  Args:
963
  volume_name: Name of the Beam volume
 
971
  local_path = Path(local_dir)
972
  local_path.mkdir(parents=True, exist_ok=True)
973
 
974
+ # When running on Beam, the volume is mounted as a directory
975
+ remote_model_path = Path(volume_name) / "models" / model_name
976
  local_model_path = local_path / model_name
977
 
978
+ if not remote_model_path.exists():
979
+ logger.warning(f"⚠️ Model {model_name} not found in Beam volume at {remote_model_path}")
980
+ return False
981
+
982
+ # Copy the model directory
983
+ import shutil
984
+
985
+ if local_model_path.exists():
986
+ shutil.rmtree(local_model_path)
987
+ shutil.copytree(remote_model_path, local_model_path)
988
 
989
  logger.info(f"📥 Downloaded model {model_name} from Beam to {local_dir}")
990
  return True
991
 
992
+ except Exception as e:
993
  logger.warning(f"⚠️ Failed to download model {model_name} from Beam: {e}")
994
  return False
 
 
 
995
 
996
 
997
  def upload_model_to_beam(
 
1000
  local_dir: str,
1001
  ) -> bool:
1002
  """
1003
+ Upload a model from local directory to Beam volume using direct file operations.
1004
 
1005
  Args:
1006
  volume_name: Name of the Beam volume
 
1016
  logger.error(f"❌ Local model directory does not exist: {local_dir}")
1017
  return False
1018
 
1019
+ # When running on Beam, the volume is mounted as a directory
1020
+ remote_models_dir = Path(volume_name) / "models"
1021
+ remote_models_dir.mkdir(parents=True, exist_ok=True)
1022
+ remote_model_path = remote_models_dir / model_name
1023
+
1024
+ # Copy the model directory
1025
+ import shutil
1026
 
1027
+ if remote_model_path.exists():
1028
+ shutil.rmtree(remote_model_path)
1029
+ shutil.copytree(local_path, remote_model_path)
1030
 
1031
  logger.info(f"📤 Uploaded model {model_name} to Beam from {local_dir}")
1032
  return True
1033
 
1034
+ except Exception as e:
1035
  logger.warning(f"⚠️ Failed to upload model {model_name} to Beam: {e}")
1036
  return False
 
 
 
1037
 
1038
 
1039
  def download_checkpoints_from_beam(
 
1043
  local_checkpoints_dir: str = "code_model2vec/checkpoints",
1044
  ) -> bool:
1045
  """
1046
+ Download checkpoint files from Beam volume to local directory using direct file operations.
1047
 
1048
  Args:
1049
  volume_name: Name of the Beam volume
 
1058
  local_path = Path(local_checkpoints_dir)
1059
  local_path.mkdir(parents=True, exist_ok=True)
1060
 
1061
+ # When running on Beam, the volume is mounted as a directory
1062
+ remote_base_path = Path(volume_name) / remote_checkpoints_dir
1063
+
1064
+ # If the remote path doesn't exist, there are no checkpoints to download
1065
+ if not remote_base_path.exists():
1066
+ logger.info(f"ℹ️ No checkpoint directory found at {remote_base_path}")
1067
+ return True
1068
+
1069
  # Build the pattern for files to download
1070
  if stage:
1071
  local_stage_dir = local_path / stage
1072
  local_stage_dir.mkdir(parents=True, exist_ok=True)
1073
+ # Look for files in stage-specific directory
1074
+ remote_stage_dir = remote_base_path / stage
1075
+ if remote_stage_dir.exists():
1076
+ remote_files = list(remote_stage_dir.glob(f"checkpoints_{stage}_*.json"))
1077
+ else:
1078
+ remote_files = []
1079
  else:
1080
+ # Look for all checkpoint files in all stage subdirectories
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1081
  remote_files = []
1082
+ for stage_dir in remote_base_path.iterdir():
1083
+ if stage_dir.is_dir():
1084
+ remote_files.extend(stage_dir.glob("checkpoints_*.json"))
1085
 
1086
+ # Copy each checkpoint file
 
 
 
 
1087
  downloaded_files = []
1088
+ for checkpoint_file in remote_files:
 
 
1089
  # Determine local subdirectory based on checkpoint stage
1090
+ file_stage = checkpoint_file.name.split("_")[1] if "_" in checkpoint_file.name else "unknown"
1091
  local_stage_dir = local_path / file_stage
1092
  local_stage_dir.mkdir(parents=True, exist_ok=True)
1093
+ local_file_path = local_stage_dir / checkpoint_file.name
1094
 
1095
  try:
1096
+ import shutil
1097
+
1098
+ shutil.copy2(checkpoint_file, local_file_path)
1099
+ downloaded_files.append(checkpoint_file.name)
1100
+ logger.info(f"📥 Downloaded checkpoint: {checkpoint_file.name}")
1101
 
1102
+ except Exception as e:
1103
+ logger.warning(f"⚠️ Failed to download checkpoint {checkpoint_file.name}: {e}")
1104
 
1105
  if downloaded_files:
1106
  logger.info(f"✅ Downloaded {len(downloaded_files)} checkpoint files")
 
1120
  remote_checkpoints_dir: str = "checkpoints",
1121
  ) -> bool:
1122
  """
1123
+ Upload checkpoint files from local directory to Beam volume using direct file operations.
1124
 
1125
  Args:
1126
  volume_name: Name of the Beam volume
 
1137
  logger.warning(f"⚠️ Local checkpoints directory does not exist: {local_checkpoints_dir}")
1138
  return True # Not an error - no checkpoints to upload
1139
 
1140
+ # When running on Beam, the volume is mounted as a directory
1141
+ remote_base_path = Path(volume_name) / remote_checkpoints_dir
1142
+ remote_base_path.mkdir(parents=True, exist_ok=True)
1143
+
1144
  # Find checkpoint files to upload
1145
  if stage:
1146
  # Look in the stage subdirectory
 
1157
  logger.info(f"ℹ️ No checkpoint files found to upload for stage: {stage or 'all'}")
1158
  return True
1159
 
1160
+ # Copy each checkpoint file
1161
  uploaded_files = []
1162
  for checkpoint_file in checkpoint_files:
1163
+ # Determine remote subdirectory based on checkpoint stage
1164
+ file_stage = checkpoint_file.name.split("_")[1] if "_" in checkpoint_file.name else "unknown"
1165
+ remote_stage_dir = remote_base_path / file_stage
1166
+ remote_stage_dir.mkdir(parents=True, exist_ok=True)
1167
+ remote_file_path = remote_stage_dir / checkpoint_file.name
1168
 
1169
  try:
1170
+ import shutil
1171
+
1172
+ shutil.copy2(checkpoint_file, remote_file_path)
1173
  uploaded_files.append(checkpoint_file.name)
1174
  logger.info(f"📤 Uploaded checkpoint: {checkpoint_file.name}")
1175
 
1176
+ except Exception as e:
1177
  logger.warning(f"⚠️ Failed to upload checkpoint {checkpoint_file.name}: {e}")
1178
 
1179
  if uploaded_files: