""" STEP 1 — Convert Label Studio JSON export to LayoutLMv3 training format Produces: data2/annotations.json + data2/train.json + data2/val.json + data2/test.json """ import json import os import random from pathlib import Path import sys from urllib.parse import unquote, urlparse # ── CONFIG ────────────────────────────────────────────────────────────────── LABEL_STUDIO_JSON = "project-14-at-2026-05-11-01-35-876abcf8.json" IMAGES_ROOT = "processed_dataref" OUTPUT_DIR = str(Path(__file__).resolve().parent / "data2") TRAIN_RATIO = 0.7 VAL_RATIO = 0.15 TEST_RATIO = 0.15 RANDOM_SEED = 42 # Document classes DOC_CLASSES = ["Autorisation", "Certificat", "Mandat", "PlanMasse", "PlanSituation", "fiche"] DOC2ID = {c: i for i, c in enumerate(DOC_CLASSES)} # Field labels (for extraction) FIELD_LABELS = [ "O", # Outside — no field "Reference_Urbanisme", "DLPI", "Disposition_Mandat", "Nombre_Logement_Lot_MacroLot", "Nb_log_pro", "Nb_log_res", "nb_log_totale", "cabinet_conseil", "Representant_Nom_Complet", "Representant_Telephone", "Representant_Email", "Batiment_Adresse", ] FIELD2ID = {f: i for i, f in enumerate(FIELD_LABELS)} def normalize_text(value): return " ".join((value or "").split()) def get_asset_roots(): """Return every directory under the repo that may host /images and /ocr trees. Different Label Studio exports point at different rasterisation runs, so we have to search them all.""" script_dir = Path(__file__).resolve().parent candidates = [ script_dir / IMAGES_ROOT, script_dir / IMAGES_ROOT / "processed_DataSet1", script_dir / "processed", script_dir / "processed_dataref", script_dir / "processed_dataset2", ] seen, roots = set(), [] for c in candidates: if c.exists() and c not in seen: roots.append(c) seen.add(c) return roots def get_relative_image_path(item): image_url = item["data"].get("image", "") if not image_url: return None parsed = urlparse(image_url) relative_path = parsed.path.lstrip("/") if not relative_path: return None return Path(unquote(relative_path)) def read_ocr_text(ocr_path): try: with open(ocr_path, encoding="utf-8") as f: ocr_data = json.load(f) except (OSError, json.JSONDecodeError): return "" if isinstance(ocr_data, dict): return ocr_data.get("full_text") or ocr_data.get("text") or "" return "" def get_image_path(item): """Reconstruct the local image path from Label Studio data. The export only stores filenames, but this project has two mirrored source roots: `processed` and `processed/processed_DataSet1`. Resolve the exact image by checking the task OCR text against the matching OCR JSON in each root instead of using a global recursive filename search. """ image_file = item["data"].get("image_file", "") doc_class = item["data"].get("doc_class", "") expected_ocr_text = normalize_text(item["data"].get("ocr", "")) relative_image_path = get_relative_image_path(item) image_stem = Path(image_file).stem best_candidate = None best_score = -1 for root in get_asset_roots(): candidate_paths = [] if relative_image_path is not None: candidate_paths.append(root / relative_image_path) if doc_class and image_file: candidate_paths.append(root / doc_class / "images" / image_file) seen_paths = set() for candidate_path in candidate_paths: if candidate_path in seen_paths: continue seen_paths.add(candidate_path) if not candidate_path.exists(): continue score = 1 if relative_image_path is not None and candidate_path == root / relative_image_path: score += 2 ocr_path = root / doc_class / "ocr" / f"{image_stem}.json" if ocr_path.exists() and expected_ocr_text: local_ocr_text = normalize_text(read_ocr_text(ocr_path)) if local_ocr_text == expected_ocr_text: score += 4 if score > best_score: best_candidate = candidate_path best_score = score return str(best_candidate) if best_candidate else None def get_ocr_path(item): doc_class = item["data"].get("doc_class", "") image_file = item["data"].get("image_file", "") image_stem = Path(image_file).stem for root in get_asset_roots(): candidate = root / doc_class / "ocr" / f"{image_stem}.json" if candidate.exists(): return str(candidate) return None def resolve_labelstudio_path(): """Resolve the Label Studio JSON path. Priority: - CLI argument `sys.argv[1]` if provided and exists - `LABEL_STUDIO_JSON` if it exists - first match for `project-*.json` in current working dir - first `*.json` in current working dir Raises a helpful FileNotFoundError otherwise. """ script_dir = Path(__file__).resolve().parent # CLI override if len(sys.argv) > 1: candidate = sys.argv[1] if os.path.exists(candidate): return candidate # try relative to cwd if os.path.exists(os.path.join(os.getcwd(), candidate)): return os.path.join(os.getcwd(), candidate) # try relative to the script location script_candidate = script_dir / candidate if script_candidate.exists(): return str(script_candidate) # configured constant relative to the script location configured = script_dir / LABEL_STUDIO_JSON if configured.exists(): return str(configured) # search for project-*.json next to the script candidates = list(script_dir.glob('project-*.json')) if not candidates: candidates = list(script_dir.glob('*.json')) if candidates: chosen = str(candidates[0]) print(f"Auto-detected Label Studio JSON: {chosen}") return chosen # nothing found — provide helpful context files = [p.name for p in script_dir.iterdir() if p.is_file()] raise FileNotFoundError( f"Label Studio JSON '{LABEL_STUDIO_JSON}' not found next to the script in '{script_dir}'. Files there: {files}") def convert_bbox(x_pct, y_pct, w_pct, h_pct, img_w, img_h): """Convert Label Studio % coords to absolute pixel coords [x0,y0,x1,y1].""" x0 = int(x_pct / 100 * img_w) y0 = int(y_pct / 100 * img_h) x1 = int((x_pct + w_pct) / 100 * img_w) y1 = int((y_pct + h_pct) / 100 * img_h) return [x0, y0, x1, y1] def process_item(item): """Convert one Label Studio task to training record.""" data = item["data"] doc_class = data.get("doc_class", "") ocr_text = data.get("ocr", "") image_file = data.get("image_file", "") # Skip unannotated or cancelled valid_anns = [ a for a in item.get("annotations", []) if not a.get("was_cancelled") and a.get("result") ] if not valid_anns: return None ann = valid_anns[0] # take first valid annotation results = ann["result"] # Get image dimensions from first result img_w = results[0].get("original_width", 1654) img_h = results[0].get("original_height", 2339) # Extract bounding boxes boxes = [] labels = [] for r in results: if r.get("type") != "rectanglelabels": continue v = r["value"] bbox = convert_bbox(v["x"], v["y"], v["width"], v["height"], img_w, img_h) label = v["rectanglelabels"][0] if v.get("rectanglelabels") else "O" boxes.append(bbox) labels.append(label) image_path = get_image_path(item) ocr_path = get_ocr_path(item) return { "id": item["id"], "image_file": image_file, "image_path": image_path, "ocr_path": ocr_path, "doc_class": doc_class, "doc_class_id": DOC2ID.get(doc_class, -1), "ocr_text": ocr_text, "image_width": img_w, "image_height": img_h, "boxes": boxes, "box_labels": labels, "box_label_ids": [FIELD2ID.get(l, 0) for l in labels], } def main(): os.makedirs(OUTPUT_DIR, exist_ok=True) LABEL_STUDIO_JSON_PATH = resolve_labelstudio_path() with open(LABEL_STUDIO_JSON_PATH, encoding="utf-8") as f: data = json.load(f) print(f"Total tasks: {len(data)}") records = [] skipped = 0 for item in data: rec = process_item(item) if rec: records.append(rec) else: skipped += 1 print(f"Converted: {len(records)} | Skipped (unannotated): {skipped}") # Save full annotations with open(f"{OUTPUT_DIR}/annotations.json", "w", encoding="utf-8") as f: json.dump(records, f, ensure_ascii=False, indent=2) # Train / Val / Test split random.seed(RANDOM_SEED) random.shuffle(records) n = len(records) n_train = int(n * TRAIN_RATIO) n_val = int(n * VAL_RATIO) train = records[:n_train] val = records[n_train:n_train + n_val] test = records[n_train + n_val:] for split_name, split_data in [("train", train), ("val", val), ("test", test)]: path = f"{OUTPUT_DIR}/{split_name}.json" with open(path, "w", encoding="utf-8") as f: json.dump(split_data, f, ensure_ascii=False, indent=2) print(f" {split_name}: {len(split_data)} samples → {path}") # Save label mappings mappings = { "doc_classes": DOC_CLASSES, "doc2id": DOC2ID, "field_labels": FIELD_LABELS, "field2id": FIELD2ID, } with open(f"{OUTPUT_DIR}/label_mappings.json", "w") as f: json.dump(mappings, f, indent=2) print("\n✅ Done! Files saved to ./data2/") print(" annotations.json, train.json, val.json, test.json, label_mappings.json") if __name__ == "__main__": main()