from __future__ import annotations from pathlib import Path from typing import Any import pandas as pd def _build_transformer(source_crs: Any, target_crs: str | None): if not source_crs or not target_crs: return None try: from pyproj import CRS, Transformer except Exception: return None try: source = CRS.from_user_input(source_crs) target = CRS.from_user_input(target_crs) except Exception: return None if source == target: return None try: return Transformer.from_crs(source, target, always_xy=True) except Exception: return None def _field_name(field: Any) -> str: if hasattr(field, "name"): return str(getattr(field, "name") or "") if isinstance(field, (list, tuple)) and field: return str(field[0] or "") return str(field or "") def _iter_features_pyshp(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]: import shapefile from shapely.geometry import shape from shapely.ops import transform as shapely_transform shp_path = shapefile_path.with_suffix(".shp") shx_path = shapefile_path.with_suffix(".shx") dbf_path = shapefile_path.with_suffix(".dbf") def _describe_sidecars() -> str: parts: list[str] = [] for candidate in (shp_path, shx_path, dbf_path, shapefile_path.with_suffix(".prj"), shapefile_path.with_suffix(".cpg")): if candidate.exists(): try: parts.append(f"{candidate.name}={candidate.stat().st_size}") except Exception: parts.append(f"{candidate.name}=present") else: parts.append(f"{candidate.name}=missing") return ", ".join(parts) if not shp_path.exists(): raise FileNotFoundError(f"Shapefile nao encontrado: {shp_path}") handles = [] reader = None try: shp_handle = open(shp_path, "rb") handles.append(shp_handle) shx_handle = open(shx_path, "rb") if shx_path.exists() else None if shx_handle is not None: handles.append(shx_handle) dbf_handle = open(dbf_path, "rb") if dbf_path.exists() else None if dbf_handle is not None: handles.append(dbf_handle) reader = shapefile.Reader( shp=shp_handle, shx=shx_handle, dbf=dbf_handle, encoding="utf-8", encodingErrors="replace", ) field_names = [_field_name(field) for field in reader.fields[1:]] source_crs = None prj_path = shapefile_path.with_suffix(".prj") if prj_path.exists(): try: source_crs = prj_path.read_text(encoding="utf-8", errors="replace") except Exception: source_crs = prj_path.read_text(errors="replace") transformer = _build_transformer(source_crs, target_crs) features: list[tuple[dict[str, Any], Any]] = [] record_errors: list[str] = [] for index, shape_record in enumerate(reader.iterShapeRecords()): try: values = list(shape_record.record) properties = { field_name: values[field_index] if field_index < len(values) else None for field_index, field_name in enumerate(field_names) } geometry_raw = getattr(shape_record.shape, "__geo_interface__", None) geometry = None if geometry_raw and geometry_raw.get("coordinates"): geometry = shape(geometry_raw) if transformer is not None: geometry = shapely_transform(transformer.transform, geometry) features.append((properties, geometry)) except Exception as exc: if len(record_errors) < 5: record_errors.append(f"registro {index}: {exc!r}") continue if not features and record_errors: detalhe = "; ".join(record_errors) raise RuntimeError(f"nenhum registro legivel via pyshp ({detalhe})") return features except Exception as exc: raise RuntimeError( f"pyshp falhou para {shapefile_path.name}: {exc!r}; arquivos: {_describe_sidecars()}" ) from exc finally: try: if reader is not None: reader.close() except Exception: pass for handle in handles: try: handle.close() except Exception: pass def _iter_features_fiona(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]: import fiona from shapely.geometry import shape from shapely.ops import transform as shapely_transform features: list[tuple[dict[str, Any], Any]] = [] with fiona.open(shapefile_path) as source: transformer = _build_transformer(source.crs_wkt or source.crs, target_crs) for feature in source: properties = dict(feature.get("properties") or {}) geometry_raw = feature.get("geometry") geometry = None if geometry_raw: geometry = shape(geometry_raw) if transformer is not None: geometry = shapely_transform(transformer.transform, geometry) features.append((properties, geometry)) return features def _load_features(shapefile_path: Path, *, target_crs: str | None = "EPSG:4326") -> list[tuple[dict[str, Any], Any]]: pyshp_error = None try: return _iter_features_pyshp(shapefile_path, target_crs=target_crs) except Exception as exc: pyshp_error = exc try: return _iter_features_fiona(shapefile_path, target_crs=target_crs) except Exception: if pyshp_error is not None: raise pyshp_error raise def load_vector_dataframe(shapefile_path: str | Path, *, target_crs: str | None = "EPSG:4326") -> pd.DataFrame: path = Path(shapefile_path).expanduser().resolve() if not path.exists(): raise FileNotFoundError(f"Shapefile nao encontrado: {path}") rows: list[dict[str, Any]] = [] for properties, geometry in _load_features(path, target_crs=target_crs): row = dict(properties) row["geometry"] = geometry rows.append(row) return pd.DataFrame(rows) def load_vector_geojson( shapefile_path: str | Path, *, target_crs: str | None = "EPSG:4326", property_fields: tuple[str, ...] | list[str] | None = None, simplify_tolerance: float = 0.0, ) -> dict[str, Any]: from shapely.geometry import mapping path = Path(shapefile_path).expanduser().resolve() if not path.exists(): raise FileNotFoundError(f"Shapefile nao encontrado: {path}") feature_collection: dict[str, Any] = { "type": "FeatureCollection", "features": [], } wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip()) for properties_raw, geometry in _load_features(path, target_crs=target_crs): if geometry is None: continue if simplify_tolerance: geometry = geometry.simplify(float(simplify_tolerance), preserve_topology=True) properties = ( {field: properties_raw.get(field) for field in wanted_fields if field in properties_raw} if wanted_fields else dict(properties_raw) ) feature_collection["features"].append( { "type": "Feature", "properties": properties, "geometry": mapping(geometry), } ) return feature_collection def load_attribute_records( shapefile_path: str | Path, *, property_fields: tuple[str, ...] | list[str] | None = None, ) -> list[dict[str, Any]]: path = Path(shapefile_path).expanduser().resolve() dbf_path = path.with_suffix(".dbf") if not dbf_path.exists(): raise FileNotFoundError(f"DBF nao encontrado: {dbf_path}") wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip()) try: import shapefile except Exception: records: list[dict[str, Any]] = [] for properties, _ in _load_features(path, target_crs=None): if wanted_fields: records.append({field: properties.get(field) for field in wanted_fields if field in properties}) else: records.append(dict(properties)) return records with open(dbf_path, "rb") as dbf_handle: reader = shapefile.Reader(dbf=dbf_handle, encoding="utf-8", encodingErrors="replace") field_names = [_field_name(field) for field in reader.fields[1:]] records: list[dict[str, Any]] = [] for record in reader.iterRecords(): values = list(record) row = { field_name: values[index] if index < len(values) else None for index, field_name in enumerate(field_names) } if wanted_fields: row = {field: row.get(field) for field in wanted_fields if field in row} records.append(row) return records