import os import cv2 import tempfile import spaces import gradio as gr import numpy as np import torch import matplotlib import matplotlib.pyplot as plt from PIL import Image, ImageDraw from typing import Iterable from gradio.themes import Soft from gradio.themes.utils import colors, fonts, sizes from transformers import ( Sam3Model, Sam3Processor, Sam3VideoModel, Sam3VideoProcessor, Sam3TrackerModel, Sam3TrackerProcessor ) import json from datetime import datetime import threading import queue import uuid # ============ THEME SETUP ============ colors.steel_blue = colors.Color( name="steel_blue", c50="#EBF3F8", c100="#D3E5F0", c200="#A8CCE1", c300="#7DB3D2", c400="#529AC3", c500="#4682B4", c600="#3E72A0", c700="#36638C", c800="#2E5378", c900="#264364", c950="#1E3450", ) class CustomBlueTheme(Soft): def __init__( self, *, primary_hue: colors.Color | str = colors.gray, secondary_hue: colors.Color | str = colors.steel_blue, neutral_hue: colors.Color | str = colors.slate, text_size: sizes.Size | str = sizes.text_lg, font: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("Outfit"), "Arial", "sans-serif", ), font_mono: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace", ), ): super().__init__( primary_hue=primary_hue, secondary_hue=secondary_hue, neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono, ) super().set( background_fill_primary="*primary_50", background_fill_primary_dark="*primary_900", body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", button_primary_text_color="white", button_primary_text_color_hover="white", button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)", button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)", button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)", button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)", slider_color="*secondary_500", slider_color_dark="*secondary_600", block_title_text_weight="600", block_border_width="3px", block_shadow="*shadow_drop_lg", button_primary_shadow="*shadow_drop_lg", button_large_padding="11px", color_accent_soft="*primary_100", block_label_background_fill="*primary_200", ) app_theme = CustomBlueTheme() # ============ GLOBAL SETUP ============ device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🖥️ Using compute device: {device}") # History storage HISTORY_DIR = "processing_history" os.makedirs(HISTORY_DIR, exist_ok=True) HISTORY_FILE = os.path.join(HISTORY_DIR, "history.json") # Background processing queue processing_queue = queue.Queue() processing_results = {} # Load models print("⏳ Loading SAM3 Models permanently into memory...") try: print(" ... Loading Image Text Model") IMG_MODEL = Sam3Model.from_pretrained("DiffusionWave/sam3").to(device) IMG_PROCESSOR = Sam3Processor.from_pretrained("DiffusionWave/sam3") print(" ... Loading Image Tracker Model") TRK_MODEL = Sam3TrackerModel.from_pretrained("DiffusionWave/sam3").to(device) TRK_PROCESSOR = Sam3TrackerProcessor.from_pretrained("DiffusionWave/sam3") print(" ... Loading Video Model") VID_MODEL = Sam3VideoModel.from_pretrained("DiffusionWave/sam3").to(device, dtype=torch.bfloat16) VID_PROCESSOR = Sam3VideoProcessor.from_pretrained("DiffusionWave/sam3") print("✅ All Models loaded successfully!") except Exception as e: print(f"❌ CRITICAL ERROR LOADING MODELS: {e}") IMG_MODEL = IMG_PROCESSOR = TRK_MODEL = TRK_PROCESSOR = VID_MODEL = VID_PROCESSOR = None # ============ HISTORY MANAGEMENT ============ def load_history(): """Load processing history from JSON file""" if os.path.exists(HISTORY_FILE): try: with open(HISTORY_FILE, 'r') as f: return json.load(f) except: return [] return [] def save_history(history_item): """Save a new history item""" history = load_history() history.insert(0, history_item) # Add to beginning history = history[:100] # Keep last 100 items with open(HISTORY_FILE, 'w') as f: json.dump(history, f, indent=2) def get_history_display(): """Format history for display""" history = load_history() if not history: return "Chưa có lịch sử xử lý nào" display_text = "" for i, item in enumerate(history[:50], 1): status_emoji = "✅" if item['status'] == 'completed' else "❌" display_text += f"{status_emoji} **{item['type'].upper()}** - {item['timestamp']}\n" display_text += f" Prompt: {item['prompt']}\n" if item.get('output_path'): display_text += f" File: `{os.path.basename(item['output_path'])}`\n" display_text += "\n" return display_text # ============ UTILITY FUNCTIONS ============ def apply_mask_overlay(base_image, mask_data, opacity=0.5): """Draws segmentation masks on top of an image.""" if isinstance(base_image, np.ndarray): base_image = Image.fromarray(base_image) base_image = base_image.convert("RGBA") if mask_data is None or len(mask_data) == 0: return base_image.convert("RGB") if isinstance(mask_data, torch.Tensor): mask_data = mask_data.cpu().numpy() mask_data = mask_data.astype(np.uint8) if mask_data.ndim == 4: mask_data = mask_data[0] if mask_data.ndim == 3 and mask_data.shape[0] == 1: mask_data = mask_data[0] num_masks = mask_data.shape[0] if mask_data.ndim == 3 else 1 if mask_data.ndim == 2: mask_data = [mask_data] num_masks = 1 try: color_map = matplotlib.colormaps["rainbow"].resampled(max(num_masks, 1)) except AttributeError: import matplotlib.cm as cm color_map = cm.get_cmap("rainbow").resampled(max(num_masks, 1)) rgb_colors = [tuple(int(c * 255) for c in color_map(i)[:3]) for i in range(num_masks)] composite_layer = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) for i, single_mask in enumerate(mask_data): mask_bitmap = Image.fromarray((single_mask * 255).astype(np.uint8)) if mask_bitmap.size != base_image.size: mask_bitmap = mask_bitmap.resize(base_image.size, resample=Image.NEAREST) fill_color = rgb_colors[i] color_fill = Image.new("RGBA", base_image.size, fill_color + (0,)) mask_alpha = mask_bitmap.point(lambda v: int(v * opacity) if v > 0 else 0) color_fill.putalpha(mask_alpha) composite_layer = Image.alpha_composite(composite_layer, color_fill) return Image.alpha_composite(base_image, composite_layer).convert("RGB") def draw_points_on_image(image, points): """Draws red dots on the image to indicate click locations.""" if isinstance(image, np.ndarray): image = Image.fromarray(image) draw_img = image.copy() draw = ImageDraw.Draw(draw_img) for pt in points: x, y = pt r = 8 draw.ellipse((x-r, y-r, x+r, y+r), fill="red", outline="white", width=4) return draw_img # ============ BACKGROUND PROCESSING WORKER ============ def background_worker(): """Background thread that processes jobs from queue""" while True: try: job = processing_queue.get() if job is None: break job_id = job['id'] job_type = job['type'] processing_results[job_id] = {'status': 'processing', 'progress': 0} try: if job_type == 'image': result = process_image_job(job) elif job_type == 'video': result = process_video_job(job) elif job_type == 'click': result = process_click_job(job) processing_results[job_id] = { 'status': 'completed', 'result': result, 'progress': 100 } # Save to history save_history({ 'id': job_id, 'type': job_type, 'prompt': job.get('prompt', 'N/A'), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'status': 'completed', 'output_path': result.get('output_path') }) except Exception as e: processing_results[job_id] = { 'status': 'error', 'error': str(e), 'progress': 0 } save_history({ 'id': job_id, 'type': job_type, 'prompt': job.get('prompt', 'N/A'), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'status': 'error', 'error': str(e) }) except Exception as e: print(f"Worker error: {e}") # Start background worker worker_thread = threading.Thread(target=background_worker, daemon=True) worker_thread.start() # ============ JOB PROCESSORS ============ @spaces.GPU def process_image_job(job): """Process image segmentation job""" source_img = job['image'] text_query = job['prompt'] conf_thresh = job.get('conf_thresh', 0.5) if isinstance(source_img, str): source_img = Image.open(source_img) pil_image = source_img.convert("RGB") model_inputs = IMG_PROCESSOR(images=pil_image, text=text_query, return_tensors="pt").to(device) with torch.no_grad(): inference_output = IMG_MODEL(**model_inputs) processed_results = IMG_PROCESSOR.post_process_instance_segmentation( inference_output, threshold=conf_thresh, mask_threshold=0.5, target_sizes=model_inputs.get("original_sizes").tolist() )[0] annotation_list = [] raw_masks = processed_results['masks'].cpu().numpy() raw_scores = processed_results['scores'].cpu().numpy() for idx, mask_array in enumerate(raw_masks): label_str = f"{text_query} ({raw_scores[idx]:.2f})" annotation_list.append((mask_array, label_str)) # Save output output_path = os.path.join(HISTORY_DIR, f"{job['id']}_result.jpg") result_img = apply_mask_overlay(pil_image, raw_masks) result_img.save(output_path) return { 'image': (pil_image, annotation_list), 'output_path': output_path } @spaces.GPU def process_video_job(job): """Process video segmentation job""" source_vid = job['video'] text_query = job['prompt'] frame_limit = job.get('frame_limit', 60) video_cap = cv2.VideoCapture(source_vid) vid_fps = video_cap.get(cv2.CAP_PROP_FPS) vid_w = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) vid_h = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) video_frames = [] counter = 0 while video_cap.isOpened(): ret, frame = video_cap.read() if not ret or (frame_limit > 0 and counter >= frame_limit): break video_frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) counter += 1 video_cap.release() session = VID_PROCESSOR.init_video_session(video=video_frames, inference_device=device, dtype=torch.bfloat16) session = VID_PROCESSOR.add_text_prompt(inference_session=session, text=text_query) output_path = os.path.join(HISTORY_DIR, f"{job['id']}_result.mp4") video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), vid_fps, (vid_w, vid_h)) total_frames = len(video_frames) for frame_idx, model_out in enumerate(VID_MODEL.propagate_in_video_iterator(inference_session=session, max_frame_num_to_track=total_frames)): post_processed = VID_PROCESSOR.postprocess_outputs(session, model_out) f_idx = model_out.frame_idx original_pil = Image.fromarray(video_frames[f_idx]) if 'masks' in post_processed: detected_masks = post_processed['masks'] if detected_masks.ndim == 4: detected_masks = detected_masks.squeeze(1) final_frame = apply_mask_overlay(original_pil, detected_masks) else: final_frame = original_pil video_writer.write(cv2.cvtColor(np.array(final_frame), cv2.COLOR_RGB2BGR)) # Update progress progress = int((frame_idx + 1) / total_frames * 100) processing_results[job['id']]['progress'] = progress video_writer.release() return {'output_path': output_path} @spaces.GPU def process_click_job(job): """Process click segmentation job""" input_image = job['image'] points_state = job['points'] labels_state = job['labels'] if isinstance(input_image, str): input_image = Image.open(input_image) input_points = [[points_state]] input_labels = [[labels_state]] inputs = TRK_PROCESSOR(images=input_image, input_points=input_points, input_labels=input_labels, return_tensors="pt").to(device) with torch.no_grad(): outputs = TRK_MODEL(**inputs, multimask_output=False) masks = TRK_PROCESSOR.post_process_masks(outputs.pred_masks.cpu(), inputs["original_sizes"], binarize=True)[0] final_img = apply_mask_overlay(input_image, masks[0]) final_img = draw_points_on_image(final_img, points_state) output_path = os.path.join(HISTORY_DIR, f"{job['id']}_result.jpg") final_img.save(output_path) return { 'image': final_img, 'output_path': output_path } # ============ UI HANDLERS ============ def submit_image_job(source_img, text_query, conf_thresh): """Submit image segmentation job to background queue""" if source_img is None or not text_query: return None, "❌ Vui lòng cung cấp ảnh và prompt", "" job_id = str(uuid.uuid4()) job = { 'id': job_id, 'type': 'image', 'image': source_img, 'prompt': text_query, 'conf_thresh': conf_thresh } processing_queue.put(job) return None, f"✅ Đã thêm vào hàng chờ (ID: {job_id[:8]}). Đang xử lý...", job_id def check_image_status(job_id): """Check status of image processing job""" if not job_id or job_id not in processing_results: return None, "Không tìm thấy công việc" result = processing_results[job_id] if result['status'] == 'processing': return None, f"⏳ Đang xử lý... {result['progress']}%" elif result['status'] == 'completed': return result['result']['image'], "✅ Hoàn thành!" else: return None, f"❌ Lỗi: {result.get('error', 'Unknown')}" def submit_video_job(source_vid, text_query, frame_limit, time_limit): """Submit video segmentation job to background queue""" if not source_vid or not text_query: return None, "❌ Vui lòng cung cấp video và prompt", "" job_id = str(uuid.uuid4()) job = { 'id': job_id, 'type': 'video', 'video': source_vid, 'prompt': text_query, 'frame_limit': frame_limit, 'time_limit': time_limit } processing_queue.put(job) return None, f"✅ Đã thêm vào hàng chờ (ID: {job_id[:8]}). Đang xử lý...", job_id def check_video_status(job_id): """Check status of video processing job""" if not job_id or job_id not in processing_results: return None, "Không tìm thấy công việc" result = processing_results[job_id] if result['status'] == 'processing': return None, f"⏳ Đang xử lý... {result['progress']}%" elif result['status'] == 'completed': return result['result']['output_path'], "✅ Hoàn thành!" else: return None, f"❌ Lỗi: {result.get('error', 'Unknown')}" def image_click_handler(image, evt: gr.SelectData, points_state, labels_state): """Handle click events for interactive segmentation""" x, y = evt.index if points_state is None: points_state = [] if labels_state is None: labels_state = [] points_state.append([x, y]) labels_state.append(1) # Process immediately (can be changed to background if needed) job_id = str(uuid.uuid4()) job = { 'id': job_id, 'type': 'click', 'image': image, 'points': points_state, 'labels': labels_state } try: result = process_click_job(job) return result['image'], points_state, labels_state except Exception as e: print(f"Click error: {e}") return image, points_state, labels_state # ============ GRADIO INTERFACE ============ custom_css=""" #col-container { margin: 0 auto; max-width: 1200px; } #main-title h1 { font-size: 2.1em !important; } .history-box { max-height: 600px; overflow-y: auto; } """ with gr.Blocks(css=custom_css, theme=app_theme) as demo: with gr.Column(elem_id="col-container"): gr.Markdown("# **SAM3: Segment Anything Model 3** 🚀", elem_id="main-title") gr.Markdown("Xử lý ảnh/video với **background processing** - không cần chờ đợi!") with gr.Tabs(): # ===== IMAGE SEGMENTATION TAB ===== with gr.Tab("📷 Image Segmentation"): with gr.Row(): with gr.Column(scale=1): image_input = gr.Image(label="Upload Image", type="pil", height=350) txt_prompt_img = gr.Textbox(label="Text Prompt", placeholder="e.g., cat, face, car wheel") with gr.Accordion("Advanced Settings", open=False): conf_slider = gr.Slider(0.0, 1.0, value=0.45, step=0.05, label="Confidence Threshold") btn_submit_img = gr.Button("🚀 Submit Job (Background)", variant="primary") btn_check_img = gr.Button("🔍 Check Status", variant="secondary") job_id_img = gr.Textbox(label="Job ID", visible=False) with gr.Column(scale=1.5): image_result = gr.AnnotatedImage(label="Segmented Result", height=410) status_img = gr.Textbox(label="Status", interactive=False) btn_submit_img.click( fn=submit_image_job, inputs=[image_input, txt_prompt_img, conf_slider], outputs=[image_result, status_img, job_id_img] ) btn_check_img.click( fn=check_image_status, inputs=[job_id_img], outputs=[image_result, status_img] ) # ===== VIDEO SEGMENTATION TAB ===== with gr.Tab("🎥 Video Segmentation"): with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Video", format="mp4", height=320) txt_prompt_vid = gr.Textbox(label="Text Prompt", placeholder="e.g., person running, red car") with gr.Row(): frame_limiter = gr.Slider(10, 500, value=60, step=10, label="Max Frames") time_limiter = gr.Radio([60, 120, 180], value=60, label="Timeout (seconds)") btn_submit_vid = gr.Button("🚀 Submit Job (Background)", variant="primary") btn_check_vid = gr.Button("🔍 Check Status", variant="secondary") job_id_vid = gr.Textbox(label="Job ID", visible=False) with gr.Column(): video_result = gr.Video(label="Processed Video") status_vid = gr.Textbox(label="Status", interactive=False) btn_submit_vid.click( fn=submit_video_job, inputs=[video_input, txt_prompt_vid, frame_limiter, time_limiter], outputs=[video_result, status_vid, job_id_vid] ) btn_check_vid.click( fn=check_video_status, inputs=[job_id_vid], outputs=[video_result, status_vid] ) # ===== CLICK SEGMENTATION TAB ===== with gr.Tab("👆 Click Segmentation"): with gr.Row(): with gr.Column(scale=1): img_click_input = gr.Image(type="pil", label="Upload Image", interactive=True, height=450) gr.Markdown("**Hướng dẫn:** Click vào đối tượng bạn muốn phân đoạn") with gr.Row(): img_click_clear = gr.Button("🔄 Clear Points & Reset", variant="primary") st_click_points = gr.State([]) st_click_labels = gr.State([]) with gr.Column(scale=1): img_click_output = gr.Image(type="pil", label="Result Preview", height=450, interactive=False) img_click_input.select( image_click_handler, inputs=[img_click_input, st_click_points, st_click_labels], outputs=[img_click_output, st_click_points, st_click_labels] ) img_click_clear.click( lambda: (None, [], []), outputs=[img_click_output, st_click_points, st_click_labels] ) # ===== HISTORY TAB ===== with gr.Tab("📜 Lịch Sử Xử Lý"): with gr.Row(): with gr.Column(): btn_refresh_history = gr.Button("🔄 Refresh History", variant="primary") history_display = gr.Markdown(value=get_history_display(), elem_classes="history-box") with gr.Accordion("Hướng dẫn", open=False): gr.Markdown(""" ### Lịch sử lưu: - ✅ **Hoàn thành**: File đã được xử lý thành công - ❌ **Lỗi**: Xử lý thất bại - Tất cả file output được lưu trong thư mục `processing_history/` - Hệ thống giữ lại 100 lịch sử gần nhất """) btn_refresh_history.click( fn=get_history_display, outputs=[history_display] ) # ===== BATCH PROCESSING TAB ===== with gr.Tab("⚙️ Batch Processing"): gr.Markdown("### Xử lý hàng loạt (Coming Soon)") gr.Markdown(""" Tính năng này sẽ cho phép bạn: - Upload nhiều ảnh/video cùng lúc - Tự động xử lý tuần tự - Download tất cả kết quả dưới dạng ZIP """) if __name__ == "__main__": demo.launch( css=custom_css, theme=app_theme, ssr_mode=False, mcp_server=True, show_error=True )