English
Shanci's picture
Upload folder using huggingface_hub
26225c5 verified
import os
import h5py
import torch
import socket
import numpy as np
from time import time
from datetime import datetime
from src.utils.tensor import tensor_idx, cast_numpyfy
from src.utils.sparse import dense_to_csr, csr_to_dense
__all__ = [
'date_time_string', 'dated_dir', 'host_data_root', 'save_tensor',
'load_tensor', 'save_tensor_dict', 'load_tensor_dict', 'save_dense_to_csr',
'load_csr_to_dense']
def date_time_string():
"""Returns a string holding the current date and time. Useful for
creating an output file or directory.
"""
date = '-'.join([
f'{getattr(datetime.now(), x)}'
for x in ['year', 'month', 'day']])
time = '-'.join([
f'{getattr(datetime.now(), x)}'
for x in ['hour', 'minute', 'second']])
return f'{date}_{time}'
def dated_dir(root, create=False):
"""Returns a directory path in root, named based on the current date
and time.
"""
dir_name = date_time_string()
path = os.path.join(root, dir_name)
if create and not os.path.exists(path):
os.makedirs(path, exist_ok=True)
return path
#TODO: remove this for deployment !
def host_data_root():
"""Read the host machine's name and return the known $DATA_ROOT
directory
"""
HOST = socket.gethostname()
if HOST == 'DEL2001W017':
DATA_ROOT = '/media/drobert-admin/DATA2/datasets'
elif HOST == 'HP-2010S002':
DATA_ROOT = '/var/data/drobert/datasets'
elif HOST == '9c81b1a54ad8':
DATA_ROOT = '/raid/dataset/pointcloud/data'
elif HOST.endswith('sis.cnes.fr'):
DATA_ROOT = '/home/qt/robertda/scratch/datasets'
else:
raise NotImplementedError(
f"Unknown host '{HOST}', cannot set DATA_ROOT")
return DATA_ROOT
def save_tensor(x, f, key, fp_dtype=torch.float):
"""Save torch.Tensor to HDF5 file.
:param x: 2D torch.Tensor
:param f: h5 file path of h5py.File or h5py.Group
:param key: str
h5py.Dataset key under which to save the tensor
:param fp_dtype: torch dtype
Data type to which floating point tensors should be cast before
saving
:return:
"""
if not isinstance(f, (h5py.File, h5py.Group)):
with h5py.File(f, 'w') as file:
save_tensor(x, file, key, fp_dtype=fp_dtype)
return
assert isinstance(x, torch.Tensor)
d = cast_numpyfy(x, fp_dtype=fp_dtype)
f.create_dataset(key, data=d, dtype=d.dtype)
def load_tensor(f, key=None, idx=None):
"""Load torch.Tensor from an HDF5 file. See `save_tensor` for
writing such file. Options allow reading only part of the rows.
:param f: h5 file path of h5py.File or h5py.Group or h5py.Dataset
:param key: str
h5py.Dataset key under which to the tensor was saved. Must be
provided if f is not already a h5py.Dataset object
:param idx: int, list, numpy.ndarray, torch.Tensor
Used to select and read only some rows of the dense tensor.
Supports fancy indexing
:return:
"""
if not isinstance(f, (h5py.File, h5py.Group, h5py.Dataset)):
with h5py.File(f, 'r') as file:
out = load_tensor(file, key=key, idx=idx)
return out
if not isinstance(f, h5py.Dataset):
f = f[key]
idx = tensor_idx(idx)
if idx is None or idx.shape[0] == 0:
x = torch.from_numpy(f[:])
else:
x = torch.from_numpy(f[:])[idx]
# By default, convert int16 and int32 to int64, might cause issues
# for tensor indexing otherwise
if x is not None and not x.is_floating_point():
x = x.long()
return x
def save_tensor_dict(d, f, key, fp_dtype=torch.float):
"""Save torch.Tensor to HDF5 file.
:param d: dictionary of 2D torch.Tensors
:param f: h5 file path of h5py.File or h5py.Group
:param key: str
h5py.Dataset key under which to save the tensor dictionary
:param fp_dtype: torch dtype
Data type to which floating point tensors should be cast before
saving
:return:
"""
if not isinstance(f, (h5py.File, h5py.Group)):
with h5py.File(f, 'w') as file:
save_tensor_dict(d, file, key, fp_dtype=fp_dtype)
return
g = f.create_group(key)
for k, v in d.items():
if not isinstance(v, torch.Tensor):
continue
save_tensor(v, g, k, fp_dtype=fp_dtype)
def load_tensor_dict(f, idx=None):
"""Load a dictionary of torch.Tensor from an HDF5 file.
:param f: h5 file path of h5py.File or h5py.Group or h5py.Dataset
:param idx: int, list, numpy.ndarray, torch.Tensor
Used to select and read only some rows of the dense tensor.
Supports fancy indexing
:return:
"""
if not isinstance(f, (h5py.File, h5py.Group)):
with h5py.File(f, 'w') as file:
load_tensor_dict(file)
return
return {k: load_tensor(f[k], key=None, idx=idx) for k in f.keys()}
def save_dense_to_csr(x, f, fp_dtype=torch.float):
"""Compress a 2D tensor with CSR format and save it in an
already-open HDF5.
:param x: 2D torch.Tensor
:param f: h5 file path of h5py.File or h5py.Group
:param fp_dtype: torch dtype
Data type to which floating point tensors should be cast before
saving
:return:
"""
if not isinstance(f, (h5py.File, h5py.Group)):
with h5py.File(f, 'w') as file:
save_dense_to_csr(x, file, fp_dtype=fp_dtype)
return
assert isinstance(x, torch.Tensor) and x.dim() == 2
pointers, columns, values = dense_to_csr(x)
save_tensor(pointers, f, 'pointers', fp_dtype=fp_dtype)
save_tensor(columns, f, 'columns', fp_dtype=fp_dtype)
save_tensor(values, f, 'values', fp_dtype=fp_dtype)
f.create_dataset('shape', data=np.array(x.shape))
def load_csr_to_dense(f, idx=None, verbose=False):
"""Read an HDF5 file of group produced using `dense_to_csr_hdf5` and
return the dense tensor. An optional idx can be passed to only read
corresponding rows from the dense tensor.
:param f: h5 file path of h5py.File or h5py.Group
:param idx: int, list, numpy.ndarray, torch.Tensor
Used to select and read only some rows of the dense tensor.
Supports fancy indexing
:param verbose: bool
:return:
"""
KEYS = ['pointers', 'columns', 'values', 'shape']
if not isinstance(f, (h5py.File, h5py.Group)):
with h5py.File(f, 'r') as file:
out = load_csr_to_dense(file, idx=idx, verbose=verbose)
return out
assert all(k in f.keys() for k in KEYS)
idx = tensor_idx(idx)
if idx is None or idx.shape[0] == 0:
start = time()
pointers = load_tensor(f['pointers'])
columns = load_tensor(f['columns'])
values = load_tensor(f['values'])
shape = load_tensor(f['shape'])
if verbose:
print(f'load_csr_to_dense read all : {time() - start:0.5f}s')
start = time()
out = csr_to_dense(pointers, columns, values, shape=shape)
if verbose:
print(f'load_csr_to_dense csr_to_dense : {time() - start:0.5f}s')
return out
# Read only pointers start and end indices based on idx
start = time()
ptr_start = load_tensor(f['pointers'], idx=idx)
ptr_end = load_tensor(f['pointers'], idx=idx + 1)
if verbose:
print(f'load_csr_to_dense read ptr : {time() - start:0.5f}s')
# Create the new pointers
start = time()
pointers = torch.cat([
torch.zeros(1, dtype=ptr_start.dtype),
torch.cumsum(ptr_end - ptr_start, 0)])
if verbose:
print(f'load_csr_to_dense pointers : {time() - start:0.5f}s')
# Create the indexing tensor to select and order values.
# Simply, we could have used a list of slices but we want to
# avoid for loops and list concatenations to benefit from torch
# capabilities
start = time()
sizes = pointers[1:] - pointers[:-1]
val_idx = torch.arange(pointers[-1])
val_idx -= torch.arange(pointers[-1] + 1)[
pointers[:-1]].repeat_interleave(sizes)
val_idx += ptr_start.repeat_interleave(sizes)
if verbose:
print(f'load_csr_to_dense val_idx : {time() - start:0.5f}s')
# Read the columns and values, now we have computed the val_idx.
# Make sure to update the output shape too, since the rows have been
# indexed
start = time()
columns = load_tensor(f['columns'], idx=val_idx)
values = load_tensor(f['values'], idx=val_idx)
shape = load_tensor(f['shape'])
shape[0] = idx.shape[0]
if verbose:
print(f'load_csr_to_dense read values : {time() - start:0.5f}s')
start = time()
out = csr_to_dense(pointers, columns, values, shape=shape)
if verbose:
print(f'load_csr_to_dense csr_to_dense : {time() - start:0.5f}s')
return out