import gradio as gr import json import os import pickle from PIL import Image import tempfile import time import io import shutil import html from datasets import load_dataset import rarfile # ========== CONFIGURATION ========== HF_DATASET_REPO = "alyex/karnak-data-app" # Persistent directories (survive Space restarts) HF_ANNOTATIONS_DIR = "/data/annotations" HF_HARD_NEGATIVES_DIR = "/data/hard_negatives" SEGMENT_EXTRACT_DIR = "/data/kiu_segment_metadata" # Temporary cache (doesn't need to persist) CACHE_DIR = os.path.join(tempfile.gettempdir(), "karnak_cache") os.makedirs(HF_ANNOTATIONS_DIR, exist_ok=True) os.makedirs(HF_HARD_NEGATIVES_DIR, exist_ok=True) os.makedirs(SEGMENT_EXTRACT_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True) MANIFEST_FILE = os.path.join(CACHE_DIR, "manifest.pkl") class DataManager: """Handles all data loading and caching""" def __init__(self): self.dataset = None self.segment_data = {} self.manifest = [] self._load_segment_metadata() self._build_manifest() self._init_streaming_dataset() def _init_streaming_dataset(self): """Initialize streaming dataset""" print("Initializing streaming dataset...") try: self.dataset = load_dataset(HF_DATASET_REPO, split="train", streaming=True) print(f"✅ Streaming dataset initialized") except Exception as e: print(f"❌ Failed to load dataset: {e}") self.dataset = None def _load_segment_metadata(self): """Load segment metadata from RAR file containing multiple JSON files""" RAR_FILE = "kiu_segment_metadata.rar" EXTRACT_DIR = SEGMENT_EXTRACT_DIR # Check if already extracted if os.path.exists(EXTRACT_DIR) and os.listdir(EXTRACT_DIR): json_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')] if json_files: print(f"Loading cached segment metadata from {len(json_files)} files...") self.segment_data = {} for json_file in json_files: filepath = os.path.join(EXTRACT_DIR, json_file) try: with open(filepath, 'r') as f: data = json.load(f) # Use filename as key (e.g., "KIU_00001_segments") key = json_file.replace('.json', '') self.segment_data[key] = data except Exception as e: print(f"⚠️ Error loading {json_file}: {e}") print(f"✅ Loaded {len(self.segment_data)} segment entries from cache") return # Extract from RAR if not os.path.exists(RAR_FILE): print(f"❌ {RAR_FILE} not found in Space root") print(f"Current directory: {os.getcwd()}") print(f"Files available: {os.listdir('.')}") return print(f"Extracting {RAR_FILE}...") try: # Set rarfile to use unrar rarfile.UNRAR_TOOL = "unrar" with rarfile.RarFile(RAR_FILE) as rf: # List all files all_files = [f.filename for f in rf.infolist()] json_files = [f for f in all_files if f.endswith('.json')] print(f"Found {len(json_files)} JSON files in RAR") if not json_files: print(f"❌ No JSON files found in RAR") print(f"Files in RAR: {all_files[:10]}...") # Show first 10 return # Extract all JSON files os.makedirs(EXTRACT_DIR, exist_ok=True) for json_file in json_files: try: rf.extract(json_file, EXTRACT_DIR) except Exception as e: print(f"⚠️ Error extracting {json_file}: {e}") # Try reading directly try: with rf.open(json_file) as f: content = f.read() # Get just the filename without path filename = os.path.basename(json_file) output_path = os.path.join(EXTRACT_DIR, filename) with open(output_path, 'wb') as out: out.write(content) except Exception as e2: print(f"❌ Failed to extract {json_file}: {e2}") continue # Now load all extracted JSON files self.segment_data = {} extracted_files = [f for f in os.listdir(EXTRACT_DIR) if f.endswith('.json')] print(f"Loading {len(extracted_files)} extracted JSON files...") for json_file in extracted_files: filepath = os.path.join(EXTRACT_DIR, json_file) try: with open(filepath, 'r') as f: data = json.load(f) # Use filename as key (e.g., "KIU_00001_segments") key = json_file.replace('.json', '') self.segment_data[key] = data except Exception as e: print(f"⚠️ Error loading {json_file}: {e}") print(f"✅ Extracted and loaded {len(self.segment_data)} segment entries") except rarfile.RarCannotExec as e: print(f"❌ RAR tool not available: {e}") print("⚠️ SOLUTION: Ensure packages.txt contains 'unrar'") print(" Or extract manually and upload JSON files to:") print(f" {EXTRACT_DIR}/") except Exception as e: print(f"❌ Error with RAR file: {e}") import traceback traceback.print_exc() def _build_manifest(self): """Build manifest of all line instances from multiple JSON files""" if not self.segment_data: print("❌ No segment data - cannot build manifest") return print("Building manifest from segment metadata...") self.manifest = [] # self.segment_data is now a dict where: # key = "KIU_00001_segments" (filename without .json) # value = the JSON content for that KIU for key, segment_info in self.segment_data.items(): if not isinstance(segment_info, dict): continue # Extract KIU ID from key (e.g., "KIU_00001_segments" -> "00001") kiu_id = None if 'KIU_' in key: try: # Extract the numeric part after KIU_ parts = key.split('_') for part in parts: if part.isdigit() or (part.startswith('0') and part[1:].isdigit()): kiu_id = part break except: pass # Also check if kiu_id is in the data itself if not kiu_id and 'kiu_id' in segment_info: kiu_id = str(segment_info['kiu_id']) if not kiu_id: print(f"⚠️ Could not extract KIU ID from key: {key}") continue # Ensure it's zero-padded to 5 digits kiu_id = str(kiu_id).zfill(5) # Process line instances instances = segment_info.get('instances', []) for instance in instances: if instance.get('class') == 'Line': self.manifest.append({ 'kiu_id': kiu_id, 'instance_id': instance.get('instance_id'), 'crop_coords': instance.get('crop_coords', [0, 0, 100, 100]), 'direction': instance.get('direction', ''), 'annotated': instance.get('annotated', False), 'line_number': instance.get('line_number'), 'segment_key': key }) print(f"✅ Built manifest with {len(self.manifest)} line instances") # Save manifest with open(MANIFEST_FILE, 'wb') as f: pickle.dump(self.manifest, f) def get_kiu_data(self, kiu_id): """ Stream through dataset to find KIU Returns: (html_content, image_pil) or (None, None) """ if not self.dataset: return None, None target_kiu = str(kiu_id).zfill(5) try: # Stream through dataset for item in self.dataset: if item['kiu_id'] == target_kiu: html_content = item['html'] # Convert bytes to PIL Image try: image_pil = Image.open(io.BytesIO(item['image'])) return html_content, image_pil except Exception as e: print(f"❌ Error decoding image bytes for KIU {target_kiu}: {e}") return html_content, None print(f"❌ KIU {target_kiu} not found in dataset") return None, None except Exception as e: print(f"❌ Error streaming dataset: {e}") return None, None class AnnotationApp: """Main annotation application""" def __init__(self): self.data_mgr = DataManager() self.current_index = 0 self.history = [] # Jump to first unannotated self.current_index = self._find_first_unannotated() def _needs_annotation(self, item): """Check if item needs annotation""" if not item.get('annotated', False): return True if item.get('direction') == 'TTB_NEEDS_RECLASSIFICATION': return True return False def _find_first_unannotated(self): """Find first unannotated item""" for idx, item in enumerate(self.data_mgr.manifest): if self._needs_annotation(item): return idx return 0 def _find_next_unannotated(self, from_index=None): """Find next unannotated item""" if not self.data_mgr.manifest: return 0 start = from_index if from_index is not None else self.current_index # Search forward for idx in range(start + 1, len(self.data_mgr.manifest)): if self._needs_annotation(self.data_mgr.manifest[idx]): return idx # Wrap around for idx in range(0, start): if self._needs_annotation(self.data_mgr.manifest[idx]): return idx return start def get_statistics(self): """Calculate annotation statistics""" manifest = self.data_mgr.manifest if not manifest: return { 'total': 0, 'remaining': 0, 'processed': 0, 'progress_pct': 0, 'ltr': 0, 'rtl': 0, 'ttb_ltr': 0, 'ttb_rtl': 0, 'skip': 0, 'unclear': 0, 'ttb_reclass': 0 } total = len(manifest) remaining = sum(1 for item in manifest if self._needs_annotation(item)) processed = total - remaining progress = (processed / total * 100) if total > 0 else 0 return { 'total': total, 'remaining': remaining, 'processed': processed, 'progress_pct': progress, 'ltr': sum(1 for i in manifest if i.get('direction') == 'LTR'), 'rtl': sum(1 for i in manifest if i.get('direction') == 'RTL'), 'ttb_ltr': sum(1 for i in manifest if i.get('direction') == 'TTB_LTR'), 'ttb_rtl': sum(1 for i in manifest if i.get('direction') == 'TTB_RTL'), 'skip': sum(1 for i in manifest if i.get('direction') == 'Skip'), 'unclear': sum(1 for i in manifest if i.get('direction') == 'Unclear'), 'ttb_reclass': sum(1 for i in manifest if i.get('direction') == 'TTB_NEEDS_RECLASSIFICATION') } def _get_annotation_path(self, item): """Get annotation file path""" filename = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}.json" return os.path.join(HF_ANNOTATIONS_DIR, filename) def _save_annotation(self, item, direction, line_number): """Save annotation to disk""" annotation_file = self._get_annotation_path(item) data = { 'kiu_id': item['kiu_id'], 'instance_id': item['instance_id'], 'direction': direction, 'annotated': True, 'line_number': int(line_number) if line_number and line_number.strip() else None, 'timestamp': time.time(), 'crop_coords': item['crop_coords'] } with open(annotation_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) return annotation_file def load_current(self): """ Load and display current instance Returns: (image, html, info, line_number) """ manifest = self.data_mgr.manifest if not manifest or self.current_index >= len(manifest): return ( None, "
⚠️ No items available
", "No items", "" ) item = manifest[self.current_index] kiu_id = item['kiu_id'] print(f"\n{'='*60}") print(f"Loading: Index {self.current_index + 1}/{len(manifest)}") print(f"KIU: {kiu_id}, Instance: {item['instance_id']}") # Get data from dataset (streaming) html_content, full_image = self.data_mgr.get_kiu_data(kiu_id) # Crop image cropped_image = None if full_image: try: x1, y1, x2, y2 = map(int, item['crop_coords']) # Clamp to image bounds x1 = max(0, min(x1, full_image.width)) y1 = max(0, min(y1, full_image.height)) x2 = max(0, min(x2, full_image.width)) y2 = max(0, min(y2, full_image.height)) if x2 > x1 and y2 > y1: cropped_image = full_image.crop((x1, y1, x2, y2)) print(f"✅ Cropped: {cropped_image.size}") # Resize if too tall if cropped_image.height > 800: ratio = 800 / cropped_image.height new_w = int(cropped_image.width * ratio) cropped_image = cropped_image.resize((new_w, 800), Image.LANCZOS) else: print(f"❌ Invalid crop coords") except Exception as e: print(f"❌ Crop error: {e}") else: print(f"❌ No image for KIU {kiu_id}") # Format HTML if html_content and html_content.strip(): escaped = html.escape(html_content[:5000]) if len(html_content) > 5000: escaped += "\n\n... (truncated)" html_display = f"""
📄 KIU {kiu_id} Reference HTML
{escaped}
""" else: html_display = f"""
⚠️ No HTML content available for KIU {kiu_id}
""" # Load existing annotation annotation_file = self._get_annotation_path(item) existing_line_num = "" if os.path.exists(annotation_file): try: with open(annotation_file, 'r') as f: ann = json.load(f) if ann.get('line_number'): existing_line_num = str(ann['line_number']) except: pass # Build info display stats = self.get_statistics() direction = item.get('direction', '') annotated = item.get('annotated', False) # Status indicator if annotated: if direction in ['LTR', 'RTL', 'TTB_LTR', 'TTB_RTL']: status = f"✅ {direction}" elif direction == 'TTB_NEEDS_RECLASSIFICATION': status = "⚠️ NEEDS RECLASS" elif direction == 'Skip': status = "⏭️ Skipped" elif direction == 'Unclear': status = "❓ Unclear" else: status = f"✅ {direction}" else: status = "⏳ Pending" line_info = f" | Line #{existing_line_num}" if existing_line_num else "" info = f""" **Instance {self.current_index + 1} / {stats['total']}** | KIU {kiu_id} | Instance {item['instance_id']}{line_info} | {status} **Progress:** {stats['processed']}/{stats['total']} ({stats['progress_pct']:.1f}%) | **Remaining:** {stats['remaining']} **Direction Counts:** LTR: {stats['ltr']} | RTL: {stats['rtl']} | TTB+LTR: {stats['ttb_ltr']} | TTB+RTL: {stats['ttb_rtl']} Skipped: {stats['skip']} | Unclear: {stats['unclear']} | Needs Reclass: {stats['ttb_reclass']} """ print(f"{'='*60}\n") return cropped_image, html_display, info, existing_line_num def annotate_and_next(self, direction, line_number): """Save annotation and move to next unannotated""" manifest = self.data_mgr.manifest if not manifest: return self.load_current() # Save to history for undo self.history.append({ 'index': self.current_index, 'item': manifest[self.current_index].copy() }) # Save annotation item = manifest[self.current_index] self._save_annotation(item, direction, line_number) # Update manifest item['direction'] = direction item['annotated'] = True if line_number and line_number.strip(): try: item['line_number'] = int(line_number.strip()) except: pass # Save manifest with open(MANIFEST_FILE, 'wb') as f: pickle.dump(manifest, f) print(f"✅ Saved: {direction}") # Move to next unannotated self.current_index = self._find_next_unannotated() return self.load_current() def undo_last(self): """Undo last annotation""" if not self.history: print("⚠️ Nothing to undo") return self.load_current() last = self.history.pop() self.current_index = last['index'] item = self.data_mgr.manifest[self.current_index] # Delete annotation file annotation_file = self._get_annotation_path(item) if os.path.exists(annotation_file): os.remove(annotation_file) # Restore manifest item self.data_mgr.manifest[self.current_index] = last['item'] # Save manifest with open(MANIFEST_FILE, 'wb') as f: pickle.dump(self.data_mgr.manifest, f) print("✅ Undo successful") return self.load_current() def flag_hard_negative(self, line_number): """Flag as hard negative and save image""" manifest = self.data_mgr.manifest if not manifest: return self.load_current() item = manifest[self.current_index] # Create directory dir_name = f"KIU_{item['kiu_id']}_instance_{item['instance_id']}" dir_path = os.path.join(HF_HARD_NEGATIVES_DIR, dir_name) os.makedirs(dir_path, exist_ok=True) # Get and save cropped image html_content, full_image = self.data_mgr.get_kiu_data(item['kiu_id']) if full_image: try: x1, y1, x2, y2 = map(int, item['crop_coords']) x1 = max(0, min(x1, full_image.width)) y1 = max(0, min(y1, full_image.height)) x2 = max(0, min(x2, full_image.width)) y2 = max(0, min(y2, full_image.height)) if x2 > x1 and y2 > y1: cropped = full_image.crop((x1, y1, x2, y2)) cropped.save(os.path.join(dir_path, "hard_negative.jpg"), "JPEG") except Exception as e: print(f"❌ Error saving hard negative image: {e}") # Save metadata metadata = { 'kiu_id': item['kiu_id'], 'instance_id': item['instance_id'], 'crop_coords': item['crop_coords'], 'flagged_as_hard_negative': True, 'timestamp': time.time() } with open(os.path.join(dir_path, "metadata.json"), 'w') as f: json.dump(metadata, f, indent=2) # Save as annotation self._save_annotation(item, "HardNegative", line_number) item['direction'] = 'HardNegative' item['annotated'] = True with open(MANIFEST_FILE, 'wb') as f: pickle.dump(manifest, f) print("✅ Flagged as hard negative") # Move to next self.current_index = self._find_next_unannotated() return self.load_current() # Navigation methods def go_back(self): self.current_index = max(0, self.current_index - 1) return self.load_current() def go_forward(self): manifest = self.data_mgr.manifest self.current_index = min(len(manifest) - 1, self.current_index + 1) return self.load_current() def jump_to_next_unannotated(self): self.current_index = self._find_next_unannotated() return self.load_current() def jump_to_index(self, target): try: idx = int(target) - 1 if 0 <= idx < len(self.data_mgr.manifest): self.current_index = idx except: pass return self.load_current() def jump_to_kiu(self, kiu_id): target = kiu_id.strip().zfill(5) for idx, item in enumerate(self.data_mgr.manifest): if item['kiu_id'] == target: self.current_index = idx break return self.load_current() def jump_to_ttb_reclass(self): """Find next TTB needing reclassification""" manifest = self.data_mgr.manifest # Search forward for idx in range(self.current_index + 1, len(manifest)): if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION': self.current_index = idx return self.load_current() # Wrap around for idx in range(0, self.current_index): if manifest[idx].get('direction') == 'TTB_NEEDS_RECLASSIFICATION': self.current_index = idx return self.load_current() print("⚠️ No TTB reclassification items found") return self.load_current() def export_annotations(self): """Export all annotations to single JSON""" annotations = [] for filename in os.listdir(HF_ANNOTATIONS_DIR): if filename.endswith('.json') and filename != 'all_annotations.json': filepath = os.path.join(HF_ANNOTATIONS_DIR, filename) try: with open(filepath, 'r') as f: annotations.append(json.load(f)) except: pass export_file = os.path.join(HF_ANNOTATIONS_DIR, "all_annotations.json") with open(export_file, 'w', encoding='utf-8') as f: json.dump(annotations, f, indent=2, ensure_ascii=False) return f"✅ Exported {len(annotations)} annotations to `{export_file}`" # Initialize app app_state = AnnotationApp() # Build Gradio UI with gr.Blocks(title="⚡ Hieroglyph Annotation") as demo: gr.Markdown("# ⚡ Hieroglyph Direction Annotation Tool") # Check if data is loaded stats = app_state.get_statistics() data_loaded = len(app_state.data_mgr.manifest) > 0 if not data_loaded: gr.HTML(f"""

