mesa-react / backend /app /core /shapefile_runtime.py
Guilherme Silberfarb Costa
Speed up logradouro catalog loading
4678b81
from __future__ import annotations
from pathlib import Path
from typing import Any
import pandas as pd
def _build_transformer(source_crs: Any, target_crs: str | None):
if not source_crs or not target_crs:
return None
try:
from pyproj import CRS, Transformer
except Exception:
return None
try:
source = CRS.from_user_input(source_crs)
target = CRS.from_user_input(target_crs)
except Exception:
return None
if source == target:
return None
try:
return Transformer.from_crs(source, target, always_xy=True)
except Exception:
return None
def _field_name(field: Any) -> str:
if hasattr(field, "name"):
return str(getattr(field, "name") or "")
if isinstance(field, (list, tuple)) and field:
return str(field[0] or "")
return str(field or "")
def _iter_features_pyshp(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]:
import shapefile
from shapely.geometry import shape
from shapely.ops import transform as shapely_transform
shp_path = shapefile_path.with_suffix(".shp")
shx_path = shapefile_path.with_suffix(".shx")
dbf_path = shapefile_path.with_suffix(".dbf")
def _describe_sidecars() -> str:
parts: list[str] = []
for candidate in (shp_path, shx_path, dbf_path, shapefile_path.with_suffix(".prj"), shapefile_path.with_suffix(".cpg")):
if candidate.exists():
try:
parts.append(f"{candidate.name}={candidate.stat().st_size}")
except Exception:
parts.append(f"{candidate.name}=present")
else:
parts.append(f"{candidate.name}=missing")
return ", ".join(parts)
if not shp_path.exists():
raise FileNotFoundError(f"Shapefile nao encontrado: {shp_path}")
handles = []
reader = None
try:
shp_handle = open(shp_path, "rb")
handles.append(shp_handle)
shx_handle = open(shx_path, "rb") if shx_path.exists() else None
if shx_handle is not None:
handles.append(shx_handle)
dbf_handle = open(dbf_path, "rb") if dbf_path.exists() else None
if dbf_handle is not None:
handles.append(dbf_handle)
reader = shapefile.Reader(
shp=shp_handle,
shx=shx_handle,
dbf=dbf_handle,
encoding="utf-8",
encodingErrors="replace",
)
field_names = [_field_name(field) for field in reader.fields[1:]]
source_crs = None
prj_path = shapefile_path.with_suffix(".prj")
if prj_path.exists():
try:
source_crs = prj_path.read_text(encoding="utf-8", errors="replace")
except Exception:
source_crs = prj_path.read_text(errors="replace")
transformer = _build_transformer(source_crs, target_crs)
features: list[tuple[dict[str, Any], Any]] = []
record_errors: list[str] = []
for index, shape_record in enumerate(reader.iterShapeRecords()):
try:
values = list(shape_record.record)
properties = {
field_name: values[field_index] if field_index < len(values) else None
for field_index, field_name in enumerate(field_names)
}
geometry_raw = getattr(shape_record.shape, "__geo_interface__", None)
geometry = None
if geometry_raw and geometry_raw.get("coordinates"):
geometry = shape(geometry_raw)
if transformer is not None:
geometry = shapely_transform(transformer.transform, geometry)
features.append((properties, geometry))
except Exception as exc:
if len(record_errors) < 5:
record_errors.append(f"registro {index}: {exc!r}")
continue
if not features and record_errors:
detalhe = "; ".join(record_errors)
raise RuntimeError(f"nenhum registro legivel via pyshp ({detalhe})")
return features
except Exception as exc:
raise RuntimeError(
f"pyshp falhou para {shapefile_path.name}: {exc!r}; arquivos: {_describe_sidecars()}"
) from exc
finally:
try:
if reader is not None:
reader.close()
except Exception:
pass
for handle in handles:
try:
handle.close()
except Exception:
pass
def _iter_features_fiona(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]:
import fiona
from shapely.geometry import shape
from shapely.ops import transform as shapely_transform
features: list[tuple[dict[str, Any], Any]] = []
with fiona.open(shapefile_path) as source:
transformer = _build_transformer(source.crs_wkt or source.crs, target_crs)
for feature in source:
properties = dict(feature.get("properties") or {})
geometry_raw = feature.get("geometry")
geometry = None
if geometry_raw:
geometry = shape(geometry_raw)
if transformer is not None:
geometry = shapely_transform(transformer.transform, geometry)
features.append((properties, geometry))
return features
def _load_features(shapefile_path: Path, *, target_crs: str | None = "EPSG:4326") -> list[tuple[dict[str, Any], Any]]:
pyshp_error = None
try:
return _iter_features_pyshp(shapefile_path, target_crs=target_crs)
except Exception as exc:
pyshp_error = exc
try:
return _iter_features_fiona(shapefile_path, target_crs=target_crs)
except Exception:
if pyshp_error is not None:
raise pyshp_error
raise
def load_vector_dataframe(shapefile_path: str | Path, *, target_crs: str | None = "EPSG:4326") -> pd.DataFrame:
path = Path(shapefile_path).expanduser().resolve()
if not path.exists():
raise FileNotFoundError(f"Shapefile nao encontrado: {path}")
rows: list[dict[str, Any]] = []
for properties, geometry in _load_features(path, target_crs=target_crs):
row = dict(properties)
row["geometry"] = geometry
rows.append(row)
return pd.DataFrame(rows)
def load_vector_geojson(
shapefile_path: str | Path,
*,
target_crs: str | None = "EPSG:4326",
property_fields: tuple[str, ...] | list[str] | None = None,
simplify_tolerance: float = 0.0,
) -> dict[str, Any]:
from shapely.geometry import mapping
path = Path(shapefile_path).expanduser().resolve()
if not path.exists():
raise FileNotFoundError(f"Shapefile nao encontrado: {path}")
feature_collection: dict[str, Any] = {
"type": "FeatureCollection",
"features": [],
}
wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip())
for properties_raw, geometry in _load_features(path, target_crs=target_crs):
if geometry is None:
continue
if simplify_tolerance:
geometry = geometry.simplify(float(simplify_tolerance), preserve_topology=True)
properties = (
{field: properties_raw.get(field) for field in wanted_fields if field in properties_raw}
if wanted_fields
else dict(properties_raw)
)
feature_collection["features"].append(
{
"type": "Feature",
"properties": properties,
"geometry": mapping(geometry),
}
)
return feature_collection
def load_attribute_records(
shapefile_path: str | Path,
*,
property_fields: tuple[str, ...] | list[str] | None = None,
) -> list[dict[str, Any]]:
path = Path(shapefile_path).expanduser().resolve()
dbf_path = path.with_suffix(".dbf")
if not dbf_path.exists():
raise FileNotFoundError(f"DBF nao encontrado: {dbf_path}")
wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip())
try:
import shapefile
except Exception:
records: list[dict[str, Any]] = []
for properties, _ in _load_features(path, target_crs=None):
if wanted_fields:
records.append({field: properties.get(field) for field in wanted_fields if field in properties})
else:
records.append(dict(properties))
return records
with open(dbf_path, "rb") as dbf_handle:
reader = shapefile.Reader(dbf=dbf_handle, encoding="utf-8", encodingErrors="replace")
field_names = [_field_name(field) for field in reader.fields[1:]]
records: list[dict[str, Any]] = []
for record in reader.iterRecords():
values = list(record)
row = {
field_name: values[index] if index < len(values) else None
for index, field_name in enumerate(field_names)
}
if wanted_fields:
row = {field: row.get(field) for field in wanted_fields if field in row}
records.append(row)
return records