LAS2Pano / app.py
OttoYu's picture
Update app.py
f664a16 verified
import laspy
import numpy as np
import cv2
from math import pi
from multiprocessing import Process, shared_memory, cpu_count
import gradio as gr
# === Constants ===
PRECISION = 16
FACTOR = 2 ** PRECISION
NUM_WORKERS = min(cpu_count(), 4)
SHARED_MEMORY_NAME = 'shared_canvas'
# === Parameters ===
IMAGE_HEIGHT = 4096 # Default image height
POINT_SIZE = 3
SCALE_COLORS = False
FULL_RES = False
MAX_RADIUS = 30
def create_shared_memory_array(data, name, dtype):
print("[STEP 7] Creating shared memory...")
shm = shared_memory.SharedMemory(create=True, size=data.nbytes, name=name)
shared_array = np.ndarray(data.shape, dtype=dtype, buffer=shm.buf)
shared_array[:] = data
print("[STEP 7] Shared memory initialized.")
return shm
def release_shared_memory(name):
print("[STEP 12] Releasing shared memory...")
shm = shared_memory.SharedMemory(name=name)
shm.close()
shm.unlink()
print("[STEP 12] Shared memory released.")
def draw_points(start, length, canvas_shape, theta, phi, radius, color):
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME)
canvas = np.ndarray(canvas_shape, dtype=np.uint16, buffer=shm.buf)
for i in range(start, start + length):
if 0 <= theta[i] < canvas_shape[1] * FACTOR and 0 <= phi[i] < canvas_shape[0] * FACTOR:
cv2.circle(
canvas,
(int(theta[i]), int(phi[i])),
int(radius[i]),
color=(int(color[i][0]), int(color[i][1]), int(color[i][2])),
thickness=cv2.FILLED,
shift=PRECISION
)
def project_point_cloud(input_file):
print("[STEP 1] Reading LAS file...")
try:
point_cloud = laspy.read(input_file.name)
except Exception as e:
return f"Failed to read LAS file: {e}"
print("[STEP 1] LAS file loaded successfully.")
height, width = IMAGE_HEIGHT, IMAGE_HEIGHT * 2
output_shape = (height, width, 3)
total_points = point_cloud.header.point_count
print(f"[STEP 2] Point cloud shape: {total_points} points")
# === Step 3: Create blank canvas ===
print("[STEP 3] Initializing blank canvas...")
canvas = np.zeros(output_shape, dtype=np.uint16)
# === Step 4: Compute distances ===
print("[STEP 4] Computing 3D distances...")
xyz = np.vstack((point_cloud.x, point_cloud.y, point_cloud.z)).T
distances = np.linalg.norm(xyz, axis=1)
distances[distances == 0] = 1e-6 # Avoid divide by zero
# === Step 5: Normalize distances for depth-aware radius ===
print("[STEP 5] Normalizing distances...")
dist_norm = (distances - distances.min()) / (distances.max() - distances.min())
# === Step 6: Compute angles ===
print("[STEP 6] Calculating spherical projection angles...")
theta = ((np.arctan2(point_cloud.y, point_cloud.x) + pi) / (2 * pi)) * width * FACTOR
theta = np.mod(theta, width * FACTOR).astype(np.uint64)
z_norm = np.clip(point_cloud.z / distances, -1.0, 1.0)
phi = ((np.arccos(z_norm) / pi) * height * FACTOR).astype(np.uint64)
# === Step 6.5: Compute radius based on distance ===
print("[STEP 6.5] Calculating radii for depth effect...")
radii = (FACTOR * POINT_SIZE * (1 - dist_norm) + 1).astype(np.uint64)
radii = np.clip(radii, 1, MAX_RADIUS)
# === Step 6.6: Compute colors ===
print("[STEP 6.6] Extracting and adjusting point colors...")
colors = np.stack([point_cloud.blue, point_cloud.green, point_cloud.red], axis=-1).astype(np.uint16)
if SCALE_COLORS:
colors *= 256
colors = np.clip(colors, 0, 65535)
# === Step 7: Create shared memory canvas ===
shm = create_shared_memory_array(canvas, SHARED_MEMORY_NAME, np.uint16)
# === Step 8: Start multiprocessing drawing ===
print("[STEP 8] Launching drawing processes...")
processes = []
for i in range(NUM_WORKERS):
start = int(i * total_points / NUM_WORKERS)
end = int((i + 1) * total_points / NUM_WORKERS)
p = Process(target=draw_points, args=(
start, end - start, output_shape, theta, phi, radii, colors
))
p.start()
processes.append(p)
print("[STEP 9] Waiting for drawing to complete...")
for p in processes:
p.join()
print("[STEP 9] All drawing processes finished.")
# === Step 10: Convert canvas back ===
print("[STEP 10] Retrieving final image from shared memory...")
final_image = np.ndarray(output_shape, dtype=np.uint16, buffer=shm.buf)
if not FULL_RES:
print("[STEP 10.1] Downsampling image to 8-bit for display...")
final_image = (final_image / 256).astype(np.uint8)
# === Step 11: Save image ===
output_file = "output_image.jpg"
print(f"[STEP 11] Saving image to: {output_file}")
cv2.imwrite(output_file, final_image)
# === Step 12: Cleanup shared memory ===
release_shared_memory(SHARED_MEMORY_NAME)
print("[STEP 13] Projection pipeline complete.")
return output_file
# === Gradio Interface ===
def main(input_file):
return project_point_cloud(input_file)
iface = gr.Interface(
fn=main,
inputs=gr.File(label="Upload LAS File"),
outputs=gr.Image(type="filepath", label="Projected Image"),
title="Equirectangular Point Cloud Projection",
description="Upload a LAS point cloud file and project it into a 360° equirectangular image."
)
if __name__ == "__main__":
iface.launch()