Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| def _build_transformer(source_crs: Any, target_crs: str | None): | |
| if not source_crs or not target_crs: | |
| return None | |
| try: | |
| from pyproj import CRS, Transformer | |
| except Exception: | |
| return None | |
| try: | |
| source = CRS.from_user_input(source_crs) | |
| target = CRS.from_user_input(target_crs) | |
| except Exception: | |
| return None | |
| if source == target: | |
| return None | |
| try: | |
| return Transformer.from_crs(source, target, always_xy=True) | |
| except Exception: | |
| return None | |
| def _field_name(field: Any) -> str: | |
| if hasattr(field, "name"): | |
| return str(getattr(field, "name") or "") | |
| if isinstance(field, (list, tuple)) and field: | |
| return str(field[0] or "") | |
| return str(field or "") | |
| def _iter_features_pyshp(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]: | |
| import shapefile | |
| from shapely.geometry import shape | |
| from shapely.ops import transform as shapely_transform | |
| shp_path = shapefile_path.with_suffix(".shp") | |
| shx_path = shapefile_path.with_suffix(".shx") | |
| dbf_path = shapefile_path.with_suffix(".dbf") | |
| def _describe_sidecars() -> str: | |
| parts: list[str] = [] | |
| for candidate in (shp_path, shx_path, dbf_path, shapefile_path.with_suffix(".prj"), shapefile_path.with_suffix(".cpg")): | |
| if candidate.exists(): | |
| try: | |
| parts.append(f"{candidate.name}={candidate.stat().st_size}") | |
| except Exception: | |
| parts.append(f"{candidate.name}=present") | |
| else: | |
| parts.append(f"{candidate.name}=missing") | |
| return ", ".join(parts) | |
| if not shp_path.exists(): | |
| raise FileNotFoundError(f"Shapefile nao encontrado: {shp_path}") | |
| handles = [] | |
| reader = None | |
| try: | |
| shp_handle = open(shp_path, "rb") | |
| handles.append(shp_handle) | |
| shx_handle = open(shx_path, "rb") if shx_path.exists() else None | |
| if shx_handle is not None: | |
| handles.append(shx_handle) | |
| dbf_handle = open(dbf_path, "rb") if dbf_path.exists() else None | |
| if dbf_handle is not None: | |
| handles.append(dbf_handle) | |
| reader = shapefile.Reader( | |
| shp=shp_handle, | |
| shx=shx_handle, | |
| dbf=dbf_handle, | |
| encoding="utf-8", | |
| encodingErrors="replace", | |
| ) | |
| field_names = [_field_name(field) for field in reader.fields[1:]] | |
| source_crs = None | |
| prj_path = shapefile_path.with_suffix(".prj") | |
| if prj_path.exists(): | |
| try: | |
| source_crs = prj_path.read_text(encoding="utf-8", errors="replace") | |
| except Exception: | |
| source_crs = prj_path.read_text(errors="replace") | |
| transformer = _build_transformer(source_crs, target_crs) | |
| features: list[tuple[dict[str, Any], Any]] = [] | |
| record_errors: list[str] = [] | |
| for index, shape_record in enumerate(reader.iterShapeRecords()): | |
| try: | |
| values = list(shape_record.record) | |
| properties = { | |
| field_name: values[field_index] if field_index < len(values) else None | |
| for field_index, field_name in enumerate(field_names) | |
| } | |
| geometry_raw = getattr(shape_record.shape, "__geo_interface__", None) | |
| geometry = None | |
| if geometry_raw and geometry_raw.get("coordinates"): | |
| geometry = shape(geometry_raw) | |
| if transformer is not None: | |
| geometry = shapely_transform(transformer.transform, geometry) | |
| features.append((properties, geometry)) | |
| except Exception as exc: | |
| if len(record_errors) < 5: | |
| record_errors.append(f"registro {index}: {exc!r}") | |
| continue | |
| if not features and record_errors: | |
| detalhe = "; ".join(record_errors) | |
| raise RuntimeError(f"nenhum registro legivel via pyshp ({detalhe})") | |
| return features | |
| except Exception as exc: | |
| raise RuntimeError( | |
| f"pyshp falhou para {shapefile_path.name}: {exc!r}; arquivos: {_describe_sidecars()}" | |
| ) from exc | |
| finally: | |
| try: | |
| if reader is not None: | |
| reader.close() | |
| except Exception: | |
| pass | |
| for handle in handles: | |
| try: | |
| handle.close() | |
| except Exception: | |
| pass | |
| def _iter_features_fiona(shapefile_path: Path, *, target_crs: str | None) -> list[tuple[dict[str, Any], Any]]: | |
| import fiona | |
| from shapely.geometry import shape | |
| from shapely.ops import transform as shapely_transform | |
| features: list[tuple[dict[str, Any], Any]] = [] | |
| with fiona.open(shapefile_path) as source: | |
| transformer = _build_transformer(source.crs_wkt or source.crs, target_crs) | |
| for feature in source: | |
| properties = dict(feature.get("properties") or {}) | |
| geometry_raw = feature.get("geometry") | |
| geometry = None | |
| if geometry_raw: | |
| geometry = shape(geometry_raw) | |
| if transformer is not None: | |
| geometry = shapely_transform(transformer.transform, geometry) | |
| features.append((properties, geometry)) | |
| return features | |
| def _load_features(shapefile_path: Path, *, target_crs: str | None = "EPSG:4326") -> list[tuple[dict[str, Any], Any]]: | |
| pyshp_error = None | |
| try: | |
| return _iter_features_pyshp(shapefile_path, target_crs=target_crs) | |
| except Exception as exc: | |
| pyshp_error = exc | |
| try: | |
| return _iter_features_fiona(shapefile_path, target_crs=target_crs) | |
| except Exception: | |
| if pyshp_error is not None: | |
| raise pyshp_error | |
| raise | |
| def load_vector_dataframe(shapefile_path: str | Path, *, target_crs: str | None = "EPSG:4326") -> pd.DataFrame: | |
| path = Path(shapefile_path).expanduser().resolve() | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Shapefile nao encontrado: {path}") | |
| rows: list[dict[str, Any]] = [] | |
| for properties, geometry in _load_features(path, target_crs=target_crs): | |
| row = dict(properties) | |
| row["geometry"] = geometry | |
| rows.append(row) | |
| return pd.DataFrame(rows) | |
| def load_vector_geojson( | |
| shapefile_path: str | Path, | |
| *, | |
| target_crs: str | None = "EPSG:4326", | |
| property_fields: tuple[str, ...] | list[str] | None = None, | |
| simplify_tolerance: float = 0.0, | |
| ) -> dict[str, Any]: | |
| from shapely.geometry import mapping | |
| path = Path(shapefile_path).expanduser().resolve() | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Shapefile nao encontrado: {path}") | |
| feature_collection: dict[str, Any] = { | |
| "type": "FeatureCollection", | |
| "features": [], | |
| } | |
| wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip()) | |
| for properties_raw, geometry in _load_features(path, target_crs=target_crs): | |
| if geometry is None: | |
| continue | |
| if simplify_tolerance: | |
| geometry = geometry.simplify(float(simplify_tolerance), preserve_topology=True) | |
| properties = ( | |
| {field: properties_raw.get(field) for field in wanted_fields if field in properties_raw} | |
| if wanted_fields | |
| else dict(properties_raw) | |
| ) | |
| feature_collection["features"].append( | |
| { | |
| "type": "Feature", | |
| "properties": properties, | |
| "geometry": mapping(geometry), | |
| } | |
| ) | |
| return feature_collection | |
| def load_attribute_records( | |
| shapefile_path: str | Path, | |
| *, | |
| property_fields: tuple[str, ...] | list[str] | None = None, | |
| ) -> list[dict[str, Any]]: | |
| path = Path(shapefile_path).expanduser().resolve() | |
| dbf_path = path.with_suffix(".dbf") | |
| if not dbf_path.exists(): | |
| raise FileNotFoundError(f"DBF nao encontrado: {dbf_path}") | |
| wanted_fields = tuple(str(field).strip() for field in (property_fields or ()) if str(field).strip()) | |
| try: | |
| import shapefile | |
| except Exception: | |
| records: list[dict[str, Any]] = [] | |
| for properties, _ in _load_features(path, target_crs=None): | |
| if wanted_fields: | |
| records.append({field: properties.get(field) for field in wanted_fields if field in properties}) | |
| else: | |
| records.append(dict(properties)) | |
| return records | |
| with open(dbf_path, "rb") as dbf_handle: | |
| reader = shapefile.Reader(dbf=dbf_handle, encoding="utf-8", encodingErrors="replace") | |
| field_names = [_field_name(field) for field in reader.fields[1:]] | |
| records: list[dict[str, Any]] = [] | |
| for record in reader.iterRecords(): | |
| values = list(record) | |
| row = { | |
| field_name: values[index] if index < len(values) else None | |
| for index, field_name in enumerate(field_names) | |
| } | |
| if wanted_fields: | |
| row = {field: row.get(field) for field in wanted_fields if field in row} | |
| records.append(row) | |
| return records | |