|
|
import laspy |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from math import pi |
|
|
from multiprocessing import Process, shared_memory, cpu_count |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
PRECISION = 16 |
|
|
FACTOR = 2 ** PRECISION |
|
|
NUM_WORKERS = min(cpu_count(), 4) |
|
|
SHARED_MEMORY_NAME = 'shared_canvas' |
|
|
|
|
|
|
|
|
IMAGE_HEIGHT = 4096 |
|
|
POINT_SIZE = 3 |
|
|
SCALE_COLORS = False |
|
|
FULL_RES = False |
|
|
MAX_RADIUS = 30 |
|
|
|
|
|
|
|
|
def create_shared_memory_array(data, name, dtype): |
|
|
print("[STEP 7] Creating shared memory...") |
|
|
shm = shared_memory.SharedMemory(create=True, size=data.nbytes, name=name) |
|
|
shared_array = np.ndarray(data.shape, dtype=dtype, buffer=shm.buf) |
|
|
shared_array[:] = data |
|
|
print("[STEP 7] Shared memory initialized.") |
|
|
return shm |
|
|
|
|
|
|
|
|
def release_shared_memory(name): |
|
|
print("[STEP 12] Releasing shared memory...") |
|
|
shm = shared_memory.SharedMemory(name=name) |
|
|
shm.close() |
|
|
shm.unlink() |
|
|
print("[STEP 12] Shared memory released.") |
|
|
|
|
|
|
|
|
def draw_points(start, length, canvas_shape, theta, phi, radius, color): |
|
|
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME) |
|
|
canvas = np.ndarray(canvas_shape, dtype=np.uint16, buffer=shm.buf) |
|
|
|
|
|
for i in range(start, start + length): |
|
|
if 0 <= theta[i] < canvas_shape[1] * FACTOR and 0 <= phi[i] < canvas_shape[0] * FACTOR: |
|
|
cv2.circle( |
|
|
canvas, |
|
|
(int(theta[i]), int(phi[i])), |
|
|
int(radius[i]), |
|
|
color=(int(color[i][0]), int(color[i][1]), int(color[i][2])), |
|
|
thickness=cv2.FILLED, |
|
|
shift=PRECISION |
|
|
) |
|
|
|
|
|
|
|
|
def project_point_cloud(input_file): |
|
|
print("[STEP 1] Reading LAS file...") |
|
|
try: |
|
|
point_cloud = laspy.read(input_file.name) |
|
|
except Exception as e: |
|
|
return f"Failed to read LAS file: {e}" |
|
|
|
|
|
print("[STEP 1] LAS file loaded successfully.") |
|
|
|
|
|
height, width = IMAGE_HEIGHT, IMAGE_HEIGHT * 2 |
|
|
output_shape = (height, width, 3) |
|
|
total_points = point_cloud.header.point_count |
|
|
print(f"[STEP 2] Point cloud shape: {total_points} points") |
|
|
|
|
|
|
|
|
print("[STEP 3] Initializing blank canvas...") |
|
|
canvas = np.zeros(output_shape, dtype=np.uint16) |
|
|
|
|
|
|
|
|
print("[STEP 4] Computing 3D distances...") |
|
|
xyz = np.vstack((point_cloud.x, point_cloud.y, point_cloud.z)).T |
|
|
distances = np.linalg.norm(xyz, axis=1) |
|
|
distances[distances == 0] = 1e-6 |
|
|
|
|
|
|
|
|
print("[STEP 5] Normalizing distances...") |
|
|
dist_norm = (distances - distances.min()) / (distances.max() - distances.min()) |
|
|
|
|
|
|
|
|
print("[STEP 6] Calculating spherical projection angles...") |
|
|
theta = ((np.arctan2(point_cloud.y, point_cloud.x) + pi) / (2 * pi)) * width * FACTOR |
|
|
theta = np.mod(theta, width * FACTOR).astype(np.uint64) |
|
|
|
|
|
z_norm = np.clip(point_cloud.z / distances, -1.0, 1.0) |
|
|
phi = ((np.arccos(z_norm) / pi) * height * FACTOR).astype(np.uint64) |
|
|
|
|
|
|
|
|
print("[STEP 6.5] Calculating radii for depth effect...") |
|
|
radii = (FACTOR * POINT_SIZE * (1 - dist_norm) + 1).astype(np.uint64) |
|
|
radii = np.clip(radii, 1, MAX_RADIUS) |
|
|
|
|
|
|
|
|
print("[STEP 6.6] Extracting and adjusting point colors...") |
|
|
colors = np.stack([point_cloud.blue, point_cloud.green, point_cloud.red], axis=-1).astype(np.uint16) |
|
|
if SCALE_COLORS: |
|
|
colors *= 256 |
|
|
colors = np.clip(colors, 0, 65535) |
|
|
|
|
|
|
|
|
shm = create_shared_memory_array(canvas, SHARED_MEMORY_NAME, np.uint16) |
|
|
|
|
|
|
|
|
print("[STEP 8] Launching drawing processes...") |
|
|
processes = [] |
|
|
for i in range(NUM_WORKERS): |
|
|
start = int(i * total_points / NUM_WORKERS) |
|
|
end = int((i + 1) * total_points / NUM_WORKERS) |
|
|
p = Process(target=draw_points, args=( |
|
|
start, end - start, output_shape, theta, phi, radii, colors |
|
|
)) |
|
|
p.start() |
|
|
processes.append(p) |
|
|
|
|
|
print("[STEP 9] Waiting for drawing to complete...") |
|
|
for p in processes: |
|
|
p.join() |
|
|
print("[STEP 9] All drawing processes finished.") |
|
|
|
|
|
|
|
|
print("[STEP 10] Retrieving final image from shared memory...") |
|
|
final_image = np.ndarray(output_shape, dtype=np.uint16, buffer=shm.buf) |
|
|
if not FULL_RES: |
|
|
print("[STEP 10.1] Downsampling image to 8-bit for display...") |
|
|
final_image = (final_image / 256).astype(np.uint8) |
|
|
|
|
|
|
|
|
output_file = "output_image.jpg" |
|
|
print(f"[STEP 11] Saving image to: {output_file}") |
|
|
cv2.imwrite(output_file, final_image) |
|
|
|
|
|
|
|
|
release_shared_memory(SHARED_MEMORY_NAME) |
|
|
|
|
|
print("[STEP 13] Projection pipeline complete.") |
|
|
return output_file |
|
|
|
|
|
|
|
|
|
|
|
def main(input_file): |
|
|
return project_point_cloud(input_file) |
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=main, |
|
|
inputs=gr.File(label="Upload LAS File"), |
|
|
outputs=gr.Image(type="filepath", label="Projected Image"), |
|
|
title="Equirectangular Point Cloud Projection", |
|
|
description="Upload a LAS point cloud file and project it into a 360° equirectangular image." |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch() |
|
|
|