Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from functools import reduce | |
| from PIL import Image | |
| def detect_graph_boundaries(img): | |
| height, width = img.shape[:2] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) | |
| col_sums = np.sum(thresh, axis=0) / 255 | |
| is_line = col_sums > (height * 0.40) | |
| line_indices = np.where(is_line)[0] | |
| start_x = 0 | |
| if len(line_indices) > 0: | |
| left_lines = [x for x in line_indices if x < width * 0.2 and x > 5] | |
| if left_lines: | |
| start_x = left_lines[0] | |
| end_x = width - 1 | |
| if len(line_indices) > 0: | |
| right_margin = width * 0.95 | |
| right_lines = [x for x in line_indices if x > right_margin] | |
| if right_lines: | |
| end_x = right_lines[-1] | |
| # Create debug image | |
| debug_img = img.copy() | |
| cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) | |
| cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) | |
| return int(start_x), int(end_x), debug_img | |
| def extract_color_pixels( | |
| image, color="green", mode="dominant", threshold=0, difference=10 | |
| ): | |
| """ | |
| Process an image and extract only pixels of a specific color. | |
| Display them on a black background. | |
| Args: | |
| image: PIL Image object | |
| color: str, one of 'red', 'green', or 'blue' | |
| mode: str, detection mode - 'dominant', 'difference', or 'strict' | |
| threshold: int, minimum value for the target color channel (0-255) | |
| difference: int/float, parameter meaning depends on mode | |
| Returns: | |
| tuple: (PIL Image object with only specified color pixels, color_mask array) | |
| """ | |
| # Convert image to RGB if it's not already | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Convert to numpy array for easier manipulation | |
| img_array = np.array(image) | |
| # Create a black background with the same dimensions | |
| result_array = np.zeros_like(img_array) | |
| # Extract RGB channels | |
| red = img_array[:, :, 0].astype(np.float32) | |
| green = img_array[:, :, 1].astype(np.float32) | |
| blue = img_array[:, :, 2].astype(np.float32) | |
| # Create mask based on selected color and mode | |
| if mode == "dominant": | |
| # Simply check if the target color is the highest channel | |
| if color == "red": | |
| color_mask = (red >= green) & (red >= blue) & (red > threshold) | |
| elif color == "green": | |
| color_mask = (green >= red) & (green >= blue) & (green > threshold) | |
| elif color == "blue": | |
| color_mask = (blue >= red) & (blue >= green) & (blue > threshold) | |
| elif mode == "difference": | |
| # Target color must be higher than others by a certain absolute difference | |
| if color == "red": | |
| color_mask = ( | |
| (red > threshold) | |
| & (red > green + difference) | |
| & (red > blue + difference) | |
| ) | |
| elif color == "green": | |
| color_mask = ( | |
| (green > threshold) | |
| & (green > red + difference) | |
| & (green > blue + difference) | |
| ) | |
| elif color == "blue": | |
| color_mask = ( | |
| (blue > threshold) | |
| & (blue > red + difference) | |
| & (blue > green + difference) | |
| ) | |
| elif mode == "strict": | |
| # Target color must be significantly higher (percentage-based) | |
| dominance_factor = 1.0 + (difference / 100.0) | |
| if color == "red": | |
| color_mask = ( | |
| (red > threshold) | |
| & (red > green * dominance_factor) | |
| & (red > blue * dominance_factor) | |
| ) | |
| elif color == "green": | |
| color_mask = ( | |
| (green > threshold) | |
| & (green > red * dominance_factor) | |
| & (green > blue * dominance_factor) | |
| ) | |
| elif color == "blue": | |
| color_mask = ( | |
| (blue > threshold) | |
| & (blue > red * dominance_factor) | |
| & (blue > green * dominance_factor) | |
| ) | |
| else: | |
| raise ValueError("Mode must be 'dominant', 'difference', or 'strict'") | |
| # Apply mask to keep only target color pixels | |
| result_array[color_mask] = img_array[color_mask] | |
| # Convert back to PIL Image | |
| result_image = Image.fromarray(result_array.astype("uint8")) | |
| return result_image, color_mask | |
| def extract_line_mask( | |
| img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold | |
| ): | |
| # Boost Saturation | |
| hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv_pre) | |
| s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) | |
| hsv = cv2.merge((h, s, v)) | |
| # Convert OpenCV BGR (boosted) to PIL RGB | |
| boosted_bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) | |
| img_rgb = cv2.cvtColor(boosted_bgr, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(img_rgb) | |
| target_color = "green" | |
| if line_color == "Red": | |
| target_color = "red" | |
| elif line_color == "Blue (Cyan)": | |
| target_color = "blue" | |
| diff_val = 20 | |
| if line_color == "Green": | |
| diff_val = 30 | |
| _, color_mask = extract_color_pixels( | |
| pil_image, | |
| color=target_color, | |
| mode="difference", | |
| threshold=40, | |
| difference=diff_val, | |
| ) | |
| # Convert boolean mask to uint8 | |
| mask = np.zeros_like(img_cropped[:, :, 0], dtype=np.uint8) | |
| mask[color_mask] = 255 | |
| debug_image = None | |
| # Additional processing for Green (White removal) | |
| if line_color == "Green": | |
| original_bgr = img_cropped | |
| original_hsv = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2HSV) | |
| _, orig_s, orig_v = cv2.split(original_hsv) | |
| white_mask = (orig_v > 200) & (orig_s < 50) | |
| mask_before_white_removal = mask.copy() | |
| mask[white_mask] = 0 | |
| # Create debug visualization | |
| debug_image = img_cropped.copy() | |
| debug_image[mask > 0] = [0, 255, 0] | |
| removed_white = white_mask & (mask_before_white_removal > 0) | |
| debug_image[removed_white] = [0, 0, 255] | |
| if mask is None: | |
| return None, None | |
| # Noise/Gap cleanup | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| mask_clean = np.zeros_like(mask) | |
| for cnt in contours: | |
| if cv2.contourArea(cnt) > (noise_threshold * 0.5): | |
| cv2.drawContours(mask_clean, [cnt], -1, 255, -1) | |
| mask = mask_clean | |
| if gap_fill_size > 0: | |
| k_h = np.ones((1, gap_fill_size), np.uint8) | |
| close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h) | |
| k_v = np.ones((gap_fill_size, 1), np.uint8) | |
| close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v) | |
| mask = cv2.bitwise_or(close_h, close_v) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2, 2), np.uint8)) | |
| return mask, debug_image | |
| def generate_curve_data(mask, name_upper, name_lower): | |
| height, width = mask.shape | |
| data = [] | |
| for x in range(width): | |
| col = mask[:, x] | |
| indices = np.where(col > 0)[0] | |
| val_top, val_bot = None, None | |
| if len(indices) > 0: | |
| y_min, y_max = indices[0], indices[-1] | |
| graph_y_top = height - y_min | |
| graph_y_bot = height - y_max | |
| val_top = graph_y_top | |
| val_bot = graph_y_bot | |
| data.append({"X": x, name_upper: val_top, name_lower: val_bot}) | |
| df = pd.DataFrame(data) | |
| df[name_upper] = df[name_upper].interpolate( | |
| method="linear", limit=3, limit_area="inside" | |
| ) | |
| df[name_lower] = df[name_lower].interpolate( | |
| method="linear", limit=3, limit_area="inside" | |
| ) | |
| df[name_upper] = df[name_upper].bfill().ffill() | |
| df[name_lower] = df[name_lower].bfill().ffill() | |
| return df | |
| def process_uploaded_image( | |
| file_bytes, | |
| sat_factor, | |
| gap_size, | |
| noise_threshold, | |
| crop_enabled, | |
| total_duration, | |
| travel_gradient_threshold=30, | |
| ): | |
| file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) | |
| img_orig = cv2.imdecode(file_bytes, 1) | |
| debug_img_bounds = img_orig.copy() | |
| sx, ex = 0, img_orig.shape[1] | |
| if crop_enabled: | |
| sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) | |
| img_working = img_orig[:, sx:ex] | |
| else: | |
| img_working = img_orig | |
| if img_working.shape[1] == 0: | |
| return None, None, None, "Crop failed.", {} | |
| configs = [ | |
| ("Red", "Red", ("Travel", "C1")), | |
| ("Green", "Green", ("Resistance", "C2")), | |
| ("Blue (Cyan)", "Blue", ("Current", "C3")), | |
| ] | |
| dfs = [] | |
| debug_images = {} | |
| debug_images["Boundaries"] = debug_img_bounds | |
| height, width = img_working.shape[:2] | |
| for color_key, _, col_names in configs: | |
| mask, debug_img = extract_line_mask( | |
| img_working, color_key, sat_factor, gap_size, noise_threshold | |
| ) | |
| if mask is not None: | |
| if debug_img is not None and color_key == "Green": | |
| debug_images[color_key + " (White Removal)"] = cv2.cvtColor( | |
| debug_img, cv2.COLOR_BGR2RGB | |
| ) | |
| colored_mask_clean = np.zeros_like(img_working) | |
| colored_mask_clean[mask > 0] = [0, 255, 0] | |
| overlay_clean = cv2.addWeighted( | |
| img_working, 0.7, colored_mask_clean, 0.3, 0 | |
| ) | |
| debug_images[color_key + " (Cleaned Overlay)"] = cv2.cvtColor( | |
| overlay_clean, cv2.COLOR_BGR2RGB | |
| ) | |
| colored_mask = np.zeros_like(img_working) | |
| colored_mask[mask > 0] = [0, 255, 0] | |
| overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0) | |
| debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) | |
| df_curve = generate_curve_data(mask, col_names[0], col_names[1]) | |
| dfs.append(df_curve) | |
| else: | |
| df_empty = pd.DataFrame( | |
| {"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan} | |
| ) | |
| dfs.append(df_empty) | |
| if dfs: | |
| final_df = reduce( | |
| lambda left, right: pd.merge(left, right, on="X", how="outer"), dfs | |
| ) | |
| cols = ["X", "Travel", "C1", "Resistance", "C2", "Current", "C3"] | |
| existing_cols = [c for c in cols if c in final_df.columns] | |
| if "X" in final_df.columns: | |
| # === UPDATED TIME CALCULATION === | |
| # Calculates strict linear time: Pixel 0 = 0ms, Pixel Last = total_duration | |
| final_df["Time (ms)"] = (final_df["X"] / (width - 1)) * total_duration | |
| existing_cols.insert(1, "Time (ms)") | |
| else: | |
| return None, None, None, "X-axis alignment failed.", {} | |
| # IMPROVED BASELINE CLEANUP - Remove dotted reference lines | |
| baselines = {} | |
| for col in ["Travel", "Current"]: | |
| if col in final_df.columns: | |
| # Calculate baseline from first 60 entries | |
| first_60 = final_df[col].head(60) | |
| if first_60.notna().any(): | |
| initial_baseline = first_60.mean(skipna=True) | |
| if col == "Travel": | |
| # Identify outliers: points < 98% of initial baseline | |
| outlier_threshold = initial_baseline * 0.98 | |
| valid_points = first_60[first_60 >= outlier_threshold] | |
| if valid_points.notna().any(): | |
| baseline_val = valid_points.mean(skipna=True) | |
| else: | |
| baseline_val = initial_baseline | |
| else: | |
| baseline_val = initial_baseline | |
| else: | |
| valid_idx = final_df[col].first_valid_index() | |
| if valid_idx is not None: | |
| baseline_val = final_df.loc[valid_idx, col] | |
| else: | |
| continue | |
| baselines[col] = baseline_val | |
| # Find minimum value (dotted reference line level) | |
| min_val = final_df[col].min(skipna=True) | |
| # Set values near minimum to NaN | |
| threshold = min_val + (baseline_val - min_val) * 0.15 | |
| final_df.loc[final_df[col] < threshold, col] = np.nan | |
| # Abrupt Change (Gradient) Filter | |
| if col == "Travel": | |
| gradient_threshold = travel_gradient_threshold | |
| diff = final_df[col].diff().abs() | |
| mask_abrupt = diff > gradient_threshold | |
| final_df.loc[mask_abrupt, col] = np.nan | |
| # Time-Based Baseline Tolerances | |
| # 1. Start (0-30ms) | |
| mask_start = final_df["Time (ms)"] < 30 | |
| threshold_start = baseline_val * 0.98 | |
| mask_remove_start = mask_start & (final_df[col] < threshold_start) | |
| final_df.loc[mask_remove_start, col] = np.nan | |
| # 2. End (Last 50ms) | |
| max_time = final_df["Time (ms)"].max() | |
| mask_end = final_df["Time (ms)"] > (max_time - 50) | |
| threshold_end = baseline_val * 0.98 | |
| mask_remove_end = mask_end & (final_df[col] < threshold_end) | |
| final_df.loc[mask_remove_end, col] = np.nan | |
| # 3. Center (100-300ms) | |
| mask_center = (final_df["Time (ms)"] >= 100) & ( | |
| final_df["Time (ms)"] <= 300 | |
| ) | |
| threshold_center = baseline_val * 1.05 | |
| mask_remove_center = mask_center & (final_df[col] < threshold_center) | |
| final_df.loc[mask_remove_center, col] = np.nan | |
| # 4. Main (30-350ms) excluding Center | |
| mask_main_pre = (final_df["Time (ms)"] >= 30) & ( | |
| final_df["Time (ms)"] < 100 | |
| ) | |
| mask_main_post = (final_df["Time (ms)"] > 300) & ( | |
| final_df["Time (ms)"] <= 350 | |
| ) | |
| mask_remove_main_pre = mask_main_pre & (final_df[col] < baseline_val) | |
| mask_remove_main_post = mask_main_post & (final_df[col] < baseline_val) | |
| final_df.loc[mask_remove_main_pre, col] = np.nan | |
| final_df.loc[mask_remove_main_post, col] = np.nan | |
| # Fill gaps | |
| final_df[col] = ( | |
| final_df[col] | |
| .interpolate(method="linear", limit=3, limit_area="inside") | |
| .bfill() | |
| .ffill() | |
| ) | |
| # CROSS-CHANNEL BASELINE CONSTRAINTS | |
| if "Travel" in baselines: | |
| travel_base = baselines["Travel"] | |
| if "Current" in final_df.columns: | |
| mask = final_df["Current"] < travel_base | |
| final_df.loc[mask, "Current"] = np.nan | |
| final_df["Current"] = ( | |
| final_df["Current"] | |
| .interpolate(method="linear", limit=3, limit_area="inside") | |
| .bfill() | |
| .ffill() | |
| ) | |
| # AUXILIARY CURVE LOGIC | |
| pairs = [("C1", "Travel"), ("C2", "Resistance"), ("C3", "Current")] | |
| for lower, upper in pairs: | |
| if lower in final_df.columns and upper in final_df.columns: | |
| if not final_df[upper].isnull().all(): | |
| invalid_mask = final_df[lower] > final_df[upper] | |
| final_df.loc[invalid_mask, lower] = np.nan | |
| # Final global cleanup (excluding Resistance) | |
| for col in ["Travel", "Current", "C1", "C3"]: | |
| if col in final_df.columns: | |
| final_df[col] = ( | |
| final_df[col] | |
| .interpolate(method="linear", limit=3, limit_area="inside") | |
| .bfill() | |
| .ffill() | |
| ) | |
| return final_df[existing_cols], debug_images, (sx, ex), None, baselines | |
| return None, None, None, "No data extracted.", {} |