import cv2 import numpy as np import pandas as pd from functools import reduce def detect_graph_boundaries(img): height, width = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) col_sums = np.sum(thresh, axis=0) / 255 is_line = col_sums > (height * 0.40) line_indices = np.where(is_line)[0] start_x = 0 if len(line_indices) > 0: left_lines = [x for x in line_indices if x < width * 0.2 and x > 5] if left_lines: start_x = left_lines[0] end_x = width - 1 if len(line_indices) > 0: right_margin = width * 0.95 right_lines = [x for x in line_indices if x > right_margin] if right_lines: end_x = right_lines[-1] # Create debug image debug_img = img.copy() cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) return int(start_x), int(end_x), debug_img def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold): # Boost Saturation hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv_pre) s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)) mask = None if line_color == "Green": lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255]) mask_hsv = cv2.inRange(hsv, lower, upper) diff_gb = g.astype(np.int16) - b.astype(np.int16) diff_gr = g.astype(np.int16) - r.astype(np.int16) mask_channel = np.zeros_like(g, dtype=np.uint8) mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) elif line_color == "Blue (Cyan)": lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255]) mask_hsv = cv2.inRange(hsv, lower, upper) diff_br = b.astype(np.int16) - r.astype(np.int16) mask_channel = np.zeros_like(b, dtype=np.uint8) mask_channel[diff_br > 20] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) elif line_color == "Red": lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255]) lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255]) mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2)) diff_rg = r.astype(np.int16) - g.astype(np.int16) diff_rb = r.astype(np.int16) - b.astype(np.int16) mask_channel = np.zeros_like(r, dtype=np.uint8) mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) if mask is None: return None # Noise/Gap cleanup contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) mask_clean = np.zeros_like(mask) for cnt in contours: # Reduced noise threshold slightly to keep thin spikes if cv2.contourArea(cnt) > (noise_threshold * 0.5): cv2.drawContours(mask_clean, [cnt], -1, 255, -1) mask = mask_clean if gap_fill_size > 0: k_h = np.ones((1, gap_fill_size), np.uint8) close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h) k_v = np.ones((gap_fill_size, 1), np.uint8) close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v) mask = cv2.bitwise_or(close_h, close_v) # Replaced the 3x3 CLOSE with a smaller one to prevent merging nearby spikes too aggressively mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2,2), np.uint8)) # --- FIX 1: REMOVED MORPH_OPEN --- # The previous code had: mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) # This erodes the image. If a spike is very sharp (1-2px tip), this line deletes the tip. # We remove it to preserve high-frequency details. return mask def generate_curve_data(mask, name_upper, name_lower): height, width = mask.shape data = [] prev_top, prev_bot = None, None proximity_thresh = 10 for x in range(width): col = mask[:, x] indices = np.where(col > 0)[0] val_top, val_bot = None, None if len(indices) > 0: y_min, y_max = indices[0], indices[-1] graph_y_top = height - y_min graph_y_bot = height - y_max # If the line is thin, it's a single curve if abs(y_max - y_min) <= proximity_thresh: current_val = height - int((y_min + y_max) / 2) # Simple tracking to decide if it belongs to top or bottom curve if they were split previously if prev_top is None and prev_bot is None: val_top = current_val elif prev_top is not None and prev_bot is None: val_top = current_val elif prev_top is None and prev_bot is not None: val_bot = current_val else: if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val else: val_bot = current_val else: # Vertical line (spike) or filled area val_top = graph_y_top val_bot = graph_y_bot if val_top is not None: prev_top = val_top if val_bot is not None: prev_bot = val_bot data.append({"X": x, name_upper: val_top, name_lower: val_bot}) df = pd.DataFrame(data) # Using 'pchip' or 'linear' interpolation. # 'linear' is safer for sharp spikes. 'pchip' can overshoot. df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both') df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both') return df def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration): # 1. Decode Image file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) img_orig = cv2.imdecode(file_bytes, 1) # 2. Crop debug_img_bounds = img_orig.copy() sx, ex = 0, img_orig.shape[1] if crop_enabled: sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) img_working = img_orig[:, sx:ex] else: img_working = img_orig if img_working.shape[1] == 0: return None, None, None, "Crop failed." # 3. Process Colors configs = [ ("Red", "Red", ("Travel", "C1")), ("Green", "Green", ("Resistance", "C2")), ("Blue (Cyan)", "Blue", ("Current", "C3")) ] dfs = [] debug_images = {} debug_images["Boundaries"] = debug_img_bounds height, width = img_working.shape[:2] for color_key, _, col_names in configs: mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold) if mask is not None: colored_mask = np.zeros_like(img_working) colored_mask[mask > 0] = [0, 255, 0] overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0) debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) df_curve = generate_curve_data(mask, col_names[0], col_names[1]) dfs.append(df_curve) else: df_empty = pd.DataFrame({"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan}) dfs.append(df_empty) # 4. Merge if dfs: final_df = reduce(lambda left, right: pd.merge(left, right, on='X', how='outer'), dfs) # STEP A: GENERATE TIME COLUMN cols = ['X', 'Travel', 'C1', 'Resistance', 'C2', 'Current', 'C3'] existing_cols = [c for c in cols if c in final_df.columns] if 'X' in final_df.columns: time_per_pixel = total_duration / width final_df['Time (ms)'] = final_df['X'] * time_per_pixel existing_cols.insert(1, 'Time (ms)') else: return None, None, None, "X-axis alignment failed." # STEP B: CALCULATE BASELINES & CLEANUP for col in ['Current', 'Travel']: if col in final_df.columns: # Baseline - keep this simple to detect zero offset baseline_val = 0 start_window = final_df[final_df['Time (ms)'] <= 30] if not start_window.empty and start_window[col].notna().any(): baseline_val = start_window[col].mean() else: valid_idx = final_df[col].first_valid_index() if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col] # --- FIX 2: Relaxed Baseline Clipping --- # Previous code strictly deleted anything below baseline. # Spikes often oscillate. We allow small dips now. end_x = width - 1 if len(line_indices) > 0: right_margin = width * 0.95 right_lines = [x for x in line_indices if x > right_margin] if right_lines: end_x = right_lines[-1] # Create debug image debug_img = img.copy() cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) return int(start_x), int(end_x), debug_img def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold): # Boost Saturation hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv_pre) s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)) mask = None if line_color == "Green": lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255]) mask_hsv = cv2.inRange(hsv, lower, upper) diff_gb = g.astype(np.int16) - b.astype(np.int16) diff_gr = g.astype(np.int16) - r.astype(np.int16) mask_channel = np.zeros_like(g, dtype=np.uint8) mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) elif line_color == "Blue (Cyan)": lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255]) mask_hsv = cv2.inRange(hsv, lower, upper) diff_br = b.astype(np.int16) - r.astype(np.int16) mask_channel = np.zeros_like(b, dtype=np.uint8) mask_channel[diff_br > 20] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) elif line_color == "Red": lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255]) lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255]) mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2)) diff_rg = r.astype(np.int16) - g.astype(np.int16) diff_rb = r.astype(np.int16) - b.astype(np.int16) mask_channel = np.zeros_like(r, dtype=np.uint8) mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255 mask = cv2.bitwise_and(mask_hsv, mask_channel) if mask is None: return None # Noise/Gap cleanup contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) mask_clean = np.zeros_like(mask) for cnt in contours: # Reduced noise threshold slightly to keep thin spikes if cv2.contourArea(cnt) > (noise_threshold * 0.5): cv2.drawContours(mask_clean, [cnt], -1, 255, -1) mask = mask_clean if gap_fill_size > 0: k_h = np.ones((1, gap_fill_size), np.uint8) close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h) k_v = np.ones((gap_fill_size, 1), np.uint8) close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v) mask = cv2.bitwise_or(close_h, close_v) # Replaced the 3x3 CLOSE with a smaller one to prevent merging nearby spikes too aggressively mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2,2), np.uint8)) # --- FIX 1: REMOVED MORPH_OPEN --- # The previous code had: mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) # This erodes the image. If a spike is very sharp (1-2px tip), this line deletes the tip. # We remove it to preserve high-frequency details. return mask def generate_curve_data(mask, name_upper, name_lower): height, width = mask.shape data = [] prev_top, prev_bot = None, None proximity_thresh = 10 for x in range(width): col = mask[:, x] indices = np.where(col > 0)[0] val_top, val_bot = None, None if len(indices) > 0: y_min, y_max = indices[0], indices[-1] graph_y_top = height - y_min graph_y_bot = height - y_max # If the line is thin, it's a single curve if abs(y_max - y_min) <= proximity_thresh: current_val = height - int((y_min + y_max) / 2) # Simple tracking to decide if it belongs to top or bottom curve if they were split previously if prev_top is None and prev_bot is None: val_top = current_val elif prev_top is not None and prev_bot is None: val_top = current_val elif prev_top is None and prev_bot is not None: val_bot = current_val else: if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val else: val_bot = current_val else: # Vertical line (spike) or filled area val_top = graph_y_top val_bot = graph_y_bot if val_top is not None: prev_top = val_top if val_bot is not None: prev_bot = val_bot data.append({"X": x, name_upper: val_top, name_lower: val_bot}) df = pd.DataFrame(data) # Using 'pchip' or 'linear' interpolation. # 'linear' is safer for sharp spikes. 'pchip' can overshoot. df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both') df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both') return df def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration): # 1. Decode Image file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) img_orig = cv2.imdecode(file_bytes, 1) # 2. Crop debug_img_bounds = img_orig.copy() sx, ex = 0, img_orig.shape[1] if crop_enabled: sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) img_working = img_orig[:, sx:ex] else: img_working = img_orig if img_working.shape[1] == 0: return None, None, None, "Crop failed." # 3. Process Colors configs = [ ("Red", "Red", ("Travel", "C1")), ("Green", "Green", ("Resistance", "C2")), ("Blue (Cyan)", "Blue", ("Current", "C3")) ] dfs = [] debug_images = {} debug_images["Boundaries"] = debug_img_bounds height, width = img_working.shape[:2] for color_key, _, col_names in configs: mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold) if mask is not None: colored_mask = np.zeros_like(img_working) colored_mask[mask > 0] = [0, 255, 0] overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0) debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) df_curve = generate_curve_data(mask, col_names[0], col_names[1]) dfs.append(df_curve) else: df_empty = pd.DataFrame({"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan}) dfs.append(df_empty) # 4. Merge if dfs: final_df = reduce(lambda left, right: pd.merge(left, right, on='X', how='outer'), dfs) # STEP A: GENERATE TIME COLUMN cols = ['X', 'Travel', 'C1', 'Resistance', 'C2', 'Current', 'C3'] existing_cols = [c for c in cols if c in final_df.columns] if 'X' in final_df.columns: time_per_pixel = total_duration / width final_df['Time (ms)'] = final_df['X'] * time_per_pixel existing_cols.insert(1, 'Time (ms)') else: return None, None, None, "X-axis alignment failed." # STEP B: CALCULATE BASELINES & CLEANUP for col in ['Current', 'Travel']: if col in final_df.columns: # Baseline - keep this simple to detect zero offset baseline_val = 0 start_window = final_df[final_df['Time (ms)'] <= 30] if not start_window.empty and start_window[col].notna().any(): baseline_val = start_window[col].mean() else: valid_idx = final_df[col].first_valid_index() if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col] # --- FIX 2: Relaxed Baseline Clipping --- # Previous code strictly deleted anything below baseline. # Spikes often oscillate. We allow small dips now. # Only clip if it is essentially noise below 0 (assuming values are positive) # If your graph allows negative values, remove this line entirely. if baseline_val > 0: final_df.loc[final_df[col] < (baseline_val * 0.8), col] = np.nan final_df[col] = final_df[col].interpolate(method='linear', limit_direction='both') # --- FIX 3: REMOVED "Middle Section Cleanup" --- # The previous code deleted data between 120ms and 250ms if it was near the baseline. # This was the primary cause of spikes disappearing in that region. # The code block is deleted here. # STEP C: AUXILIARY CURVE LOGIC # Purpose: Clean auxiliary curves (C1, C2, C3) that represent the bottom edge of thick lines # The auxiliary curve should never be ABOVE the main curve at the same X position pairs = [('C1', 'Travel'), ('C2', 'Resistance'), ('C3', 'Current')] for lower, upper in pairs: if lower in final_df.columns and upper in final_df.columns: if not final_df[upper].isnull().all(): # FIX: Compare at same X position to preserve spikes # Previous logic: final_df[lower] > min_upper (global minimum) # - This deleted spike data because spike bottoms exceeded the global min # New logic: final_df[lower] > final_df[upper] (position-wise comparison) # - This only deletes truly invalid crossovers at the same X # - Preserves spikes where upper and lower legitimately differ invalid_mask = final_df[lower] > final_df[upper] final_df.loc[invalid_mask, lower] = np.nan # Final global cleanup for col in ['Current', 'Travel', 'C1', 'C2', 'C3']: if col in final_df.columns: final_df[col] = final_df[col].interpolate(method='linear', limit_direction='both') return final_df[existing_cols], debug_images, (sx, ex), None return None, None, None, "No data extracted."