| | import os |
| | import pandas as pd |
| | import threading |
| | import gradio as gr |
| | from config import OUTPUT_DIR |
| |
|
| | |
| | IMAGE_FOLDER = os.path.join(OUTPUT_DIR, "blobs") |
| | os.makedirs(IMAGE_FOLDER, exist_ok=True) |
| |
|
| | CSV_FILE = None |
| | df_annotations = pd.DataFrame(columns=["blob_id", "human_ocr"]) |
| |
|
| | |
| | all_images = [ |
| | f for f in os.listdir(IMAGE_FOLDER) |
| | if f.lower().endswith(('.png', '.jpg', '.jpeg')) and '_margin' in f |
| | ] |
| | all_images_paths = [os.path.join(IMAGE_FOLDER, f) for f in all_images] |
| | current_index = 0 |
| |
|
| | |
| | def switch_tile_csv(selected_tile): |
| | global CSV_FILE, df_annotations |
| |
|
| | tile_filename = os.path.basename(selected_tile["tile_path"]) |
| | tile_name, _ = os.path.splitext(tile_filename) |
| | CSV_FILE = os.path.join(OUTPUT_DIR, f"annotations_{tile_name}.csv") |
| |
|
| | |
| | if os.path.exists(CSV_FILE): |
| | df_annotations = pd.read_csv(CSV_FILE, dtype={"blob_id": str, "human_ocr": str}) |
| |
|
| | return CSV_FILE |
| |
|
| | |
| | def get_progress_text(): |
| | if not all_images_paths: |
| | return "No images loaded" |
| | return f"Image {current_index + 1} of {len(all_images_paths)}" |
| |
|
| | def get_current_image_path(): |
| | if 0 <= current_index < len(all_images_paths): |
| | return all_images_paths[current_index] |
| | return None |
| |
|
| | def get_annotation_for_image(image_path): |
| | blob_id = os.path.basename(image_path).replace("_margin", "") |
| | row = df_annotations[df_annotations["blob_id"] == blob_id] |
| | if not row.empty: |
| | return str(row["human_ocr"].values[-1]) |
| | return "" |
| |
|
| | def is_annotated_or_deleted(image_path): |
| | """Return True if image has an annotation or is deleted.""" |
| | blob_id = os.path.basename(image_path).replace("_margin", "") |
| | row = df_annotations[df_annotations["blob_id"] == blob_id] |
| | if not row.empty: |
| | val = str(row["human_ocr"].values[-1]).strip() |
| | return val != "" |
| | return False |
| |
|
| | def is_deleted(image_path): |
| | blob_id = os.path.basename(image_path).replace("_margin", "") |
| | row = df_annotations[df_annotations["blob_id"] == blob_id] |
| | if not row.empty: |
| | return str(row["human_ocr"].values[-1]).strip() == "DELETED" |
| | return False |
| |
|
| | def all_processed(): |
| | """Return True if all images are either annotated or deleted.""" |
| | return all(is_annotated_or_deleted(p) for p in all_images_paths) |
| |
|
| | def find_next_unprocessed_index(start): |
| | """Return the next image index that is neither annotated nor deleted.""" |
| | n = len(all_images_paths) |
| | idx = start |
| | for _ in range(n): |
| | idx = (idx + 1) % n |
| | if not is_annotated_or_deleted(all_images_paths[idx]): |
| | return idx |
| | return None |
| |
|
| | |
| | def save_annotation(user_text): |
| | """Save the current annotation for the active image.""" |
| | global df_annotations |
| | img_path = get_current_image_path() |
| | if not img_path: |
| | return |
| |
|
| | blob_id = os.path.basename(img_path).replace("_margin", "") |
| | text_value = user_text.strip() if user_text else "" |
| |
|
| | row_idx = df_annotations.index[df_annotations["blob_id"] == blob_id].tolist() |
| | if row_idx: |
| | df_annotations.at[row_idx[0], "human_ocr"] = text_value |
| | else: |
| | df_annotations = pd.concat( |
| | [df_annotations, pd.DataFrame([{"blob_id": blob_id, "human_ocr": text_value}])], |
| | ignore_index=True |
| | ) |
| |
|
| | df_annotations.to_csv(CSV_FILE, index=False) |
| |
|
| | def save_and_next(user_text): |
| | global current_index |
| | if get_current_image_path() is None: |
| | return None, "", gr.update(visible=True, value="No images available."), "No image loaded", "No images loaded" |
| |
|
| | save_annotation(user_text) |
| |
|
| | if all_processed(): |
| | current_index = 0 |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=True, value="All images annotated."), img_path, get_progress_text() |
| |
|
| | next_idx = find_next_unprocessed_index(current_index) |
| | current_index = next_idx if next_idx is not None else 0 |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=False), img_path, get_progress_text() |
| |
|
| | def previous_image(): |
| | global current_index |
| | if not all_images_paths: |
| | return None, "", gr.update(visible=True, value="No images available."), "No image loaded", "No images loaded" |
| |
|
| | current_index = (current_index - 1) % len(all_images_paths) |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=False), img_path, get_progress_text() |
| |
|
| | def delete_and_next(): |
| | """Mark current image as DELETED and move to next image.""" |
| | global current_index, df_annotations |
| | img_path = get_current_image_path() |
| | if not img_path: |
| | return None, "", gr.update(visible=True, value="No images available."), "No image loaded", "No images loaded" |
| |
|
| | blob_id = os.path.basename(img_path).replace("_margin", "") |
| | row_idx = df_annotations.index[df_annotations["blob_id"] == blob_id].tolist() |
| | if row_idx: |
| | df_annotations.at[row_idx[0], "human_ocr"] = "DELETED" |
| | else: |
| | df_annotations = pd.concat( |
| | [df_annotations, pd.DataFrame([{"blob_id": blob_id, "human_ocr": "DELETED"}])], |
| | ignore_index=True |
| | ) |
| |
|
| | df_annotations.to_csv(CSV_FILE, index=False) |
| |
|
| | if all_processed(): |
| | current_index = 0 |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=True, value="All images annotated."), img_path, get_progress_text() |
| |
|
| | next_idx = find_next_unprocessed_index(current_index) |
| | current_index = next_idx if next_idx is not None else 0 |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=False), img_path, get_progress_text() |
| |
|
| | def save_and_exit(user_text): |
| | if get_current_image_path() is not None: |
| | save_annotation(user_text) |
| | threading.Timer(1, lambda: os._exit(0)).start() |
| | return None, "", gr.update(visible=True, value="Session closed."), "", get_progress_text() |
| |
|
| | def refresh_image_list(selected_tile): |
| | """Reload images for the current tile and prepare the annotation CSV.""" |
| | global all_images_paths, current_index, df_annotations, CSV_FILE |
| |
|
| | |
| | tile_filename = os.path.basename(selected_tile["tile_path"]) |
| | tile_name, _ = os.path.splitext(tile_filename) |
| | CSV_FILE = os.path.join(OUTPUT_DIR, f"annotations_{tile_name}.csv") |
| |
|
| | |
| | if not os.path.exists(CSV_FILE): |
| | df_annotations = pd.DataFrame(columns=["blob_id", "human_ocr"]) |
| | df_annotations.to_csv(CSV_FILE, index=False) |
| | else: |
| | |
| | df_annotations = pd.read_csv(CSV_FILE, dtype={"blob_id": str, "human_ocr": str}) |
| |
|
| | |
| | all_images = [ |
| | f for f in os.listdir(IMAGE_FOLDER) |
| | if f.lower().endswith(('.png', '.jpg', '.jpeg')) and '_margin' in f |
| | ] |
| | all_images_paths = [os.path.join(IMAGE_FOLDER, f) for f in all_images] |
| | current_index = 0 |
| |
|
| | if not all_images_paths: |
| | return None, "", gr.update(visible=True, value="No images available."), "No image loaded", "No images loaded" |
| |
|
| | |
| | img_path = get_current_image_path() |
| | annotation = get_annotation_for_image(img_path) |
| | return img_path, annotation, gr.update(visible=False), img_path, get_progress_text() |
| |
|
| |
|