Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import plotly.graph_objs as go | |
| from scipy.ndimage import convolve | |
| import os | |
| import matplotlib.pyplot as plt | |
| from scipy.signal import find_peaks | |
| from scipy.ndimage import gaussian_filter1d | |
| import cv2 | |
| def readRAW(path): | |
| filesize = os.path.getsize(path) | |
| print(filesize) | |
| if filesize == 31*40*64*2: | |
| output = np.fromfile(path, dtype=np.int16) | |
| else: | |
| with open(path, "rb") as f: | |
| raw_data = f.read() | |
| raw10 = np.frombuffer(raw_data, dtype=np.uint8) | |
| n_blocks = raw10.shape[0] // 5 | |
| raw10 = raw10[:n_blocks * 5].reshape(-1, 5) | |
| B0 = raw10[:, 0].astype(np.uint16) | |
| B1 = raw10[:, 1].astype(np.uint16) | |
| B2 = raw10[:, 2].astype(np.uint16) | |
| B3 = raw10[:, 3].astype(np.uint16) | |
| B4 = raw10[:, 4] | |
| p0 = (B0 << 2) | ((B4 >> 0) & 0x03) | |
| p1 = (B1 << 2) | ((B4 >> 2) & 0x03) | |
| p2 = (B2 << 2) | ((B4 >> 4) & 0x03) | |
| p3 = (B3 << 2) | ((B4 >> 6) & 0x03) | |
| output = np.stack([p0, p1, p2, p3], axis=1).flatten() | |
| # output = np.fromfile(path, dtype=np.int16).reshape(31,40,64*2) | |
| # output = np.fromfile(path, dtype=np.int16).reshape(30,40,64) | |
| return output.reshape(31,40,64) | |
| def load_bin(file): | |
| # raw_hist = readRAW(file.name)[1:,...].astype(np.float32) | |
| raw_hist = readRAW(file.name).astype(np.float32) | |
| print("raw_hist shape:", raw_hist[0,0,:]) | |
| # raw_hist = raw_hist[::-1, ::-1, :] | |
| print("raw_hist shape:", raw_hist[0,0,:]) | |
| # raw_hist = readRAW(file.name) | |
| # 默认显示一张 sum 图像 | |
| multishot = (raw_hist[...,62]*1024 + raw_hist[...,63]) | |
| # multishot[multishot==0] = 20e3 | |
| # normalize_data = 1 / multishot * 20e3 | |
| normalize_data = 1 / multishot * 4e4 * 1/1023 | |
| nor_hist = (raw_hist) * normalize_data[...,np.newaxis] | |
| # nor_hist = (raw_hist) | |
| img = np.sum(nor_hist[1:, :, :-2], axis=2) | |
| img = np.log(img +1) | |
| norm_img = (img - img.min()) / (img.max()) | |
| img_uint8 = (norm_img * 255).astype(np.uint8) | |
| img_tc_zoomed = np.repeat(np.repeat(img_uint8, 16, axis=0), 16, axis=1) | |
| img = np.argmax(nor_hist[1:, :, 15:-2], axis=2)+15 | |
| nosie_est = np.mean(nor_hist[1:, :, -6:-3],axis=2) | |
| th = nosie_est + 3*np.sqrt(nosie_est) | |
| peak = np.max(nor_hist[1:, :, 5:-2], axis=2) | |
| mask = peak > th | |
| # img = img * mask | |
| print('std of tof' , np.std(img.flatten()),'std of peak' , np.std(peak.flatten())) | |
| norm_img = (img - img.min()) / (img.max() + 1e-8) | |
| img_uint8 = (norm_img * 255).astype(np.uint8) | |
| img_tof_zoomed = np.repeat(np.repeat(img_uint8, 16, axis=0), 16, axis=1) | |
| return img_tc_zoomed,img_tof_zoomed, raw_hist, nor_hist | |
| def plot_pixel_histogram(evt: gr.SelectData, raw_hist, nor_hist): | |
| # print("evt:", evt) | |
| x, y = evt.index # Gradio SelectData 对象 | |
| x = x // 16 | |
| y = y // 16 | |
| raw_hist = raw_hist - np.min(raw_hist[...,:-5],axis=2)[...,np.newaxis] | |
| raw_hist[raw_hist<0] = 0 | |
| rm_scatter_hist = np.zeros_like(raw_hist) | |
| # r=1 | |
| # for i in range(r,62): | |
| # range_hist = raw_hist[...,i] | |
| # data = range_hist | |
| # _, otsu_thresh = cv2.threshold(data.flatten().astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # mask = range_hist > _ | |
| # filter_map = range_hist * mask | |
| # raw_hist[...,i] = filter_map | |
| raw_values = raw_hist[y+1, x, :] | |
| raw_values1 = raw_hist[y+2, x, :] | |
| raw_values2 = raw_hist[y, x, :] | |
| raw_values3 = raw_hist[y+1, x+1, :] | |
| raw_values4 = raw_hist[y+1, x-1, :] | |
| tof = np.argmax(nor_hist[y+1, x, 10:-5]) + 10 | |
| tof_map = np.argmax(nor_hist[1:, :, 5:-5], axis=2) | |
| kernel = np.array(([1,2,1]), dtype=np.int32) | |
| result_conv = convolve(raw_values, kernel, mode='constant', cval=0) | |
| # result_conv = data | |
| I4 = tof | |
| I3 = I4-1 | |
| I5 = I4+1 | |
| C3 = result_conv[I3] | |
| C4 = result_conv[I4] | |
| C5 = result_conv[I5] | |
| shift_mat = (C5-C3)/(4.0 * C4 -2.0 * C3 - 2.0 * C5) | |
| sr_tof = (tof + shift_mat ) * 500 * 0.15 | |
| noise = np.mean(nor_hist[1:,...,:3],axis=2) | |
| range_hist = 3 | |
| nor_hist[nor_hist>3e3] = 3e3 | |
| epsilon=1e-10 | |
| array = (nor_hist[y+1, x, tof-range_hist:tof+range_hist+1]) - noise[y,x] | |
| safe_array = np.where(array <= 0, epsilon, array) | |
| sim_values = (safe_array) | |
| array = (nor_hist[1:, :, tof-range_hist:tof+range_hist+1]) - noise[...,np.newaxis] | |
| safe_array = np.where(array <= 0, epsilon, array) | |
| histogram_sim = (safe_array) | |
| print(sim_values.shape, histogram_sim.shape,noise.shape) | |
| img = np.tensordot(sim_values,histogram_sim, axes=(0, 2)) | |
| # img = np.log10(img) | |
| print(np.max(img)) | |
| # img[img<0] = 0 | |
| img = img/np.max(img+1e-7)*255 | |
| print('selected value: ',img[y,x],img.shape) | |
| # img = np.zeros((30,40)) | |
| # for i in range(30): | |
| # for j in range(40): | |
| # tof_ = np.argmax(nor_hist[i+1, j, :-2]) | |
| # # sim_values = nor_hist[i+1, j, tof_-range_hist:tof_+range_hist+1] | |
| # array = (nor_hist[i+1, j, tof_-range_hist:tof_+range_hist+1]) - noise[i,j] | |
| # safe_array = np.where(array <= 0, epsilon, array) | |
| # # print(safe_array.shape) | |
| # if safe_array.shape[0]==0: | |
| # continue | |
| # sim_values = (safe_array) | |
| # # histogram_sim = nor_hist[1:, :, tof_-range_hist:tof_+range_hist+1] | |
| # array = (nor_hist[1:, :, tof_-range_hist:tof_+range_hist+1]) - noise[...,np.newaxis] | |
| # safe_array = np.where(array <= 0, epsilon, array) | |
| # if safe_array.shape[0]==0: | |
| # continue | |
| # histogram_sim = (safe_array) | |
| # # | |
| # img_ = np.tensordot(sim_values,histogram_sim, axes=(0, 2)) | |
| # img_ = np.log(img_) | |
| # img_[img_<0] = 0 | |
| # img_ = img_/np.max(img_+1e-7)*255 | |
| # _, otsu_thresh = cv2.threshold(img_.astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # if (img_[i,j]-_)>0: | |
| # # if np.std(img_.flatten())<50: | |
| # img[i,j] = 255 | |
| # # print(i,j,img_[i,j],_) | |
| # else: | |
| # img[i,j] = 0 | |
| # # print(i,j,img_[i,j],_,'remove') | |
| # if i==y and j==x: | |
| # print(i,j,'sim ',img_[i,j],' th ',_,'selected') | |
| # # img = np.zeros((30,40)) | |
| # img = img * tof_map | |
| norm_img = (img - img.min()) / (img.max() + 1e-8) | |
| img_uint8 = (norm_img * 255).astype(np.uint8) | |
| img_tof_zoomed = np.repeat(np.repeat(img_uint8, 16, axis=0), 16, axis=1) | |
| vctEmbd = raw_hist[:1,:,:].flatten().astype(np.int32) >> 2 | |
| fRX_Temp = (vctEmbd[15] << 3) + vctEmbd[14] | |
| LDVCC = (((((vctEmbd[65] << 8) + vctEmbd[64])) - 1024) / 1024 * 1.7 * 0.9638 + 1.42) * 6 | |
| fTx_Temp = (((vctEmbd[67] << 8) + vctEmbd[66] - 1024) / 5.34 + 30) | |
| BVD = vctEmbd[23] | |
| # fTx_Temp = float(vctEmbd[61]+((vctEmbd[63] & 0xc0) << 2)) * 0.178 - 38.18 | |
| # LDVCC = ((((vctEmbd[63]&0x30)<<4) + vctEmbd[60] - 110) * 13.7 + 5000) / 1000 | |
| y_min = np.min(raw_values[:-2]) - 10 | |
| y_max = np.max(raw_values[:-2]) + 10 | |
| CUSTOM_COLORS = [ | |
| "#1f77b4", # 蓝 | |
| "#ff7f0e", # 橙 | |
| "#2ca02c", # 绿 | |
| "#d62728", # 红 | |
| "#9467bd", # 紫 | |
| ] | |
| dash_styles = ["solid", "dash", "dot", "dashdot"] | |
| fig = go.Figure() | |
| # fig.add_trace(go.Scatter(y=raw_values, mode="lines+markers")) | |
| # fig.add_trace(go.Scatter(y=raw_values1, mode="lines+markers")) | |
| # fig.add_trace(go.Scatter(y=raw_values2, mode="lines+markers")) | |
| # fig.add_trace(go.Scatter(y=raw_values3, mode="lines+markers")) | |
| # fig.add_trace(go.Scatter(y=raw_values4, mode="lines+markers")) | |
| # 取默认颜色序列 | |
| # colorway = fig.layout.colorway | |
| # if colorway is None: | |
| # colorway = go.Figure().layout.colorway # fallback | |
| start = 5 | |
| range_num = 4 | |
| hist_list = [raw_values1,raw_values2,raw_values3,raw_values4] | |
| ego_tof = np.argmax(raw_values[start:-5 ]) +start | |
| color = CUSTOM_COLORS[0] | |
| fig.add_trace(go.Scatter(y=raw_values, mode="lines+markers",line_color=color)) | |
| fig.add_vline( | |
| x=ego_tof, | |
| line_color=color, | |
| line_dash="solid", | |
| line_width=2 | |
| ) | |
| ego_tof_hist = raw_values[ego_tof-range_num:ego_tof+range_num+1] | |
| ego_tof_hist = ego_tof_hist - np.min(ego_tof_hist) | |
| ego_tof_hist = ego_tof_hist/np.linalg.norm(ego_tof_hist) | |
| ego_tof_neighbor_hist =[] | |
| ego_tof_neighbor_proj = [] | |
| neighbor_tof_ego_proj = [] | |
| for i,v in enumerate(hist_list): | |
| neighbor_tof = np.argmax(v[start:-5])+start | |
| neighbor_hist = v[ego_tof-range_num:ego_tof+range_num+1] | |
| neighbor_hist = neighbor_hist - np.min(neighbor_hist) | |
| neighbor_hist = neighbor_hist/np.linalg.norm(neighbor_hist) | |
| ego_tof_neighbor_hist.append(neighbor_hist) | |
| neighbor_tof_ego_hist = raw_values[neighbor_tof-range_num:neighbor_tof+range_num+1] | |
| neighbor_tof_ego_hist = neighbor_tof_ego_hist - np.min(neighbor_tof_ego_hist) | |
| neighbor_tof_ego_hist = neighbor_tof_ego_hist/np.linalg.norm(neighbor_tof_ego_hist) | |
| neighbor_tof_neighbor_hist = v[neighbor_tof-range_num:neighbor_tof+range_num+1] | |
| neighbor_tof_neighbor_hist = neighbor_tof_neighbor_hist - np.min(neighbor_tof_neighbor_hist) | |
| neighbor_tof_neighbor_hist = neighbor_tof_neighbor_hist/np.linalg.norm(neighbor_tof_neighbor_hist) | |
| # print('neighbor_hist','ego_tof_hist',neighbor_hist,ego_tof_hist,np.dot(neighbor_hist,ego_tof_hist)) | |
| # print('neighbor_tof_ego_hist','neighbor_tof_neighbor_hist',neighbor_tof_ego_hist,neighbor_tof_neighbor_hist,np.dot(neighbor_tof_ego_hist,neighbor_tof_neighbor_hist)) | |
| ego_tof_neighbor_proj.append(np.dot(neighbor_hist,ego_tof_hist)) | |
| neighbor_tof_ego_proj.append(np.dot(neighbor_tof_ego_hist,neighbor_tof_neighbor_hist)) | |
| color = CUSTOM_COLORS[i % len(CUSTOM_COLORS)+1] | |
| # fig.add_trace(go.Scatter(y=v, mode="lines+markers",line_color=color)) | |
| # fig.add_vline( | |
| # x=(neighbor_tof), | |
| # line_color=color, | |
| # line_dash=dash_styles[i % 4], | |
| # line_width=2 | |
| # ) | |
| fig.update_layout( | |
| title=f"Pixel ({x}, {y}) 在所有 {raw_values.shape[0]} 帧的强度变化 {f'ToF: {sr_tof:.1f} mm'} {f'RX: {fRX_Temp} °C'} {f'TX: {fTx_Temp:.2f} °C'} {f'LDVCC: {LDVCC:.2f} V'} {f'BVD: {BVD} V'}", | |
| xaxis_title="帧索引 (T)", | |
| yaxis_title="强度值", | |
| yaxis=dict( | |
| range=[y_min, y_max]) # Set the min and max for y-axis | |
| ) | |
| print('ego_tof_neighbor_proj',ego_tof_neighbor_proj) | |
| print('neighbor_tof_ego_proj',neighbor_tof_ego_proj) | |
| ego_tof_neighbor_hist = np.mean(np.array(ego_tof_neighbor_hist),axis=0) | |
| print(ego_tof_neighbor_hist) | |
| ego_tof_neighbor_hist = ego_tof_neighbor_hist/np.linalg.norm(ego_tof_neighbor_hist) | |
| print('mean ',np.dot(ego_tof_neighbor_hist,ego_tof_hist)) | |
| fig.add_trace(go.Scatter(y=ego_tof_neighbor_hist, mode="lines")) | |
| fig.add_trace(go.Scatter(y=ego_tof_hist, mode="lines+markers")) | |
| return fig, img_tof_zoomed,img | |
| # def plot_depth(nor_hist): | |
| # kernel = np.array([[1,1,1],[1,1,1],[1,1,1]]) | |
| # # Create an empty array to store the results | |
| # output = np.zeros((96, 240, 254)) | |
| # # Perform the convolution along the first two axes (height and width) | |
| # for i in range(254): | |
| # output[:, :, i] = convolve(nor_hist[:, :, i], kernel, mode='constant', cval=0) | |
| # modulate1 = np.arange(1,181,1) | |
| # modulate = modulate1 * modulate1 /(180*180) | |
| # arr = output[...,:180] * modulate | |
| # tc_bin = np.sum(arr,axis=(0,1)) | |
| # max_id = np.argmax(tc_bin[:-2]) | |
| # # modulate = np.concatenate([a, b,c]) | |
| # pad_head = np.ones(max_id-4) | |
| # expand_kernel = np.arange(1,13,1) * 0.01 | |
| # pad_tail = np.ones((180-len(pad_head)-len(expand_kernel))) | |
| # expand_filter = np.concatenate([pad_head, expand_kernel,pad_tail]) | |
| # arr_expandfilter = arr * expand_filter | |
| # tof = np.argmax(arr,axis=2) | |
| # tof_filter = np.argmax(arr_expandfilter,axis=2) | |
| # return tof, tof_filter | |
| def find_bimodal_threshold(data, bins=50, sigma=2): | |
| """ | |
| 查找双峰直方图的阈值 | |
| 参数: | |
| data: 输入数据 | |
| bins: 直方图分组数 | |
| sigma: 高斯平滑参数 | |
| 返回: | |
| threshold: 计算得到的阈值 | |
| peak_indices: 峰值位置 | |
| hist: 直方图数据 | |
| bin_edges: 分组边界 | |
| """ | |
| # 计算直方图 | |
| hist, bin_edges = np.histogram(data, bins=bins, density=True) | |
| bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 | |
| # 对直方图进行高斯平滑 | |
| smoothed_hist = gaussian_filter1d(hist, sigma=sigma) | |
| # 查找峰值 | |
| peak_indices, peak_properties = find_peaks(smoothed_hist, height=0.01, distance=10) | |
| if len(peak_indices) >= 2: | |
| # 找到两个主要峰值 | |
| peak_heights = smoothed_hist[peak_indices] | |
| sorted_peaks = peak_indices[np.argsort(peak_heights)[-2:]] | |
| sorted_peaks = np.sort(sorted_peaks) | |
| # 在两个峰值之间找到最低点作为阈值 | |
| valley_region = smoothed_hist[sorted_peaks[0]:sorted_peaks[1]] | |
| if len(valley_region) > 0: | |
| valley_index = np.argmin(valley_region) + sorted_peaks[0] | |
| threshold = bin_centers[valley_index] | |
| else: | |
| threshold = bin_centers[sorted_peaks[0]] | |
| else: | |
| print("警告: 未检测到明显的双峰分布") | |
| threshold = np.median(data) | |
| return threshold, peak_indices, hist, bin_edges | |
| def draw_histogram(evt: gr.SelectData,text, bins): | |
| # 解析输入数据 | |
| try: | |
| data = text.flatten() | |
| except: | |
| return None | |
| x, y = evt.index # Gradio SelectData 对象 | |
| x = x // 16 | |
| y = y // 16 | |
| # 使用OpenCV的Otsu阈值 | |
| _, otsu_thresh = cv2.threshold(data.astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # 查找双峰阈值 | |
| # threshold, peak_indices, hist, bin_edges = find_bimodal_threshold(data) | |
| # print(f"检测到 {len(peak_indices)} 个峰值") | |
| hist, bin_edges = np.histogram(data, bins=bins) | |
| indices = np.arange(len(hist)) | |
| total_weight = np.sum(hist) | |
| centroid = np.sum(indices * hist) / total_weight | |
| print('ostu threshold: ',_,'data std',np.std(data),' centroid, ',centroid, 'diff ', np.abs(_-centroid)) | |
| # plt.figure() | |
| # # 绘制原始直方图 | |
| # plt.hist(data, bins=50, alpha=0.7, color='skyblue', edgecolor='black', | |
| # label='数据分布', density=True) | |
| # # 画直方图 | |
| plt.figure() | |
| plt.hist(data, bins=bins, density=False) | |
| plt.xlabel("Value") | |
| plt.ylabel("Count") | |
| plt.title("Histogram") | |
| # 绘制阈值线 | |
| plt.axvline(x=_, color='red', linestyle='--', linewidth=3, | |
| label=f'双峰阈值: {_:.2f}') | |
| plt.axvline(x=data[y*40+x], color='green', linestyle='--', linewidth=3, | |
| label=f'Seelcted: {_:.2f}') | |
| # plt.legend() | |
| return plt | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 上传 31,40,64 int16 `.bin/.raw` 文件,点击图像像素查看该像素的 64 帧直方图") | |
| file_input = gr.File(label="上传 .raw/.bin 文件", file_types=[".raw", ".bin"]) | |
| image_tc_display = gr.Image(interactive=True, label="tc") | |
| image_tof_display = gr.Image(interactive=True, label="tof") | |
| histogram = gr.Plot(label="像素强度曲线") | |
| raw_hist = gr.State() | |
| nor_hist = gr.State() | |
| img_state = gr.State() # ✅ 保存你点击后的数组(替代原来的 img = []) | |
| bins_slider = gr.Slider(5, 200, value=64, step=1, label="Bins") | |
| image_sim_display = gr.Image(interactive=True, label="sim") | |
| sim_histogram = gr.Plot(label="相似性直方图") | |
| file_input.change(load_bin, inputs=file_input, outputs=[image_tc_display, image_tof_display, raw_hist, nor_hist]) | |
| image_tof_display.select(plot_pixel_histogram, inputs=[ raw_hist, nor_hist], outputs=[histogram,image_sim_display,img_state]) | |
| # 3️⃣ 用数组 + bins 重新画直方图 | |
| image_tof_display.select( | |
| draw_histogram, | |
| inputs=[img_state, gr.State(16)], | |
| outputs=sim_histogram | |
| ) | |
| # # 3️⃣ 用数组 + bins 重新画直方图 | |
| # bins_slider.change( | |
| # draw_histogram, | |
| # inputs=[img_state, bins_slider], | |
| # outputs=sim_histogram | |
| # ) | |
| # gr.Interface( | |
| # fn=draw_histogram, | |
| # inputs=[ | |
| # img, | |
| # gr.Slider(5, 200, value=64, step=1, label="Bins") | |
| # ], | |
| # outputs=gr.Plot(), | |
| # ) | |
| # demo.launch(share=True) | |
| demo.launch(share=False) |