import cv2 import numpy as np import gradio as gr from skimage.filters import threshold_multiotsu from scipy.ndimage import binary_fill_holes, binary_opening import matplotlib.pyplot as plt import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def enhanced_aggregate_segmentation(image): """Advanced concrete aggregate segmentation pipeline""" try: # Convert to grayscale if needed if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image.copy() # Step 1: Contrast enhancement clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8)) enhanced = clahe.apply(gray) # Step 2: Multi-level Otsu thresholding thresholds = threshold_multiotsu(enhanced, classes=3) regions = np.digitize(enhanced, bins=thresholds) # Step 3: Aggregate mask creation aggregate_mask = (regions == 2).astype(np.uint8) # Assuming aggregates are brightest # Step 4: Morphological refinement kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5)) cleaned = cv2.morphologyEx(aggregate_mask*255, cv2.MORPH_CLOSE, kernel, iterations=2) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_OPEN, kernel, iterations=1) filled = binary_fill_holes(cleaned > 127) # Step 5: Final mask processing final_mask = binary_opening(filled, structure=np.ones((3,3))).astype(np.uint8) # Create output images if len(image.shape) == 3: aggregates = image.copy() aggregates[final_mask == 0] = 0 mask_vis = cv2.applyColorMap((final_mask*255).astype(np.uint8), cv2.COLORMAP_JET) mask_vis = cv2.addWeighted(image, 0.7, mask_vis, 0.3, 0) else: aggregates = np.zeros_like(image) aggregates[final_mask == 1] = image[final_mask == 1] mask_vis = cv2.applyColorMap((final_mask*255).astype(np.uint8), cv2.COLORMAP_JET) return aggregates, mask_vis except Exception as e: logger.error(f"Segmentation error: {str(e)}") error_img = np.zeros_like(image) if len(image.shape) == 2 else np.zeros((*image.shape[:2], 3)) return error_img, error_img def process_image(image): """Process image for Gradio interface""" try: # Convert from Gradio's RGB to OpenCV's BGR if isinstance(image, np.ndarray) and image.shape[2] == 3: image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) aggregates, mask_vis = enhanced_aggregate_segmentation(image) # Convert back to RGB for display if len(aggregates.shape) == 3: aggregates = cv2.cvtColor(aggregates, cv2.COLOR_BGR2RGB) mask_vis = cv2.cvtColor(mask_vis, cv2.COLOR_BGR2RGB) return aggregates, mask_vis except Exception as e: logger.error(f"Processing error: {str(e)}") error_img = np.zeros((512,512,3), dtype=np.uint8) return error_img, error_img # Create Gradio interface with gr.Blocks(title="Concrete Aggregate Analyzer") as app: gr.Markdown(""" ## Concrete Aggregate Segmentation Analyzer Upload an image of concrete surface for aggregate analysis """) with gr.Row(): with gr.Column(): input_img = gr.Image(label="Input Image", type="numpy") process_btn = gr.Button("Analyze", variant="primary") with gr.Column(): aggregates_img = gr.Image(label="Detected Aggregates") mask_img = gr.Image(label="Segmentation Visualization") debug_output = gr.Textbox(label="Processing Log") def process_with_logging(img): logger.handlers.clear() log_messages = [] class LogHandler(logging.Handler): def emit(self, record): log_messages.append(f"{record.levelname}: {record.getMessage()}") logger.addHandler(LogHandler()) results = process_image(img) return (*results, "\n".join(log_messages)) process_btn.click( fn=process_with_logging, inputs=input_img, outputs=[aggregates_img, mask_img, debug_output] ) if __name__ == "__main__": logger.info("Starting concrete aggregate analyzer...") app.launch( server_name="0.0.0.0", server_port=7860, share=False )