⚠️ No Segment Data Loaded

The segment metadata could not be loaded from the RAR file.

Solution Options:

  1. Install unrar tool:
    Add to your Space's Dockerfile or requirements:
    RUN apt-get update && apt-get install -y unrar
  2. Upload extracted JSON directly:
    Extract kiu_segment_metadata.json locally and upload to:
    {SEGMENT_EXTRACT_DIR}/kiu_segment_metadata.json
  3. Use a different format:
    Upload the JSON file as a regular file in your Space repo instead of RAR.

Current Status:

""") else: gr.HTML(f"""

📊 System Status

Dataset: {HF_DATASET_REPO}
Total Instances: {stats['total']:,}
Remaining: {stats['remaining']:,}
Progress: {stats['progress_pct']:.1f}% ({stats['processed']:,} / {stats['total']:,} annotated)
""") with gr.Row(): with gr.Column(scale=2): image_display = gr.Image(label="📸 Line Instance", type="pil", height=500) line_number_input = gr.Textbox( label="📝 Line Number (optional)", placeholder="Enter line number: 1, 2, 3...", max_lines=1 ) gr.Markdown("### 🎯 Annotation") with gr.Row(): ltr_btn = gr.Button("➡️ LTR", variant="primary", size="lg") rtl_btn = gr.Button("⬅️ RTL", variant="primary", size="lg") with gr.Row(): ttb_ltr_btn = gr.Button("⬇️➡️ TTB+LTR", variant="primary") ttb_rtl_btn = gr.Button("⬇️⬅️ TTB+RTL", variant="primary") with gr.Row(): skip_btn = gr.Button("⏭️ Skip", variant="secondary") unclear_btn = gr.Button("❓ Unclear", variant="secondary") hard_neg_btn = gr.Button("🚫 Bad Detection", variant="stop") with gr.Column(scale=1): html_display = gr.HTML(label="📄 Reference") info_display = gr.Markdown() gr.Markdown("---") gr.Markdown("### 🧭 Navigation & Controls") with gr.Row(): back_btn = gr.Button("⬅️ Previous") forward_btn = gr.Button("➡️ Next") undo_btn = gr.Button("↩️ Undo Last") next_unann_btn = gr.Button("⏭️ Next Unannotated", variant="primary") export_btn = gr.Button("💾 Export All", variant="secondary") with gr.Row(): with gr.Column(scale=2): jump_input = gr.Textbox(label="Jump to Index", placeholder="e.g., 123") with gr.Column(scale=1): jump_btn = gr.Button("Go") with gr.Column(scale=2): kiu_input = gr.Textbox(label="Find KIU", placeholder="e.g., 00001") with gr.Column(scale=1): kiu_btn = gr.Button("Find") with gr.Column(scale=2): ttb_btn = gr.Button("Find TTB Reclass", variant="secondary") export_output = gr.Markdown() # Event handlers outputs = [image_display, html_display, info_display, line_number_input] ltr_btn.click(lambda ln: app_state.annotate_and_next("LTR", ln), inputs=[line_number_input], outputs=outputs) rtl_btn.click(lambda ln: app_state.annotate_and_next("RTL", ln), inputs=[line_number_input], outputs=outputs) ttb_ltr_btn.click(lambda ln: app_state.annotate_and_next("TTB_LTR", ln), inputs=[line_number_input], outputs=outputs) ttb_rtl_btn.click(lambda ln: app_state.annotate_and_next("TTB_RTL", ln), inputs=[line_number_input], outputs=outputs) skip_btn.click(lambda ln: app_state.annotate_and_next("Skip", ln), inputs=[line_number_input], outputs=outputs) unclear_btn.click(lambda ln: app_state.annotate_and_next("Unclear", ln), inputs=[line_number_input], outputs=outputs) hard_neg_btn.click(app_state.flag_hard_negative, inputs=[line_number_input], outputs=outputs) back_btn.click(app_state.go_back, outputs=outputs) forward_btn.click(app_state.go_forward, outputs=outputs) undo_btn.click(app_state.undo_last, outputs=outputs) next_unann_btn.click(app_state.jump_to_next_unannotated, outputs=outputs) export_btn.click(app_state.export_annotations, outputs=[export_output]) jump_btn.click(app_state.jump_to_index, inputs=[jump_input], outputs=outputs) kiu_btn.click(app_state.jump_to_kiu, inputs=[kiu_input], outputs=outputs) ttb_btn.click(app_state.jump_to_ttb_reclass, outputs=outputs) # Load initial state demo.load(app_state.load_current, outputs=outputs) if __name__ == "__main__": stats = app_state.get_statistics() print("\n" + "="*80) print("⚡ HIEROGLYPH ANNOTATION TOOL") print("="*80) print(f"Dataset: {HF_DATASET_REPO}") print(f"Total Instances: {stats['total']:,}") print(f"Remaining: {stats['remaining']:,}") print(f"Progress: {stats['progress_pct']:.1f}%") print("="*80 + "\n") demo.launch(server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft())