Jiahua0's picture
Upload folder using huggingface_hub
ff47419 verified
import struct
import numpy as np
import png
import re
import sys
import csv
from PIL import Image
import h5py
FLO_TAG_FLOAT = (
202021.25 # first 4 bytes in flo file; check for this when READING the file
)
FLO_TAG_STRING = "PIEH" # first 4 bytes in flo file; use this when WRITING the file
FLO_UNKNOWN_FLOW_THRESH = 1e9 # flo format threshold for unknown values
FLO_UNKNOWN_FLOW = 1e10 # value to use to represent unknown flow in flo file format
def readFlowFile(filepath):
"""read flow files in several formats. The resulting flow has shape height x width x 2.
For positions where there is no groundtruth available, the flow is set to np.nan.
Supports flo (Sintel), png (KITTI), npy (numpy), pfm (FlyingThings3D) and flo5 (Spring) file format.
filepath: path to the flow file
returns: flow with shape height x width x 2
"""
if filepath.endswith(".flo"):
return readFloFlow(filepath)
elif filepath.endswith(".png"):
return readPngFlow(filepath)
elif filepath.endswith(".npy"):
return readNpyFlow(filepath)
elif filepath.endswith(".pfm"):
return readPfmFlow(filepath)
elif filepath.endswith(".flo5"):
return readFlo5Flow(filepath)
else:
raise ValueError(f"readFlowFile: Unknown file format for {filepath}")
def writeFlowFile(flow, filepath):
"""write optical flow to file. Supports flo (Sintel), png (KITTI) and npy (numpy) file format.
flow: optical flow with shape height x width x 2. Invalid values should be represented as np.nan
filepath: file path where to write the flow
"""
if not filepath:
raise ValueError("writeFlowFile: empty filepath")
if len(flow.shape) != 3 or flow.shape[2] != 2:
raise IOError(
f"writeFlowFile {filepath}: expected shape height x width x 2 but received {flow.shape}"
)
if flow.shape[0] > flow.shape[1]:
print(
f"write flo file {filepath}: Warning: Are you writing an upright image? Expected shape height x width x 2, got {flow.shape}"
)
if filepath.endswith(".flo"):
return writeFloFlow(flow, filepath)
elif filepath.endswith(".png"):
return writePngFlow(flow, filepath)
elif filepath.endswith(".npy"):
return writeNpyFile(flow, filepath)
elif filepath.endswith(".flo5"):
return writeFlo5File(flow, filepath)
else:
raise ValueError(f"writeFlowFile: Unknown file format for {filepath}")
def readFloFlow(filepath):
"""read optical flow from file stored in .flo file format as used in the Sintel dataset (Butler et al., 2012)
filepath: path to file where to read from
returns: flow as a numpy array with shape height x width x 2
---
".flo" file format used for optical flow evaluation
Stores 2-band float image for horizontal (u) and vertical (v) flow components.
Floats are stored in little-endian order.
A flow value is considered "unknown" if either |u| or |v| is greater than 1e9.
bytes contents
0-3 tag: "PIEH" in ASCII, which in little endian happens to be the float 202021.25
(just a sanity check that floats are represented correctly)
4-7 width as an integer
8-11 height as an integer
12-end data (width*height*2*4 bytes total)
the float values for u and v, interleaved, in row order, i.e.,
u[row0,col0], v[row0,col0], u[row0,col1], v[row0,col1], ...
"""
if filepath is None:
raise IOError("read flo file: empty filename")
if not filepath.endswith(".flo"):
raise IOError(f"read flo file ({filepath}): extension .flo expected")
with open(filepath, "rb") as stream:
tag = struct.unpack("f", stream.read(4))[0]
width = struct.unpack("i", stream.read(4))[0]
height = struct.unpack("i", stream.read(4))[0]
if tag != FLO_TAG_FLOAT: # simple test for correct endian-ness
raise IOError(
f"read flo file({filepath}): wrong tag (possibly due to big-endian machine?)"
)
# another sanity check to see that integers were read correctly (99999 should do the trick...)
if width < 1 or width > 99999:
raise IOError(f"read flo file({filepath}): illegal width {width}")
if height < 1 or height > 99999:
raise IOError(f"read flo file({filepath}): illegal height {height}")
nBands = 2
flow = []
n = nBands * width
for _ in range(height):
data = stream.read(n * 4)
if data is None:
raise IOError(f"read flo file({filepath}): file is too short")
data = np.asarray(struct.unpack(f"{n}f", data))
data = data.reshape((width, nBands))
flow.append(data)
if stream.read(1) != b"":
raise IOError(f"read flo file({filepath}): file is too long")
flow = np.asarray(flow)
# unknown values are set to nan
flow[np.abs(flow) > FLO_UNKNOWN_FLOW_THRESH] = np.nan
return flow
def writeFloFlow(flow, filepath):
"""
write optical flow in .flo format to file as used in the Sintel dataset (Butler et al., 2012)
flow: optical flow with shape height x width x 2
filepath: optical flow file path to be saved
---
".flo" file format used for optical flow evaluation
Stores 2-band float image for horizontal (u) and vertical (v) flow components.
Floats are stored in little-endian order.
A flow value is considered "unknown" if either |u| or |v| is greater than 1e9.
bytes contents
0-3 tag: "PIEH" in ASCII, which in little endian happens to be the float 202021.25
(just a sanity check that floats are represented correctly)
4-7 width as an integer
8-11 height as an integer
12-end data (width*height*2*4 bytes total)
the float values for u and v, interleaved, in row order, i.e.,
u[row0,col0], v[row0,col0], u[row0,col1], v[row0,col1], ...
"""
height, width, nBands = flow.shape
with open(filepath, "wb") as f:
if f is None:
raise IOError(f"write flo file {filepath}: file could not be opened")
# write header
result = f.write(FLO_TAG_STRING.encode("ascii"))
result += f.write(struct.pack("i", width))
result += f.write(struct.pack("i", height))
if result != 12:
raise IOError(f"write flo file {filepath}: problem writing header")
# write content
n = nBands * width
for i in range(height):
data = flow[i, :, :].flatten()
data[np.isnan(data)] = FLO_UNKNOWN_FLOW
result = f.write(struct.pack(f"{n}f", *data))
if result != n * 4:
raise IOError(f"write flo file {filepath}: problem writing row {i}")
def readPngFlow(filepath):
"""read optical flow from file stored in png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset.
filepath: path to file where to read from
returns: flow as a numpy array with shape height x width x 2. Invalid values are represented as np.nan
"""
# adapted from https://github.com/liruoteng/OpticalFlowToolkit
flow_object = png.Reader(filename=filepath)
flow_direct = flow_object.asDirect()
flow_data = list(flow_direct[2])
(w, h) = flow_direct[3]["size"]
flow = np.zeros((h, w, 3), dtype=np.float64)
for i in range(len(flow_data)):
flow[i, :, 0] = flow_data[i][0::3]
flow[i, :, 1] = flow_data[i][1::3]
flow[i, :, 2] = flow_data[i][2::3]
invalid_idx = flow[:, :, 2] == 0
flow[:, :, 0:2] = (flow[:, :, 0:2] - 2**15) / 64.0
flow[invalid_idx, 0] = np.nan
flow[invalid_idx, 1] = np.nan
return flow[:, :, :2]
def writePngFlow(flow, filename):
"""write optical flow to file png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset.
flow: optical flow in shape height x width x 2, invalid values should be represented as np.nan
filepath: path to file where to write to
"""
flow = 64.0 * flow + 2**15
width = flow.shape[1]
height = flow.shape[0]
valid_map = np.ones([flow.shape[0], flow.shape[1], 1])
valid_map[np.isnan(flow[:, :, 0]) | np.isnan(flow[:, :, 1])] = 0
flow = np.nan_to_num(flow)
flow = np.concatenate([flow, valid_map], axis=-1)
flow = np.clip(flow, 0, 2**16 - 1)
flow = flow.astype(np.uint16)
flow = np.reshape(flow, (-1, width * 3))
with open(filename, "wb") as f:
writer = png.Writer(width=width, height=height, bitdepth=16, greyscale=False)
writer.write(f, flow)
def readNpyFlow(filepath):
"""read numpy array from file.
filepath: file to read from
returns: numpy array
"""
return np.load(filepath)
def writeNpyFile(arr, filepath):
"""write numpy array to file.
arr: numpy array to write
filepath: file to write to
"""
np.save(filepath, arr)
def writeFlo5File(flow, filename):
with h5py.File(filename, "w") as f:
f.create_dataset("flow", data=flow, compression="gzip", compression_opts=5)
def readFlo5Flow(filename):
with h5py.File(filename, "r") as f:
if "flow" not in f.keys():
raise IOError(
f"File {filename} does not have a 'flow' key. Is this a valid flo5 file?"
)
return f["flow"][()]
def readPfmFlow(filepath):
"""read optical flow from file stored in pfm file format as used in the FlyingThings3D (Mayer et al., 2016) dataset.
filepath: path to file where to read from
returns: flow as a numpy array with shape height x width x 2.
"""
flow = readPfmFile(filepath)
if len(flow.shape) != 3:
raise IOError(
f"read pfm flow: PFM file has wrong shape (assumed to be w x h x 3): {flow.shape}"
)
if flow.shape[2] != 3:
raise IOError(
f"read pfm flow: PFM file has wrong shape (assumed to be w x h x 3): {flow.shape}"
)
# remove third channel -> is all zeros
return flow[:, :, :2]
def readPfmFile(filepath):
"""
adapted from https://lmb.informatik.uni-freiburg.de/resources/datasets/SceneFlowDatasets.en.html
"""
file = open(filepath, "rb")
color = None
width = None
height = None
scale = None
endian = None
header = file.readline().rstrip()
if header.decode("ascii") == "PF":
color = True
elif header.decode("ascii") == "Pf":
color = False
else:
raise Exception("Not a PFM file.")
dim_match = re.match(r"^(\d+)\s(\d+)\s$", file.readline().decode("ascii"))
if dim_match:
width, height = list(map(int, dim_match.groups()))
else:
raise Exception("Malformed PFM header.")
scale = float(file.readline().decode("ascii").rstrip())
if scale < 0: # little-endian
endian = "<"
scale = -scale
else:
endian = ">" # big-endian
data = np.fromfile(file, endian + "f")
shape = (height, width, 3) if color else (height, width)
data = np.reshape(data, shape)
data = np.flipud(data)
return data # , scale
def writePfmFile(image, filepath):
"""
adapted from https://lmb.informatik.uni-freiburg.de/resources/datasets/SceneFlowDatasets.en.html
"""
scale = 1
file = open(filepath, "wb")
color = None
if image.dtype.name != "float32":
raise Exception("Image dtype must be float32.")
image = np.flipud(image)
if len(image.shape) == 3 and image.shape[2] == 3: # color image
color = True
elif (
len(image.shape) == 2 or len(image.shape) == 3 and image.shape[2] == 1
): # greyscale
color = False
else:
raise Exception("Image must have H x W x 3, H x W x 1 or H x W dimensions.")
file.write("PF\n" if color else "Pf\n".encode())
file.write("%d %d\n".encode() % (image.shape[1], image.shape[0]))
endian = image.dtype.byteorder
if endian == "<" or endian == "=" and sys.byteorder == "little":
scale = -scale
file.write("%f\n".encode() % scale)
image.tofile(file)
def readDispFile(filepath):
"""read disparity (or disparity change) from file. The resulting numpy array has shape height x width.
For positions where there is no groundtruth available, the value is set to np.nan.
Supports png (KITTI), npy (numpy) and pfm (FlyingThings3D) file format.
filepath: path to the flow file
returns: disparity with shape height x width
"""
if filepath.endswith(".png"):
return readPngDisp(filepath)
elif filepath.endswith(".npy"):
return readNpyFlow(filepath)
elif filepath.endswith(".pfm"):
return readPfmDisp(filepath)
elif filepath.endswith(".dsp5"):
return readDsp5Disp(filepath)
else:
raise ValueError(f"readDispFile: Unknown file format for {filepath}")
def readPngDisp(filepath):
"""read disparity from file stored in png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset.
filepath: path to file where to read from
returns: disparity as a numpy array with shape height x width. Invalid values are represented as np.nan
"""
# adapted from https://github.com/liruoteng/OpticalFlowToolkit
image_object = png.Reader(filename=filepath)
image_direct = image_object.asDirect()
image_data = list(image_direct[2])
(w, h) = image_direct[3]["size"]
channel = len(image_data[0]) // w
if channel != 1:
raise IOError("read png disp: assumed channels to be 1!")
disp = np.zeros((h, w), dtype=np.float64)
for i in range(len(image_data)):
disp[i, :] = image_data[i][:]
disp[disp == 0] = np.nan
return disp[:, :] / 256.0
def readPfmDisp(filepath):
"""read disparity or disparity change from file stored in pfm file format as used in the FlyingThings3D (Mayer et al., 2016) dataset.
filepath: path to file where to read from
returns: disparity as a numpy array with shape height x width. Invalid values are represented as np.nan
"""
disp = readPfmFile(filepath)
if len(disp.shape) != 2:
raise IOError(
f"read pfm disp: PFM file has wrong shape (assumed to be w x h): {disp.shape}"
)
return disp
def writePngDisp(disp, filepath):
"""write disparity to png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset.
disp: disparity in shape height x width, invalid values should be represented as np.nan
filepath: path to file where to write to
"""
disp = 256 * disp
width = disp.shape[1]
height = disp.shape[0]
disp = np.clip(disp, 0, 2**16 - 1)
disp = np.nan_to_num(disp).astype(np.uint16)
disp = np.reshape(disp, (-1, width))
with open(filepath, "wb") as f:
writer = png.Writer(width=width, height=height, bitdepth=16, greyscale=True)
writer.write(f, disp)
def writeDsp5File(disp, filename):
with h5py.File(filename, "w") as f:
f.create_dataset("disparity", data=disp, compression="gzip", compression_opts=5)
def readDsp5Disp(filename):
with h5py.File(filename, "r") as f:
if "disparity" not in f.keys():
raise IOError(
f"File {filename} does not have a 'disparity' key. Is this a valid dsp5 file?"
)
return f["disparity"][()]
def writeDispFile(disp, filepath):
"""write disparity to file. Supports png (KITTI) and npy (numpy) file format.
disp: disparity with shape height x width. Invalid values should be represented as np.nan
filepath: file path where to write the flow
"""
if not filepath:
raise ValueError("writeDispFile: empty filepath")
if len(disp.shape) != 2:
raise IOError(
f"writeDispFile {filepath}: expected shape height x width but received {disp.shape}"
)
if disp.shape[0] > disp.shape[1]:
print(
f"writeDispFile {filepath}: Warning: Are you writing an upright image? Expected shape height x width, got {disp.shape}"
)
if filepath.endswith(".png"):
writePngDisp(disp, filepath)
elif filepath.endswith(".npy"):
writeNpyFile(disp, filepath)
elif filepath.endswith(".dsp5"):
writeDsp5File(disp, filepath)
def readKITTIObjMap(filepath):
assert filepath.endswith(".png")
return np.asarray(Image.open(filepath)) > 0
def readKITTIIntrinsics(filepath, image=2):
assert filepath.endswith(".txt")
with open(filepath) as f:
reader = csv.reader(f, delimiter=" ")
for row in reader:
if row[0] == f"K_{image:02d}:":
K = np.array(row[1:], dtype=np.float32).reshape(3, 3)
kvec = np.array([K[0, 0], K[1, 1], K[0, 2], K[1, 2]])
return kvec
def writePngMapFile(map_, filename):
Image.fromarray(map_).save(filename)