zhenghan.qiu
add requirement.txt
f2506d5
import gradio as gr
import cv2
import matplotlib
import numpy as np
import os
import time
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from scipy.ndimage import convolve
# from ppd.utils.set_seed import set_seed
# from ppd.utils.align_depth_func import recover_metric_depth_ransac
# from ppd.utils.depth2pcd import depth2pcd
# from moge.model.v2 import MoGeModel
# from ppd.models.ppd import PixelPerfectDepth
# try:
# import spaces
# HUGGINFACE_SPACES_INSTALLED = True
# except ImportError:
# HUGGINFACE_SPACES_INSTALLED = False
# css = """
# #img-display-container {
# max-height: 100vh;
# }
# #img-display-input {
# max-height: 100vh;
# }
# #img-display-output {
# max-height: 100vh;
# }
# #download {
# height: 62px;
# }
# #img-display-output .image-slider-image {
# object-fit: contain !important;
# width: 100% !important;
# height: 100% !important;
# }
# # """
# set_seed(666)
# DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# default_steps = 20
# model = PixelPerfectDepth(sampling_steps=default_steps)
# ckpt_path = hf_hub_download(
# repo_id="gangweix/Pixel-Perfect-Depth",
# filename="ppd.pth",
# repo_type="model"
# )
# state_dict = torch.load(ckpt_path, map_location="cpu")
# model.load_state_dict(state_dict, strict=False)
# model = model.eval()
# model = model.to(DEVICE)
# moge_model = MoGeModel.from_pretrained("Ruicheng/moge-2-vitl-normal").eval()
# moge_model = moge_model.to(DEVICE)
# 用来保存当前显示的图(全局缓存一份)
current_img = None
normalize_hist = None
g_est_nosie = None
def readRAW(path):
filesize = os.path.getsize(path)
print('filesize: ',filesize)
if filesize == 31*40*64*2:
output = np.fromfile(path, dtype=np.int16)
else:
with open(path, "rb") as f:
raw_data = f.read()
raw10 = np.frombuffer(raw_data, dtype=np.uint8)
n_blocks = raw10.shape[0] // 5
raw10 = raw10[:n_blocks * 5].reshape(-1, 5)
B0 = raw10[:, 0].astype(np.uint16)
B1 = raw10[:, 1].astype(np.uint16)
B2 = raw10[:, 2].astype(np.uint16)
B3 = raw10[:, 3].astype(np.uint16)
B4 = raw10[:, 4]
p0 = (B0 << 2) | ((B4 >> 0) & 0x03)
p1 = (B1 << 2) | ((B4 >> 2) & 0x03)
p2 = (B2 << 2) | ((B4 >> 4) & 0x03)
p3 = (B3 << 2) | ((B4 >> 6) & 0x03)
output = np.stack([p0, p1, p2, p3], axis=1).flatten()
return output.reshape(31,40,64)
def on_image_click(evt: gr.SelectData):
global current_img
if current_img is None:
return None
x, y = evt.index # 注意:是 (x, y)
img = current_img.copy()
# 画一个红点
cv2.circle(img, (x, y), 5, (255, 0, 0), -1)
# current_img = img
return img
def update_image(img):
global current_img
current_img = img
def update_hist(hist):
global normalize_hist
normalize_hist = hist
def load_bin(file):
raw_hist = readRAW(file.name).astype(np.float32)
# multishot = raw_hist[..., 62] * 1024 + raw_hist[..., 63]
# raw_hist = image[1:,...].copy()
multishot = (raw_hist[...,62]*1024 + raw_hist[...,63])
normalize_data = 1 / multishot * 1/1024
nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis]
# nor_hist = raw_hist / (multishot[..., None] + 1e-6) # 防止除0
img = np.log1p(np.sum(nor_hist[1:, :, :-2], axis=2))
norm_img = (img - img.min()) / (img.max() - img.min() + 1e-6)
img_tc_zoomed = np.kron((norm_img * 255).astype(np.uint8),
np.ones((16, 16), dtype=np.uint8))
update_image(img_tc_zoomed)
update_hist(nor_hist)
return raw_hist,img_tc_zoomed
cmap = matplotlib.colormaps.get_cmap('viridis')
def gray_to_color_zoom(img_1ch):
# img_1ch: H×W, uint8 or float
img_norm = (img_1ch - img_1ch.min()) / (img_1ch.max() - img_1ch.min() + 1e-6)
color_img = cmap(img_norm)[..., :3] # 取 RGB,去掉 alpha
color_img = (color_img * 255).astype(np.uint8)
color_img = np.repeat(np.repeat(color_img, 16, axis=0), 16, axis=1)
return color_img
def main(share=True):
print("Initializing Demo...")
title = "# VisionICs 3D DEMO"
description = """ 上传 `.bin/.raw` 文件,点击图像像素查看该像素的直方图 """
# @(spaces.GPU if HUGGINFACE_SPACES_INSTALLED else (lambda x: x))
# def predict_depth(image, denoise_steps):
# depth, resize_image = model.infer_image(image, sampling_steps=denoise_steps)
# return depth, resize_image
# @(spaces.GPU if HUGGINFACE_SPACES_INSTALLED else (lambda x: x))
# def predict_moge_depth(image):
# image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# image = torch.tensor(image / 255, dtype=torch.float32, device=DEVICE).permute(2, 0, 1)
# metric_depth, mask, intrinsics = moge_model.infer(image)
# metric_depth[~mask] = metric_depth[mask].max()
# return metric_depth, mask, intrinsics
def estimate_noise(hist,noise_filter_steps):
noise_hist = np.sort(hist, axis=2)[..., ::-1][...,32:]
lower_bound = np.median(noise_hist, axis=2)
est_nosie = (lower_bound + noise_filter_steps * np.std(noise_hist,axis=2))
return est_nosie
def mean_pool_same_axis2(arr, k=3):
pad = k // 2
# reflect padding,最像真实数据
arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median')
H, W, C = arr.shape
out = np.zeros_like(arr)
for i in range(C):
window = arr_pad[:, :, i : i + k]
# out[:, :, i] = np.median(window, axis=2)
out[:, :, i] = np.mean(window, axis=2)
return out
def median_pool_same_axis2(arr, k=12):
pad = k // 2
# reflect padding,最像真实数据
arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median')
H, W, C = arr.shape
out = np.zeros_like(arr)
for i in range(C):
window = arr_pad[:, :, i : i + k]
out[:, :, i] = np.median(window, axis=2)
return out
def min_pool_same_axis2(arr, k=12):
pad = k // 2
# reflect padding,最像真实数据
arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median')
H, W, C = arr.shape
out = np.zeros_like(arr)
for i in range(C):
window = arr_pad[:, :, i : i + k]
out[:, :, i] = np.min(window, axis=2)
return out
def plot_pixel_histogram(evt: gr.SelectData, raw_hist, show_filter_hist):
CUSTOM_COLORS = [
"#1f77b4", # 蓝
"#ff7f0e", # 橙
"#2ca02c", # 绿
"#d62728", # 红
"#9467bd", # 紫
]
# print("evt:", evt)
x, y = evt.index # Gradio SelectData 对象
x = x // 16
y = y // 16
# multishot = (raw_hist[...,62]*1024 + raw_hist[...,63])
# normalize_data = 1 / multishot *25e4 * 1/1024
# nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis]
ego_tof_hist = raw_hist[y+1, x, :]
# fig = go.Figure()
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(y=ego_tof_hist, mode="lines+markers",name="Raw"),secondary_y=False)
if normalize_hist is not None and show_filter_hist:
global g_est_nosie
ego_normalize_hist = normalize_hist[y, x, :]
ego_tof = np.argmax(ego_normalize_hist)
fig.add_trace(go.Scatter(y=ego_normalize_hist, mode="lines+markers",name="Filtered"),secondary_y=True)
fig.add_vline(
x=ego_tof,
line_dash="dash",
line_width=2
)
# fig.add_hline(
# y = g_est_nosie[y, x],
# line_dash="dash",
# line_width=2,secondary_y=True
# )
# print('est nosie ', g_est_nosie[y, x])
fig.update_layout(
title=f"Pixel ({x}, {y}) 在所有 {ego_tof} ",
xaxis_title="帧索引 (T)",
yaxis_title="强度值",
# yaxis=dict(
# range=[y_min, y_max]) # Set the min and max for y-axis
)
return fig
def on_submit(image,cycle_steps, neighbor_filter_steps, noise_filter_steps, apply_scatter_filter,apply_ref_filter,apply_noise_filter,tof_range_min_steps,tof_range_max_steps, request: gr.Request = None):
global g_est_nosie
raw_hist = image[1:,...].copy()
low, high = [tof_range_min_steps,tof_range_max_steps]
t0 = time.perf_counter()
multishot = (raw_hist[...,62]*1024 + raw_hist[...,63])
normalize_data = 1 / multishot *cycle_steps * 1/1024
nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis]
dcr_cps = 4000
bin_size_ns = 0.25
total_bin = 62
integ_time_s = total_bin * bin_size_ns * cycle_steps * 1e-9
count_per_bin = dcr_cps * integ_time_s *normalize_data
nor_hist = nor_hist - (count_per_bin)[...,np.newaxis]
nor_hist[nor_hist<0] = 0
filter_hist = np.zeros_like(nor_hist)
nor_hist[...,:low] = 0
nor_hist[...,high:] = 0
if apply_scatter_filter:
est_nosie = median_pool_same_axis2(nor_hist,k=12)
# est_nosie = min_pool_same_axis2(nor_hist,k=12)
sqrt_nosie = np.sqrt(est_nosie)
est_nosie = est_nosie + noise_filter_steps * sqrt_nosie
g_est_nosie = est_nosie
nor_hist = nor_hist - est_nosie
nor_hist[nor_hist<0] = 0
bin_range = 3
for i in range(0,62,bin_range):
map = (nor_hist[...,i:i+bin_range])
ratio = 1/(np.max(map)-np.min(map))*255
data = (map-np.min(map)) * ratio
# _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
_, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE)
mask = map > ( _ ) / ratio
filter_map = map * mask
filter_hist[...,i:i+bin_range] = filter_map
t1 = time.perf_counter()
print("elapsed:", (t1 - t0)*1e3, "miliseconds")
mask_peak_filter = 1
filter_hist = filter_hist * 25e3
kernel = np.ones((3, 3, 3), dtype=np.float32)
kernel[1, 1, 1] = 0
mask_filter = (filter_hist > 1).astype(np.uint8)
out = convolve(mask_filter, kernel, mode='nearest', cval=0.0)
mask = out >= neighbor_filter_steps
print(filter_hist,out,mask)
# filter_hist = mask * filter_hist
filter_hist = np.where(mask, filter_hist, 0)
edge_range = 3
filter_hist[...,:edge_range] = 0
nor_hist[...,:edge_range] = 0
filter_hist[...,-edge_range:] = 0
nor_hist[...,-edge_range:] = 0
if apply_scatter_filter:
tof = np.argmax(filter_hist,axis=2)
else:
tof = np.argmax(nor_hist,axis=2)
peak = np.take_along_axis(nor_hist, tof[..., None], axis=2)[..., 0]
update_hist(filter_hist)
if apply_noise_filter:
# th = est_nosie
est_nosie = mean_pool_same_axis2(nor_hist,8)
sqrt_nosie = np.sqrt(est_nosie)
est_nosie = est_nosie + noise_filter_steps * sqrt_nosie
th = np.take_along_axis(est_nosie, tof[..., None], axis=2)[..., 0]
else:
th = 0
mask_nosie = peak > th
tof = tof * mask_nosie * mask_peak_filter
img_tof_zoomed= gray_to_color_zoom(tof)
peak = np.log1p(peak)
img_peak_zoomed= gray_to_color_zoom(peak)
# tof = tof/np.max(tof+1e-7)*255
# norm_tof = (tof - tof.min()) / (tof.max() + 1e-8)
# norm_tof_uint8 = (norm_tof * 255).astype(np.uint8)
# img_tof_zoomed = np.repeat(np.repeat(norm_tof_uint8, 16, axis=0), 16, axis=1)
# peak = peak/np.max(peak+1e-7)*255
# norm_peak = (peak - peak.min()) / (peak.max() + 1e-8)
# = (norm_peak * 255).astype(np.uint8)
# = np.repeat(np.repeat(norm_peak_uint8, 16, axis=0), 16, axis=1)
# H, W = image.shape[:2]
# ppd_depth, resize_image = predict_depth(image[:, :, ::-1], denoise_steps)
# resize_H, resize_W = resize_image.shape[:2]
# # moge provide metric depth and intrinsics
# moge_depth, mask, intrinsics = predict_moge_depth(resize_image)
# # relative depth -> metric depth
# metric_depth = recover_metric_depth_ransac(ppd_depth, moge_depth, mask)
# intrinsics[0, 0] *= resize_W
# intrinsics[1, 1] *= resize_H
# intrinsics[0, 2] *= resize_W
# intrinsics[1, 2] *= resize_H
# # metric depth -> point cloud
# pcd = depth2pcd(metric_depth, intrinsics, color=cv2.cvtColor(resize_image, cv2.COLOR_BGR2RGB), input_mask=mask, ret_pcd=True)
# if apply_filter:
# cl, ind = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
# pcd = pcd.select_by_index(ind)
# tempdir = Path(tempfile.gettempdir(), 'ppd')
# tempdir.mkdir(exist_ok=True)
# output_path = Path(tempdir, request.session_hash)
# shutil.rmtree(output_path, ignore_errors=True)
# output_path.mkdir(exist_ok=True, parents=True)
# ply_path = os.path.join(output_path, 'pointcloud.ply')
# # save pcd to temporary .ply
# pcd.points = o3d.utility.Vector3dVector(
# np.asarray(pcd.points) * np.array([1, -1, -1], dtype=np.float32)
# )
# o3d.io.write_point_cloud(ply_path, pcd)
# vertices = np.asarray(pcd.points)
# vertex_colors = (np.asarray(pcd.colors) * 255).astype(np.uint8)
# mesh = trimesh.PointCloud(vertices=vertices, colors=vertex_colors)
# glb_path = os.path.join(output_path, 'pointcloud.glb')
# mesh.export(glb_path)
# # save raw depth (npy)
# depth = cv2.resize(ppd_depth, (W, H), interpolation=cv2.INTER_LINEAR)
# raw_depth_path = os.path.join(output_path, 'raw_depth.npy')
# np.save(raw_depth_path, depth)
# depth_vis = (depth - depth.min()) / (depth.max() - depth.min() + 1e-5) * 255.0
# depth_vis = depth_vis.astype(np.uint8)
# colored_depth = (cmap(depth_vis)[:, :, :3] * 255).astype(np.uint8)
# split_region = np.ones((image.shape[0], 50, 3), dtype=np.uint8) * 255
# combined_result = cv2.hconcat([image[:, :, ::-1], split_region, colored_depth[:, :, ::-1]])
# vis_path = os.path.join(output_path, 'image_depth_vis.png')
# cv2.imwrite(vis_path, combined_result)
# file_names = ["image_depth_vis.png", "raw_depth.npy", "pointcloud.ply"]
# download_files = [
# (output_path / name).as_posix()
# for name in file_names
# if (output_path / name).exists()
# ]
# return [(image, colored_depth), glb_path, download_files]
return [img_tof_zoomed,img_peak_zoomed]
def draw_slice(input_image,slice_steps):
raw_hist = input_image[1:,...] #remove embd
print(slice_steps)
multishot = (raw_hist[...,62]*1024 + raw_hist[...,63])
normalize_data = 1 / multishot *25e4
nor_hist = (raw_hist) * normalize_data[...,np.newaxis]
est_nosie = median_pool_same_axis2(nor_hist,k=12)
sqrt_nosie = np.sqrt(est_nosie)
est_nosie = est_nosie + 0.4 * sqrt_nosie
nor_hist = nor_hist - est_nosie
nor_hist[nor_hist<0] = 0
slice_img = nor_hist[...,slice_steps]
map = (nor_hist[...,slice_steps])
ratio = 1/(np.max(map)-np.min(map))*255
data = (map-np.min(map)) * ratio
nonzero = data[data > 0] # 排除 0
# _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
_, otsu_thresh = cv2.threshold(nonzero.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE)
mask = map > ( _ ) / ratio
rgb_img = gray_to_color_zoom(slice_img)
rgb_mask_img = gray_to_color_zoom(mask.astype(np.float32))
print(mask.shape)
return rgb_img,rgb_mask_img
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(title)
gr.Markdown(description)
gr.Markdown("### Simple Elegant Algorithm")
file_input = gr.File(label="上传 .raw/.bin/.txt 文件", file_types=[".raw", ".bin", ".txt"])
input_image = gr.State()
with gr.Row():
# Left: input image + settings
with gr.Column():
total_count_image = gr.Image(label="Total Count Image", image_mode="RGB", type='numpy', elem_id='img-display-input')
with gr.Column():
histogram = gr.Plot(label="像素直方图")
with gr.Row():
# Right: 3D point cloud + depth
with gr.Column():
tof_image = gr.Image(label="ToF Image", image_mode="RGB", type='numpy', elem_id='img-display-input')
with gr.Column():
peak_image = gr.Image(label="Peak Image", image_mode="RGB", type='numpy', elem_id='img-display-input')
with gr.Row():
with gr.Column():
submit_btn = gr.Button(value="Predict")
with gr.Accordion(label="Settings", open=False):
show_filter_hist = gr.Checkbox(label="Show Filter HIST", value=False)
cycle_steps = gr.Slider(label="reflect filter Steps", minimum=1, maximum=262144, value=25e4, step=1)
tof_range_min_steps = gr.Slider(label="ToF Range Max Steps", minimum=0, maximum=62, value=5, step=1)
tof_range_max_steps = gr.Slider(label="ToF Range Min Steps", minimum=0, maximum=62, value=60, step=1)
apply_scatter_filter = gr.Checkbox(label="Apply scatter filter points", value=True)
apply_ref_filter = gr.Checkbox(label="Apply reflect filter points", value=False)
neighbor_filter_steps = gr.Slider(label="reflect filter Steps", minimum=1 , maximum=26, value=12, step=1)
apply_noise_filter = gr.Checkbox(label="Apply noise filter points", value=False)
noise_filter_steps = gr.Slider(label="noise filter Steps (STD)", minimum=0, maximum=1, value=0.3, step=0.01)
# with gr.Accordion(label="Settings", open=False):
with gr.Row():
with gr.Column():
slice_steps = gr.Slider(label="Slice Steps", minimum=0, maximum=63, value=0, step=1)
slice_image = gr.Image(label="Slice Image", image_mode="RGB", type='numpy')
with gr.Column():
binary_th_steps = gr.Slider(label="Binary Steps", minimum=0, maximum=256, value=128, step=1)
slice_histogram = gr.Image(label="Slice Image", image_mode="RGB", type='numpy')
mask_image = gr.Image(label="Mask Image", image_mode="RGB", type='numpy')
# with gr.Column():
# noise_image = gr.Image(label="Nosie Image", image_mode="RGB", type='numpy', elem_id='img-display-input')
# with gr.Column():
# multishot_image = gr.Image(label="Multishot Image", image_mode="RGB", type='numpy', elem_id='img-display-input')
# with gr.Tabs():
# with gr.Tab("3D View"):
# model_3d = gr.Model3D(display_mode="solid", label="3D Point Map", clear_color=[1,1,1,1], height="60vh")
# with gr.Tab("Depth"):
# depth_map = ImageSlider(label="Depth Map with Slider View", elem_id='img-display-output', position=0.5)
# with gr.Tab("Download"):
# download_files = gr.File(type='filepath', label="Download Files")
file_input.change(load_bin, inputs=file_input, outputs=[input_image,total_count_image])
# total_count_image.change(
# fn=update_image,
# inputs=total_count_image,
# outputs=total_count_image
# )
total_count_image.select(plot_pixel_histogram, inputs=[ input_image ,show_filter_hist], outputs=[histogram])
total_count_image.select(
fn=on_image_click,
outputs=total_count_image
)
submit_btn.click(
fn=lambda: [None, None, None, "", "", ""]
,outputs=[tof_image,peak_image]
).then(
fn=on_submit,
inputs=[input_image,cycle_steps, neighbor_filter_steps, noise_filter_steps, apply_scatter_filter,apply_ref_filter,apply_noise_filter,tof_range_min_steps,tof_range_max_steps]
,outputs=[tof_image,peak_image]
)
slice_steps.change(
draw_slice,
inputs=[input_image,slice_steps],
outputs=[slice_image,mask_image]
)
# example_files = os.listdir('assets/examples')
# example_files.sort()
# example_files = [os.path.join('assets/examples', filename) for filename in example_files]
# examples = gr.Examples(
# examples=example_files,
# inputs=input_image,
# outputs=[depth_map, model_3d, download_files],
# fn=on_submit,
# cache_examples=False
# )
demo.queue().launch(share=share)
if __name__ == '__main__':
main(share=False)
def mean_pool_same_axis2(arr, k=3):
pad = k // 2
# reflect padding,最像真实数据
arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median')
H, W, C = arr.shape
out = np.zeros_like(arr)
for i in range(C):
window = arr_pad[:, :, i : i + k]
# out[:, :, i] = np.median(window, axis=2)
out[:, :, i] = np.mean(window, axis=2)
return out
def median_pool_same_axis2(arr, k=12):
pad = k // 2
# reflect padding,最像真实数据
arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median')
H, W, C = arr.shape
out = np.zeros_like(arr)
for i in range(C):
window = arr_pad[:, :, i : i + k]
out[:, :, i] = np.median(window, axis=2)
return out
# raw_hist (30,40,64)
# bin_range = 3
# multishot = (raw_hist[...,62]*1024 + raw_hist[...,63])
# normalize_data = 1 / multishot *cycle_steps * 1/1024
# nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis]
# filter_hist = np.zeros_like(nor_hist)
# est_nosie = median_pool_same_axis2(nor_hist,k=12)
# sqrt_nosie = np.sqrt(est_nosie)
# est_nosie = est_nosie + noise_filter_steps * sqrt_nosie
# g_est_nosie = est_nosie
# nor_hist = nor_hist - est_nosie
# nor_hist[nor_hist<0] = 0
# for i in range(0,62,bin_range):
# map = (nor_hist[...,i:i+bin_range])
# ratio = 1/(np.max(map)-np.min(map))*255
# data = (map-np.min(map)) * ratio
# _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE)
# mask = map > ( _ ) / ratio
# filter_map = map * mask
# # filter_map = map * mask
# filter_hist[...,i:i+bin_range] = filter_map
# tof = np.argmax(filter_hist,axis=2)
# peak = np.take_along_axis(nor_hist, tof[..., None], axis=2)[..., 0]