Spaces:
Paused
Paused
| """ | |
| ๐ฑ Emperor SEG Space | |
| ูุณุชูุจู ุตูุฑุฉ ู ุงูุฌุง โ ูุฑุฌุน mask ุฃุจูุถ ูุฃุณูุฏ ูููุตูุต | |
| ุงููู ูุฐุฌ: ogkalu/comic-text-segmenter-yolov8m + ogkalu/comic-speech-bubble-detector-yolov8m | |
| ููุงูู YOLOv8 ู ุชุฏุฑุจูู ุนูู 8k+ ุตูุฑุฉ ู ุงูุฌุง/ููุจ-ุชูู/ู ุงูููุง | |
| """ | |
| import io | |
| import base64 | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| import torch | |
| # โโ ุชุญู ูู ุงููู ุงุฐุฌ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| print("โณ Loading YOLOv8 manga models...", flush=True) | |
| text_model = None # ูุดู ุงููุต ู ุจุงุดุฑุฉ (ุญุฑูู + SFX) | |
| bubble_model = None # ูุดู ููุงุนุงุช ุงูููุงู | |
| try: | |
| from ultralytics import YOLO | |
| from huggingface_hub import hf_hub_download | |
| # โโ ุงูู ูุฏูู ุงูุฃูู: ูุงุดู ุงููุต โโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| text_pt = hf_hub_download( | |
| repo_id="ogkalu/comic-text-segmenter-yolov8m", | |
| filename="comic-text-segmenter.pt", | |
| ) | |
| text_model = YOLO(text_pt) | |
| print("โ Text segmenter loaded (ogkalu/comic-text-segmenter-yolov8m)", flush=True) | |
| except Exception as e: | |
| print(f"โ ๏ธ Text segmenter failed: {e}", flush=True) | |
| try: | |
| from ultralytics import YOLO | |
| from huggingface_hub import hf_hub_download | |
| # โโ ุงูู ูุฏูู ุงูุซุงูู: ูุงุดู ุงูููุงุนุงุช โโโโโโโโโโโโโโโโโโโโ | |
| bubble_pt = hf_hub_download( | |
| repo_id="ogkalu/comic-speech-bubble-detector-yolov8m", | |
| filename="comic-speech-bubble-detector.pt", | |
| ) | |
| bubble_model = YOLO(bubble_pt) | |
| print("โ Bubble detector loaded (ogkalu/comic-speech-bubble-detector-yolov8m)", flush=True) | |
| except Exception as e: | |
| print(f"โ ๏ธ Bubble detector failed: {e}", flush=True) | |
| if text_model is None and bubble_model is None: | |
| print("๐ Both models failed โ will use OpenCV fallback only", flush=True) | |
| else: | |
| print("โ Models ready!", flush=True) | |
| # โโ ุฏุงูุฉ ุจูุงุก ุงูู mask ู ู ูุชุงุฆุฌ YOLO Segmentation โโโโโโโโโ | |
| def _yolo_seg_to_mask(results, w: int, h: int) -> np.ndarray: | |
| """ | |
| ุชุญููู ูุชุงุฆุฌ YOLO (instance masks) ูู mask ูุงุญุฏ ุจุงูุญุฌู ุงููุงู ู. | |
| """ | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| for r in results: | |
| if r.masks is None: | |
| continue | |
| for seg_mask in r.masks.data: | |
| # seg_mask: tensor (H', W') ููู 0-1 | |
| arr = seg_mask.cpu().numpy() | |
| arr = (arr * 255).astype(np.uint8) | |
| # resize ููุญุฌู ุงูุฃุตูู | |
| from PIL import Image as _PIL | |
| arr_resized = np.array( | |
| _PIL.fromarray(arr).resize((w, h), _PIL.NEAREST) | |
| ) | |
| mask = np.maximum(mask, arr_resized) | |
| return mask | |
| # โโ ุฏุงูุฉ ุจูุงุก ุงูู mask ู ู ูุชุงุฆุฌ YOLO Detection (boxes ููุท) โ | |
| def _yolo_det_to_mask(results, w: int, h: int) -> np.ndarray: | |
| """ | |
| ูู ุงูู ูุฏูู detection ู ุด seg โ ูู ูุฃ ุงูู ุณุชุทููุงุช. | |
| """ | |
| import cv2 | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| for r in results: | |
| if r.boxes is None: | |
| continue | |
| for box in r.boxes.xyxy.cpu().numpy(): | |
| x1, y1, x2, y2 = map(int, box[:4]) | |
| pad = 6 | |
| x1 = max(0, x1 - pad); y1 = max(0, y1 - pad) | |
| x2 = min(w, x2 + pad); y2 = min(h, y2 + pad) | |
| cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1) | |
| return mask | |
| # โโ OpenCV fallback (ุงุญุชูุงุทู ููุท) โโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def detect_text_opencv(img: Image.Image) -> np.ndarray: | |
| import cv2 | |
| img_rgb = np.array(img.convert("RGB")) | |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) | |
| h, w = gray.shape | |
| mask = np.zeros_like(gray) | |
| # ูุดู ุงูููุงุนุงุช ุงูุจูุถุงุก | |
| _, white_thresh = cv2.threshold(gray, 220, 255, cv2.THRESH_BINARY) | |
| border = 10 | |
| white_thresh[:border, :] = 0; white_thresh[-border:, :] = 0 | |
| white_thresh[:, :border] = 0; white_thresh[:, -border:] = 0 | |
| kernel_fill = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15)) | |
| white_closed = cv2.morphologyEx(white_thresh, cv2.MORPH_CLOSE, kernel_fill, iterations=3) | |
| contours_b, _ = cv2.findContours(white_closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| for cnt in contours_b: | |
| x, y, cw, ch = cv2.boundingRect(cnt) | |
| area = cw * ch | |
| if area < 800 or area > 0.5 * h * w: continue | |
| ratio = cw / max(ch, 1) | |
| if ratio > 8 or ratio < 0.12: continue | |
| roi = gray[y:y+ch, x:x+cw] | |
| if np.sum(roi < 100) / max(roi.size, 1) < 0.02: continue | |
| pad = 8 | |
| cv2.rectangle(mask, (max(0,x-pad), max(0,y-pad)), | |
| (min(w,x+cw+pad), min(h,y+ch+pad)), 255, -1) | |
| # ูุดู ุงููุต ุงูู ุจุงุดุฑ | |
| binary = cv2.adaptiveThreshold(gray, 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 15, 10) | |
| kernel_noise = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel_noise) | |
| combined = cv2.bitwise_or( | |
| cv2.dilate(binary, cv2.getStructuringElement(cv2.MORPH_RECT, (25, 4)), iterations=2), | |
| cv2.dilate(binary, cv2.getStructuringElement(cv2.MORPH_RECT, (4, 12)), iterations=2), | |
| ) | |
| closed = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, | |
| cv2.getStructuringElement(cv2.MORPH_RECT, (35, 25)), iterations=2) | |
| contours_t, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| for cnt in contours_t: | |
| x, y, cw, ch = cv2.boundingRect(cnt) | |
| area = cw * ch | |
| ratio = cw / max(ch, 1) | |
| if area > 0.5 * h * w or area < 60: continue | |
| if ratio > 20 or ratio < 0.05: continue | |
| pad = 6 | |
| cv2.rectangle(mask, (max(0,x-pad), max(0,y-pad)), | |
| (min(w,x+cw+pad), min(h,y+ch+pad)), 255, -1) | |
| return mask | |
| # โโ Core function โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def segment(image_b64: str) -> str: | |
| """ | |
| Input: base64 image (data:image/...;base64,... ุฃู raw base64) | |
| Output: base64 mask PNG (data:image/png;base64,...) | |
| """ | |
| try: | |
| if not image_b64 or not image_b64.strip(): | |
| return "" | |
| raw = image_b64.split(",")[1] if "," in image_b64 else image_b64 | |
| img_bytes = base64.b64decode(raw) | |
| img = Image.open(io.BytesIO(img_bytes)).convert("RGB") | |
| img.thumbnail((1024, 1024)) | |
| w, h = img.size | |
| final_mask = np.zeros((h, w), dtype=np.uint8) | |
| # โโ ุงูู ูุฏูู ุงูุฃูู: ุงููุต โโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| if text_model is not None: | |
| try: | |
| results = text_model(img, imgsz=1024, conf=0.25, verbose=False) | |
| # ุฌุฑุจ seg ุฃููุงูุ ูู ู ุงููุด masks ุงุณุชุฎุฏู boxes | |
| seg_mask = _yolo_seg_to_mask(results, w, h) | |
| if seg_mask.max() == 0: | |
| seg_mask = _yolo_det_to_mask(results, w, h) | |
| final_mask = np.maximum(final_mask, seg_mask) | |
| n = np.count_nonzero(seg_mask) | |
| print(f"โ Text model: {n} pixels detected", flush=True) | |
| except Exception as e: | |
| print(f"โ ๏ธ Text model inference error: {e}", flush=True) | |
| # โโ ุงูู ูุฏูู ุงูุซุงูู: ุงูููุงุนุงุช โโโโโโโโโโโโโโโโโโโโโโ | |
| if bubble_model is not None: | |
| try: | |
| results = bubble_model(img, imgsz=1024, conf=0.25, verbose=False) | |
| seg_mask = _yolo_seg_to_mask(results, w, h) | |
| if seg_mask.max() == 0: | |
| seg_mask = _yolo_det_to_mask(results, w, h) | |
| final_mask = np.maximum(final_mask, seg_mask) | |
| n = np.count_nonzero(seg_mask) | |
| print(f"โ Bubble model: {n} pixels detected", flush=True) | |
| except Exception as e: | |
| print(f"โ ๏ธ Bubble model inference error: {e}", flush=True) | |
| # โโ fallback ูู ููุงูู ูุดู ุฃู ุฑุฌูุนูุง ูุงุถู โโโโโโโโ | |
| if final_mask.max() == 0: | |
| print("๐ YOLO masks empty โ using OpenCV fallback", flush=True) | |
| final_mask = detect_text_opencv(img) | |
| # ุชูุฑูุฑ ููุงุฆู | |
| if final_mask.max() == 0: | |
| print("โ ๏ธ Final mask is empty โ no text detected", flush=True) | |
| else: | |
| pct = 100 * np.count_nonzero(final_mask) / final_mask.size | |
| print(f"โ Final mask: {pct:.1f}% coverage", flush=True) | |
| buf = io.BytesIO() | |
| Image.fromarray(final_mask).save(buf, format="PNG") | |
| return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() | |
| except Exception as e: | |
| print(f"โ segment error: {e}", flush=True) | |
| import traceback; traceback.print_exc() | |
| return "" | |
| # โโ Gradio UI + API โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| with gr.Blocks(title="Emperor SEG API") as demo: | |
| gr.Markdown("## ๐ฑ Emperor SEG Space\nText Segmentation API for Manga/Manhwa cleaning.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_input = gr.Image(label="Input Image", type="pil") | |
| btn = gr.Button("Segment", variant="primary") | |
| with gr.Column(): | |
| mask_output = gr.Image(label="Text Mask") | |
| def _demo_fn(img_pil): | |
| if img_pil is None: | |
| return None | |
| buf = io.BytesIO() | |
| img_pil.save(buf, format="PNG") | |
| mask_b64 = segment(base64.b64encode(buf.getvalue()).decode()) | |
| if not mask_b64: | |
| return None | |
| mask_bytes = base64.b64decode(mask_b64.split(",")[1]) | |
| return Image.open(io.BytesIO(mask_bytes)) | |
| btn.click(_demo_fn, inputs=[img_input], outputs=[mask_output]) | |
| gr.api(segment, api_name="segment") | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |