|
|
"""DXF file handling utilities for importing and exporting geometry. |
|
|
|
|
|
Based on REMB_Production_Final_Reliable.ipynb converter logic. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import ezdxf |
|
|
from shapely.geometry import Polygon, mapping, LineString |
|
|
from shapely.ops import unary_union, polygonize |
|
|
from typing import Optional, List, Tuple |
|
|
import io |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def load_boundary_from_dxf(dxf_content: bytes) -> Optional[Polygon]: |
|
|
""" |
|
|
Load site boundary from DXF file content. |
|
|
|
|
|
Uses the same logic as notebook's load_boundary_from_dxf function: |
|
|
- Looks for closed LWPOLYLINE entities |
|
|
- Returns the largest valid polygon found |
|
|
|
|
|
Args: |
|
|
dxf_content: Bytes content of DXF file |
|
|
|
|
|
Returns: |
|
|
Shapely Polygon or None if no valid boundary found |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
doc = None |
|
|
tmp_path = None |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', suffix='.dxf', delete=False) as tmp: |
|
|
tmp.write(dxf_content) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
|
|
|
doc = ezdxf.readfile(tmp_path) |
|
|
logger.info("Successfully loaded DXF using tempfile method") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Tempfile method failed: {e}, trying stream methods") |
|
|
|
|
|
|
|
|
encodings = ['utf-8', 'latin-1', 'cp1252', 'utf-16'] |
|
|
|
|
|
for encoding in encodings: |
|
|
try: |
|
|
text_content = dxf_content.decode(encoding) |
|
|
text_stream = io.StringIO(text_content) |
|
|
doc = ezdxf.read(text_stream) |
|
|
logger.info(f"Successfully loaded DXF with {encoding} encoding") |
|
|
break |
|
|
except (UnicodeDecodeError, AttributeError): |
|
|
continue |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
if doc is None: |
|
|
try: |
|
|
dxf_stream = io.BytesIO(dxf_content) |
|
|
doc = ezdxf.read(dxf_stream) |
|
|
logger.info("Successfully loaded DXF in binary format") |
|
|
except Exception as final_error: |
|
|
logger.error(f"Failed to load DXF in any format: {final_error}") |
|
|
return None |
|
|
finally: |
|
|
|
|
|
if tmp_path and os.path.exists(tmp_path): |
|
|
try: |
|
|
os.unlink(tmp_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
if doc is None: |
|
|
return None |
|
|
|
|
|
msp = doc.modelspace() |
|
|
|
|
|
largest = None |
|
|
max_area = 0.0 |
|
|
|
|
|
|
|
|
for entity in msp: |
|
|
if entity.dxftype() == 'LWPOLYLINE' and entity.is_closed: |
|
|
try: |
|
|
|
|
|
pts = list(entity.get_points(format='xy')) |
|
|
|
|
|
if len(pts) >= 3: |
|
|
poly = Polygon(pts) |
|
|
|
|
|
if poly.is_valid and poly.area > max_area: |
|
|
max_area = poly.area |
|
|
largest = poly |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to process LWPOLYLINE: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if not largest: |
|
|
for entity in msp.query('POLYLINE'): |
|
|
if entity.is_closed: |
|
|
try: |
|
|
points = list(entity.get_points()) |
|
|
if len(points) >= 3: |
|
|
coords = [(p[0], p[1]) for p in points] |
|
|
poly = Polygon(coords) |
|
|
|
|
|
if poly.is_valid and poly.area > max_area: |
|
|
max_area = poly.area |
|
|
largest = poly |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to process POLYLINE: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if not largest: |
|
|
try: |
|
|
from shapely.ops import polygonize, unary_union |
|
|
from shapely.geometry import MultiLineString |
|
|
|
|
|
lines = list(msp.query('LINE')) |
|
|
if lines: |
|
|
logger.info(f"Attempting to build polygon from {len(lines)} LINE entities") |
|
|
|
|
|
|
|
|
line_segments = [] |
|
|
for line in lines: |
|
|
start = (line.dxf.start.x, line.dxf.start.y) |
|
|
end = (line.dxf.end.x, line.dxf.end.y) |
|
|
line_segments.append(LineString([start, end])) |
|
|
|
|
|
|
|
|
polygons = list(polygonize(line_segments)) |
|
|
|
|
|
if polygons: |
|
|
logger.info(f"Found {len(polygons)} polygons from LINE entities") |
|
|
|
|
|
|
|
|
for poly in polygons: |
|
|
if poly.is_valid and poly.area > max_area: |
|
|
max_area = poly.area |
|
|
largest = poly |
|
|
else: |
|
|
logger.warning("Could not create polygons from LINE entities") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to process LINE entities: {e}") |
|
|
|
|
|
if largest: |
|
|
|
|
|
from shapely import affinity |
|
|
largest = affinity.scale(largest, xfact=1.5, yfact=1.5, origin='centroid') |
|
|
logger.info(f"Boundary loaded and scaled 1.5x: {largest.area/10000:.2f} ha") |
|
|
return largest |
|
|
|
|
|
logger.warning("No valid closed polylines found in DXF") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error loading DXF: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def export_to_dxf(geometries: List[dict], output_type: str = 'final') -> bytes: |
|
|
""" |
|
|
Export geometries to DXF format. |
|
|
|
|
|
Args: |
|
|
geometries: List of geometry dicts with 'geometry' and 'properties' |
|
|
output_type: Type of output ('stage1', 'stage2', 'final') |
|
|
|
|
|
Returns: |
|
|
DXF file content as bytes |
|
|
""" |
|
|
try: |
|
|
|
|
|
doc = ezdxf.new('R2010') |
|
|
msp = doc.modelspace() |
|
|
|
|
|
|
|
|
doc.layers.add('BLOCKS', color=5) |
|
|
doc.layers.add('LOTS', color=3) |
|
|
doc.layers.add('PARKS', color=2) |
|
|
doc.layers.add('SERVICE', color=4) |
|
|
doc.layers.add('ROADS', color=8) |
|
|
doc.layers.add('INFRASTRUCTURE', color=1) |
|
|
|
|
|
|
|
|
for item in geometries: |
|
|
geom = item.get('geometry') |
|
|
props = item.get('properties', {}) |
|
|
geom_type = props.get('type', 'lot') |
|
|
|
|
|
|
|
|
layer_map = { |
|
|
'block': 'BLOCKS', |
|
|
'park': 'PARKS', |
|
|
'service': 'SERVICE', |
|
|
'xlnt': 'SERVICE', |
|
|
'road_network': 'ROADS', |
|
|
'connection': 'INFRASTRUCTURE', |
|
|
'transformer': 'INFRASTRUCTURE', |
|
|
'drainage': 'INFRASTRUCTURE', |
|
|
'lot': 'LOTS', |
|
|
'setback': 'LOTS' |
|
|
} |
|
|
layer = layer_map.get(geom_type, 'LOTS') |
|
|
|
|
|
|
|
|
if geom and 'coordinates' in geom: |
|
|
coords = geom['coordinates'] |
|
|
geom_geom_type = geom.get('type', 'Polygon') |
|
|
|
|
|
|
|
|
if geom_geom_type == 'Point': |
|
|
|
|
|
if isinstance(coords, list) and len(coords) >= 2: |
|
|
msp.add_circle( |
|
|
center=(coords[0], coords[1]), |
|
|
radius=2.0, |
|
|
dxfattribs={'layer': layer} |
|
|
) |
|
|
|
|
|
elif geom_geom_type == 'LineString': |
|
|
|
|
|
if isinstance(coords, list) and len(coords) > 0: |
|
|
if all(isinstance(p, (list, tuple)) and len(p) >= 2 for p in coords): |
|
|
points_2d = [(p[0], p[1]) for p in coords] |
|
|
if len(points_2d) >= 2: |
|
|
msp.add_lwpolyline( |
|
|
points_2d, |
|
|
dxfattribs={'layer': layer, 'closed': False} |
|
|
) |
|
|
|
|
|
elif geom_geom_type == 'Polygon': |
|
|
|
|
|
if isinstance(coords, list) and len(coords) > 0: |
|
|
points = coords[0] if isinstance(coords[0], list) else coords |
|
|
|
|
|
|
|
|
if isinstance(points, list) and len(points) >= 3: |
|
|
if all(isinstance(p, (list, tuple)) and len(p) >= 2 for p in points): |
|
|
points_2d = [(p[0], p[1]) for p in points] |
|
|
|
|
|
|
|
|
msp.add_lwpolyline( |
|
|
points_2d, |
|
|
dxfattribs={ |
|
|
'layer': layer, |
|
|
'closed': True |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
stream = io.StringIO() |
|
|
doc.write(stream, fmt='asc') |
|
|
return stream.getvalue().encode('utf-8') |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error exporting DXF: {e}") |
|
|
return b'' |
|
|
|
|
|
|
|
|
def validate_dxf(dxf_content: bytes) -> Tuple[bool, str]: |
|
|
""" |
|
|
Validate DXF file and return status. |
|
|
|
|
|
Args: |
|
|
dxf_content: DXF file bytes |
|
|
|
|
|
Returns: |
|
|
(is_valid, message) |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
doc = None |
|
|
tmp_path = None |
|
|
|
|
|
try: |
|
|
with tempfile.NamedTemporaryFile(mode='wb', suffix='.dxf', delete=False) as tmp: |
|
|
tmp.write(dxf_content) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
doc = ezdxf.readfile(tmp_path) |
|
|
|
|
|
except Exception: |
|
|
|
|
|
encodings = ['utf-8', 'latin-1', 'cp1252', 'utf-16'] |
|
|
|
|
|
for encoding in encodings: |
|
|
try: |
|
|
text_content = dxf_content.decode(encoding) |
|
|
text_stream = io.StringIO(text_content) |
|
|
doc = ezdxf.read(text_stream) |
|
|
break |
|
|
except (UnicodeDecodeError, AttributeError, Exception): |
|
|
continue |
|
|
|
|
|
if doc is None: |
|
|
try: |
|
|
dxf_stream = io.BytesIO(dxf_content) |
|
|
doc = ezdxf.read(dxf_stream) |
|
|
except Exception as e: |
|
|
return False, f"Failed to parse DXF: {str(e)}" |
|
|
finally: |
|
|
if tmp_path and os.path.exists(tmp_path): |
|
|
try: |
|
|
os.unlink(tmp_path) |
|
|
except: |
|
|
pass |
|
|
msp = doc.modelspace() |
|
|
|
|
|
|
|
|
lwpolylines = sum(1 for e in msp if e.dxftype() == 'LWPOLYLINE') |
|
|
polylines = len(list(msp.query('POLYLINE'))) |
|
|
lines = len(list(msp.query('LINE'))) |
|
|
|
|
|
total_entities = lwpolylines + polylines + lines |
|
|
|
|
|
if total_entities == 0: |
|
|
return False, "No polylines or lines found in DXF" |
|
|
|
|
|
|
|
|
closed_count = sum(1 for e in msp if e.dxftype() == 'LWPOLYLINE' and e.is_closed) |
|
|
|
|
|
msg = f"Valid DXF: {lwpolylines} LWPOLYLINE ({closed_count} closed), {polylines} POLYLINE, {lines} LINE" |
|
|
return True, msg |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Invalid DXF: {str(e)}" |
|
|
|