import gradio as gr import cv2 import matplotlib import numpy as np import os import time import plotly.graph_objs as go from plotly.subplots import make_subplots from scipy.ndimage import convolve # from ppd.utils.set_seed import set_seed # from ppd.utils.align_depth_func import recover_metric_depth_ransac # from ppd.utils.depth2pcd import depth2pcd # from moge.model.v2 import MoGeModel # from ppd.models.ppd import PixelPerfectDepth # try: # import spaces # HUGGINFACE_SPACES_INSTALLED = True # except ImportError: # HUGGINFACE_SPACES_INSTALLED = False # css = """ # #img-display-container { # max-height: 100vh; # } # #img-display-input { # max-height: 100vh; # } # #img-display-output { # max-height: 100vh; # } # #download { # height: 62px; # } # #img-display-output .image-slider-image { # object-fit: contain !important; # width: 100% !important; # height: 100% !important; # } # # """ # set_seed(666) # DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # default_steps = 20 # model = PixelPerfectDepth(sampling_steps=default_steps) # ckpt_path = hf_hub_download( # repo_id="gangweix/Pixel-Perfect-Depth", # filename="ppd.pth", # repo_type="model" # ) # state_dict = torch.load(ckpt_path, map_location="cpu") # model.load_state_dict(state_dict, strict=False) # model = model.eval() # model = model.to(DEVICE) # moge_model = MoGeModel.from_pretrained("Ruicheng/moge-2-vitl-normal").eval() # moge_model = moge_model.to(DEVICE) # 用来保存当前显示的图(全局缓存一份) current_img = None normalize_hist = None g_est_nosie = None def readRAW(path): filesize = os.path.getsize(path) print('filesize: ',filesize) if filesize == 31*40*64*2: output = np.fromfile(path, dtype=np.int16) else: with open(path, "rb") as f: raw_data = f.read() raw10 = np.frombuffer(raw_data, dtype=np.uint8) n_blocks = raw10.shape[0] // 5 raw10 = raw10[:n_blocks * 5].reshape(-1, 5) B0 = raw10[:, 0].astype(np.uint16) B1 = raw10[:, 1].astype(np.uint16) B2 = raw10[:, 2].astype(np.uint16) B3 = raw10[:, 3].astype(np.uint16) B4 = raw10[:, 4] p0 = (B0 << 2) | ((B4 >> 0) & 0x03) p1 = (B1 << 2) | ((B4 >> 2) & 0x03) p2 = (B2 << 2) | ((B4 >> 4) & 0x03) p3 = (B3 << 2) | ((B4 >> 6) & 0x03) output = np.stack([p0, p1, p2, p3], axis=1).flatten() return output.reshape(31,40,64) def on_image_click(evt: gr.SelectData): global current_img if current_img is None: return None x, y = evt.index # 注意:是 (x, y) img = current_img.copy() # 画一个红点 cv2.circle(img, (x, y), 5, (255, 0, 0), -1) # current_img = img return img def update_image(img): global current_img current_img = img def update_hist(hist): global normalize_hist normalize_hist = hist def load_bin(file): raw_hist = readRAW(file.name).astype(np.float32) # multishot = raw_hist[..., 62] * 1024 + raw_hist[..., 63] # raw_hist = image[1:,...].copy() multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) normalize_data = 1 / multishot * 1/1024 nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis] # nor_hist = raw_hist / (multishot[..., None] + 1e-6) # 防止除0 img = np.log1p(np.sum(nor_hist[1:, :, :-2], axis=2)) norm_img = (img - img.min()) / (img.max() - img.min() + 1e-6) img_tc_zoomed = np.kron((norm_img * 255).astype(np.uint8), np.ones((16, 16), dtype=np.uint8)) update_image(img_tc_zoomed) update_hist(nor_hist) return raw_hist,img_tc_zoomed cmap = matplotlib.colormaps.get_cmap('viridis') def gray_to_color_zoom(img_1ch): # img_1ch: H×W, uint8 or float img_norm = (img_1ch - img_1ch.min()) / (img_1ch.max() - img_1ch.min() + 1e-6) color_img = cmap(img_norm)[..., :3] # 取 RGB,去掉 alpha color_img = (color_img * 255).astype(np.uint8) color_img = np.repeat(np.repeat(color_img, 16, axis=0), 16, axis=1) return color_img def main(share=True): print("Initializing Demo...") title = "# VisionICs 3D DEMO" description = """ 上传 `.bin/.raw` 文件,点击图像像素查看该像素的直方图 """ # @(spaces.GPU if HUGGINFACE_SPACES_INSTALLED else (lambda x: x)) # def predict_depth(image, denoise_steps): # depth, resize_image = model.infer_image(image, sampling_steps=denoise_steps) # return depth, resize_image # @(spaces.GPU if HUGGINFACE_SPACES_INSTALLED else (lambda x: x)) # def predict_moge_depth(image): # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # image = torch.tensor(image / 255, dtype=torch.float32, device=DEVICE).permute(2, 0, 1) # metric_depth, mask, intrinsics = moge_model.infer(image) # metric_depth[~mask] = metric_depth[mask].max() # return metric_depth, mask, intrinsics def estimate_noise(hist,noise_filter_steps): noise_hist = np.sort(hist, axis=2)[..., ::-1][...,32:] lower_bound = np.median(noise_hist, axis=2) est_nosie = (lower_bound + noise_filter_steps * np.std(noise_hist,axis=2)) return est_nosie def mean_pool_same_axis2(arr, k=3): pad = k // 2 # reflect padding,最像真实数据 arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median') H, W, C = arr.shape out = np.zeros_like(arr) for i in range(C): window = arr_pad[:, :, i : i + k] # out[:, :, i] = np.median(window, axis=2) out[:, :, i] = np.mean(window, axis=2) return out def median_pool_same_axis2(arr, k=12): pad = k // 2 # reflect padding,最像真实数据 arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median') H, W, C = arr.shape out = np.zeros_like(arr) for i in range(C): window = arr_pad[:, :, i : i + k] out[:, :, i] = np.median(window, axis=2) return out def min_pool_same_axis2(arr, k=12): pad = k // 2 # reflect padding,最像真实数据 arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median') H, W, C = arr.shape out = np.zeros_like(arr) for i in range(C): window = arr_pad[:, :, i : i + k] out[:, :, i] = np.min(window, axis=2) return out def plot_pixel_histogram(evt: gr.SelectData, raw_hist, show_filter_hist): CUSTOM_COLORS = [ "#1f77b4", # 蓝 "#ff7f0e", # 橙 "#2ca02c", # 绿 "#d62728", # 红 "#9467bd", # 紫 ] # print("evt:", evt) x, y = evt.index # Gradio SelectData 对象 x = x // 16 y = y // 16 # multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) # normalize_data = 1 / multishot *25e4 * 1/1024 # nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis] ego_tof_hist = raw_hist[y+1, x, :] # fig = go.Figure() fig = make_subplots(specs=[[{"secondary_y": True}]]) fig.add_trace(go.Scatter(y=ego_tof_hist, mode="lines+markers",name="Raw"),secondary_y=False) if normalize_hist is not None and show_filter_hist: global g_est_nosie ego_normalize_hist = normalize_hist[y, x, :] ego_tof = np.argmax(ego_normalize_hist) fig.add_trace(go.Scatter(y=ego_normalize_hist, mode="lines+markers",name="Filtered"),secondary_y=True) fig.add_vline( x=ego_tof, line_dash="dash", line_width=2 ) # fig.add_hline( # y = g_est_nosie[y, x], # line_dash="dash", # line_width=2,secondary_y=True # ) # print('est nosie ', g_est_nosie[y, x]) fig.update_layout( title=f"Pixel ({x}, {y}) 在所有 {ego_tof} ", xaxis_title="帧索引 (T)", yaxis_title="强度值", # yaxis=dict( # range=[y_min, y_max]) # Set the min and max for y-axis ) return fig def on_submit(image,cycle_steps, neighbor_filter_steps, noise_filter_steps, apply_scatter_filter,apply_ref_filter,apply_noise_filter,tof_range_min_steps,tof_range_max_steps, request: gr.Request = None): global g_est_nosie raw_hist = image[1:,...].copy() low, high = [tof_range_min_steps,tof_range_max_steps] t0 = time.perf_counter() multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) normalize_data = 1 / multishot *cycle_steps * 1/1024 nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis] dcr_cps = 4000 bin_size_ns = 0.25 total_bin = 62 integ_time_s = total_bin * bin_size_ns * cycle_steps * 1e-9 count_per_bin = dcr_cps * integ_time_s *normalize_data nor_hist = nor_hist - (count_per_bin)[...,np.newaxis] nor_hist[nor_hist<0] = 0 filter_hist = np.zeros_like(nor_hist) nor_hist[...,:low] = 0 nor_hist[...,high:] = 0 if apply_scatter_filter: est_nosie = median_pool_same_axis2(nor_hist,k=12) # est_nosie = min_pool_same_axis2(nor_hist,k=12) sqrt_nosie = np.sqrt(est_nosie) est_nosie = est_nosie + noise_filter_steps * sqrt_nosie g_est_nosie = est_nosie nor_hist = nor_hist - est_nosie nor_hist[nor_hist<0] = 0 bin_range = 3 for i in range(0,62,bin_range): map = (nor_hist[...,i:i+bin_range]) ratio = 1/(np.max(map)-np.min(map))*255 data = (map-np.min(map)) * ratio # _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE) mask = map > ( _ ) / ratio filter_map = map * mask filter_hist[...,i:i+bin_range] = filter_map t1 = time.perf_counter() print("elapsed:", (t1 - t0)*1e3, "miliseconds") mask_peak_filter = 1 filter_hist = filter_hist * 25e3 kernel = np.ones((3, 3, 3), dtype=np.float32) kernel[1, 1, 1] = 0 mask_filter = (filter_hist > 1).astype(np.uint8) out = convolve(mask_filter, kernel, mode='nearest', cval=0.0) mask = out >= neighbor_filter_steps print(filter_hist,out,mask) # filter_hist = mask * filter_hist filter_hist = np.where(mask, filter_hist, 0) edge_range = 3 filter_hist[...,:edge_range] = 0 nor_hist[...,:edge_range] = 0 filter_hist[...,-edge_range:] = 0 nor_hist[...,-edge_range:] = 0 if apply_scatter_filter: tof = np.argmax(filter_hist,axis=2) else: tof = np.argmax(nor_hist,axis=2) peak = np.take_along_axis(nor_hist, tof[..., None], axis=2)[..., 0] update_hist(filter_hist) if apply_noise_filter: # th = est_nosie est_nosie = mean_pool_same_axis2(nor_hist,8) sqrt_nosie = np.sqrt(est_nosie) est_nosie = est_nosie + noise_filter_steps * sqrt_nosie th = np.take_along_axis(est_nosie, tof[..., None], axis=2)[..., 0] else: th = 0 mask_nosie = peak > th tof = tof * mask_nosie * mask_peak_filter img_tof_zoomed= gray_to_color_zoom(tof) peak = np.log1p(peak) img_peak_zoomed= gray_to_color_zoom(peak) # tof = tof/np.max(tof+1e-7)*255 # norm_tof = (tof - tof.min()) / (tof.max() + 1e-8) # norm_tof_uint8 = (norm_tof * 255).astype(np.uint8) # img_tof_zoomed = np.repeat(np.repeat(norm_tof_uint8, 16, axis=0), 16, axis=1) # peak = peak/np.max(peak+1e-7)*255 # norm_peak = (peak - peak.min()) / (peak.max() + 1e-8) # = (norm_peak * 255).astype(np.uint8) # = np.repeat(np.repeat(norm_peak_uint8, 16, axis=0), 16, axis=1) # H, W = image.shape[:2] # ppd_depth, resize_image = predict_depth(image[:, :, ::-1], denoise_steps) # resize_H, resize_W = resize_image.shape[:2] # # moge provide metric depth and intrinsics # moge_depth, mask, intrinsics = predict_moge_depth(resize_image) # # relative depth -> metric depth # metric_depth = recover_metric_depth_ransac(ppd_depth, moge_depth, mask) # intrinsics[0, 0] *= resize_W # intrinsics[1, 1] *= resize_H # intrinsics[0, 2] *= resize_W # intrinsics[1, 2] *= resize_H # # metric depth -> point cloud # pcd = depth2pcd(metric_depth, intrinsics, color=cv2.cvtColor(resize_image, cv2.COLOR_BGR2RGB), input_mask=mask, ret_pcd=True) # if apply_filter: # cl, ind = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) # pcd = pcd.select_by_index(ind) # tempdir = Path(tempfile.gettempdir(), 'ppd') # tempdir.mkdir(exist_ok=True) # output_path = Path(tempdir, request.session_hash) # shutil.rmtree(output_path, ignore_errors=True) # output_path.mkdir(exist_ok=True, parents=True) # ply_path = os.path.join(output_path, 'pointcloud.ply') # # save pcd to temporary .ply # pcd.points = o3d.utility.Vector3dVector( # np.asarray(pcd.points) * np.array([1, -1, -1], dtype=np.float32) # ) # o3d.io.write_point_cloud(ply_path, pcd) # vertices = np.asarray(pcd.points) # vertex_colors = (np.asarray(pcd.colors) * 255).astype(np.uint8) # mesh = trimesh.PointCloud(vertices=vertices, colors=vertex_colors) # glb_path = os.path.join(output_path, 'pointcloud.glb') # mesh.export(glb_path) # # save raw depth (npy) # depth = cv2.resize(ppd_depth, (W, H), interpolation=cv2.INTER_LINEAR) # raw_depth_path = os.path.join(output_path, 'raw_depth.npy') # np.save(raw_depth_path, depth) # depth_vis = (depth - depth.min()) / (depth.max() - depth.min() + 1e-5) * 255.0 # depth_vis = depth_vis.astype(np.uint8) # colored_depth = (cmap(depth_vis)[:, :, :3] * 255).astype(np.uint8) # split_region = np.ones((image.shape[0], 50, 3), dtype=np.uint8) * 255 # combined_result = cv2.hconcat([image[:, :, ::-1], split_region, colored_depth[:, :, ::-1]]) # vis_path = os.path.join(output_path, 'image_depth_vis.png') # cv2.imwrite(vis_path, combined_result) # file_names = ["image_depth_vis.png", "raw_depth.npy", "pointcloud.ply"] # download_files = [ # (output_path / name).as_posix() # for name in file_names # if (output_path / name).exists() # ] # return [(image, colored_depth), glb_path, download_files] return [img_tof_zoomed,img_peak_zoomed] def draw_slice(input_image,slice_steps): raw_hist = input_image[1:,...] #remove embd print(slice_steps) multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) normalize_data = 1 / multishot *25e4 nor_hist = (raw_hist) * normalize_data[...,np.newaxis] est_nosie = median_pool_same_axis2(nor_hist,k=12) sqrt_nosie = np.sqrt(est_nosie) est_nosie = est_nosie + 0.4 * sqrt_nosie nor_hist = nor_hist - est_nosie nor_hist[nor_hist<0] = 0 slice_img = nor_hist[...,slice_steps] map = (nor_hist[...,slice_steps]) ratio = 1/(np.max(map)-np.min(map))*255 data = (map-np.min(map)) * ratio nonzero = data[data > 0] # 排除 0 # _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) _, otsu_thresh = cv2.threshold(nonzero.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE) mask = map > ( _ ) / ratio rgb_img = gray_to_color_zoom(slice_img) rgb_mask_img = gray_to_color_zoom(mask.astype(np.float32)) print(mask.shape) return rgb_img,rgb_mask_img with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(title) gr.Markdown(description) gr.Markdown("### Simple Elegant Algorithm") file_input = gr.File(label="上传 .raw/.bin/.txt 文件", file_types=[".raw", ".bin", ".txt"]) input_image = gr.State() with gr.Row(): # Left: input image + settings with gr.Column(): total_count_image = gr.Image(label="Total Count Image", image_mode="RGB", type='numpy', elem_id='img-display-input') with gr.Column(): histogram = gr.Plot(label="像素直方图") with gr.Row(): # Right: 3D point cloud + depth with gr.Column(): tof_image = gr.Image(label="ToF Image", image_mode="RGB", type='numpy', elem_id='img-display-input') with gr.Column(): peak_image = gr.Image(label="Peak Image", image_mode="RGB", type='numpy', elem_id='img-display-input') with gr.Row(): with gr.Column(): submit_btn = gr.Button(value="Predict") with gr.Accordion(label="Settings", open=False): show_filter_hist = gr.Checkbox(label="Show Filter HIST", value=False) cycle_steps = gr.Slider(label="reflect filter Steps", minimum=1, maximum=262144, value=25e4, step=1) tof_range_min_steps = gr.Slider(label="ToF Range Max Steps", minimum=0, maximum=62, value=5, step=1) tof_range_max_steps = gr.Slider(label="ToF Range Min Steps", minimum=0, maximum=62, value=60, step=1) apply_scatter_filter = gr.Checkbox(label="Apply scatter filter points", value=True) apply_ref_filter = gr.Checkbox(label="Apply reflect filter points", value=False) neighbor_filter_steps = gr.Slider(label="reflect filter Steps", minimum=1 , maximum=26, value=12, step=1) apply_noise_filter = gr.Checkbox(label="Apply noise filter points", value=False) noise_filter_steps = gr.Slider(label="noise filter Steps (STD)", minimum=0, maximum=1, value=0.3, step=0.01) # with gr.Accordion(label="Settings", open=False): with gr.Row(): with gr.Column(): slice_steps = gr.Slider(label="Slice Steps", minimum=0, maximum=63, value=0, step=1) slice_image = gr.Image(label="Slice Image", image_mode="RGB", type='numpy') with gr.Column(): binary_th_steps = gr.Slider(label="Binary Steps", minimum=0, maximum=256, value=128, step=1) slice_histogram = gr.Image(label="Slice Image", image_mode="RGB", type='numpy') mask_image = gr.Image(label="Mask Image", image_mode="RGB", type='numpy') # with gr.Column(): # noise_image = gr.Image(label="Nosie Image", image_mode="RGB", type='numpy', elem_id='img-display-input') # with gr.Column(): # multishot_image = gr.Image(label="Multishot Image", image_mode="RGB", type='numpy', elem_id='img-display-input') # with gr.Tabs(): # with gr.Tab("3D View"): # model_3d = gr.Model3D(display_mode="solid", label="3D Point Map", clear_color=[1,1,1,1], height="60vh") # with gr.Tab("Depth"): # depth_map = ImageSlider(label="Depth Map with Slider View", elem_id='img-display-output', position=0.5) # with gr.Tab("Download"): # download_files = gr.File(type='filepath', label="Download Files") file_input.change(load_bin, inputs=file_input, outputs=[input_image,total_count_image]) # total_count_image.change( # fn=update_image, # inputs=total_count_image, # outputs=total_count_image # ) total_count_image.select(plot_pixel_histogram, inputs=[ input_image ,show_filter_hist], outputs=[histogram]) total_count_image.select( fn=on_image_click, outputs=total_count_image ) submit_btn.click( fn=lambda: [None, None, None, "", "", ""] ,outputs=[tof_image,peak_image] ).then( fn=on_submit, inputs=[input_image,cycle_steps, neighbor_filter_steps, noise_filter_steps, apply_scatter_filter,apply_ref_filter,apply_noise_filter,tof_range_min_steps,tof_range_max_steps] ,outputs=[tof_image,peak_image] ) slice_steps.change( draw_slice, inputs=[input_image,slice_steps], outputs=[slice_image,mask_image] ) # example_files = os.listdir('assets/examples') # example_files.sort() # example_files = [os.path.join('assets/examples', filename) for filename in example_files] # examples = gr.Examples( # examples=example_files, # inputs=input_image, # outputs=[depth_map, model_3d, download_files], # fn=on_submit, # cache_examples=False # ) demo.queue().launch(share=share) if __name__ == '__main__': main(share=False) def mean_pool_same_axis2(arr, k=3): pad = k // 2 # reflect padding,最像真实数据 arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median') H, W, C = arr.shape out = np.zeros_like(arr) for i in range(C): window = arr_pad[:, :, i : i + k] # out[:, :, i] = np.median(window, axis=2) out[:, :, i] = np.mean(window, axis=2) return out def median_pool_same_axis2(arr, k=12): pad = k // 2 # reflect padding,最像真实数据 arr_pad = np.pad(arr, ((0,0),(0,0),(pad,pad)), mode='median') H, W, C = arr.shape out = np.zeros_like(arr) for i in range(C): window = arr_pad[:, :, i : i + k] out[:, :, i] = np.median(window, axis=2) return out # raw_hist (30,40,64) # bin_range = 3 # multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) # normalize_data = 1 / multishot *cycle_steps * 1/1024 # nor_hist = (raw_hist[...,:-2]) * normalize_data[...,np.newaxis] # filter_hist = np.zeros_like(nor_hist) # est_nosie = median_pool_same_axis2(nor_hist,k=12) # sqrt_nosie = np.sqrt(est_nosie) # est_nosie = est_nosie + noise_filter_steps * sqrt_nosie # g_est_nosie = est_nosie # nor_hist = nor_hist - est_nosie # nor_hist[nor_hist<0] = 0 # for i in range(0,62,bin_range): # map = (nor_hist[...,i:i+bin_range]) # ratio = 1/(np.max(map)-np.min(map))*255 # data = (map-np.min(map)) * ratio # _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_TRIANGLE) # mask = map > ( _ ) / ratio # filter_map = map * mask # # filter_map = map * mask # filter_hist[...,i:i+bin_range] = filter_map # tof = np.argmax(filter_hist,axis=2) # peak = np.take_along_axis(nor_hist, tof[..., None], axis=2)[..., 0]