|
|
import numpy as np |
|
|
from scipy.ndimage import convolve |
|
|
import os |
|
|
from numpy.lib.stride_tricks import sliding_window_view |
|
|
from numba import njit |
|
|
import cv2 |
|
|
|
|
|
def mipi_raw10_to_raw8_scaled(raw10_data): |
|
|
raw10_data = np.frombuffer(raw10_data, dtype=np.uint8) |
|
|
n_blocks = len(raw10_data) // 5 |
|
|
raw10_data = raw10_data[:n_blocks * 5].reshape(-1, 5) |
|
|
|
|
|
p0 = (raw10_data[:, 0].astype(np.uint16) << 2) | ((raw10_data[:, 4] >> 0) & 0x03) |
|
|
p1 = (raw10_data[:, 1].astype(np.uint16) << 2) | ((raw10_data[:, 4] >> 2) & 0x03) |
|
|
p2 = (raw10_data[:, 2].astype(np.uint16) << 2) | ((raw10_data[:, 4] >> 4) & 0x03) |
|
|
p3 = (raw10_data[:, 3].astype(np.uint16) << 2) | ((raw10_data[:, 4] >> 6) & 0x03) |
|
|
|
|
|
raw8_data = np.empty((n_blocks * 4 * 2,), dtype=np.uint8) |
|
|
raw8_data[0::8] = p0 & 0xFF |
|
|
raw8_data[1::8] = p0 >> 8 |
|
|
raw8_data[2::8] = p1 & 0xFF |
|
|
raw8_data[3::8] = p1 >> 8 |
|
|
raw8_data[4::8] = p2 & 0xFF |
|
|
raw8_data[5::8] = p2 >> 8 |
|
|
raw8_data[6::8] = p3 & 0xFF |
|
|
raw8_data[7::8] = p3 >> 8 |
|
|
|
|
|
return raw8_data.tobytes() |
|
|
|
|
|
def readRAW(path): |
|
|
filesize = os.path.getsize(path) |
|
|
|
|
|
with open(path, "rb") as f: |
|
|
raw_data = f.read() |
|
|
|
|
|
|
|
|
if filesize == 7372800: |
|
|
raw_data = mipi_raw10_to_raw8_scaled(raw_data) |
|
|
|
|
|
|
|
|
arr = np.frombuffer(raw_data, dtype=np.int16).reshape(96, 240, 256) |
|
|
|
|
|
|
|
|
reshaped = arr.reshape(*arr.shape[:-1], -1, 2) |
|
|
swapped = reshaped[..., ::-1] |
|
|
histogram_data = swapped.reshape(arr.shape) |
|
|
|
|
|
|
|
|
mapping = [0, 4, 1, 5, 2, 6, 3, 7] |
|
|
group_size = 8 |
|
|
num_groups = 12 |
|
|
output = np.empty_like(histogram_data) |
|
|
|
|
|
for g in range(num_groups): |
|
|
start = g * group_size |
|
|
end = start + group_size |
|
|
output[start:end, :, :] = histogram_data[start:end, :, :][mapping, :, :] |
|
|
|
|
|
return output.astype(np.int16) |
|
|
|
|
|
def binning_2x2_stride2(data): |
|
|
""" |
|
|
data: numpy array (96, 240, 256) |
|
|
return: numpy array (48, 120, 256) |
|
|
""" |
|
|
|
|
|
return data.reshape(48, 2, 120, 2, 256).sum(axis=(1, 3)) |
|
|
|
|
|
def binning_2x2_stride1(data): |
|
|
""" |
|
|
data: numpy array (96, 240, 256) |
|
|
return: numpy array (95, 239, 256) # 因为stride=1,边界少1行1列 |
|
|
""" |
|
|
|
|
|
binned = (data[:-1, :-1] + data[1:, :-1] + |
|
|
data[:-1, 1:] + data[1:, 1:]) |
|
|
|
|
|
return binned |
|
|
|
|
|
|
|
|
def ma_vectorized(data, kernel=[-2, -2, 1, 2, 2, 3, -1, -1]): |
|
|
kernel = np.array(kernel, dtype=np.float32) |
|
|
k = kernel.size |
|
|
kernel_sum = kernel.sum() |
|
|
|
|
|
|
|
|
data = np.asarray(data, dtype=np.float32) |
|
|
|
|
|
|
|
|
windows = np.lib.stride_tricks.sliding_window_view(data, window_shape=k, axis=2) |
|
|
|
|
|
|
|
|
smoothed = np.tensordot(windows, kernel, axes=([3],[0])) / kernel_sum |
|
|
smoothed[smoothed<0] = 0 |
|
|
|
|
|
|
|
|
pad_width = ((0,0), (0,0), (0,k-1)) |
|
|
smoothed = np.pad(smoothed, pad_width, mode='constant', constant_values=0) |
|
|
return smoothed |
|
|
|
|
|
|
|
|
def ma_vectorized_fast(data, kernel=[-2, -2, 1, 2, 2, 3, -1, -1]): |
|
|
kernel = np.array(kernel, dtype=np.float32) |
|
|
k = kernel.size |
|
|
kernel_sum = kernel.sum() |
|
|
|
|
|
if kernel_sum == 0: |
|
|
kernel_sum = 1 |
|
|
|
|
|
|
|
|
pad = k // 2 |
|
|
data_padded = np.pad(data, ((0,0),(0,0),(pad,pad)), mode='edge') |
|
|
|
|
|
|
|
|
def conv_1d(x): |
|
|
return np.convolve(x, kernel[::-1], mode='valid') / kernel_sum |
|
|
|
|
|
|
|
|
smoothed = np.apply_along_axis(conv_1d, 2, data_padded) |
|
|
|
|
|
smoothed = np.maximum(smoothed, 0) |
|
|
return smoothed |
|
|
|
|
|
BIN_SIZE = 180 |
|
|
MAX_PEAKS = 2 |
|
|
|
|
|
@njit |
|
|
def sum_hist(hist, length): |
|
|
return np.sum(hist[:length]) |
|
|
@njit |
|
|
def max_hist(hist, length): |
|
|
return np.max(hist[:length]) |
|
|
@njit |
|
|
def compute_centroid(hist, start_bin, end_bin): |
|
|
bins = np.arange(start_bin, end_bin + 1) |
|
|
values = hist[start_bin:end_bin + 1] |
|
|
total = np.sum(values) |
|
|
if total == 0: |
|
|
return (start_bin + end_bin) / 2.0 |
|
|
return np.sum(bins * values) / total |
|
|
|
|
|
@njit |
|
|
def find_peaks_hw(histograms, histograms_ma): |
|
|
""" |
|
|
输入: |
|
|
histograms: (H, W, 256) 原始直方图 |
|
|
histograms_ma: (H, W, 256) 平滑直方图 |
|
|
输出: |
|
|
tof_data: (H, W, MAX_PEAKS) 质心位置 |
|
|
peak_data: (H, W, MAX_PEAKS) 峰值强度 |
|
|
noise_data: (H, W) 噪声值 |
|
|
multishot_data: (H, W) 多拍信息 |
|
|
totalcount: (H, W) 总计数 |
|
|
nt_count_n: (H, W, MAX_PEAKS) NT计数 |
|
|
""" |
|
|
|
|
|
H, W, _ = histograms.shape |
|
|
|
|
|
|
|
|
tof_data = np.zeros((H, W, MAX_PEAKS), dtype=np.float32) |
|
|
peak_data = np.zeros((H, W, MAX_PEAKS), dtype=np.int32) |
|
|
noise_data = np.zeros((H, W), dtype=np.int16) |
|
|
multishot_data = np.zeros((H, W), dtype=np.int16) |
|
|
totalcount = np.zeros((H, W), dtype=np.int32) |
|
|
nt_count_n = np.zeros((H, W, MAX_PEAKS), dtype=np.int32) |
|
|
|
|
|
for i in range(H): |
|
|
for j in range(W): |
|
|
hist_raw = histograms[i, j] |
|
|
hist = histograms_ma[i, j] |
|
|
count = 0 |
|
|
bin_idx = 1 |
|
|
|
|
|
|
|
|
multishot = (int(hist_raw[254]) << 8) + (int(hist_raw[255]) >> 2) |
|
|
multishot_data[i, j] = multishot |
|
|
|
|
|
totalcount[i, j] = int(sum_hist(hist_raw, BIN_SIZE) << 13) // multishot |
|
|
|
|
|
|
|
|
noise_data[i, j] = int(sum_hist(hist, 8) + 4) >> 3 |
|
|
|
|
|
noise_i = max_hist(hist, 8) |
|
|
|
|
|
max_th = 50 |
|
|
noise_i = max_th if noise_i < max_th else noise_i * 3 |
|
|
th = 2 * noise_i - 24 / (2e4 * 4) * noise_i * noise_i |
|
|
|
|
|
|
|
|
while bin_idx < BIN_SIZE - 1 and count < MAX_PEAKS: |
|
|
|
|
|
while bin_idx < BIN_SIZE - 1 and hist[bin_idx] < th: |
|
|
bin_idx += 1 |
|
|
|
|
|
bin_idx -= 1 |
|
|
start_bin = bin_idx |
|
|
start_peak = hist[start_bin] |
|
|
|
|
|
|
|
|
while bin_idx + 1 < BIN_SIZE and ( |
|
|
hist[bin_idx] < hist[bin_idx - 1] or hist[bin_idx] < hist[bin_idx + 1] |
|
|
): |
|
|
bin_idx += 1 |
|
|
max_bin = bin_idx |
|
|
max_peak = hist[max_bin] |
|
|
|
|
|
|
|
|
while bin_idx + 1 < BIN_SIZE and ( |
|
|
hist[bin_idx] > th or hist[bin_idx] > start_peak or hist[bin_idx] > hist[bin_idx + 1] |
|
|
): |
|
|
bin_idx += 1 |
|
|
end_bin = bin_idx |
|
|
|
|
|
if ( |
|
|
start_bin == end_bin |
|
|
or start_bin == max_bin |
|
|
or max_bin == end_bin |
|
|
or (max_peak - start_peak) < 50 |
|
|
): |
|
|
bin_idx += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
centroid = compute_centroid(hist, start_bin, end_bin) |
|
|
tof_data[i, j, count] = centroid |
|
|
peak_data[i, j, count] = (int(hist[max_bin]) << 13) // multishot |
|
|
|
|
|
|
|
|
nt_end_bin = max_bin - 10 |
|
|
nt_start_bin = nt_end_bin - 48 |
|
|
nt_start_bin = 0 if nt_start_bin < 0 else nt_start_bin |
|
|
nt_num = nt_end_bin - nt_start_bin |
|
|
est_nt = 48 * noise_data[i, j] if nt_num < 48 else 0 |
|
|
nt_count_n[i, j, count] = np.sum(hist[nt_start_bin:nt_start_bin + nt_num]) + est_nt |
|
|
|
|
|
count += 1 |
|
|
peak_data=peak_data*256/48000 |
|
|
return tof_data, peak_data, noise_data, multishot_data, totalcount, nt_count_n |
|
|
|
|
|
|
|
|
def local_threshold(img, window_size=15, C=2): |
|
|
h, w = img.shape |
|
|
out = np.zeros_like(img, dtype=np.uint8) |
|
|
pad = window_size // 2 |
|
|
padded = cv2.copyMakeBorder(img, pad, pad, pad, pad, cv2.BORDER_REFLECT) |
|
|
|
|
|
for i in range(h): |
|
|
for j in range(w): |
|
|
local_region = padded[i:i+window_size, j:j+window_size] |
|
|
|
|
|
thresh = np.mean(local_region,axis=(0,1)) - 0.1 |
|
|
out[i,j] = img[i,j] if img[i,j] > thresh else 0 |
|
|
|
|
|
return out |
|
|
|
|
|
def select_peaks_hw(tof_data, peak_data): |
|
|
""" |
|
|
输入: |
|
|
tof_data: (H, W, MAX_PEAKS) 质心位置 |
|
|
peak_data: (H, W, MAX_PEAKS) 峰值强度 |
|
|
输出: |
|
|
tof: (H, W) |
|
|
peak: (H, W) |
|
|
|
|
|
""" |
|
|
|
|
|
ref_set = np.zeros_like(peak_data) |
|
|
for i in range(MAX_PEAKS): |
|
|
ref_set[...,i] = peak_data[...,i]*tof_data[...,i]*tof_data[...,i] /1200 * 6 |
|
|
ref = np.log2(ref_set[...,i]) |
|
|
ref[ref<0] = 0 |
|
|
_, otsu_binary = cv2.threshold(ref.astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ref_set[...,i] =(ref) |
|
|
|
|
|
ref = np.max(ref_set,axis=2) |
|
|
|
|
|
frame1 = ref_set[...,0] |
|
|
frame2 = ref_set[...,1] |
|
|
|
|
|
mask1 = np.zeros_like(frame1, dtype=np.uint8) |
|
|
mask2 = np.zeros_like(frame2, dtype=np.uint8) |
|
|
|
|
|
|
|
|
mask1[(frame1 > frame2) & (frame1 > 0)] = 1 |
|
|
mask2[(frame2 > frame1) & (frame2 > 0)] = 1 |
|
|
|
|
|
|
|
|
mask1[(frame1 == frame2) & (frame1 > 0)] = 1 |
|
|
|
|
|
tof = tof_data[...,0] * mask1 + tof_data[...,1] * mask2 |
|
|
peak = peak_data[...,0] * mask1 + peak_data[...,1] * mask2 |
|
|
|
|
|
return tof,peak,ref,ref_set |