Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import math | |
| import tempfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Iterable | |
| import numpy as np | |
| import trimesh | |
| from scipy import ndimage | |
| from skimage import measure | |
| from llm_parser import DEFAULT_LOCAL_MODEL, parse_prompt_with_local_llm | |
| from parser import PromptSpec, parse_prompt | |
| class BuildArtifacts: | |
| ply_path: str | |
| glb_path: str | |
| summary: dict | |
| SCALE_FACTORS = { | |
| "small": 1.0, | |
| "medium": 1.35, | |
| "large": 1.85, | |
| } | |
| def _sample_box_surface(center, size, density: int, label: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| cx, cy, cz = center | |
| sx, sy, sz = size | |
| n = max(4, density) | |
| u = np.linspace(-0.5, 0.5, n) | |
| vv = np.linspace(-0.5, 0.5, n) | |
| pts = [] | |
| normals = [] | |
| labels = [] | |
| for ax in (-1, 1): | |
| x = np.full((n, n), cx + ax * sx / 2) | |
| y, z = np.meshgrid(u * sy + cy, vv * sz + cz) | |
| pts.append(np.column_stack([x.ravel(), y.ravel(), z.ravel()])) | |
| normals.append(np.tile([ax, 0, 0], (n * n, 1))) | |
| labels.append(np.full(n * n, label)) | |
| for ay in (-1, 1): | |
| y = np.full((n, n), cy + ay * sy / 2) | |
| x, z = np.meshgrid(u * sx + cx, vv * sz + cz) | |
| pts.append(np.column_stack([x.ravel(), y.ravel(), z.ravel()])) | |
| normals.append(np.tile([0, ay, 0], (n * n, 1))) | |
| labels.append(np.full(n * n, label)) | |
| for az in (-1, 1): | |
| z = np.full((n, n), cz + az * sz / 2) | |
| x, y = np.meshgrid(u * sx + cx, vv * sy + cy) | |
| pts.append(np.column_stack([x.ravel(), y.ravel(), z.ravel()])) | |
| normals.append(np.tile([0, 0, az], (n * n, 1))) | |
| labels.append(np.full(n * n, label)) | |
| return np.vstack(pts), np.vstack(normals), np.concatenate(labels) | |
| def _sample_ellipsoid_surface(center, radii, density: int, label: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| cx, cy, cz = center | |
| rx, ry, rz = radii | |
| nu = max(16, density * 3) | |
| nv = max(10, density * 2) | |
| u = np.linspace(0, 2 * math.pi, nu, endpoint=False) | |
| v = np.linspace(-math.pi / 2, math.pi / 2, nv) | |
| uu, vv = np.meshgrid(u, v) | |
| x = cx + rx * np.cos(vv) * np.cos(uu) | |
| y = cy + ry * np.cos(vv) * np.sin(uu) | |
| z = cz + rz * np.sin(vv) | |
| pts = np.column_stack([x.ravel(), y.ravel(), z.ravel()]) | |
| normals = np.column_stack([ | |
| (x - cx).ravel() / max(rx, 1e-6), | |
| (y - cy).ravel() / max(ry, 1e-6), | |
| (z - cz).ravel() / max(rz, 1e-6), | |
| ]) | |
| normals /= np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8 | |
| labels = np.full(len(pts), label) | |
| return pts, normals, labels | |
| def _sample_cylinder_surface(center, radius, length, axis: str, density: int, label: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| cx, cy, cz = center | |
| nt = max(18, density * 4) | |
| nl = max(6, density) | |
| theta = np.linspace(0, 2 * math.pi, nt, endpoint=False) | |
| line = np.linspace(-length / 2, length / 2, nl) | |
| tt, ll = np.meshgrid(theta, line) | |
| if axis == "x": | |
| x = cx + ll | |
| y = cy + radius * np.cos(tt) | |
| z = cz + radius * np.sin(tt) | |
| normals = np.column_stack([np.zeros(x.size), np.cos(tt).ravel(), np.sin(tt).ravel()]) | |
| elif axis == "y": | |
| x = cx + radius * np.cos(tt) | |
| y = cy + ll | |
| z = cz + radius * np.sin(tt) | |
| normals = np.column_stack([np.cos(tt).ravel(), np.zeros(x.size), np.sin(tt).ravel()]) | |
| else: | |
| x = cx + radius * np.cos(tt) | |
| y = cy + radius * np.sin(tt) | |
| z = cz + ll | |
| normals = np.column_stack([np.cos(tt).ravel(), np.sin(tt).ravel(), np.zeros(x.size)]) | |
| pts = np.column_stack([x.ravel(), y.ravel(), z.ravel()]) | |
| labels = np.full(len(pts), label) | |
| return pts, normals, labels | |
| def build_particle_blueprint( | |
| prompt: str, | |
| detail: int = 24, | |
| parser_mode: str = "heuristic", | |
| model_id: str | None = None, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray, PromptSpec, str]: | |
| parser_mode = (parser_mode or "heuristic").strip().lower() | |
| parser_backend = "heuristic" | |
| if parser_mode.startswith("local"): | |
| spec = parse_prompt_with_local_llm(prompt, model_id=model_id or DEFAULT_LOCAL_MODEL) | |
| parser_backend = f"local_llm:{model_id or DEFAULT_LOCAL_MODEL}" | |
| else: | |
| spec = parse_prompt(prompt) | |
| scale = SCALE_FACTORS[spec.scale] | |
| density = max(6, detail) | |
| parts = [] | |
| normals = [] | |
| labels = [] | |
| hull_len = 2.8 * scale | |
| hull_w = 1.2 * scale | |
| hull_h = 0.8 * scale | |
| if spec.hull_style == "rounded": | |
| p, n, l = _sample_ellipsoid_surface((0.0, 0.0, 0.0), (hull_len / 2, hull_w / 2, hull_h / 2), density, 0) | |
| elif spec.hull_style == "sleek": | |
| p1, n1, l1 = _sample_ellipsoid_surface((0.12 * scale, 0.0, 0.0), (hull_len / 2.3, hull_w / 2.8, hull_h / 2.6), density, 0) | |
| p2, n2, l2 = _sample_box_surface((-0.15 * scale, 0.0, -0.02 * scale), (hull_len * 0.52, hull_w * 0.5, hull_h * 0.55), density // 2, 0) | |
| p = np.vstack([p1, p2]) | |
| n = np.vstack([n1, n2]) | |
| l = np.concatenate([l1, l2]) | |
| else: | |
| p, n, l = _sample_box_surface((0.0, 0.0, 0.0), (hull_len, hull_w, hull_h), density, 0) | |
| parts.append(p) | |
| normals.append(n) | |
| labels.append(l) | |
| cockpit_center = (hull_len / 2 - hull_len * spec.cockpit_ratio * 0.8, 0.0, hull_h * 0.14) | |
| cp, cn, cl = _sample_ellipsoid_surface(cockpit_center, (hull_len * spec.cockpit_ratio, hull_w * 0.22, hull_h * 0.24), density // 2, 1) | |
| parts.append(cp) | |
| normals.append(cn) | |
| labels.append(cl) | |
| if spec.cargo_ratio > 0.16: | |
| cargo_center = (-hull_len * 0.18, 0.0, -hull_h * 0.06) | |
| cargo_size = (hull_len * spec.cargo_ratio, hull_w * 0.76, hull_h * 0.6) | |
| pp, pn, pl = _sample_box_surface(cargo_center, cargo_size, density // 2, 2) | |
| parts.append(pp) | |
| normals.append(pn) | |
| labels.append(pl) | |
| if spec.wing_span > 0: | |
| wing_length = hull_len * 0.34 | |
| wing_width = hull_w * 0.18 | |
| wing_height = hull_h * 0.08 | |
| yoff = hull_w * 0.45 + wing_width * 0.6 | |
| for side in (-1, 1): | |
| wc = (-0.1 * scale, side * yoff, -0.04 * scale) | |
| pp, pn, pl = _sample_box_surface(wc, (wing_length, wing_width, wing_height), max(6, density // 3), 3) | |
| parts.append(pp) | |
| normals.append(pn) | |
| labels.append(pl) | |
| engine_radius = 0.14 * scale if spec.object_type != "fighter" else 0.1 * scale | |
| engine_length = 0.48 * scale | |
| engine_y_positions = np.linspace(-hull_w * 0.32, hull_w * 0.32, spec.engine_count) | |
| for ypos in engine_y_positions: | |
| ec = (-hull_len / 2 + engine_length * 0.3, ypos, 0.0) | |
| pp, pn, pl = _sample_cylinder_surface(ec, engine_radius, engine_length, "x", max(6, density // 3), 4) | |
| parts.append(pp) | |
| normals.append(pn) | |
| labels.append(pl) | |
| if spec.fin_height > 0: | |
| fin_center = (-hull_len * 0.25, 0.0, hull_h * 0.42) | |
| fin_size = (hull_len * 0.18, hull_w * 0.1, hull_h * max(spec.fin_height, 0.12)) | |
| pp, pn, pl = _sample_box_surface(fin_center, fin_size, max(6, density // 3), 5) | |
| parts.append(pp) | |
| normals.append(pn) | |
| labels.append(pl) | |
| if spec.landing_gear: | |
| gear_x = np.array([-hull_len * 0.18, hull_len * 0.12]) | |
| gear_y = np.array([-hull_w * 0.28, hull_w * 0.28]) | |
| for gx in gear_x: | |
| for gy in gear_y: | |
| gc = (gx, gy, -hull_h * 0.45) | |
| pp, pn, pl = _sample_cylinder_surface(gc, 0.04 * scale, 0.22 * scale, "z", max(5, density // 5), 6) | |
| parts.append(pp) | |
| normals.append(pn) | |
| labels.append(pl) | |
| points = np.vstack(parts) | |
| point_normals = np.vstack(normals) | |
| point_labels = np.concatenate(labels) | |
| if spec.asymmetry > 0: | |
| mask = points[:, 1] > 0 | |
| points[mask, 2] += spec.asymmetry * np.sin(points[mask, 0] * 2.0) | |
| return points.astype(np.float32), point_normals.astype(np.float32), point_labels.astype(np.int32), spec, parser_backend | |
| def points_to_mesh(points: np.ndarray, pitch: float = 0.08, padding: int = 5, sigma: float = 1.2, level: float = 0.11) -> trimesh.Trimesh: | |
| mins = points.min(axis=0) - padding * pitch | |
| maxs = points.max(axis=0) + padding * pitch | |
| dims = np.ceil((maxs - mins) / pitch).astype(int) + 1 | |
| dims = np.clip(dims, 24, 192) | |
| grid = np.zeros(tuple(dims.tolist()), dtype=np.float32) | |
| coords = ((points - mins) / pitch).astype(int) | |
| coords = np.clip(coords, 0, dims - 1) | |
| np.add.at(grid, (coords[:, 0], coords[:, 1], coords[:, 2]), 1.0) | |
| grid = ndimage.gaussian_filter(grid, sigma=sigma) | |
| verts, faces, normals, _ = measure.marching_cubes(grid, level=level) | |
| verts = verts * pitch + mins | |
| mesh = trimesh.Trimesh(vertices=verts, faces=faces, vertex_normals=normals, process=True) | |
| mesh.update_faces(mesh.nondegenerate_faces()) | |
| mesh.update_faces(mesh.unique_faces()) | |
| mesh.remove_unreferenced_vertices() | |
| try: | |
| mesh.fill_holes() | |
| except Exception: | |
| pass | |
| try: | |
| trimesh.smoothing.filter_humphrey(mesh, iterations=2) | |
| except Exception: | |
| pass | |
| return mesh | |
| def export_point_cloud_as_ply(points: np.ndarray, labels: np.ndarray, path: str) -> str: | |
| colors = np.array([ | |
| [170, 170, 180], | |
| [120, 180, 255], | |
| [255, 190, 120], | |
| [180, 180, 255], | |
| [255, 120, 120], | |
| [200, 255, 180], | |
| [255, 255, 180], | |
| ], dtype=np.uint8) | |
| c = colors[labels % len(colors)] | |
| pc = trimesh.points.PointCloud(vertices=points, colors=c) | |
| pc.export(path) | |
| return path | |
| def export_mesh_as_glb(mesh: trimesh.Trimesh, path: str) -> str: | |
| mesh.visual.vertex_colors = np.tile(np.array([[185, 190, 200, 255]], dtype=np.uint8), (len(mesh.vertices), 1)) | |
| mesh.export(path) | |
| return path | |
| def run_pipeline( | |
| prompt: str, | |
| detail: int = 24, | |
| voxel_pitch: float = 0.08, | |
| parser_mode: str = "heuristic", | |
| model_id: str | None = None, | |
| ) -> BuildArtifacts: | |
| points, normals, labels, spec, parser_backend = build_particle_blueprint( | |
| prompt, | |
| detail=detail, | |
| parser_mode=parser_mode, | |
| model_id=model_id, | |
| ) | |
| mesh = points_to_mesh(points, pitch=voxel_pitch) | |
| out_dir = Path(tempfile.mkdtemp(prefix="particle_blueprint_")) | |
| ply_path = str(out_dir / "blueprint.ply") | |
| glb_path = str(out_dir / "mesh.glb") | |
| export_point_cloud_as_ply(points, labels, ply_path) | |
| export_mesh_as_glb(mesh, glb_path) | |
| summary = { | |
| "prompt": prompt, | |
| "parser_backend": parser_backend, | |
| "spec": spec.to_dict(), | |
| "point_count": int(len(points)), | |
| "vertex_count": int(len(mesh.vertices)), | |
| "face_count": int(len(mesh.faces)), | |
| "bounds": mesh.bounds.round(3).tolist(), | |
| "voxel_pitch": voxel_pitch, | |
| } | |
| return BuildArtifacts(ply_path=ply_path, glb_path=glb_path, summary=summary) | |