FiberGate / scripts /01_convert_labelstudio.py
AzizMiladi's picture
chore: git mv scripts, UI, dev tools, docs into folders
70c46cc
Raw
History Blame
10.3 kB
"""
STEP 1 — Convert Label Studio JSON export to LayoutLMv3 training format
Produces: data2/annotations.json + data2/train.json + data2/val.json + data2/test.json
"""
import json
import os
import random
from pathlib import Path
import sys
from urllib.parse import unquote, urlparse
# ── CONFIG ──────────────────────────────────────────────────────────────────
LABEL_STUDIO_JSON = "project-14-at-2026-05-11-01-35-876abcf8.json"
IMAGES_ROOT = "processed_dataref"
OUTPUT_DIR = str(Path(__file__).resolve().parent / "data2")
TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
TEST_RATIO = 0.15
RANDOM_SEED = 42
# Document classes
DOC_CLASSES = ["Autorisation", "Certificat", "Mandat", "PlanMasse", "PlanSituation", "fiche"]
DOC2ID = {c: i for i, c in enumerate(DOC_CLASSES)}
# Field labels (for extraction)
FIELD_LABELS = [
"O", # Outside — no field
"Reference_Urbanisme",
"DLPI",
"Disposition_Mandat",
"Nombre_Logement_Lot_MacroLot",
"Nb_log_pro",
"Nb_log_res",
"nb_log_totale",
"cabinet_conseil",
"Representant_Nom_Complet",
"Representant_Telephone",
"Representant_Email",
"Batiment_Adresse",
]
FIELD2ID = {f: i for i, f in enumerate(FIELD_LABELS)}
def normalize_text(value):
return " ".join((value or "").split())
def get_asset_roots():
"""Return every directory under the repo that may host <class>/images and
<class>/ocr trees. Different Label Studio exports point at different
rasterisation runs, so we have to search them all."""
script_dir = Path(__file__).resolve().parent
candidates = [
script_dir / IMAGES_ROOT,
script_dir / IMAGES_ROOT / "processed_DataSet1",
script_dir / "processed",
script_dir / "processed_dataref",
script_dir / "processed_dataset2",
]
seen, roots = set(), []
for c in candidates:
if c.exists() and c not in seen:
roots.append(c)
seen.add(c)
return roots
def get_relative_image_path(item):
image_url = item["data"].get("image", "")
if not image_url:
return None
parsed = urlparse(image_url)
relative_path = parsed.path.lstrip("/")
if not relative_path:
return None
return Path(unquote(relative_path))
def read_ocr_text(ocr_path):
try:
with open(ocr_path, encoding="utf-8") as f:
ocr_data = json.load(f)
except (OSError, json.JSONDecodeError):
return ""
if isinstance(ocr_data, dict):
return ocr_data.get("full_text") or ocr_data.get("text") or ""
return ""
def get_image_path(item):
"""Reconstruct the local image path from Label Studio data.
The export only stores filenames, but this project has two mirrored source
roots: `processed` and `processed/processed_DataSet1`. Resolve the exact
image by checking the task OCR text against the matching OCR JSON in each
root instead of using a global recursive filename search.
"""
image_file = item["data"].get("image_file", "")
doc_class = item["data"].get("doc_class", "")
expected_ocr_text = normalize_text(item["data"].get("ocr", ""))
relative_image_path = get_relative_image_path(item)
image_stem = Path(image_file).stem
best_candidate = None
best_score = -1
for root in get_asset_roots():
candidate_paths = []
if relative_image_path is not None:
candidate_paths.append(root / relative_image_path)
if doc_class and image_file:
candidate_paths.append(root / doc_class / "images" / image_file)
seen_paths = set()
for candidate_path in candidate_paths:
if candidate_path in seen_paths:
continue
seen_paths.add(candidate_path)
if not candidate_path.exists():
continue
score = 1
if relative_image_path is not None and candidate_path == root / relative_image_path:
score += 2
ocr_path = root / doc_class / "ocr" / f"{image_stem}.json"
if ocr_path.exists() and expected_ocr_text:
local_ocr_text = normalize_text(read_ocr_text(ocr_path))
if local_ocr_text == expected_ocr_text:
score += 4
if score > best_score:
best_candidate = candidate_path
best_score = score
return str(best_candidate) if best_candidate else None
def get_ocr_path(item):
doc_class = item["data"].get("doc_class", "")
image_file = item["data"].get("image_file", "")
image_stem = Path(image_file).stem
for root in get_asset_roots():
candidate = root / doc_class / "ocr" / f"{image_stem}.json"
if candidate.exists():
return str(candidate)
return None
def resolve_labelstudio_path():
"""Resolve the Label Studio JSON path.
Priority:
- CLI argument `sys.argv[1]` if provided and exists
- `LABEL_STUDIO_JSON` if it exists
- first match for `project-*.json` in current working dir
- first `*.json` in current working dir
Raises a helpful FileNotFoundError otherwise.
"""
script_dir = Path(__file__).resolve().parent
# CLI override
if len(sys.argv) > 1:
candidate = sys.argv[1]
if os.path.exists(candidate):
return candidate
# try relative to cwd
if os.path.exists(os.path.join(os.getcwd(), candidate)):
return os.path.join(os.getcwd(), candidate)
# try relative to the script location
script_candidate = script_dir / candidate
if script_candidate.exists():
return str(script_candidate)
# configured constant relative to the script location
configured = script_dir / LABEL_STUDIO_JSON
if configured.exists():
return str(configured)
# search for project-*.json next to the script
candidates = list(script_dir.glob('project-*.json'))
if not candidates:
candidates = list(script_dir.glob('*.json'))
if candidates:
chosen = str(candidates[0])
print(f"Auto-detected Label Studio JSON: {chosen}")
return chosen
# nothing found — provide helpful context
files = [p.name for p in script_dir.iterdir() if p.is_file()]
raise FileNotFoundError(
f"Label Studio JSON '{LABEL_STUDIO_JSON}' not found next to the script in '{script_dir}'. Files there: {files}")
def convert_bbox(x_pct, y_pct, w_pct, h_pct, img_w, img_h):
"""Convert Label Studio % coords to absolute pixel coords [x0,y0,x1,y1]."""
x0 = int(x_pct / 100 * img_w)
y0 = int(y_pct / 100 * img_h)
x1 = int((x_pct + w_pct) / 100 * img_w)
y1 = int((y_pct + h_pct) / 100 * img_h)
return [x0, y0, x1, y1]
def process_item(item):
"""Convert one Label Studio task to training record."""
data = item["data"]
doc_class = data.get("doc_class", "")
ocr_text = data.get("ocr", "")
image_file = data.get("image_file", "")
# Skip unannotated or cancelled
valid_anns = [
a for a in item.get("annotations", [])
if not a.get("was_cancelled") and a.get("result")
]
if not valid_anns:
return None
ann = valid_anns[0] # take first valid annotation
results = ann["result"]
# Get image dimensions from first result
img_w = results[0].get("original_width", 1654)
img_h = results[0].get("original_height", 2339)
# Extract bounding boxes
boxes = []
labels = []
for r in results:
if r.get("type") != "rectanglelabels":
continue
v = r["value"]
bbox = convert_bbox(v["x"], v["y"], v["width"], v["height"], img_w, img_h)
label = v["rectanglelabels"][0] if v.get("rectanglelabels") else "O"
boxes.append(bbox)
labels.append(label)
image_path = get_image_path(item)
ocr_path = get_ocr_path(item)
return {
"id": item["id"],
"image_file": image_file,
"image_path": image_path,
"ocr_path": ocr_path,
"doc_class": doc_class,
"doc_class_id": DOC2ID.get(doc_class, -1),
"ocr_text": ocr_text,
"image_width": img_w,
"image_height": img_h,
"boxes": boxes,
"box_labels": labels,
"box_label_ids": [FIELD2ID.get(l, 0) for l in labels],
}
def main():
os.makedirs(OUTPUT_DIR, exist_ok=True)
LABEL_STUDIO_JSON_PATH = resolve_labelstudio_path()
with open(LABEL_STUDIO_JSON_PATH, encoding="utf-8") as f:
data = json.load(f)
print(f"Total tasks: {len(data)}")
records = []
skipped = 0
for item in data:
rec = process_item(item)
if rec:
records.append(rec)
else:
skipped += 1
print(f"Converted: {len(records)} | Skipped (unannotated): {skipped}")
# Save full annotations
with open(f"{OUTPUT_DIR}/annotations.json", "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
# Train / Val / Test split
random.seed(RANDOM_SEED)
random.shuffle(records)
n = len(records)
n_train = int(n * TRAIN_RATIO)
n_val = int(n * VAL_RATIO)
train = records[:n_train]
val = records[n_train:n_train + n_val]
test = records[n_train + n_val:]
for split_name, split_data in [("train", train), ("val", val), ("test", test)]:
path = f"{OUTPUT_DIR}/{split_name}.json"
with open(path, "w", encoding="utf-8") as f:
json.dump(split_data, f, ensure_ascii=False, indent=2)
print(f" {split_name}: {len(split_data)} samples → {path}")
# Save label mappings
mappings = {
"doc_classes": DOC_CLASSES,
"doc2id": DOC2ID,
"field_labels": FIELD_LABELS,
"field2id": FIELD2ID,
}
with open(f"{OUTPUT_DIR}/label_mappings.json", "w") as f:
json.dump(mappings, f, indent=2)
print("\n✅ Done! Files saved to ./data2/")
print(" annotations.json, train.json, val.json, test.json, label_mappings.json")
if __name__ == "__main__":
main()