|
|
import gradio as gr |
|
|
import json |
|
|
import os |
|
|
import pickle |
|
|
from PIL import Image |
|
|
import tempfile |
|
|
import time |
|
|
import io |
|
|
import shutil |
|
|
import html |
|
|
|
|
|
from datasets import load_dataset |
|
|
import rarfile |
|
|
|
|
|
|
|
|
HF_DATASET_REPO = "alyex/karnak-data-app" |
|
|
|
|
|
|
|
|
HF_ANNOTATIONS_DIR = "/data/annotations" |
|
|
HF_HARD_NEGATIVES_DIR = "/data/hard_negatives" |
|
|
SEGMENT_EXTRACT_DIR = "/data/kiu_segment_metadata" |
|
|
|
|
|
|
|
|
CACHE_DIR = os.path.join(tempfile.gettempdir(), "karnak_cache") |
|
|
|
|
|
os.makedirs(HF_ANNOTATIONS_DIR, exist_ok=True) |
|
|
os.makedirs(HF_HARD_NEGATIVES_DIR, exist_ok=True) |
|
|
os.makedirs(SEGMENT_EXTRACT_DIR, exist_ok=True) |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
|
|
|
MANIFEST_FILE = os.path.join(CACHE_DIR, "manifest.pkl") |
|
|
|
|
|
|
|
|
class DataManager: |
|
|
"""Handles all data loading and caching""" |
|
|
|
|
|
def __init__(self): |
|
|
self.dataset = None |
|
|
self.segment_data = {} |
|
|
self.manifest = [] |
|
|
|
|
|
self._load_segment_metadata() |
|
|
self._build_manifest() |
|
|
self._init_streaming_dataset() |
|
|
|
|
|
def _init_streaming_dataset(self): |
|
|
"""Initialize streaming dataset""" |
|
|
print("Initializing streaming dataset...") |
|
|
try: |
|
|
self.dataset = load_dataset(HF_DATASET_REPO, split="train", streaming=True) |
|
|
print(f"✅ Streaming dataset initialized") |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load dataset: {e}") |
|
|
self.dataset = None |
|
|
|
|
|
def _load_segment_metadata(self): |
|
|
"""Load segment metadata from RAR file containing multiple JSON files""" |
|
|
RAR_FILE = "kiu_segment_metadata.rar" |
|
|
EXTRACT_DIR = SEGMENT_EXTRACT_DIR |
|
|
|
|
|
|
|
|
if os.path.exists(EXTRACT_DIR) and os.listdir(EXTRACT_DIR): |
|
|
json_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')] |
|
|
if json_files: |
|
|
print(f"Loading cached segment metadata from {len(json_files)} files...") |
|
|
self.segment_data = {} |
|
|
|
|
|
for json_file in json_files: |
|
|
filepath = os.path.join(EXTRACT_DIR, json_file) |
|
|
try: |
|
|
with open(filepath, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
key = json_file.replace('.json', '') |
|
|
self.segment_data[key] = data |
|
|
except Exception as e: |
|
|
print(f"⚠️ Error loading {json_file}: {e}") |
|
|
|
|
|
print(f"✅ Loaded {len(self.segment_data)} segment entries from cache") |
|
|
return |
|
|
|
|
|
|
|
|
if not os.path.exists(RAR_FILE): |
|
|
print(f"❌ {RAR_FILE} not found in Space root") |
|
|
print(f"Current directory: {os.getcwd()}") |
|
|
print(f"Files available: {os.listdir('.')}") |
|
|
return |
|
|
|
|
|
print(f"Extracting {RAR_FILE}...") |
|
|
|
|
|
try: |
|
|
|
|
|
rarfile.UNRAR_TOOL = "unrar" |
|
|
|
|
|
with rarfile.RarFile(RAR_FILE) as rf: |
|
|
|
|
|
all_files = [f.filename for f in rf.infolist()] |
|
|
json_files = [f for f in all_files if f.endswith('.json')] |
|
|
|
|
|
print(f"Found {len(json_files)} JSON files in RAR") |
|
|
|
|
|
if not json_files: |
|
|
print(f"❌ No JSON files found in RAR") |
|
|
print(f"Files in RAR: {all_files[:10]}...") |
|
|
return |
|
|
|
|
|
|
|
|
os.makedirs(EXTRACT_DIR, exist_ok=True) |
|
|
|
|
|
for json_file in json_files: |
|
|
try: |
|
|
rf.extract(json_file, EXTRACT_DIR) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Error extracting {json_file}: {e}") |
|
|
|
|
|
try: |
|
|
with rf.open(json_file) as f: |
|
|
content = f.read() |
|
|
|
|
|
filename = os.path.basename(json_file) |
|
|
output_path = os.path.join(EXTRACT_DIR, filename) |
|
|
with open(output_path, 'wb') as out: |
|
|
out.write(content) |
|
|
except Exception as e2: |
|
|
print(f"❌ Failed to extract {json_file}: {e2}") |
|
|
continue |
|
|
|
|
|
|
|
|
self.segment_data = {} |
|
|
extracted_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')] |
|
|
|
|
|
print(f"Loading {len(extracted_files)} extracted JSON files...") |
|
|
|
|
|
for json_file in extracted_files: |
|
|
filepath = os.path.join(EXTRACT_DIR, json_file) |
|
|
try: |
|
|
with open(filepath, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
key = json_file.replace('.json', '') |
|
|
self.segment_data[key] = data |
|
|
except Exception as e: |
|
|
print(f"⚠️ Error loading {json_file}: {e}") |
|
|
|
|
|
print(f"✅ Extracted and loaded {len(self.segment_data)} segment entries") |
|
|
|
|
|
except rarfile.RarCannotExec as e: |
|
|
print(f"❌ RAR tool not available: {e}") |
|
|
print("⚠️ SOLUTION: Ensure packages.txt contains 'unrar'") |
|
|
print(" Or extract manually and upload JSON files to:") |
|
|
print(f" {EXTRACT_DIR}/") |
|
|
except Exception as e: |
|
|
print(f"❌ Error with RAR file: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
def _build_manifest(self): |
|
|
"""Build manifest of all line instances from multiple JSON files""" |
|
|
if not self.segment_data: |
|
|
print("❌ No segment data - cannot build manifest") |
|
|
return |
|
|
|
|
|
print("Building manifest from segment metadata...") |
|
|
self.manifest = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for key, segment_info in self.segment_data.items(): |
|
|
if not isinstance(segment_info, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
kiu_id = None |
|
|
if 'KIU_' in key: |
|
|
try: |
|
|
|
|
|
parts = key.split('_') |
|
|
for part in parts: |
|
|
if part.isdigit() or (part.startswith('0') and part[1:].isdigit()): |
|
|
kiu_id = part |
|
|
break |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
if not kiu_id and 'kiu_id' in segment_info: |
|
|
kiu_id = str(segment_info['kiu_id']) |
|
|
|
|
|
if not kiu_id: |
|
|
print(f"⚠️ Could not extract KIU ID from key: {key}") |
|
|
continue |
|
|
|
|
|
|
|
|
kiu_id = str(kiu_id).zfill(5) |
|
|
|
|
|
|
|
|
instances = segment_info.get('instances', []) |
|
|
|
|
|
for instance in instances: |
|
|
if instance.get('class') == 'Line': |
|
|
self.manifest.append({ |
|
|
'kiu_id': kiu_id, |
|
|
'instance_id': instance.get('instance_id'), |
|
|
'crop_coords': instance.get('crop_coords', [0, 0, 100, 100]), |
|
|
'direction': instance.get('direction', ''), |
|
|
'annotated': instance.get('annotated', False), |
|
|
'line_number': instance.get('line_number'), |
|
|
'segment_key': key |
|
|
}) |
|
|
|
|
|
print(f"✅ Built manifest with {len(self.manifest)} line instances") |
|
|
|
|
|
|
|
|
with open(MANIFEST_FILE, 'wb') as f: |
|
|
pickle.dump(self.manifest, f) |
|
|
|
|
|
def get_kiu_data(self, kiu_id): |
|
|
""" |
|
|
Stream through dataset to find KIU |
|
|
Returns: (html_content, image_pil) or (None, None) |
|
|
""" |
|
|
if not self.dataset: |
|
|
return None, None |
|
|
|
|
|
target_kiu = str(kiu_id).zfill(5) |
|
|
|
|
|
try: |
|
|
|
|
|
for item in self.dataset: |
|
|
if item['kiu_id'] == target_kiu: |
|
|
html_content = item['html'] |
|
|
|
|
|
|
|
|
try: |
|
|
image_pil = Image.open(io.BytesIO(item['image'])) |
|
|
return html_content, image_pil |
|
|
except Exception as e: |
|
|
print(f"❌ Error decoding image bytes for KIU {target_kiu}: {e}") |
|
|
return html_content, None |
|
|
|
|
|
print(f"❌ KIU {target_kiu} not found in dataset") |
|
|
return None, None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error streaming dataset: {e}") |
|
|
return None, None |
|
|
|
|
|
|
|
|
class AnnotationApp: |
|
|
"""Main annotation application""" |
|
|
|
|
|
def __init__(self): |
|
|
self.data_mgr = DataManager() |
|
|
self.current_index = 0 |
|
|
self.history = [] |
|
|
|
|
|
|
|
|
self.current_index = self._find_first_unannotated() |
|
|
|
|
|
def _needs_annotation(self, item): |
|
|
"""Check if item needs annotation""" |
|
|
if not item.get('annotated', False): |
|
|
return True |
|
|
if item.get('direction') == 'TTB_NEEDS_RECLASSIFICATION': |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _find_first_unannotated(self): |
|
|
"""Find first unannotated item""" |
|
|
for idx, item in enumerate(self.data_mgr.manifest): |
|
|
if self._needs_annotation(item): |
|
|
return idx |
|
|
return 0 |
|
|
|
|
|
def _find_next_unannotated(self, from_index=None): |
|
|
"""Find next unannotated item""" |
|
|
if not self.data_mgr.manifest: |
|
|
return 0 |
|
|
|
|
|
start = from_index if from_index is not None else self.current_index |
|
|
|
|
|
|
|
|
for idx in range(start + 1, len(self.data_mgr.manifest)): |
|
|
if self._needs_annotation(self.data_mgr.manifest[idx]): |
|
|
return idx |
|
|
|
|
|
|
|
|
for idx in range(0, start): |
|
|
if self._needs_annotation(self.data_mgr.manifest[idx]): |
|
|
return idx |
|
|
|
|
|
return start |
|
|
|
|
|
def get_statistics(self): |
|
|
"""Calculate annotation statistics""" |
|
|
manifest = self.data_mgr.manifest |
|
|
if not manifest: |
|
|
return { |
|
|
'total': 0, 'remaining': 0, 'processed': 0, 'progress_pct': 0, |
|
|
'ltr': 0, 'rtl': 0, 'ttb_ltr': 0, 'ttb_rtl': 0, |
|
|
'skip': 0, 'unclear': 0, 'ttb_reclass': 0 |
|
|
} |
|
|
|
|
|
total = len(manifest) |
|
|
remaining = sum(1 for item in manifest if self._needs_annotation(item)) |
|
|
processed = total - remaining |
|
|
progress = (processed / total * 100) if total > 0 else 0 |
|
|
|
|
|
return { |
|
|
'total': total, |
|
|
'remaining': remaining, |
|
|
'processed': processed, |
|
|
'progress_pct': progress, |
|
|
'ltr': sum(1 for i in manifest if i.get('direction') == 'LTR'), |
|
|
'rtl': sum(1 for i in manifest if i.get('direction') == 'RTL'), |
|
|
'ttb_ltr': sum(1 for i in manifest if i.get('direction') == 'TTB_LTR'), |
|
|
'ttb_rtl': sum(1 for i in manifest if i.get('direction') == 'TTB_RTL'), |
|
|
'skip': sum(1 for i in manifest if i.get('direction') == 'Skip'), |
|
|
'unclear': sum(1 for i in manifest if i.get('direction') == 'Unclear'), |
|
|
'ttb_reclass': sum(1 for i in manifest if i.get('direction') == 'TTB_NEEDS_RECLASSIFICATION') |
|
|
} |
|
|
|
|
|
def _get_annotation_path(self, item): |
|
|
"""Get annotation file path""" |
|
|
filename = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}.json" |
|
|
return os.path.join(HF_ANNOTATIONS_DIR, filename) |
|
|
|
|
|
def _save_annotation(self, item, direction, line_number): |
|
|
"""Save annotation to disk""" |
|
|
annotation_file = self._get_annotation_path(item) |
|
|
|
|
|
data = { |
|
|
'kiu_id': item['kiu_id'], |
|
|
'instance_id': item['instance_id'], |
|
|
'direction': direction, |
|
|
'annotated': True, |
|
|
'line_number': int(line_number) if line_number and line_number.strip() else None, |
|
|
'timestamp': time.time(), |
|
|
'crop_coords': item['crop_coords'] |
|
|
} |
|
|
|
|
|
with open(annotation_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
return annotation_file |
|
|
|
|
|
def load_current(self): |
|
|
""" |
|
|
Load and display current instance |
|
|
Returns: (image, html, info, line_number) |
|
|
""" |
|
|
manifest = self.data_mgr.manifest |
|
|
|
|
|
if not manifest or self.current_index >= len(manifest): |
|
|
return ( |
|
|
None, |
|
|
"<div style='padding: 20px; background: #fff3cd;'>⚠️ No items available</div>", |
|
|
"No items", |
|
|
"" |
|
|
) |
|
|
|
|
|
item = manifest[self.current_index] |
|
|
kiu_id = item['kiu_id'] |
|
|
|
|
|
print(f"\n{'='*60}") |
|
|
print(f"Loading: Index {self.current_index + 1}/{len(manifest)}") |
|
|
print(f"KIU: {kiu_id}, Instance: {item['instance_id']}") |
|
|
|
|
|
|
|
|
html_content, full_image = self.data_mgr.get_kiu_data(kiu_id) |
|
|
|
|
|
|
|
|
cropped_image = None |
|
|
if full_image: |
|
|
try: |
|
|
x1, y1, x2, y2 = map(int, item['crop_coords']) |
|
|
|
|
|
|
|
|
x1 = max(0, min(x1, full_image.width)) |
|
|
y1 = max(0, min(y1, full_image.height)) |
|
|
x2 = max(0, min(x2, full_image.width)) |
|
|
y2 = max(0, min(y2, full_image.height)) |
|
|
|
|
|
if x2 > x1 and y2 > y1: |
|
|
cropped_image = full_image.crop((x1, y1, x2, y2)) |
|
|
print(f"✅ Cropped: {cropped_image.size}") |
|
|
|
|
|
|
|
|
if cropped_image.height > 800: |
|
|
ratio = 800 / cropped_image.height |
|
|
new_w = int(cropped_image.width * ratio) |
|
|
cropped_image = cropped_image.resize((new_w, 800), Image.LANCZOS) |
|
|
else: |
|
|
print(f"❌ Invalid crop coords") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Crop error: {e}") |
|
|
else: |
|
|
print(f"❌ No image for KIU {kiu_id}") |
|
|
|
|
|
|
|
|
if html_content and html_content.strip(): |
|
|
escaped = html.escape(html_content[:5000]) |
|
|
if len(html_content) > 5000: |
|
|
escaped += "\n\n... (truncated)" |
|
|
|
|
|
html_display = f""" |
|
|
<div style="padding: 12px; background: #f8f9fa; border-radius: 8px; border: 1px solid #dee2e6;"> |
|
|
<div style="font-weight: bold; color: #495057; margin-bottom: 8px;"> |
|
|
📄 KIU {kiu_id} Reference HTML |
|
|
</div> |
|
|
<div style="max-height: 400px; overflow-y: auto; background: white; |
|
|
padding: 10px; border-radius: 4px; font-family: 'Courier New', monospace; |
|
|
font-size: 11px; line-height: 1.5; white-space: pre-wrap; word-wrap: break-word;"> |
|
|
{escaped} |
|
|
</div> |
|
|
</div> |
|
|
""" |
|
|
else: |
|
|
html_display = f""" |
|
|
<div style='padding: 20px; background: #fff3cd; border-radius: 8px; border: 1px solid #ffc107;'> |
|
|
⚠️ No HTML content available for KIU {kiu_id} |
|
|
</div> |
|
|
""" |
|
|
|
|
|
|
|
|
annotation_file = self._get_annotation_path(item) |
|
|
existing_line_num = "" |
|
|
if os.path.exists(annotation_file): |
|
|
try: |
|
|
with open(annotation_file, 'r') as f: |
|
|
ann = json.load(f) |
|
|
if ann.get('line_number'): |
|
|
existing_line_num = str(ann['line_number']) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
stats = self.get_statistics() |
|
|
direction = item.get('direction', '') |
|
|
annotated = item.get('annotated', False) |
|
|
|
|
|
|
|
|
if annotated: |
|
|
if direction in ['LTR', 'RTL', 'TTB_LTR', 'TTB_RTL']: |
|
|
status = f"✅ {direction}" |
|
|
elif direction == 'TTB_NEEDS_RECLASSIFICATION': |
|
|
status = "⚠️ NEEDS RECLASS" |
|
|
elif direction == 'Skip': |
|
|
status = "⏭️ Skipped" |
|
|
elif direction == 'Unclear': |
|
|
status = "❓ Unclear" |
|
|
else: |
|
|
status = f"✅ {direction}" |
|
|
else: |
|
|
status = "⏳ Pending" |
|
|
|
|
|
line_info = f" | Line #{existing_line_num}" if existing_line_num else "" |
|
|
|
|
|
info = f""" |
|
|
**Instance {self.current_index + 1} / {stats['total']}** | KIU {kiu_id} | Instance {item['instance_id']}{line_info} | {status} |
|
|
|
|
|
**Progress:** {stats['processed']}/{stats['total']} ({stats['progress_pct']:.1f}%) | **Remaining:** {stats['remaining']} |
|
|
|
|
|
**Direction Counts:** |
|
|
LTR: {stats['ltr']} | RTL: {stats['rtl']} | TTB+LTR: {stats['ttb_ltr']} | TTB+RTL: {stats['ttb_rtl']} |
|
|
Skipped: {stats['skip']} | Unclear: {stats['unclear']} | Needs Reclass: {stats['ttb_reclass']} |
|
|
""" |
|
|
|
|
|
print(f"{'='*60}\n") |
|
|
|
|
|
return cropped_image, html_display, info, existing_line_num |
|
|
|
|
|
def annotate_and_next(self, direction, line_number): |
|
|
"""Save annotation and move to next unannotated""" |
|
|
manifest = self.data_mgr.manifest |
|
|
if not manifest: |
|
|
return self.load_current() |
|
|
|
|
|
|
|
|
self.history.append({ |
|
|
'index': self.current_index, |
|
|
'item': manifest[self.current_index].copy() |
|
|
}) |
|
|
|
|
|
|
|
|
item = manifest[self.current_index] |
|
|
self._save_annotation(item, direction, line_number) |
|
|
|
|
|
|
|
|
item['direction'] = direction |
|
|
item['annotated'] = True |
|
|
if line_number and line_number.strip(): |
|
|
try: |
|
|
item['line_number'] = int(line_number.strip()) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
with open(MANIFEST_FILE, 'wb') as f: |
|
|
pickle.dump(manifest, f) |
|
|
|
|
|
print(f"✅ Saved: {direction}") |
|
|
|
|
|
|
|
|
self.current_index = self._find_next_unannotated() |
|
|
|
|
|
return self.load_current() |
|
|
|
|
|
def undo_last(self): |
|
|
"""Undo last annotation""" |
|
|
if not self.history: |
|
|
print("⚠️ Nothing to undo") |
|
|
return self.load_current() |
|
|
|
|
|
last = self.history.pop() |
|
|
self.current_index = last['index'] |
|
|
|
|
|
item = self.data_mgr.manifest[self.current_index] |
|
|
|
|
|
|
|
|
annotation_file = self._get_annotation_path(item) |
|
|
if os.path.exists(annotation_file): |
|
|
os.remove(annotation_file) |
|
|
|
|
|
|
|
|
self.data_mgr.manifest[self.current_index] = last['item'] |
|
|
|
|
|
|
|
|
with open(MANIFEST_FILE, 'wb') as f: |
|
|
pickle.dump(self.data_mgr.manifest, f) |
|
|
|
|
|
print("✅ Undo successful") |
|
|
return self.load_current() |
|
|
|
|
|
def flag_hard_negative(self, line_number): |
|
|
"""Flag as hard negative and save image""" |
|
|
manifest = self.data_mgr.manifest |
|
|
if not manifest: |
|
|
return self.load_current() |
|
|
|
|
|
item = manifest[self.current_index] |
|
|
|
|
|
|
|
|
dir_name = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}" |
|
|
dir_path = os.path.join(HF_HARD_NEGATIVES_DIR, dir_name) |
|
|
os.makedirs(dir_path, exist_ok=True) |
|
|
|
|
|
|
|
|
html_content, full_image = self.data_mgr.get_kiu_data(item['kiu_id']) |
|
|
if full_image: |
|
|
try: |
|
|
x1, y1, x2, y2 = map(int, item['crop_coords']) |
|
|
x1 = max(0, min(x1, full_image.width)) |
|
|
y1 = max(0, min(y1, full_image.height)) |
|
|
x2 = max(0, min(x2, full_image.width)) |
|
|
y2 = max(0, min(y2, full_image.height)) |
|
|
|
|
|
if x2 > x1 and y2 > y1: |
|
|
cropped = full_image.crop((x1, y1, x2, y2)) |
|
|
cropped.save(os.path.join(dir_path, "hard_negative.jpg"), "JPEG") |
|
|
except Exception as e: |
|
|
print(f"❌ Error saving hard negative image: {e}") |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'kiu_id': item['kiu_id'], |
|
|
'instance_id': item['instance_id'], |
|
|
'crop_coords': item['crop_coords'], |
|
|
'flagged_as_hard_negative': True, |
|
|
'timestamp': time.time() |
|
|
} |
|
|
|
|
|
with open(os.path.join(dir_path, "metadata.json"), 'w') as f: |
|
|
json.dump(metadata, f, indent=2) |
|
|
|
|
|
|
|
|
self._save_annotation(item, "HardNegative", line_number) |
|
|
|
|
|
item['direction'] = 'HardNegative' |
|
|
item['annotated'] = True |
|
|
|
|
|
with open(MANIFEST_FILE, 'wb') as f: |
|
|
pickle.dump(manifest, f) |
|
|
|
|
|
print("✅ Flagged as hard negative") |
|
|
|
|
|
|
|
|
self.current_index = self._find_next_unannotated() |
|
|
return self.load_current() |
|
|
|
|
|
|
|
|
def go_back(self): |
|
|
self.current_index = max(0, self.current_index - 1) |
|
|
return self.load_current() |
|
|
|
|
|
def go_forward(self): |
|
|
manifest = self.data_mgr.manifest |
|
|
self.current_index = min(len(manifest) - 1, self.current_index + 1) |
|
|
return self.load_current() |
|
|
|
|
|
def jump_to_next_unannotated(self): |
|
|
self.current_index = self._find_next_unannotated() |
|
|
return self.load_current() |
|
|
|
|
|
def jump_to_index(self, target): |
|
|
try: |
|
|
idx = int(target) - 1 |
|
|
if 0 <= idx < len(self.data_mgr.manifest): |
|
|
self.current_index = idx |
|
|
except: |
|
|
pass |
|
|
return self.load_current() |
|
|
|
|
|
def jump_to_kiu(self, kiu_id): |
|
|
target = kiu_id.strip().zfill(5) |
|
|
for idx, item in enumerate(self.data_mgr.manifest): |
|
|
if item['kiu_id'] == target: |
|
|
self.current_index = idx |
|
|
break |
|
|
return self.load_current() |
|
|
|
|
|
def jump_to_ttb_reclass(self): |
|
|
"""Find next TTB needing reclassification""" |
|
|
manifest = self.data_mgr.manifest |
|
|
|
|
|
|
|
|
for idx in range(self.current_index + 1, len(manifest)): |
|
|
if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION': |
|
|
self.current_index = idx |
|
|
return self.load_current() |
|
|
|
|
|
|
|
|
for idx in range(0, self.current_index): |
|
|
if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION': |
|
|
self.current_index = idx |
|
|
return self.load_current() |
|
|
|
|
|
print("⚠️ No TTB reclassification items found") |
|
|
return self.load_current() |
|
|
|
|
|
def export_annotations(self): |
|
|
"""Export all annotations to single JSON""" |
|
|
annotations = [] |
|
|
|
|
|
for filename in os.listdir(HF_ANNOTATIONS_DIR): |
|
|
if filename.endswith('.json') and filename != 'all_annotations.json': |
|
|
filepath = os.path.join(HF_ANNOTATIONS_DIR, filename) |
|
|
try: |
|
|
with open(filepath, 'r') as f: |
|
|
annotations.append(json.load(f)) |
|
|
except: |
|
|
pass |
|
|
|
|
|
export_file = os.path.join(HF_ANNOTATIONS_DIR, "all_annotations.json") |
|
|
with open(export_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(annotations, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
return f"✅ Exported {len(annotations)} annotations to `{export_file}`" |
|
|
|
|
|
|
|
|
|
|
|
app_state = AnnotationApp() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="⚡ Hieroglyph Annotation") as demo: |
|
|
|
|
|
gr.Markdown("# ⚡ Hieroglyph Direction Annotation Tool") |
|
|
|
|
|
|
|
|
stats = app_state.get_statistics() |
|
|
data_loaded = len(app_state.data_mgr.manifest) > 0 |
|
|
|
|
|
if not data_loaded: |
|
|
gr.HTML(f""" |
|
|
<div style="background: #fff3cd; padding: 20px; border-radius: 10px; margin-bottom: 20px; border-left: 5px solid #ffc107;"> |
|
|
<h3 style="margin-top: 0; color: #856404;">⚠️ No Segment Data Loaded</h3> |
|
|
<p><strong>The segment metadata could not be loaded from the RAR file.</strong></p> |
|
|
|
|
|
<h4>Solution Options:</h4> |
|
|
<ol> |
|
|
<li><strong>Install unrar tool:</strong> |
|
|
<br>Add to your Space's Dockerfile or requirements: |
|
|
<pre style="background: #f8f9fa; padding: 10px; border-radius: 5px; margin: 10px 0;">RUN apt-get update && apt-get install -y unrar</pre> |
|
|
</li> |
|
|
<li><strong>Upload extracted JSON directly:</strong> |
|
|
<br>Extract <code>kiu_segment_metadata.json</code> locally and upload to: |
|
|
<pre style="background: #f8f9fa; padding: 10px; border-radius: 5px; margin: 10px 0;">{SEGMENT_EXTRACT_DIR}/kiu_segment_metadata.json</pre> |
|
|
</li> |
|
|
<li><strong>Use a different format:</strong> |
|
|
<br>Upload the JSON file as a regular file in your Space repo instead of RAR. |
|
|
</li> |
|
|
</ol> |
|
|
|
|
|
<p><strong>Current Status:</strong></p> |
|
|
<ul> |
|
|
<li>Dataset: {HF_DATASET_REPO} - {'✅ Loaded' if app_state.data_mgr.dataset else '❌ Failed'}</li> |
|
|
<li>Segment Metadata: ❌ Not loaded</li> |
|
|
<li>Manifest: {len(app_state.data_mgr.manifest)} instances</li> |
|
|
</ul> |
|
|
</div> |
|
|
""") |
|
|
else: |
|
|
gr.HTML(f""" |
|
|
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; padding: 15px; border-radius: 10px; margin-bottom: 20px;"> |
|
|
<h3 style="margin: 0 0 10px 0;">📊 System Status</h3> |
|
|
<div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 10px;"> |
|
|
<div><strong>Dataset:</strong> {HF_DATASET_REPO}</div> |
|
|
<div><strong>Total Instances:</strong> {stats['total']:,}</div> |
|
|
<div><strong>Remaining:</strong> {stats['remaining']:,}</div> |
|
|
</div> |
|
|
<div style="margin-top: 10px; background: rgba(255,255,255,0.2); |
|
|
padding: 8px; border-radius: 5px;"> |
|
|
<strong>Progress:</strong> {stats['progress_pct']:.1f}% |
|
|
({stats['processed']:,} / {stats['total']:,} annotated) |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
image_display = gr.Image(label="📸 Line Instance", type="pil", height=500) |
|
|
|
|
|
line_number_input = gr.Textbox( |
|
|
label="📝 Line Number (optional)", |
|
|
placeholder="Enter line number: 1, 2, 3...", |
|
|
max_lines=1 |
|
|
) |
|
|
|
|
|
gr.Markdown("### 🎯 Annotation") |
|
|
|
|
|
with gr.Row(): |
|
|
ltr_btn = gr.Button("➡️ LTR", variant="primary", size="lg") |
|
|
rtl_btn = gr.Button("⬅️ RTL", variant="primary", size="lg") |
|
|
|
|
|
with gr.Row(): |
|
|
ttb_ltr_btn = gr.Button("⬇️➡️ TTB+LTR", variant="primary") |
|
|
ttb_rtl_btn = gr.Button("⬇️⬅️ TTB+RTL", variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
skip_btn = gr.Button("⏭️ Skip", variant="secondary") |
|
|
unclear_btn = gr.Button("❓ Unclear", variant="secondary") |
|
|
hard_neg_btn = gr.Button("🚫 Bad Detection", variant="stop") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
html_display = gr.HTML(label="📄 Reference") |
|
|
|
|
|
info_display = gr.Markdown() |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### 🧭 Navigation & Controls") |
|
|
|
|
|
with gr.Row(): |
|
|
back_btn = gr.Button("⬅️ Previous") |
|
|
forward_btn = gr.Button("➡️ Next") |
|
|
undo_btn = gr.Button("↩️ Undo Last") |
|
|
next_unann_btn = gr.Button("⏭️ Next Unannotated", variant="primary") |
|
|
export_btn = gr.Button("💾 Export All", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
jump_input = gr.Textbox(label="Jump to Index", placeholder="e.g., 123") |
|
|
with gr.Column(scale=1): |
|
|
jump_btn = gr.Button("Go") |
|
|
with gr.Column(scale=2): |
|
|
kiu_input = gr.Textbox(label="Find KIU", placeholder="e.g., 00001") |
|
|
with gr.Column(scale=1): |
|
|
kiu_btn = gr.Button("Find") |
|
|
with gr.Column(scale=2): |
|
|
ttb_btn = gr.Button("Find TTB Reclass", variant="secondary") |
|
|
|
|
|
export_output = gr.Markdown() |
|
|
|
|
|
|
|
|
outputs = [image_display, html_display, info_display, line_number_input] |
|
|
|
|
|
ltr_btn.click(lambda ln: app_state.annotate_and_next("LTR", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
rtl_btn.click(lambda ln: app_state.annotate_and_next("RTL", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
ttb_ltr_btn.click(lambda ln: app_state.annotate_and_next("TTB_LTR", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
ttb_rtl_btn.click(lambda ln: app_state.annotate_and_next("TTB_RTL", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
skip_btn.click(lambda ln: app_state.annotate_and_next("Skip", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
unclear_btn.click(lambda ln: app_state.annotate_and_next("Unclear", ln), |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
hard_neg_btn.click(app_state.flag_hard_negative, |
|
|
inputs=[line_number_input], outputs=outputs) |
|
|
|
|
|
back_btn.click(app_state.go_back, outputs=outputs) |
|
|
forward_btn.click(app_state.go_forward, outputs=outputs) |
|
|
undo_btn.click(app_state.undo_last, outputs=outputs) |
|
|
next_unann_btn.click(app_state.jump_to_next_unannotated, outputs=outputs) |
|
|
export_btn.click(app_state.export_annotations, outputs=[export_output]) |
|
|
|
|
|
jump_btn.click(app_state.jump_to_index, inputs=[jump_input], outputs=outputs) |
|
|
kiu_btn.click(app_state.jump_to_kiu, inputs=[kiu_input], outputs=outputs) |
|
|
ttb_btn.click(app_state.jump_to_ttb_reclass, outputs=outputs) |
|
|
|
|
|
|
|
|
demo.load(app_state.load_current, outputs=outputs) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
stats = app_state.get_statistics() |
|
|
print("\n" + "="*80) |
|
|
print("⚡ HIEROGLYPH ANNOTATION TOOL") |
|
|
print("="*80) |
|
|
print(f"Dataset: {HF_DATASET_REPO}") |
|
|
print(f"Total Instances: {stats['total']:,}") |
|
|
print(f"Remaining: {stats['remaining']:,}") |
|
|
print(f"Progress: {stats['progress_pct']:.1f}%") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft()) |