| from __future__ import annotations |
|
|
| import io |
| import html |
| import re |
| import struct |
| import tempfile |
| import zipfile |
| from dataclasses import dataclass |
| from pathlib import Path |
| from xml.etree import ElementTree as ET |
|
|
| import numpy as np |
|
|
|
|
| class KmlToTabError(RuntimeError): |
| """Raised when a KML file cannot be converted to MapInfo TAB.""" |
|
|
|
|
| @dataclass(frozen=True) |
| class TabLayerSummary: |
| name: str |
| geometry_type: str |
| feature_count: int |
|
|
|
|
| @dataclass(frozen=True) |
| class TabConversionResult: |
| zip_bytes: bytes |
| layers: list[TabLayerSummary] |
| files: list[str] |
|
|
|
|
| _WKB_GEOMETRY_TYPES = { |
| 1: "Point", |
| 2: "LineString", |
| 3: "Polygon", |
| 4: "MultiPoint", |
| 5: "MultiLineString", |
| 6: "MultiPolygon", |
| 7: "GeometryCollection", |
| } |
|
|
|
|
| def convert_kml_to_tab_zip(kml_bytes: bytes, source_name: str) -> TabConversionResult: |
| """Convert one KML file to native MapInfo TAB tables packed into a ZIP.""" |
|
|
| if not kml_bytes: |
| raise KmlToTabError("Le fichier KML est vide.") |
|
|
| pyogrio = _import_pyogrio() |
| base_name = _safe_stem(source_name) |
|
|
| with tempfile.TemporaryDirectory(prefix="kml_to_tab_") as tmp_dir_name: |
| tmp_dir = Path(tmp_dir_name) |
| input_path = tmp_dir / f"{base_name}.kml" |
| output_dir = tmp_dir / "tab_output" |
| output_dir.mkdir() |
| input_path.write_bytes(kml_bytes) |
|
|
| try: |
| metadata, _fids, geometry, field_data = pyogrio.raw.read( |
| input_path, |
| force_2d=True, |
| ) |
| except Exception as exc: |
| raise KmlToTabError(f"Lecture du KML impossible: {exc}") from exc |
|
|
| if geometry is None or len(geometry) == 0: |
| raise KmlToTabError("Aucune geometrie exploitable trouvee dans le KML.") |
|
|
| fields, field_data = _append_kml_fields( |
| metadata.get("fields"), |
| field_data, |
| kml_bytes, |
| len(geometry), |
| ) |
| crs = metadata.get("crs") or "EPSG:4326" |
| geometry_groups = _group_geometry_indexes(geometry) |
| layers: list[TabLayerSummary] = [] |
|
|
| for geometry_type, indexes in geometry_groups.items(): |
| layer_name = _layer_name(base_name, geometry_type, len(geometry_groups)) |
| output_path = output_dir / f"{layer_name}.tab" |
| layer_geometry = geometry[indexes] |
| layer_field_data = [column[indexes] for column in field_data] |
|
|
| try: |
| pyogrio.raw.write( |
| output_path, |
| layer_geometry, |
| layer_field_data, |
| fields, |
| driver="MapInfo File", |
| geometry_type=geometry_type, |
| crs=crs, |
| encoding=metadata.get("encoding") or "UTF-8", |
| ) |
| except Exception as exc: |
| raise KmlToTabError( |
| f"Ecriture du TAB impossible pour la couche {layer_name}: {exc}" |
| ) from exc |
|
|
| layers.append( |
| TabLayerSummary( |
| name=layer_name, |
| geometry_type=geometry_type, |
| feature_count=len(indexes), |
| ) |
| ) |
|
|
| files = sorted(path.name for path in output_dir.iterdir() if path.is_file()) |
| return TabConversionResult( |
| zip_bytes=_zip_directory(output_dir), |
| layers=layers, |
| files=files, |
| ) |
|
|
|
|
| def _import_pyogrio(): |
| try: |
| import pyogrio |
| except ImportError as exc: |
| raise KmlToTabError( |
| "La conversion TAB requiert pyogrio/GDAL. " |
| "Installe les dependances avec `pip install -r requirements.txt`." |
| ) from exc |
| return pyogrio |
|
|
|
|
| def _safe_stem(filename: str) -> str: |
| stem = Path(filename or "converted").stem |
| stem = re.sub(r"[^A-Za-z0-9_-]+", "_", stem).strip("_") |
| return (stem or "converted")[:60] |
|
|
|
|
| def _layer_name(base_name: str, geometry_type: str, group_count: int) -> str: |
| if group_count == 1: |
| return base_name |
| suffix = re.sub(r"(?<!^)(?=[A-Z])", "_", geometry_type).lower() |
| return f"{base_name}_{suffix}"[:80] |
|
|
|
|
| def _group_geometry_indexes(geometry) -> dict[str, list[int]]: |
| groups: dict[str, list[int]] = {} |
|
|
| for index, wkb in enumerate(geometry): |
| geometry_type = _wkb_geometry_type(wkb) |
| groups.setdefault(geometry_type, []).append(index) |
|
|
| unsupported = sorted( |
| geometry_type |
| for geometry_type in groups |
| if geometry_type == "GeometryCollection" |
| ) |
| if unsupported: |
| raise KmlToTabError( |
| "Les GeometryCollection ne sont pas supportees pour l'export TAB." |
| ) |
|
|
| return groups |
|
|
|
|
| def _append_kml_fields(fields, field_data, kml_bytes: bytes, feature_count: int): |
| records = _extract_kml_records(kml_bytes) |
| if len(records) != feature_count: |
| return fields, field_data |
|
|
| field_pairs = _output_field_pairs(records, fields) |
| if not field_pairs: |
| return fields, field_data |
|
|
| extra_values = { |
| output_name: _record_column(records, source_name) |
| for source_name, output_name in field_pairs |
| } |
|
|
| return ( |
| np.array( |
| [*fields, *[output_name for _source, output_name in field_pairs]], |
| dtype=object, |
| ), |
| [ |
| *field_data, |
| *[extra_values[output_name] for _source, output_name in field_pairs], |
| ], |
| ) |
|
|
|
|
| def _extract_kml_records(kml_bytes: bytes) -> list[dict[str, object]]: |
| try: |
| root = ET.fromstring(kml_bytes) |
| except ET.ParseError: |
| return [] |
|
|
| records: list[dict[str, object]] = [] |
|
|
| for element in root.iter(): |
| tag = _local_name(element) |
| if tag != "Placemark" or not _placemark_has_geometry(element): |
| continue |
|
|
| description = _child_text(element, "description") |
| record = _parse_structured_description(description) |
| value = float("nan") if record else _parse_measurement_value(description) |
| if not record and np.isfinite(value): |
| record["value"] = value |
|
|
| records.append(record) |
|
|
| return records |
|
|
|
|
| def _output_field_pairs( |
| records: list[dict[str, object]], |
| existing_fields, |
| ) -> list[tuple[str, str]]: |
| used = {str(field).lower() for field in existing_fields} |
| pairs: list[tuple[str, str]] = [] |
|
|
| for source_name in _record_field_names(records): |
| output_name = source_name |
| if output_name.lower() in used: |
| output_name = _unique_field_name(f"description_{source_name}", used) |
| else: |
| used.add(output_name.lower()) |
| pairs.append((source_name, output_name)) |
|
|
| return pairs |
|
|
|
|
| def _unique_field_name(name: str, used: set[str]) -> str: |
| base = _safe_column_name(name)[:31] |
| candidate = base |
| suffix = 2 |
|
|
| while candidate.lower() in used: |
| suffix_text = f"_{suffix}" |
| candidate = f"{base[: 31 - len(suffix_text)]}{suffix_text}" |
| suffix += 1 |
|
|
| used.add(candidate.lower()) |
| return candidate |
|
|
|
|
| def _record_field_names(records: list[dict[str, object]]) -> list[str]: |
| names: list[str] = [] |
|
|
| if any("value" in record for record in records): |
| names.append("value") |
|
|
| for record in records: |
| for name in record: |
| if name != "value" and name not in names: |
| names.append(name) |
|
|
| return names |
|
|
|
|
| def _record_column(records: list[dict[str, object]], name: str): |
| values = [record.get(name, "") for record in records] |
| if name == "value": |
| return np.array( |
| [value if value != "" else float("nan") for value in values], |
| dtype=float, |
| ) |
| return np.array([_stringify_field_value(value) for value in values], dtype=object) |
|
|
|
|
| def _stringify_field_value(value: object) -> str: |
| if value is None: |
| return "" |
| if isinstance(value, float) and not np.isfinite(value): |
| return "" |
| return str(value) |
|
|
|
|
| def _placemark_has_geometry(placemark) -> bool: |
| geometry_tags = { |
| "Point", |
| "LineString", |
| "LinearRing", |
| "Polygon", |
| "MultiGeometry", |
| "Model", |
| "Track", |
| "MultiTrack", |
| } |
| return any(_local_name(child) in geometry_tags for child in placemark.iter()) |
|
|
|
|
| def _child_text(element, child_name: str) -> str: |
| child = _first_child(element, child_name) |
| if child is None: |
| return "" |
| return " ".join("".join(child.itertext()).split()) |
|
|
|
|
| def _first_child(element, child_name: str): |
| for child in element: |
| if _local_name(child) == child_name: |
| return child |
| return None |
|
|
|
|
| def _local_name(element) -> str: |
| return str(element.tag).rsplit("}", maxsplit=1)[-1] |
|
|
|
|
| def _parse_measurement_value(description: str) -> float: |
| match = re.match( |
| r"(?s)^.*?\s+(-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?)\s*$", |
| description, |
| ) |
| if not match: |
| return float("nan") |
| return float(match.group(1)) |
|
|
|
|
| def _parse_structured_description(description: str) -> dict[str, str]: |
| text = html.unescape(description or "") |
| text = re.sub(r"(?i)<br\s*/?>", "\n", text) |
| text = re.sub(r"(?i)</?b>", "", text) |
| text = re.sub(r"<[^>]+>", "", text) |
|
|
| record: dict[str, str] = {} |
| for line in text.splitlines(): |
| if ":" not in line: |
| continue |
|
|
| key, value = line.split(":", maxsplit=1) |
| column = _safe_column_name(key) |
| value = value.strip() |
| if not column or not value: |
| continue |
| record[column] = value |
|
|
| return record |
|
|
|
|
| def _safe_column_name(name: str) -> str: |
| column = html.unescape(name or "").strip() |
| column = re.sub(r"\s+", "_", column) |
| column = re.sub(r"[^A-Za-z0-9_]+", "_", column).strip("_") |
| if not column: |
| return "" |
| if column[0].isdigit(): |
| column = f"field_{column}" |
| return column[:31] |
|
|
|
|
| def _wkb_geometry_type(wkb: bytes) -> str: |
| if not wkb or len(wkb) < 5: |
| raise KmlToTabError("Geometrie WKB invalide dans le KML.") |
|
|
| endian = "<" if wkb[0] == 1 else ">" |
| raw_code = struct.unpack(f"{endian}I", wkb[1:5])[0] |
| base_code = raw_code & 0xFF |
| geometry_type = _WKB_GEOMETRY_TYPES.get(base_code) |
|
|
| if geometry_type is None: |
| raise KmlToTabError(f"Type de geometrie non supporte: WKB {raw_code}.") |
|
|
| return geometry_type |
|
|
|
|
| def _zip_directory(directory: Path) -> bytes: |
| zip_buffer = io.BytesIO() |
| with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: |
| for path in sorted(directory.iterdir()): |
| if path.is_file(): |
| zf.write(path, arcname=path.name) |
| return zip_buffer.getvalue() |
|
|