Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,763 Bytes
b0120da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
import os
import fsspec
import pyarrow as pa
# pyarrow needs the initialization from this import
import pyarrow.dataset # pyright: ignore
import typer
from pyarrow.lib import ArrowInvalid
from rich.progress import track
def is_valid_arrow_file(path: str):
try:
dataset = pa.dataset.dataset(path, format="arrow")
return True
except ArrowInvalid:
return False
app = typer.Typer()
S3_PREFIX = "s3://"
def get_fs(path: str, s3_profile: str | None = None) -> fsspec.AbstractFileSystem:
if path.startswith("s3://"):
if s3_profile is None:
return fsspec.filesystem("s3")
else:
return fsspec.filesystem("s3", profile=s3_profile)
else:
return fsspec.filesystem("file")
@app.command()
def print_local_to_delete(
blob_dir: str, local_dirs: list[str], s3_profile: str = "blt"
):
for s in local_dirs:
assert s.endswith("/"), "Dirs must end with /"
assert blob_dir.endswith("/"), "Dirs must end with /"
blob_fs = fsspec.filesystem("s3", profile=s3_profile)
blob_files = blob_fs.find(blob_dir)
for f in track(blob_files):
size = blob_fs.info(f)["Size"]
if not f.lower().endswith(".complete"):
assert size != 0, f"Size was invalidly zero for {f}"
blob_relative_paths = {f[len(blob_dir) - len(S3_PREFIX) :] for f in blob_files}
local_fs = fsspec.filesystem("file")
files_to_delete = []
for local_dir in local_dirs:
local_files = local_fs.find(local_dir)
for f in local_files:
relative_path = f[len(local_dir) :]
if relative_path in blob_relative_paths and not os.path.islink(f):
files_to_delete.append(f)
print(len(files_to_delete))
with open("/tmp/files_to_delete.txt", "w") as f:
for file in files_to_delete:
f.write(f"{file}\n")
@app.command()
def compare_local_to_blob(
source_dirs: list[str], dst_dir: str, s3_profile: str = "blt"
):
for s in source_dirs:
assert s.endswith("/"), "Dirs must end with /"
assert dst_dir.endswith("/"), "Dirs must end with /"
assert len(source_dirs) != 0
assert dst_dir.startswith("s3://")
local_fs = fsspec.filesystem("file")
dst_fs = fsspec.filesystem("s3", profile=s3_profile)
source_to_files = {}
all_local_files = set()
for s in source_dirs:
skipped = []
if s not in source_to_files:
source_to_files[s] = []
for f in local_fs.find(s):
if os.path.islink(f):
continue
if f.endswith(".COMPLETE") or f.endswith(".complete"):
is_complete_file = True
assert os.path.getsize(f) == 0, ".COMPLETE files should be empty"
else:
is_complete_file = False
if not is_complete_file and os.path.getsize(f) == 0:
skipped.append(f)
continue
if f.endswith(".arrow"):
if not is_valid_arrow_file(f):
skipped.append(f)
continue
source_to_files[s].append(f)
all_local_files.add(f[len(s) :])
print(s, len(source_to_files[s]), "skipped", len(skipped), skipped[:10])
dst_files = dst_fs.find(dst_dir)
print(dst_dir, len(dst_files))
dst_file_set = {f[len(dst_dir) - len(S3_PREFIX) :] for f in dst_files}
diff = all_local_files.symmetric_difference(dst_file_set)
print("Local files", len(all_local_files))
print("DST Files", len(dst_file_set))
print("Symmetric difference", len(diff))
dst_only_files = dst_file_set - all_local_files
print("DST only", len(dst_only_files), list(dst_only_files)[:10])
if __name__ == "__main__":
app()
|