import gradio as gr
import json
import os
import pickle
from PIL import Image
import tempfile
import time
import io
import shutil
import html
from datasets import load_dataset
import rarfile
# ========== CONFIGURATION ==========
HF_DATASET_REPO = "alyex/karnak-data-app"
# Persistent directories (survive Space restarts)
HF_ANNOTATIONS_DIR = "/data/annotations"
HF_HARD_NEGATIVES_DIR = "/data/hard_negatives"
SEGMENT_EXTRACT_DIR = "/data/kiu_segment_metadata"
# Temporary cache (doesn't need to persist)
CACHE_DIR = os.path.join(tempfile.gettempdir(), "karnak_cache")
os.makedirs(HF_ANNOTATIONS_DIR, exist_ok=True)
os.makedirs(HF_HARD_NEGATIVES_DIR, exist_ok=True)
os.makedirs(SEGMENT_EXTRACT_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
MANIFEST_FILE = os.path.join(CACHE_DIR, "manifest.pkl")
class DataManager:
"""Handles all data loading and caching"""
def __init__(self):
self.dataset = None
self.segment_data = {}
self.manifest = []
self._load_segment_metadata()
self._build_manifest()
self._init_streaming_dataset()
def _init_streaming_dataset(self):
"""Initialize streaming dataset"""
print("Initializing streaming dataset...")
try:
self.dataset = load_dataset(HF_DATASET_REPO, split="train", streaming=True)
print(f"✅ Streaming dataset initialized")
except Exception as e:
print(f"❌ Failed to load dataset: {e}")
self.dataset = None
def _load_segment_metadata(self):
"""Load segment metadata from RAR file containing multiple JSON files"""
RAR_FILE = "kiu_segment_metadata.rar"
EXTRACT_DIR = SEGMENT_EXTRACT_DIR
# Check if already extracted
if os.path.exists(EXTRACT_DIR) and os.listdir(EXTRACT_DIR):
json_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')]
if json_files:
print(f"Loading cached segment metadata from {len(json_files)} files...")
self.segment_data = {}
for json_file in json_files:
filepath = os.path.join(EXTRACT_DIR, json_file)
try:
with open(filepath, 'r') as f:
data = json.load(f)
# Use filename as key (e.g., "KIU_00001_segments")
key = json_file.replace('.json', '')
self.segment_data[key] = data
except Exception as e:
print(f"⚠️ Error loading {json_file}: {e}")
print(f"✅ Loaded {len(self.segment_data)} segment entries from cache")
return
# Extract from RAR
if not os.path.exists(RAR_FILE):
print(f"❌ {RAR_FILE} not found in Space root")
print(f"Current directory: {os.getcwd()}")
print(f"Files available: {os.listdir('.')}")
return
print(f"Extracting {RAR_FILE}...")
try:
# Set rarfile to use unrar
rarfile.UNRAR_TOOL = "unrar"
with rarfile.RarFile(RAR_FILE) as rf:
# List all files
all_files = [f.filename for f in rf.infolist()]
json_files = [f for f in all_files if f.endswith('.json')]
print(f"Found {len(json_files)} JSON files in RAR")
if not json_files:
print(f"❌ No JSON files found in RAR")
print(f"Files in RAR: {all_files[:10]}...") # Show first 10
return
# Extract all JSON files
os.makedirs(EXTRACT_DIR, exist_ok=True)
for json_file in json_files:
try:
rf.extract(json_file, EXTRACT_DIR)
except Exception as e:
print(f"⚠️ Error extracting {json_file}: {e}")
# Try reading directly
try:
with rf.open(json_file) as f:
content = f.read()
# Get just the filename without path
filename = os.path.basename(json_file)
output_path = os.path.join(EXTRACT_DIR, filename)
with open(output_path, 'wb') as out:
out.write(content)
except Exception as e2:
print(f"❌ Failed to extract {json_file}: {e2}")
continue
# Now load all extracted JSON files
self.segment_data = {}
extracted_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')]
print(f"Loading {len(extracted_files)} extracted JSON files...")
for json_file in extracted_files:
filepath = os.path.join(EXTRACT_DIR, json_file)
try:
with open(filepath, 'r') as f:
data = json.load(f)
# Use filename as key (e.g., "KIU_00001_segments")
key = json_file.replace('.json', '')
self.segment_data[key] = data
except Exception as e:
print(f"⚠️ Error loading {json_file}: {e}")
print(f"✅ Extracted and loaded {len(self.segment_data)} segment entries")
except rarfile.RarCannotExec as e:
print(f"❌ RAR tool not available: {e}")
print("⚠️ SOLUTION: Ensure packages.txt contains 'unrar'")
print(" Or extract manually and upload JSON files to:")
print(f" {EXTRACT_DIR}/")
except Exception as e:
print(f"❌ Error with RAR file: {e}")
import traceback
traceback.print_exc()
def _build_manifest(self):
"""Build manifest of all line instances from multiple JSON files"""
if not self.segment_data:
print("❌ No segment data - cannot build manifest")
return
print("Building manifest from segment metadata...")
self.manifest = []
# self.segment_data is now a dict where:
# key = "KIU_00001_segments" (filename without .json)
# value = the JSON content for that KIU
for key, segment_info in self.segment_data.items():
if not isinstance(segment_info, dict):
continue
# Extract KIU ID from key (e.g., "KIU_00001_segments" -> "00001")
kiu_id = None
if 'KIU_' in key:
try:
# Extract the numeric part after KIU_
parts = key.split('_')
for part in parts:
if part.isdigit() or (part.startswith('0') and part[1:].isdigit()):
kiu_id = part
break
except:
pass
# Also check if kiu_id is in the data itself
if not kiu_id and 'kiu_id' in segment_info:
kiu_id = str(segment_info['kiu_id'])
if not kiu_id:
print(f"⚠️ Could not extract KIU ID from key: {key}")
continue
# Ensure it's zero-padded to 5 digits
kiu_id = str(kiu_id).zfill(5)
# Process line instances
instances = segment_info.get('instances', [])
for instance in instances:
if instance.get('class') == 'Line':
self.manifest.append({
'kiu_id': kiu_id,
'instance_id': instance.get('instance_id'),
'crop_coords': instance.get('crop_coords', [0, 0, 100, 100]),
'direction': instance.get('direction', ''),
'annotated': instance.get('annotated', False),
'line_number': instance.get('line_number'),
'segment_key': key
})
print(f"✅ Built manifest with {len(self.manifest)} line instances")
# Save manifest
with open(MANIFEST_FILE, 'wb') as f:
pickle.dump(self.manifest, f)
def get_kiu_data(self, kiu_id):
"""
Stream through dataset to find KIU
Returns: (html_content, image_pil) or (None, None)
"""
if not self.dataset:
return None, None
target_kiu = str(kiu_id).zfill(5)
try:
# Stream through dataset
for item in self.dataset:
if item['kiu_id'] == target_kiu:
html_content = item['html']
# Convert bytes to PIL Image
try:
image_pil = Image.open(io.BytesIO(item['image']))
return html_content, image_pil
except Exception as e:
print(f"❌ Error decoding image bytes for KIU {target_kiu}: {e}")
return html_content, None
print(f"❌ KIU {target_kiu} not found in dataset")
return None, None
except Exception as e:
print(f"❌ Error streaming dataset: {e}")
return None, None
class AnnotationApp:
"""Main annotation application"""
def __init__(self):
self.data_mgr = DataManager()
self.current_index = 0
self.history = []
# Jump to first unannotated
self.current_index = self._find_first_unannotated()
def _needs_annotation(self, item):
"""Check if item needs annotation"""
if not item.get('annotated', False):
return True
if item.get('direction') == 'TTB_NEEDS_RECLASSIFICATION':
return True
return False
def _find_first_unannotated(self):
"""Find first unannotated item"""
for idx, item in enumerate(self.data_mgr.manifest):
if self._needs_annotation(item):
return idx
return 0
def _find_next_unannotated(self, from_index=None):
"""Find next unannotated item"""
if not self.data_mgr.manifest:
return 0
start = from_index if from_index is not None else self.current_index
# Search forward
for idx in range(start + 1, len(self.data_mgr.manifest)):
if self._needs_annotation(self.data_mgr.manifest[idx]):
return idx
# Wrap around
for idx in range(0, start):
if self._needs_annotation(self.data_mgr.manifest[idx]):
return idx
return start
def get_statistics(self):
"""Calculate annotation statistics"""
manifest = self.data_mgr.manifest
if not manifest:
return {
'total': 0, 'remaining': 0, 'processed': 0, 'progress_pct': 0,
'ltr': 0, 'rtl': 0, 'ttb_ltr': 0, 'ttb_rtl': 0,
'skip': 0, 'unclear': 0, 'ttb_reclass': 0
}
total = len(manifest)
remaining = sum(1 for item in manifest if self._needs_annotation(item))
processed = total - remaining
progress = (processed / total * 100) if total > 0 else 0
return {
'total': total,
'remaining': remaining,
'processed': processed,
'progress_pct': progress,
'ltr': sum(1 for i in manifest if i.get('direction') == 'LTR'),
'rtl': sum(1 for i in manifest if i.get('direction') == 'RTL'),
'ttb_ltr': sum(1 for i in manifest if i.get('direction') == 'TTB_LTR'),
'ttb_rtl': sum(1 for i in manifest if i.get('direction') == 'TTB_RTL'),
'skip': sum(1 for i in manifest if i.get('direction') == 'Skip'),
'unclear': sum(1 for i in manifest if i.get('direction') == 'Unclear'),
'ttb_reclass': sum(1 for i in manifest if i.get('direction') == 'TTB_NEEDS_RECLASSIFICATION')
}
def _get_annotation_path(self, item):
"""Get annotation file path"""
filename = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}.json"
return os.path.join(HF_ANNOTATIONS_DIR, filename)
def _save_annotation(self, item, direction, line_number):
"""Save annotation to disk"""
annotation_file = self._get_annotation_path(item)
data = {
'kiu_id': item['kiu_id'],
'instance_id': item['instance_id'],
'direction': direction,
'annotated': True,
'line_number': int(line_number) if line_number and line_number.strip() else None,
'timestamp': time.time(),
'crop_coords': item['crop_coords']
}
with open(annotation_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return annotation_file
def load_current(self):
"""
Load and display current instance
Returns: (image, html, info, line_number)
"""
manifest = self.data_mgr.manifest
if not manifest or self.current_index >= len(manifest):
return (
None,
"
⚠️ No items available
",
"No items",
""
)
item = manifest[self.current_index]
kiu_id = item['kiu_id']
print(f"\n{'='*60}")
print(f"Loading: Index {self.current_index + 1}/{len(manifest)}")
print(f"KIU: {kiu_id}, Instance: {item['instance_id']}")
# Get data from dataset (streaming)
html_content, full_image = self.data_mgr.get_kiu_data(kiu_id)
# Crop image
cropped_image = None
if full_image:
try:
x1, y1, x2, y2 = map(int, item['crop_coords'])
# Clamp to image bounds
x1 = max(0, min(x1, full_image.width))
y1 = max(0, min(y1, full_image.height))
x2 = max(0, min(x2, full_image.width))
y2 = max(0, min(y2, full_image.height))
if x2 > x1 and y2 > y1:
cropped_image = full_image.crop((x1, y1, x2, y2))
print(f"✅ Cropped: {cropped_image.size}")
# Resize if too tall
if cropped_image.height > 800:
ratio = 800 / cropped_image.height
new_w = int(cropped_image.width * ratio)
cropped_image = cropped_image.resize((new_w, 800), Image.LANCZOS)
else:
print(f"❌ Invalid crop coords")
except Exception as e:
print(f"❌ Crop error: {e}")
else:
print(f"❌ No image for KIU {kiu_id}")
# Format HTML
if html_content and html_content.strip():
escaped = html.escape(html_content[:5000])
if len(html_content) > 5000:
escaped += "\n\n... (truncated)"
html_display = f"""
📄 KIU {kiu_id} Reference HTML
{escaped}
"""
else:
html_display = f"""
⚠️ No HTML content available for KIU {kiu_id}
"""
# Load existing annotation
annotation_file = self._get_annotation_path(item)
existing_line_num = ""
if os.path.exists(annotation_file):
try:
with open(annotation_file, 'r') as f:
ann = json.load(f)
if ann.get('line_number'):
existing_line_num = str(ann['line_number'])
except:
pass
# Build info display
stats = self.get_statistics()
direction = item.get('direction', '')
annotated = item.get('annotated', False)
# Status indicator
if annotated:
if direction in ['LTR', 'RTL', 'TTB_LTR', 'TTB_RTL']:
status = f"✅ {direction}"
elif direction == 'TTB_NEEDS_RECLASSIFICATION':
status = "⚠️ NEEDS RECLASS"
elif direction == 'Skip':
status = "⏭️ Skipped"
elif direction == 'Unclear':
status = "❓ Unclear"
else:
status = f"✅ {direction}"
else:
status = "⏳ Pending"
line_info = f" | Line #{existing_line_num}" if existing_line_num else ""
info = f"""
**Instance {self.current_index + 1} / {stats['total']}** | KIU {kiu_id} | Instance {item['instance_id']}{line_info} | {status}
**Progress:** {stats['processed']}/{stats['total']} ({stats['progress_pct']:.1f}%) | **Remaining:** {stats['remaining']}
**Direction Counts:**
LTR: {stats['ltr']} | RTL: {stats['rtl']} | TTB+LTR: {stats['ttb_ltr']} | TTB+RTL: {stats['ttb_rtl']}
Skipped: {stats['skip']} | Unclear: {stats['unclear']} | Needs Reclass: {stats['ttb_reclass']}
"""
print(f"{'='*60}\n")
return cropped_image, html_display, info, existing_line_num
def annotate_and_next(self, direction, line_number):
"""Save annotation and move to next unannotated"""
manifest = self.data_mgr.manifest
if not manifest:
return self.load_current()
# Save to history for undo
self.history.append({
'index': self.current_index,
'item': manifest[self.current_index].copy()
})
# Save annotation
item = manifest[self.current_index]
self._save_annotation(item, direction, line_number)
# Update manifest
item['direction'] = direction
item['annotated'] = True
if line_number and line_number.strip():
try:
item['line_number'] = int(line_number.strip())
except:
pass
# Save manifest
with open(MANIFEST_FILE, 'wb') as f:
pickle.dump(manifest, f)
print(f"✅ Saved: {direction}")
# Move to next unannotated
self.current_index = self._find_next_unannotated()
return self.load_current()
def undo_last(self):
"""Undo last annotation"""
if not self.history:
print("⚠️ Nothing to undo")
return self.load_current()
last = self.history.pop()
self.current_index = last['index']
item = self.data_mgr.manifest[self.current_index]
# Delete annotation file
annotation_file = self._get_annotation_path(item)
if os.path.exists(annotation_file):
os.remove(annotation_file)
# Restore manifest item
self.data_mgr.manifest[self.current_index] = last['item']
# Save manifest
with open(MANIFEST_FILE, 'wb') as f:
pickle.dump(self.data_mgr.manifest, f)
print("✅ Undo successful")
return self.load_current()
def flag_hard_negative(self, line_number):
"""Flag as hard negative and save image"""
manifest = self.data_mgr.manifest
if not manifest:
return self.load_current()
item = manifest[self.current_index]
# Create directory
dir_name = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}"
dir_path = os.path.join(HF_HARD_NEGATIVES_DIR, dir_name)
os.makedirs(dir_path, exist_ok=True)
# Get and save cropped image
html_content, full_image = self.data_mgr.get_kiu_data(item['kiu_id'])
if full_image:
try:
x1, y1, x2, y2 = map(int, item['crop_coords'])
x1 = max(0, min(x1, full_image.width))
y1 = max(0, min(y1, full_image.height))
x2 = max(0, min(x2, full_image.width))
y2 = max(0, min(y2, full_image.height))
if x2 > x1 and y2 > y1:
cropped = full_image.crop((x1, y1, x2, y2))
cropped.save(os.path.join(dir_path, "hard_negative.jpg"), "JPEG")
except Exception as e:
print(f"❌ Error saving hard negative image: {e}")
# Save metadata
metadata = {
'kiu_id': item['kiu_id'],
'instance_id': item['instance_id'],
'crop_coords': item['crop_coords'],
'flagged_as_hard_negative': True,
'timestamp': time.time()
}
with open(os.path.join(dir_path, "metadata.json"), 'w') as f:
json.dump(metadata, f, indent=2)
# Save as annotation
self._save_annotation(item, "HardNegative", line_number)
item['direction'] = 'HardNegative'
item['annotated'] = True
with open(MANIFEST_FILE, 'wb') as f:
pickle.dump(manifest, f)
print("✅ Flagged as hard negative")
# Move to next
self.current_index = self._find_next_unannotated()
return self.load_current()
# Navigation methods
def go_back(self):
self.current_index = max(0, self.current_index - 1)
return self.load_current()
def go_forward(self):
manifest = self.data_mgr.manifest
self.current_index = min(len(manifest) - 1, self.current_index + 1)
return self.load_current()
def jump_to_next_unannotated(self):
self.current_index = self._find_next_unannotated()
return self.load_current()
def jump_to_index(self, target):
try:
idx = int(target) - 1
if 0 <= idx < len(self.data_mgr.manifest):
self.current_index = idx
except:
pass
return self.load_current()
def jump_to_kiu(self, kiu_id):
target = kiu_id.strip().zfill(5)
for idx, item in enumerate(self.data_mgr.manifest):
if item['kiu_id'] == target:
self.current_index = idx
break
return self.load_current()
def jump_to_ttb_reclass(self):
"""Find next TTB needing reclassification"""
manifest = self.data_mgr.manifest
# Search forward
for idx in range(self.current_index + 1, len(manifest)):
if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION':
self.current_index = idx
return self.load_current()
# Wrap around
for idx in range(0, self.current_index):
if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION':
self.current_index = idx
return self.load_current()
print("⚠️ No TTB reclassification items found")
return self.load_current()
def export_annotations(self):
"""Export all annotations to single JSON"""
annotations = []
for filename in os.listdir(HF_ANNOTATIONS_DIR):
if filename.endswith('.json') and filename != 'all_annotations.json':
filepath = os.path.join(HF_ANNOTATIONS_DIR, filename)
try:
with open(filepath, 'r') as f:
annotations.append(json.load(f))
except:
pass
export_file = os.path.join(HF_ANNOTATIONS_DIR, "all_annotations.json")
with open(export_file, 'w', encoding='utf-8') as f:
json.dump(annotations, f, indent=2, ensure_ascii=False)
return f"✅ Exported {len(annotations)} annotations to `{export_file}`"
# Initialize app
app_state = AnnotationApp()
# Build Gradio UI
with gr.Blocks(title="⚡ Hieroglyph Annotation") as demo:
gr.Markdown("# ⚡ Hieroglyph Direction Annotation Tool")
# Check if data is loaded
stats = app_state.get_statistics()
data_loaded = len(app_state.data_mgr.manifest) > 0
if not data_loaded:
gr.HTML(f"""
⚠️ No Segment Data Loaded
The segment metadata could not be loaded from the RAR file.
Solution Options:
- Install unrar tool:
Add to your Space's Dockerfile or requirements:
RUN apt-get update && apt-get install -y unrar
- Upload extracted JSON directly:
Extract kiu_segment_metadata.json locally and upload to:
{SEGMENT_EXTRACT_DIR}/kiu_segment_metadata.json
- Use a different format:
Upload the JSON file as a regular file in your Space repo instead of RAR.
Current Status:
- Dataset: {HF_DATASET_REPO} - {'✅ Loaded' if app_state.data_mgr.dataset else '❌ Failed'}
- Segment Metadata: ❌ Not loaded
- Manifest: {len(app_state.data_mgr.manifest)} instances
""")
else:
gr.HTML(f"""
📊 System Status
Dataset: {HF_DATASET_REPO}
Total Instances: {stats['total']:,}
Remaining: {stats['remaining']:,}
Progress: {stats['progress_pct']:.1f}%
({stats['processed']:,} / {stats['total']:,} annotated)
""")
with gr.Row():
with gr.Column(scale=2):
image_display = gr.Image(label="📸 Line Instance", type="pil", height=500)
line_number_input = gr.Textbox(
label="📝 Line Number (optional)",
placeholder="Enter line number: 1, 2, 3...",
max_lines=1
)
gr.Markdown("### 🎯 Annotation")
with gr.Row():
ltr_btn = gr.Button("➡️ LTR", variant="primary", size="lg")
rtl_btn = gr.Button("⬅️ RTL", variant="primary", size="lg")
with gr.Row():
ttb_ltr_btn = gr.Button("⬇️➡️ TTB+LTR", variant="primary")
ttb_rtl_btn = gr.Button("⬇️⬅️ TTB+RTL", variant="primary")
with gr.Row():
skip_btn = gr.Button("⏭️ Skip", variant="secondary")
unclear_btn = gr.Button("❓ Unclear", variant="secondary")
hard_neg_btn = gr.Button("🚫 Bad Detection", variant="stop")
with gr.Column(scale=1):
html_display = gr.HTML(label="📄 Reference")
info_display = gr.Markdown()
gr.Markdown("---")
gr.Markdown("### 🧭 Navigation & Controls")
with gr.Row():
back_btn = gr.Button("⬅️ Previous")
forward_btn = gr.Button("➡️ Next")
undo_btn = gr.Button("↩️ Undo Last")
next_unann_btn = gr.Button("⏭️ Next Unannotated", variant="primary")
export_btn = gr.Button("💾 Export All", variant="secondary")
with gr.Row():
with gr.Column(scale=2):
jump_input = gr.Textbox(label="Jump to Index", placeholder="e.g., 123")
with gr.Column(scale=1):
jump_btn = gr.Button("Go")
with gr.Column(scale=2):
kiu_input = gr.Textbox(label="Find KIU", placeholder="e.g., 00001")
with gr.Column(scale=1):
kiu_btn = gr.Button("Find")
with gr.Column(scale=2):
ttb_btn = gr.Button("Find TTB Reclass", variant="secondary")
export_output = gr.Markdown()
# Event handlers
outputs = [image_display, html_display, info_display, line_number_input]
ltr_btn.click(lambda ln: app_state.annotate_and_next("LTR", ln),
inputs=[line_number_input], outputs=outputs)
rtl_btn.click(lambda ln: app_state.annotate_and_next("RTL", ln),
inputs=[line_number_input], outputs=outputs)
ttb_ltr_btn.click(lambda ln: app_state.annotate_and_next("TTB_LTR", ln),
inputs=[line_number_input], outputs=outputs)
ttb_rtl_btn.click(lambda ln: app_state.annotate_and_next("TTB_RTL", ln),
inputs=[line_number_input], outputs=outputs)
skip_btn.click(lambda ln: app_state.annotate_and_next("Skip", ln),
inputs=[line_number_input], outputs=outputs)
unclear_btn.click(lambda ln: app_state.annotate_and_next("Unclear", ln),
inputs=[line_number_input], outputs=outputs)
hard_neg_btn.click(app_state.flag_hard_negative,
inputs=[line_number_input], outputs=outputs)
back_btn.click(app_state.go_back, outputs=outputs)
forward_btn.click(app_state.go_forward, outputs=outputs)
undo_btn.click(app_state.undo_last, outputs=outputs)
next_unann_btn.click(app_state.jump_to_next_unannotated, outputs=outputs)
export_btn.click(app_state.export_annotations, outputs=[export_output])
jump_btn.click(app_state.jump_to_index, inputs=[jump_input], outputs=outputs)
kiu_btn.click(app_state.jump_to_kiu, inputs=[kiu_input], outputs=outputs)
ttb_btn.click(app_state.jump_to_ttb_reclass, outputs=outputs)
# Load initial state
demo.load(app_state.load_current, outputs=outputs)
if __name__ == "__main__":
stats = app_state.get_statistics()
print("\n" + "="*80)
print("⚡ HIEROGLYPH ANNOTATION TOOL")
print("="*80)
print(f"Dataset: {HF_DATASET_REPO}")
print(f"Total Instances: {stats['total']:,}")
print(f"Remaining: {stats['remaining']:,}")
print(f"Progress: {stats['progress_pct']:.1f}%")
print("="*80 + "\n")
demo.launch(server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft())