DavMelchi commited on
Commit
cc3d5fb
·
1 Parent(s): b75a50f

Parse structured KML descriptions into TAB fields

Browse files
Files changed (2) hide show
  1. tests/test_kml_to_tab.py +58 -0
  2. utils/kml_to_tab.py +114 -12
tests/test_kml_to_tab.py CHANGED
@@ -80,6 +80,29 @@ RSRP_KML = b"""<?xml version="1.0" encoding="UTF-8"?>
80
  </kml>
81
  """
82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
  def test_convert_kml_to_tab_zip_splits_mixed_geometries():
85
  result = convert_kml_to_tab_zip(MIXED_KML, "mixed sample.kml")
@@ -123,6 +146,41 @@ def test_convert_kml_to_tab_zip_preserves_measurement_values():
123
  assert data["value"].tolist() == [-105.599998474121, -116.099998474121]
124
 
125
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
  def test_convert_kml_to_tab_zip_rejects_empty_input():
127
  with pytest.raises(KmlToTabError, match="vide"):
128
  convert_kml_to_tab_zip(b"", "empty.kml")
 
80
  </kml>
81
  """
82
 
83
+ STRUCTURED_DESCRIPTION_KML = b"""<?xml version="1.0" encoding="UTF-8"?>
84
+ <kml xmlns="http://www.opengis.net/kml/2.2">
85
+ <Document>
86
+ <Placemark>
87
+ <name>Busaiteen_1</name>
88
+ <description>&lt;b&gt;Sector Details:&lt;/b&gt;&lt;br&gt;&lt;b&gt;code:&lt;/b&gt; 1004&lt;br&gt;&lt;b&gt;name:&lt;/b&gt; Busaiteen_1&lt;br&gt;&lt;b&gt;Azimut:&lt;/b&gt; 30&lt;br&gt;&lt;b&gt;Longitude:&lt;/b&gt; 50.606051&lt;br&gt;&lt;b&gt;Latitude:&lt;/b&gt; 26.261322&lt;br&gt;&lt;b&gt;size:&lt;/b&gt; 1000&lt;br&gt;&lt;b&gt;color:&lt;/b&gt; 7fff0000&lt;br&gt;</description>
89
+ <Polygon>
90
+ <outerBoundaryIs>
91
+ <LinearRing>
92
+ <coordinates>
93
+ 50.606051,26.261322,0
94
+ 50.607051,26.261322,0
95
+ 50.607051,26.262322,0
96
+ 50.606051,26.261322,0
97
+ </coordinates>
98
+ </LinearRing>
99
+ </outerBoundaryIs>
100
+ </Polygon>
101
+ </Placemark>
102
+ </Document>
103
+ </kml>
104
+ """
105
+
106
 
107
  def test_convert_kml_to_tab_zip_splits_mixed_geometries():
108
  result = convert_kml_to_tab_zip(MIXED_KML, "mixed sample.kml")
 
146
  assert data["value"].tolist() == [-105.599998474121, -116.099998474121]
147
 
148
 
149
+ def test_convert_kml_to_tab_zip_expands_structured_description_fields():
150
+ result = convert_kml_to_tab_zip(STRUCTURED_DESCRIPTION_KML, "sectors.kml")
151
+
152
+ with tempfile.TemporaryDirectory() as tmp_dir_name:
153
+ tmp_dir = Path(tmp_dir_name)
154
+ with zipfile.ZipFile(BytesIO(result.zip_bytes)) as zf:
155
+ zf.extractall(tmp_dir)
156
+
157
+ metadata, _fids, _geometry, field_data = pyogrio.raw.read(
158
+ tmp_dir / "sectors.tab"
159
+ )
160
+
161
+ fields = metadata["fields"].tolist()
162
+ data = dict(zip(fields, field_data))
163
+
164
+ assert "value" not in fields
165
+ for field in [
166
+ "code",
167
+ "description_name",
168
+ "Azimut",
169
+ "Longitude",
170
+ "Latitude",
171
+ "size",
172
+ "color",
173
+ ]:
174
+ assert field in fields
175
+ assert data["code"].tolist() == ["1004"]
176
+ assert data["description_name"].tolist() == ["Busaiteen_1"]
177
+ assert data["Azimut"].tolist() == ["30"]
178
+ assert data["Longitude"].tolist() == ["50.606051"]
179
+ assert data["Latitude"].tolist() == ["26.261322"]
180
+ assert data["size"].tolist() == ["1000"]
181
+ assert data["color"].tolist() == ["7fff0000"]
182
+
183
+
184
  def test_convert_kml_to_tab_zip_rejects_empty_input():
185
  with pytest.raises(KmlToTabError, match="vide"):
186
  convert_kml_to_tab_zip(b"", "empty.kml")
utils/kml_to_tab.py CHANGED
@@ -1,6 +1,7 @@
1
  from __future__ import annotations
2
 
3
  import io
 
4
  import re
5
  import struct
6
  import tempfile
@@ -165,18 +166,24 @@ def _append_kml_fields(fields, field_data, kml_bytes: bytes, feature_count: int)
165
  if len(records) != feature_count:
166
  return fields, field_data
167
 
168
- existing = {str(field).lower() for field in fields}
169
- extra_names = [name for name in ["value"] if name.lower() not in existing]
170
- if not extra_names:
171
  return fields, field_data
172
 
173
  extra_values = {
174
- "value": np.array([record["value"] for record in records], dtype=float),
 
175
  }
176
 
177
  return (
178
- np.array([*fields, *extra_names], dtype=object),
179
- [*field_data, *[extra_values[name] for name in extra_names]],
 
 
 
 
 
 
180
  )
181
 
182
 
@@ -194,17 +201,80 @@ def _extract_kml_records(kml_bytes: bytes) -> list[dict[str, object]]:
194
  continue
195
 
196
  description = _child_text(element, "description")
197
- value = _parse_measurement_value(description)
 
 
 
198
 
199
- records.append(
200
- {
201
- "value": value,
202
- }
203
- )
204
 
205
  return records
206
 
207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
  def _placemark_has_geometry(placemark) -> bool:
209
  geometry_tags = {
210
  "Point",
@@ -247,6 +317,38 @@ def _parse_measurement_value(description: str) -> float:
247
  return float(match.group(1))
248
 
249
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
  def _wkb_geometry_type(wkb: bytes) -> str:
251
  if not wkb or len(wkb) < 5:
252
  raise KmlToTabError("Geometrie WKB invalide dans le KML.")
 
1
  from __future__ import annotations
2
 
3
  import io
4
+ import html
5
  import re
6
  import struct
7
  import tempfile
 
166
  if len(records) != feature_count:
167
  return fields, field_data
168
 
169
+ field_pairs = _output_field_pairs(records, fields)
170
+ if not field_pairs:
 
171
  return fields, field_data
172
 
173
  extra_values = {
174
+ output_name: _record_column(records, source_name)
175
+ for source_name, output_name in field_pairs
176
  }
177
 
178
  return (
179
+ np.array(
180
+ [*fields, *[output_name for _source, output_name in field_pairs]],
181
+ dtype=object,
182
+ ),
183
+ [
184
+ *field_data,
185
+ *[extra_values[output_name] for _source, output_name in field_pairs],
186
+ ],
187
  )
188
 
189
 
 
201
  continue
202
 
203
  description = _child_text(element, "description")
204
+ record = _parse_structured_description(description)
205
+ value = float("nan") if record else _parse_measurement_value(description)
206
+ if not record and np.isfinite(value):
207
+ record["value"] = value
208
 
209
+ records.append(record)
 
 
 
 
210
 
211
  return records
212
 
213
 
214
+ def _output_field_pairs(
215
+ records: list[dict[str, object]],
216
+ existing_fields,
217
+ ) -> list[tuple[str, str]]:
218
+ used = {str(field).lower() for field in existing_fields}
219
+ pairs: list[tuple[str, str]] = []
220
+
221
+ for source_name in _record_field_names(records):
222
+ output_name = source_name
223
+ if output_name.lower() in used:
224
+ output_name = _unique_field_name(f"description_{source_name}", used)
225
+ else:
226
+ used.add(output_name.lower())
227
+ pairs.append((source_name, output_name))
228
+
229
+ return pairs
230
+
231
+
232
+ def _unique_field_name(name: str, used: set[str]) -> str:
233
+ base = _safe_column_name(name)[:31]
234
+ candidate = base
235
+ suffix = 2
236
+
237
+ while candidate.lower() in used:
238
+ suffix_text = f"_{suffix}"
239
+ candidate = f"{base[: 31 - len(suffix_text)]}{suffix_text}"
240
+ suffix += 1
241
+
242
+ used.add(candidate.lower())
243
+ return candidate
244
+
245
+
246
+ def _record_field_names(records: list[dict[str, object]]) -> list[str]:
247
+ names: list[str] = []
248
+
249
+ if any("value" in record for record in records):
250
+ names.append("value")
251
+
252
+ for record in records:
253
+ for name in record:
254
+ if name != "value" and name not in names:
255
+ names.append(name)
256
+
257
+ return names
258
+
259
+
260
+ def _record_column(records: list[dict[str, object]], name: str):
261
+ values = [record.get(name, "") for record in records]
262
+ if name == "value":
263
+ return np.array(
264
+ [value if value != "" else float("nan") for value in values],
265
+ dtype=float,
266
+ )
267
+ return np.array([_stringify_field_value(value) for value in values], dtype=object)
268
+
269
+
270
+ def _stringify_field_value(value: object) -> str:
271
+ if value is None:
272
+ return ""
273
+ if isinstance(value, float) and not np.isfinite(value):
274
+ return ""
275
+ return str(value)
276
+
277
+
278
  def _placemark_has_geometry(placemark) -> bool:
279
  geometry_tags = {
280
  "Point",
 
317
  return float(match.group(1))
318
 
319
 
320
+ def _parse_structured_description(description: str) -> dict[str, str]:
321
+ text = html.unescape(description or "")
322
+ text = re.sub(r"(?i)<br\s*/?>", "\n", text)
323
+ text = re.sub(r"(?i)</?b>", "", text)
324
+ text = re.sub(r"<[^>]+>", "", text)
325
+
326
+ record: dict[str, str] = {}
327
+ for line in text.splitlines():
328
+ if ":" not in line:
329
+ continue
330
+
331
+ key, value = line.split(":", maxsplit=1)
332
+ column = _safe_column_name(key)
333
+ value = value.strip()
334
+ if not column or not value:
335
+ continue
336
+ record[column] = value
337
+
338
+ return record
339
+
340
+
341
+ def _safe_column_name(name: str) -> str:
342
+ column = html.unescape(name or "").strip()
343
+ column = re.sub(r"\s+", "_", column)
344
+ column = re.sub(r"[^A-Za-z0-9_]+", "_", column).strip("_")
345
+ if not column:
346
+ return ""
347
+ if column[0].isdigit():
348
+ column = f"field_{column}"
349
+ return column[:31]
350
+
351
+
352
  def _wkb_geometry_type(wkb: bytes) -> str:
353
  if not wkb or len(wkb) < 5:
354
  raise KmlToTabError("Geometrie WKB invalide dans le KML.")