| from __future__ import annotations |
|
|
| from collections import Counter |
| from pathlib import Path |
| from typing import Iterable |
|
|
| from .geometry import bbox_local, centroid_local, polygon_area_local, polygon_perimeter_local |
| from .models import BoundaryCandidate, SiteSelection |
|
|
| UNIT_CODES = { |
| "0": "unitless", |
| "4": "millimeters", |
| "6": "meters", |
| } |
|
|
| BOUNDARY_HINTS = ("site boundary", "site boundry", "boundary", "boundry", "plot", "property") |
| CONTEXT_HINTS = { |
| "road": "roads/access", |
| "surround": "surroundings", |
| "building": "existing building", |
| "built": "built-up", |
| "veget": "vegetation", |
| "tree": "vegetation", |
| "sea": "water/sea", |
| "water": "water/sea", |
| "contour": "contours", |
| } |
|
|
|
|
| def parse_dxf(path: str | Path) -> dict[str, object]: |
| source = Path(path) |
| pairs = _read_pairs(source) |
| unit = _detect_unit(pairs) |
| scale = 0.001 if unit == "millimeters" else 1.0 |
| layers = _extract_layers(pairs) |
| polylines = _extract_lwpolylines(pairs) |
| boundary_candidates: list[BoundaryCandidate] = [] |
| layer_counts: Counter[str] = Counter() |
| context_layers: Counter[str] = Counter() |
|
|
| for layer, closed, raw_points in polylines: |
| layer_counts[layer] += 1 |
| category = _context_category(layer) |
| if category: |
| context_layers[category] += 1 |
| points_m = [(x * scale, y * scale) for x, y in raw_points] |
| if _is_boundary_layer(layer) and len(points_m) >= 3: |
| closed_points = points_m if points_m[0] == points_m[-1] else points_m + [points_m[0]] |
| area = polygon_area_local(closed_points) |
| perimeter = polygon_perimeter_local(closed_points) |
| if area <= 0: |
| continue |
| boundary_candidates.append( |
| BoundaryCandidate( |
| id=f"{source.stem}:{len(boundary_candidates) + 1}", |
| source_file=source.name, |
| layer_name=layer, |
| unit=unit, |
| area_sqm=area, |
| perimeter_m=perimeter, |
| vertex_count=len(points_m), |
| is_closed=closed or points_m[0] == points_m[-1], |
| bbox=bbox_local(points_m), |
| confidence_reason="Layer name matched a boundary/site hint.", |
| points_m=points_m, |
| ) |
| ) |
| boundary_candidates.sort(key=lambda c: c.area_sqm, reverse=True) |
| return { |
| "source_file": source.name, |
| "unit": unit, |
| "layers": sorted(layers), |
| "layer_counts": dict(layer_counts), |
| "context_layers": dict(context_layers), |
| "boundary_candidates": boundary_candidates, |
| } |
|
|
|
|
| def candidate_to_selection(candidate: BoundaryCandidate, anchor_lat: float, anchor_lon: float) -> SiteSelection: |
| return SiteSelection( |
| id=f"S-{candidate.id}", |
| selection_type="dxf_boundary", |
| coordinate_mode="anchored_local_cad", |
| geometry_geojson=None, |
| local_geometry=candidate.points_m, |
| anchor_lat=anchor_lat, |
| anchor_lon=anchor_lon, |
| radius_m=None, |
| area_sqm=candidate.area_sqm, |
| perimeter_m=candidate.perimeter_m, |
| centroid=centroid_local(candidate.points_m), |
| bbox=candidate.bbox, |
| unit_source=f"DXF {candidate.unit}", |
| accuracy_label="CAD local boundary with public-data anchor", |
| source_files=[candidate.source_file], |
| selected_boundary_id=candidate.id, |
| limitations=[ |
| "CAD geometry uses local drawing coordinates unless georeferenced.", |
| "Public climate, sun, and OSM data are based on the anchor point, not exact CAD georeferencing.", |
| "CAD boundary is only as reliable as the uploaded drawing.", |
| ], |
| ) |
|
|
|
|
| def _read_pairs(path: Path) -> list[tuple[str, str]]: |
| lines = path.read_text(errors="ignore").splitlines() |
| return [ |
| (lines[index].strip(), lines[index + 1].strip()) |
| for index in range(0, len(lines) - 1, 2) |
| ] |
|
|
|
|
| def _detect_unit(pairs: list[tuple[str, str]]) -> str: |
| for idx, (code, value) in enumerate(pairs): |
| if code == "9" and value == "$INSUNITS": |
| for c2, v2 in pairs[idx + 1 : idx + 8]: |
| if c2 == "70": |
| return UNIT_CODES.get(v2.strip(), f"unit-code-{v2.strip()}") |
| return "unknown" |
|
|
|
|
| def _extract_layers(pairs: list[tuple[str, str]]) -> set[str]: |
| layers = set() |
| for idx, (code, value) in enumerate(pairs): |
| if code == "0" and value == "LAYER": |
| for c2, v2 in pairs[idx : idx + 30]: |
| if c2 == "2": |
| layers.add(v2) |
| break |
| return layers |
|
|
|
|
| def _extract_lwpolylines(pairs: list[tuple[str, str]]) -> Iterable[tuple[str, bool, list[tuple[float, float]]]]: |
| index = 0 |
| while index < len(pairs): |
| code, value = pairs[index] |
| if code == "0" and value == "LWPOLYLINE": |
| layer = "0" |
| closed = False |
| xs: list[float] = [] |
| ys: list[float] = [] |
| index += 1 |
| while index < len(pairs) and pairs[index][0] != "0": |
| c2, v2 = pairs[index] |
| if c2 == "8": |
| layer = v2 |
| elif c2 == "70": |
| try: |
| closed = bool(int(v2.strip()) & 1) |
| except ValueError: |
| closed = False |
| elif c2 == "10": |
| try: |
| xs.append(float(v2)) |
| except ValueError: |
| pass |
| elif c2 == "20": |
| try: |
| ys.append(float(v2)) |
| except ValueError: |
| pass |
| index += 1 |
| yield layer, closed, list(zip(xs, ys)) |
| continue |
| index += 1 |
|
|
|
|
| def _is_boundary_layer(layer_name: str) -> bool: |
| lower = layer_name.lower() |
| return any(hint in lower for hint in BOUNDARY_HINTS) |
|
|
|
|
| def _context_category(layer_name: str) -> str | None: |
| lower = layer_name.lower() |
| for hint, category in CONTEXT_HINTS.items(): |
| if hint in lower: |
| return category |
| return None |
|
|