Spaces:
Sleeping
Sleeping
| """ | |
| STEP 1 — Convert Label Studio JSON export to LayoutLMv3 training format | |
| Produces: data2/annotations.json + data2/train.json + data2/val.json + data2/test.json | |
| """ | |
| import json | |
| import os | |
| import random | |
| from pathlib import Path | |
| import sys | |
| from urllib.parse import unquote, urlparse | |
| # ── CONFIG ────────────────────────────────────────────────────────────────── | |
| LABEL_STUDIO_JSON = "project-14-at-2026-05-11-01-35-876abcf8.json" | |
| IMAGES_ROOT = "processed_dataref" | |
| OUTPUT_DIR = str(Path(__file__).resolve().parent / "data2") | |
| TRAIN_RATIO = 0.7 | |
| VAL_RATIO = 0.15 | |
| TEST_RATIO = 0.15 | |
| RANDOM_SEED = 42 | |
| # Document classes | |
| DOC_CLASSES = ["Autorisation", "Certificat", "Mandat", "PlanMasse", "PlanSituation", "fiche"] | |
| DOC2ID = {c: i for i, c in enumerate(DOC_CLASSES)} | |
| # Field labels (for extraction) | |
| FIELD_LABELS = [ | |
| "O", # Outside — no field | |
| "Reference_Urbanisme", | |
| "DLPI", | |
| "Disposition_Mandat", | |
| "Nombre_Logement_Lot_MacroLot", | |
| "Nb_log_pro", | |
| "Nb_log_res", | |
| "nb_log_totale", | |
| "cabinet_conseil", | |
| "Representant_Nom_Complet", | |
| "Representant_Telephone", | |
| "Representant_Email", | |
| "Batiment_Adresse", | |
| ] | |
| FIELD2ID = {f: i for i, f in enumerate(FIELD_LABELS)} | |
| def normalize_text(value): | |
| return " ".join((value or "").split()) | |
| def get_asset_roots(): | |
| """Return every directory under the repo that may host <class>/images and | |
| <class>/ocr trees. Different Label Studio exports point at different | |
| rasterisation runs, so we have to search them all.""" | |
| script_dir = Path(__file__).resolve().parent | |
| candidates = [ | |
| script_dir / IMAGES_ROOT, | |
| script_dir / IMAGES_ROOT / "processed_DataSet1", | |
| script_dir / "processed", | |
| script_dir / "processed_dataref", | |
| script_dir / "processed_dataset2", | |
| ] | |
| seen, roots = set(), [] | |
| for c in candidates: | |
| if c.exists() and c not in seen: | |
| roots.append(c) | |
| seen.add(c) | |
| return roots | |
| def get_relative_image_path(item): | |
| image_url = item["data"].get("image", "") | |
| if not image_url: | |
| return None | |
| parsed = urlparse(image_url) | |
| relative_path = parsed.path.lstrip("/") | |
| if not relative_path: | |
| return None | |
| return Path(unquote(relative_path)) | |
| def read_ocr_text(ocr_path): | |
| try: | |
| with open(ocr_path, encoding="utf-8") as f: | |
| ocr_data = json.load(f) | |
| except (OSError, json.JSONDecodeError): | |
| return "" | |
| if isinstance(ocr_data, dict): | |
| return ocr_data.get("full_text") or ocr_data.get("text") or "" | |
| return "" | |
| def get_image_path(item): | |
| """Reconstruct the local image path from Label Studio data. | |
| The export only stores filenames, but this project has two mirrored source | |
| roots: `processed` and `processed/processed_DataSet1`. Resolve the exact | |
| image by checking the task OCR text against the matching OCR JSON in each | |
| root instead of using a global recursive filename search. | |
| """ | |
| image_file = item["data"].get("image_file", "") | |
| doc_class = item["data"].get("doc_class", "") | |
| expected_ocr_text = normalize_text(item["data"].get("ocr", "")) | |
| relative_image_path = get_relative_image_path(item) | |
| image_stem = Path(image_file).stem | |
| best_candidate = None | |
| best_score = -1 | |
| for root in get_asset_roots(): | |
| candidate_paths = [] | |
| if relative_image_path is not None: | |
| candidate_paths.append(root / relative_image_path) | |
| if doc_class and image_file: | |
| candidate_paths.append(root / doc_class / "images" / image_file) | |
| seen_paths = set() | |
| for candidate_path in candidate_paths: | |
| if candidate_path in seen_paths: | |
| continue | |
| seen_paths.add(candidate_path) | |
| if not candidate_path.exists(): | |
| continue | |
| score = 1 | |
| if relative_image_path is not None and candidate_path == root / relative_image_path: | |
| score += 2 | |
| ocr_path = root / doc_class / "ocr" / f"{image_stem}.json" | |
| if ocr_path.exists() and expected_ocr_text: | |
| local_ocr_text = normalize_text(read_ocr_text(ocr_path)) | |
| if local_ocr_text == expected_ocr_text: | |
| score += 4 | |
| if score > best_score: | |
| best_candidate = candidate_path | |
| best_score = score | |
| return str(best_candidate) if best_candidate else None | |
| def get_ocr_path(item): | |
| doc_class = item["data"].get("doc_class", "") | |
| image_file = item["data"].get("image_file", "") | |
| image_stem = Path(image_file).stem | |
| for root in get_asset_roots(): | |
| candidate = root / doc_class / "ocr" / f"{image_stem}.json" | |
| if candidate.exists(): | |
| return str(candidate) | |
| return None | |
| def resolve_labelstudio_path(): | |
| """Resolve the Label Studio JSON path. | |
| Priority: | |
| - CLI argument `sys.argv[1]` if provided and exists | |
| - `LABEL_STUDIO_JSON` if it exists | |
| - first match for `project-*.json` in current working dir | |
| - first `*.json` in current working dir | |
| Raises a helpful FileNotFoundError otherwise. | |
| """ | |
| script_dir = Path(__file__).resolve().parent | |
| # CLI override | |
| if len(sys.argv) > 1: | |
| candidate = sys.argv[1] | |
| if os.path.exists(candidate): | |
| return candidate | |
| # try relative to cwd | |
| if os.path.exists(os.path.join(os.getcwd(), candidate)): | |
| return os.path.join(os.getcwd(), candidate) | |
| # try relative to the script location | |
| script_candidate = script_dir / candidate | |
| if script_candidate.exists(): | |
| return str(script_candidate) | |
| # configured constant relative to the script location | |
| configured = script_dir / LABEL_STUDIO_JSON | |
| if configured.exists(): | |
| return str(configured) | |
| # search for project-*.json next to the script | |
| candidates = list(script_dir.glob('project-*.json')) | |
| if not candidates: | |
| candidates = list(script_dir.glob('*.json')) | |
| if candidates: | |
| chosen = str(candidates[0]) | |
| print(f"Auto-detected Label Studio JSON: {chosen}") | |
| return chosen | |
| # nothing found — provide helpful context | |
| files = [p.name for p in script_dir.iterdir() if p.is_file()] | |
| raise FileNotFoundError( | |
| f"Label Studio JSON '{LABEL_STUDIO_JSON}' not found next to the script in '{script_dir}'. Files there: {files}") | |
| def convert_bbox(x_pct, y_pct, w_pct, h_pct, img_w, img_h): | |
| """Convert Label Studio % coords to absolute pixel coords [x0,y0,x1,y1].""" | |
| x0 = int(x_pct / 100 * img_w) | |
| y0 = int(y_pct / 100 * img_h) | |
| x1 = int((x_pct + w_pct) / 100 * img_w) | |
| y1 = int((y_pct + h_pct) / 100 * img_h) | |
| return [x0, y0, x1, y1] | |
| def process_item(item): | |
| """Convert one Label Studio task to training record.""" | |
| data = item["data"] | |
| doc_class = data.get("doc_class", "") | |
| ocr_text = data.get("ocr", "") | |
| image_file = data.get("image_file", "") | |
| # Skip unannotated or cancelled | |
| valid_anns = [ | |
| a for a in item.get("annotations", []) | |
| if not a.get("was_cancelled") and a.get("result") | |
| ] | |
| if not valid_anns: | |
| return None | |
| ann = valid_anns[0] # take first valid annotation | |
| results = ann["result"] | |
| # Get image dimensions from first result | |
| img_w = results[0].get("original_width", 1654) | |
| img_h = results[0].get("original_height", 2339) | |
| # Extract bounding boxes | |
| boxes = [] | |
| labels = [] | |
| for r in results: | |
| if r.get("type") != "rectanglelabels": | |
| continue | |
| v = r["value"] | |
| bbox = convert_bbox(v["x"], v["y"], v["width"], v["height"], img_w, img_h) | |
| label = v["rectanglelabels"][0] if v.get("rectanglelabels") else "O" | |
| boxes.append(bbox) | |
| labels.append(label) | |
| image_path = get_image_path(item) | |
| ocr_path = get_ocr_path(item) | |
| return { | |
| "id": item["id"], | |
| "image_file": image_file, | |
| "image_path": image_path, | |
| "ocr_path": ocr_path, | |
| "doc_class": doc_class, | |
| "doc_class_id": DOC2ID.get(doc_class, -1), | |
| "ocr_text": ocr_text, | |
| "image_width": img_w, | |
| "image_height": img_h, | |
| "boxes": boxes, | |
| "box_labels": labels, | |
| "box_label_ids": [FIELD2ID.get(l, 0) for l in labels], | |
| } | |
| def main(): | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| LABEL_STUDIO_JSON_PATH = resolve_labelstudio_path() | |
| with open(LABEL_STUDIO_JSON_PATH, encoding="utf-8") as f: | |
| data = json.load(f) | |
| print(f"Total tasks: {len(data)}") | |
| records = [] | |
| skipped = 0 | |
| for item in data: | |
| rec = process_item(item) | |
| if rec: | |
| records.append(rec) | |
| else: | |
| skipped += 1 | |
| print(f"Converted: {len(records)} | Skipped (unannotated): {skipped}") | |
| # Save full annotations | |
| with open(f"{OUTPUT_DIR}/annotations.json", "w", encoding="utf-8") as f: | |
| json.dump(records, f, ensure_ascii=False, indent=2) | |
| # Train / Val / Test split | |
| random.seed(RANDOM_SEED) | |
| random.shuffle(records) | |
| n = len(records) | |
| n_train = int(n * TRAIN_RATIO) | |
| n_val = int(n * VAL_RATIO) | |
| train = records[:n_train] | |
| val = records[n_train:n_train + n_val] | |
| test = records[n_train + n_val:] | |
| for split_name, split_data in [("train", train), ("val", val), ("test", test)]: | |
| path = f"{OUTPUT_DIR}/{split_name}.json" | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(split_data, f, ensure_ascii=False, indent=2) | |
| print(f" {split_name}: {len(split_data)} samples → {path}") | |
| # Save label mappings | |
| mappings = { | |
| "doc_classes": DOC_CLASSES, | |
| "doc2id": DOC2ID, | |
| "field_labels": FIELD_LABELS, | |
| "field2id": FIELD2ID, | |
| } | |
| with open(f"{OUTPUT_DIR}/label_mappings.json", "w") as f: | |
| json.dump(mappings, f, indent=2) | |
| print("\n✅ Done! Files saved to ./data2/") | |
| print(" annotations.json, train.json, val.json, test.json, label_mappings.json") | |
| if __name__ == "__main__": | |
| main() | |