import cv2 import numpy as np import pandas as pd from functools import reduce from PIL import Image def detect_graph_boundaries(img): height, width = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) col_sums = np.sum(thresh, axis=0) / 255 is_line = col_sums > (height * 0.40) line_indices = np.where(is_line)[0] start_x = 0 if len(line_indices) > 0: left_lines = [x for x in line_indices if x < width * 0.2 and x > 5] if left_lines: start_x = left_lines[0] end_x = width - 1 if len(line_indices) > 0: right_margin = width * 0.95 right_lines = [x for x in line_indices if x > right_margin] if right_lines: end_x = right_lines[-1] # Create debug image debug_img = img.copy() cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) return int(start_x), int(end_x), debug_img def extract_color_pixels( image, color="green", mode="dominant", threshold=0, difference=10 ): """ Process an image and extract only pixels of a specific color. Display them on a black background. Args: image: PIL Image object color: str, one of 'red', 'green', or 'blue' mode: str, detection mode - 'dominant', 'difference', or 'strict' threshold: int, minimum value for the target color channel (0-255) difference: int/float, parameter meaning depends on mode Returns: tuple: (PIL Image object with only specified color pixels, color_mask array) """ # Convert image to RGB if it's not already if image.mode != "RGB": image = image.convert("RGB") # Convert to numpy array for easier manipulation img_array = np.array(image) # Create a black background with the same dimensions result_array = np.zeros_like(img_array) # Extract RGB channels red = img_array[:, :, 0].astype(np.float32) green = img_array[:, :, 1].astype(np.float32) blue = img_array[:, :, 2].astype(np.float32) # Create mask based on selected color and mode if mode == "dominant": # Simply check if the target color is the highest channel if color == "red": color_mask = (red >= green) & (red >= blue) & (red > threshold) elif color == "green": color_mask = (green >= red) & (green >= blue) & (green > threshold) elif color == "blue": color_mask = (blue >= red) & (blue >= green) & (blue > threshold) elif mode == "difference": # Target color must be higher than others by a certain absolute difference if color == "red": color_mask = ( (red > threshold) & (red > green + difference) & (red > blue + difference) ) elif color == "green": color_mask = ( (green > threshold) & (green > red + difference) & (green > blue + difference) ) elif color == "blue": color_mask = ( (blue > threshold) & (blue > red + difference) & (blue > green + difference) ) elif mode == "strict": # Target color must be significantly higher (percentage-based) dominance_factor = 1.0 + (difference / 100.0) if color == "red": color_mask = ( (red > threshold) & (red > green * dominance_factor) & (red > blue * dominance_factor) ) elif color == "green": color_mask = ( (green > threshold) & (green > red * dominance_factor) & (green > blue * dominance_factor) ) elif color == "blue": color_mask = ( (blue > threshold) & (blue > red * dominance_factor) & (blue > green * dominance_factor) ) else: raise ValueError("Mode must be 'dominant', 'difference', or 'strict'") # Apply mask to keep only target color pixels result_array[color_mask] = img_array[color_mask] # Convert back to PIL Image result_image = Image.fromarray(result_array.astype("uint8")) return result_image, color_mask def extract_line_mask( img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold ): # Boost Saturation hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv_pre) s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) # Convert OpenCV BGR (boosted) to PIL RGB boosted_bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) img_rgb = cv2.cvtColor(boosted_bgr, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(img_rgb) target_color = "green" if line_color == "Red": target_color = "red" elif line_color == "Blue (Cyan)": target_color = "blue" diff_val = 20 if line_color == "Green": diff_val = 30 _, color_mask = extract_color_pixels( pil_image, color=target_color, mode="difference", threshold=40, difference=diff_val, ) # Convert boolean mask to uint8 mask = np.zeros_like(img_cropped[:, :, 0], dtype=np.uint8) mask[color_mask] = 255 debug_image = None # Additional processing for Green (White removal) if line_color == "Green": original_bgr = img_cropped original_hsv = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2HSV) _, orig_s, orig_v = cv2.split(original_hsv) white_mask = (orig_v > 200) & (orig_s < 50) mask_before_white_removal = mask.copy() mask[white_mask] = 0 # Create debug visualization debug_image = img_cropped.copy() debug_image[mask > 0] = [0, 255, 0] removed_white = white_mask & (mask_before_white_removal > 0) debug_image[removed_white] = [0, 0, 255] if mask is None: return None, None # Noise/Gap cleanup contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) mask_clean = np.zeros_like(mask) for cnt in contours: if cv2.contourArea(cnt) > (noise_threshold * 0.5): cv2.drawContours(mask_clean, [cnt], -1, 255, -1) mask = mask_clean if gap_fill_size > 0: k_h = np.ones((1, gap_fill_size), np.uint8) close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h) k_v = np.ones((gap_fill_size, 1), np.uint8) close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v) mask = cv2.bitwise_or(close_h, close_v) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2, 2), np.uint8)) return mask, debug_image def generate_curve_data(mask, name_upper, name_lower): height, width = mask.shape data = [] for x in range(width): col = mask[:, x] indices = np.where(col > 0)[0] val_top, val_bot = None, None if len(indices) > 0: y_min, y_max = indices[0], indices[-1] graph_y_top = height - y_min graph_y_bot = height - y_max val_top = graph_y_top val_bot = graph_y_bot data.append({"X": x, name_upper: val_top, name_lower: val_bot}) df = pd.DataFrame(data) df[name_upper] = df[name_upper].interpolate( method="linear", limit=3, limit_area="inside" ) df[name_lower] = df[name_lower].interpolate( method="linear", limit=3, limit_area="inside" ) df[name_upper] = df[name_upper].bfill().ffill() df[name_lower] = df[name_lower].bfill().ffill() return df def process_uploaded_image( file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration, travel_gradient_threshold=30, ): file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) img_orig = cv2.imdecode(file_bytes, 1) debug_img_bounds = img_orig.copy() sx, ex = 0, img_orig.shape[1] if crop_enabled: sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) img_working = img_orig[:, sx:ex] else: img_working = img_orig if img_working.shape[1] == 0: return None, None, None, "Crop failed.", {} configs = [ ("Red", "Red", ("Travel", "C1")), ("Green", "Green", ("Resistance", "C2")), ("Blue (Cyan)", "Blue", ("Current", "C3")), ] dfs = [] debug_images = {} debug_images["Boundaries"] = debug_img_bounds height, width = img_working.shape[:2] for color_key, _, col_names in configs: mask, debug_img = extract_line_mask( img_working, color_key, sat_factor, gap_size, noise_threshold ) if mask is not None: if debug_img is not None and color_key == "Green": debug_images[color_key + " (White Removal)"] = cv2.cvtColor( debug_img, cv2.COLOR_BGR2RGB ) colored_mask_clean = np.zeros_like(img_working) colored_mask_clean[mask > 0] = [0, 255, 0] overlay_clean = cv2.addWeighted( img_working, 0.7, colored_mask_clean, 0.3, 0 ) debug_images[color_key + " (Cleaned Overlay)"] = cv2.cvtColor( overlay_clean, cv2.COLOR_BGR2RGB ) colored_mask = np.zeros_like(img_working) colored_mask[mask > 0] = [0, 255, 0] overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0) debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) df_curve = generate_curve_data(mask, col_names[0], col_names[1]) dfs.append(df_curve) else: df_empty = pd.DataFrame( {"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan} ) dfs.append(df_empty) if dfs: final_df = reduce( lambda left, right: pd.merge(left, right, on="X", how="outer"), dfs ) cols = ["X", "Travel", "C1", "Resistance", "C2", "Current", "C3"] existing_cols = [c for c in cols if c in final_df.columns] if "X" in final_df.columns: # === UPDATED TIME CALCULATION === # Calculates strict linear time: Pixel 0 = 0ms, Pixel Last = total_duration final_df["Time (ms)"] = (final_df["X"] / (width - 1)) * total_duration existing_cols.insert(1, "Time (ms)") else: return None, None, None, "X-axis alignment failed.", {} # IMPROVED BASELINE CLEANUP - Remove dotted reference lines baselines = {} for col in ["Travel", "Current"]: if col in final_df.columns: # Calculate baseline from first 60 entries first_60 = final_df[col].head(60) if first_60.notna().any(): initial_baseline = first_60.mean(skipna=True) if col == "Travel": # Identify outliers: points < 98% of initial baseline outlier_threshold = initial_baseline * 0.98 valid_points = first_60[first_60 >= outlier_threshold] if valid_points.notna().any(): baseline_val = valid_points.mean(skipna=True) else: baseline_val = initial_baseline else: baseline_val = initial_baseline else: valid_idx = final_df[col].first_valid_index() if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col] else: continue baselines[col] = baseline_val # Find minimum value (dotted reference line level) min_val = final_df[col].min(skipna=True) # Set values near minimum to NaN threshold = min_val + (baseline_val - min_val) * 0.15 final_df.loc[final_df[col] < threshold, col] = np.nan # Abrupt Change (Gradient) Filter if col == "Travel": gradient_threshold = travel_gradient_threshold diff = final_df[col].diff().abs() mask_abrupt = diff > gradient_threshold final_df.loc[mask_abrupt, col] = np.nan # Time-Based Baseline Tolerances # 1. Start (0-30ms) mask_start = final_df["Time (ms)"] < 30 threshold_start = baseline_val * 0.98 mask_remove_start = mask_start & (final_df[col] < threshold_start) final_df.loc[mask_remove_start, col] = np.nan # 2. End (Last 50ms) max_time = final_df["Time (ms)"].max() mask_end = final_df["Time (ms)"] > (max_time - 50) threshold_end = baseline_val * 0.98 mask_remove_end = mask_end & (final_df[col] < threshold_end) final_df.loc[mask_remove_end, col] = np.nan # 3. Center (100-300ms) mask_center = (final_df["Time (ms)"] >= 100) & ( final_df["Time (ms)"] <= 300 ) threshold_center = baseline_val * 1.05 mask_remove_center = mask_center & (final_df[col] < threshold_center) final_df.loc[mask_remove_center, col] = np.nan # 4. Main (30-350ms) excluding Center mask_main_pre = (final_df["Time (ms)"] >= 30) & ( final_df["Time (ms)"] < 100 ) mask_main_post = (final_df["Time (ms)"] > 300) & ( final_df["Time (ms)"] <= 350 ) mask_remove_main_pre = mask_main_pre & (final_df[col] < baseline_val) mask_remove_main_post = mask_main_post & (final_df[col] < baseline_val) final_df.loc[mask_remove_main_pre, col] = np.nan final_df.loc[mask_remove_main_post, col] = np.nan # Fill gaps final_df[col] = ( final_df[col] .interpolate(method="linear", limit=3, limit_area="inside") .bfill() .ffill() ) # CROSS-CHANNEL BASELINE CONSTRAINTS if "Travel" in baselines: travel_base = baselines["Travel"] if "Current" in final_df.columns: mask = final_df["Current"] < travel_base final_df.loc[mask, "Current"] = np.nan final_df["Current"] = ( final_df["Current"] .interpolate(method="linear", limit=3, limit_area="inside") .bfill() .ffill() ) # AUXILIARY CURVE LOGIC pairs = [("C1", "Travel"), ("C2", "Resistance"), ("C3", "Current")] for lower, upper in pairs: if lower in final_df.columns and upper in final_df.columns: if not final_df[upper].isnull().all(): invalid_mask = final_df[lower] > final_df[upper] final_df.loc[invalid_mask, lower] = np.nan # Final global cleanup (excluding Resistance) for col in ["Travel", "Current", "C1", "C3"]: if col in final_df.columns: final_df[col] = ( final_df[col] .interpolate(method="linear", limit=3, limit_area="inside") .bfill() .ffill() ) return final_df[existing_cols], debug_images, (sx, ex), None, baselines return None, None, None, "No data extracted.", {}