Dexter's picture
Upload folder using huggingface_hub
36c95ba verified
import argparse
import os
import sys
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import kornia as tgm
def load_data(root_path, sequence_name, frame_id):
# index paths
file_name = 'frame_%04d' % (frame_id)
image_file = os.path.join(root_path, 'clean', sequence_name, file_name + '.png')
depth_file = os.path.join(root_path, 'depth', sequence_name, file_name + '.dpt')
camera_file = os.path.join(root_path, 'camdata_left', sequence_name, file_name + '.cam')
# load the actual data
image_tensor = load_image(image_file)
depth = load_depth(depth_file)
# load camera data and create pinhole
height, width = image_tensor.shape[-2:]
intrinsics, extrinsics = load_camera_data(camera_file)
camera = tgm.utils.create_pinhole(intrinsics, extrinsics, height, width)
return image_tensor, depth, camera
def load_depth(file_name):
"""Load the depth using the sintel SDK and converts to torch.Tensor."""
if not os.path.isfile(file_name):
raise AssertionError(f"Invalid file {file_name}")
import sintel_io
depth = sintel_io.depth_read(file_name)
return torch.from_numpy(depth).view(1, 1, *depth.shape).float()
def load_camera_data(file_name):
"""Load the camera data using the sintel SDK and converts to torch.Tensor."""
if not os.path.isfile(file_name):
raise AssertionError(f"Invalid file {file_name}")
import sintel_io
intrinsic, extrinsic = sintel_io.cam_read(file_name)
return intrinsic, extrinsic
def load_image(file_name):
"""Load the image with OpenCV and converts to torch.Tensor."""
if not os.path.isfile(file_name):
raise AssertionError(f"Invalid file {file_name}")
# load image with OpenCV
img = cv2.imread(file_name, cv2.IMREAD_COLOR)
# convert image to torch tensor
tensor = tgm.utils.image_to_tensor(img).float() / 255.0
return tensor.view(1, *tensor.shape) # 1xCxHxW
def clip_and_convert_tensor(tensor):
"""Convert the input torch.Tensor to OpenCV image,clip it to be between.
[0, 255] and convert it to unit
"""
img = tgm.utils.tensor_to_image(255.0 * tensor) # convert tensor to numpy
img_cliped = np.clip(img, 0, 255) # clip and reorder the channels
img = img_cliped.astype('uint8') # convert to uint
return img
class InvDepth(nn.Module):
def __init__(self, height, width, min_depth=0.50, max_depth=25.0):
super().__init__()
self._min_range = 1.0 / max_depth
self._max_range = 1.0 / min_depth
self.w = nn.Parameter(self._init_weights(height, width))
def _init_weights(self, height, width):
r1 = self._min_range
r2 = self._min_range + (self._max_range - self._min_range) * 0.1
w_init = (r1 - r2) * torch.rand(1, 1, height, width) + r2
return w_init
def forward(self):
return self.w.clamp(min=self._min_range, max=self._max_range)
def DepthRegressionApp():
# data settings
parser = argparse.ArgumentParser(description='Depth Regression with photometric loss.')
parser.add_argument('--input-dir', type=str, required=True, help='the path to the directory with the input data.')
parser.add_argument('--output-dir', type=str, required=True, help='the path to output the results.')
parser.add_argument(
'--num-iterations', type=int, default=1000, metavar='N', help='number of training iterations (default: 1000)'
)
parser.add_argument('--sequence-name', type=str, default='alley_1', help='the name of the sequence.')
parser.add_argument('--frame-ref-id', type=int, default=1, help='the id for the reference image in the sequence.')
parser.add_argument('--frame-i-id', type=int, default=2, help='the id for the image i in the sequence.')
# optimization parameters
parser.add_argument('--lr', type=float, default=1e-3, metavar='LR', help='learning rate (default: 1e-3)')
# device parameters
parser.add_argument('--cuda', action='store_true', default=False, help='enables CUDA training')
parser.add_argument('--seed', type=int, default=666, metavar='S', help='random seed (default: 666)')
parser.add_argument(
'--log-interval',
type=int,
default=10,
metavar='N',
help='how many batches to wait before logging training status',
)
parser.add_argument(
'--log-interval-vis',
type=int,
default=100,
metavar='N',
help='how many batches to wait before visual logging training status',
)
args = parser.parse_args()
# define the device to use for inference
use_cuda = args.cuda and torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
torch.manual_seed(args.seed)
# configure sintel SDK path
root_path = os.path.abspath(args.input_dir)
sys.path.append(os.path.join(root_path, 'sdk/python'))
# load the data
root_dir = os.path.join(root_path, 'training')
img_ref, _, cam_ref = load_data(root_dir, args.sequence_name, args.frame_ref_id)
img_i, _, cam_i = load_data(root_dir, args.sequence_name, args.frame_i_id)
# instantiate the depth warper from `kornia`
warper = tgm.DepthWarper(cam_i)
warper.compute_homographies(cam_ref)
# create the inverse depth as a parameter to be optimized
height, width = img_ref.shape[-2:]
inv_depth_ref = InvDepth(height, width).to(device)
# create optimizer
optimizer = optim.Adam(inv_depth_ref.parameters(), lr=args.lr)
# send data to device
img_ref, img_i = img_ref.to(device), img_i.to(device)
# main training loop
for iter_idx in range(args.num_iterations):
# compute the inverse depth and warp the source image
img_i_to_ref = warper(inv_depth_ref(), img_i)
# compute the photometric loss
loss = F.l1_loss(img_i_to_ref, img_ref, reduction='none')
loss = torch.mean(loss)
# compute gradient and update optimizer parameters
optimizer.zero_grad()
loss.backward()
optimizer.step()
if iter_idx % args.log_interval == 0 or iter_idx == args.num_iterations - 1:
print(f'Train iteration: {iter_idx}/{args.num_iterations}\tLoss: {loss.item():.6}')
if iter_idx % args.log_interval_vis == 0:
# merge warped and target image for visualization
img_i_to_ref = warper(inv_depth_ref(), img_i)
img_both_vis = 0.5 * (img_i_to_ref + img_ref)
img_both_vis = clip_and_convert_tensor(img_both_vis)
img_i_to_ref_vis = clip_and_convert_tensor(img_i_to_ref)
inv_depth_ref_vis = tgm.utils.tensor_to_image(
inv_depth_ref() / (inv_depth_ref().max() + 1e-6)
).squeeze()
inv_depth_ref_vis = np.clip(255.0 * inv_depth_ref_vis, 0, 255)
inv_depth_ref_vis = inv_depth_ref_vis.astype('uint8')
# save warped image and depth to disk
def file_name(output_dir, file_name, iter_idx):
return os.path.join(output_dir, f"{file_name}_{iter_idx}.png")
cv2.imwrite(file_name(args.output_dir, "warped", iter_idx), img_i_to_ref_vis)
cv2.imwrite(file_name(args.output_dir, "warped_both", iter_idx), img_both_vis)
cv2.imwrite(file_name(args.output_dir, "inv_depth_ref", iter_idx), inv_depth_ref_vis)
if __name__ == "__main__":
DepthRegressionApp()