Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from functools import reduce | |
| def detect_graph_boundaries(img): | |
| height, width = img.shape[:2] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) | |
| col_sums = np.sum(thresh, axis=0) / 255 | |
| is_line = col_sums > (height * 0.40) | |
| line_indices = np.where(is_line)[0] | |
| start_x = 0 | |
| if len(line_indices) > 0: | |
| left_lines = | |
| if left_lines: start_x = left_lines[0] | |
| end_x = width - 1 | |
| if len(line_indices) > 0: | |
| right_margin = width * 0.95 | |
| right_lines = | |
| if right_lines: end_x = right_lines[-1] | |
| # Create debug image | |
| debug_img = img.copy() | |
| cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) | |
| cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) | |
| return int(start_x), int(end_x), debug_img | |
| def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold): | |
| # Boost Saturation | |
| hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv_pre) | |
| s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) | |
| hsv = cv2.merge((h, s, v)) | |
| b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)) | |
| mask = None | |
| if line_color == "Green": | |
| lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255]) | |
| mask_hsv = cv2.inRange(hsv, lower, upper) | |
| diff_gb = g.astype(np.int16) - b.astype(np.int16) | |
| diff_gr = g.astype(np.int16) - r.astype(np.int16) | |
| mask_channel = np.zeros_like(g, dtype=np.uint8) | |
| mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| elif line_color == "Blue (Cyan)": | |
| lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255]) | |
| mask_hsv = cv2.inRange(hsv, lower, upper) | |
| diff_br = b.astype(np.int16) - r.astype(np.int16) | |
| mask_channel = np.zeros_like(b, dtype=np.uint8) | |
| mask_channel[diff_br > 20] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| elif line_color == "Red": | |
| lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255]) | |
| lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255]) | |
| mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2)) | |
| diff_rg = r.astype(np.int16) - g.astype(np.int16) | |
| diff_rb = r.astype(np.int16) - b.astype(np.int16) | |
| mask_channel = np.zeros_like(r, dtype=np.uint8) | |
| mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| if mask is None: return None | |
| # Noise/Gap cleanup | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| mask_clean = np.zeros_like(mask) | |
| for cnt in contours: | |
| # Reduced noise threshold slightly to keep thin spikes | |
| if cv2.contourArea(cnt) > (noise_threshold * 0.5): | |
| cv2.drawContours(mask_clean, | |
| prev_top, prev_bot = None, None | |
| proximity_thresh = 10 | |
| for x in range(width): | |
| col = mask[:, x] | |
| indices = np.where(col > 0)[0] | |
| val_top, val_bot = None, None | |
| if len(indices) > 0: | |
| y_min, y_max = indices[0], indices[-1] | |
| graph_y_top = height - y_min | |
| graph_y_bot = height - y_max | |
| # If the line is thin, it's a single curve | |
| if abs(y_max - y_min) <= proximity_thresh: | |
| current_val = height - int((y_min + y_max) / 2) | |
| # Simple tracking to decide if it belongs to top or bottom curve if they were split previously | |
| if prev_top is None and prev_bot is None: val_top = current_val | |
| elif prev_top is not None and prev_bot is None: val_top = current_val | |
| elif prev_top is None and prev_bot is not None: val_bot = current_val | |
| else: | |
| if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val | |
| else: val_bot = current_val | |
| else: | |
| # Vertical line (spike) or filled area | |
| val_top = graph_y_top | |
| val_bot = graph_y_bot | |
| if val_top is not None: prev_top = val_top | |
| if val_bot is not None: prev_bot = val_bot | |
| data.append({"X": x, name_upper: val_top, name_lower: val_bot}) | |
| df = pd.DataFrame(data) | |
| # Using 'pchip' or 'linear' interpolation. | |
| # 'linear' is safer for sharp spikes. 'pchip' can overshoot. | |
| df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both') | |
| df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both') | |
| return df | |
| def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration): | |
| # 1. Decode Image | |
| file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) | |
| img_orig = cv2.imdecode(file_bytes, 1) | |
| # 2. Crop | |
| debug_img_bounds = img_orig.copy() | |
| sx, ex = 0, img_orig.shape[1] | |
| if crop_enabled: | |
| sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) | |
| img_working = img_orig[:, sx:ex] | |
| else: | |
| img_working = img_orig | |
| if img_working.shape[1] == 0: | |
| return None, None, None, "Crop failed." | |
| # 3. Process Colors | |
| configs = | |
| dfs = | |
| debug_images = {} | |
| debug_images["Boundaries"] = debug_img_bounds | |
| height, width = img_working.shape[:2] | |
| for color_key, _, col_names in configs: | |
| mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold) | |
| if mask is not None: | |
| colored_mask = np.zeros_like(img_working) | |
| colored_mask[mask > 0] = | |
| existing_cols = | |
| if 'X' in final_df.columns: | |
| time_per_pixel = total_duration / width | |
| final_df['Time (ms)'] = final_df['X'] * time_per_pixel | |
| existing_cols.insert(1, 'Time (ms)') | |
| else: | |
| return None, None, None, "X-axis alignment failed." | |
| # STEP B: CALCULATE BASELINES & CLEANUP | |
| for col in | |
| if not start_window.empty and start_window[col].notna().any(): | |
| baseline_val = start_window[col].mean() | |
| else: | |
| valid_idx = final_df[col].first_valid_index() | |
| if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col] | |
| # --- FIX 2: Relaxed Baseline Clipping --- | |
| # Previous code strictly deleted anything below baseline. | |
| # Spikes often oscillate. We allow small dips now. | |
| end_x = width - 1 | |
| if len(line_indices) > 0: | |
| right_margin = width * 0.95 | |
| right_lines = | |
| if right_lines: end_x = right_lines[-1] | |
| # Create debug image | |
| debug_img = img.copy() | |
| cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3) | |
| cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3) | |
| return int(start_x), int(end_x), debug_img | |
| def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold): | |
| # Boost Saturation | |
| hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv_pre) | |
| s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8) | |
| hsv = cv2.merge((h, s, v)) | |
| b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)) | |
| mask = None | |
| if line_color == "Green": | |
| lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255]) | |
| mask_hsv = cv2.inRange(hsv, lower, upper) | |
| diff_gb = g.astype(np.int16) - b.astype(np.int16) | |
| diff_gr = g.astype(np.int16) - r.astype(np.int16) | |
| mask_channel = np.zeros_like(g, dtype=np.uint8) | |
| mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| elif line_color == "Blue (Cyan)": | |
| lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255]) | |
| mask_hsv = cv2.inRange(hsv, lower, upper) | |
| diff_br = b.astype(np.int16) - r.astype(np.int16) | |
| mask_channel = np.zeros_like(b, dtype=np.uint8) | |
| mask_channel[diff_br > 20] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| elif line_color == "Red": | |
| lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255]) | |
| lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255]) | |
| mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2)) | |
| diff_rg = r.astype(np.int16) - g.astype(np.int16) | |
| diff_rb = r.astype(np.int16) - b.astype(np.int16) | |
| mask_channel = np.zeros_like(r, dtype=np.uint8) | |
| mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255 | |
| mask = cv2.bitwise_and(mask_hsv, mask_channel) | |
| if mask is None: return None | |
| # Noise/Gap cleanup | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| mask_clean = np.zeros_like(mask) | |
| for cnt in contours: | |
| # Reduced noise threshold slightly to keep thin spikes | |
| if cv2.contourArea(cnt) > (noise_threshold * 0.5): | |
| cv2.drawContours(mask_clean, | |
| prev_top, prev_bot = None, None | |
| proximity_thresh = 10 | |
| for x in range(width): | |
| col = mask[:, x] | |
| indices = np.where(col > 0)[0] | |
| val_top, val_bot = None, None | |
| if len(indices) > 0: | |
| y_min, y_max = indices[0], indices[-1] | |
| graph_y_top = height - y_min | |
| graph_y_bot = height - y_max | |
| # If the line is thin, it's a single curve | |
| if abs(y_max - y_min) <= proximity_thresh: | |
| current_val = height - int((y_min + y_max) / 2) | |
| # Simple tracking to decide if it belongs to top or bottom curve if they were split previously | |
| if prev_top is None and prev_bot is None: val_top = current_val | |
| elif prev_top is not None and prev_bot is None: val_top = current_val | |
| elif prev_top is None and prev_bot is not None: val_bot = current_val | |
| else: | |
| if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val | |
| else: val_bot = current_val | |
| else: | |
| # Vertical line (spike) or filled area | |
| val_top = graph_y_top | |
| val_bot = graph_y_bot | |
| if val_top is not None: prev_top = val_top | |
| if val_bot is not None: prev_bot = val_bot | |
| data.append({"X": x, name_upper: val_top, name_lower: val_bot}) | |
| df = pd.DataFrame(data) | |
| # Using 'pchip' or 'linear' interpolation. | |
| # 'linear' is safer for sharp spikes. 'pchip' can overshoot. | |
| df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both') | |
| df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both') | |
| return df | |
| def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration): | |
| # 1. Decode Image | |
| file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8) | |
| img_orig = cv2.imdecode(file_bytes, 1) | |
| # 2. Crop | |
| debug_img_bounds = img_orig.copy() | |
| sx, ex = 0, img_orig.shape[1] | |
| if crop_enabled: | |
| sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig) | |
| img_working = img_orig[:, sx:ex] | |
| else: | |
| img_working = img_orig | |
| if img_working.shape[1] == 0: | |
| return None, None, None, "Crop failed." | |
| # 3. Process Colors | |
| configs = | |
| dfs = | |
| debug_images = {} | |
| debug_images["Boundaries"] = debug_img_bounds | |
| height, width = img_working.shape[:2] | |
| for color_key, _, col_names in configs: | |
| mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold) | |
| if mask is not None: | |
| colored_mask = np.zeros_like(img_working) | |
| colored_mask[mask > 0] = | |
| existing_cols = | |
| if 'X' in final_df.columns: | |
| time_per_pixel = total_duration / width | |
| final_df['Time (ms)'] = final_df['X'] * time_per_pixel | |
| existing_cols.insert(1, 'Time (ms)') | |
| else: | |
| return None, None, None, "X-axis alignment failed." | |
| # STEP B: CALCULATE BASELINES & CLEANUP | |
| for col in | |
| if not start_window.empty and start_window[col].notna().any(): | |
| baseline_val = start_window[col].mean() | |
| else: | |
| valid_idx = final_df[col].first_valid_index() | |
| if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col] | |
| # --- FIX 2: Relaxed Baseline Clipping --- | |
| # Previous code strictly deleted anything below baseline. | |
| # Spikes often oscillate. We allow small dips now. | |
| # Only clip if it is essentially noise below 0 (assuming values are positive) | |
| # If your graph allows negative values, remove this line entirely. | |
| if baseline_val > 0: | |
| final_df.loc[final_df[col] < (baseline_val * 0.8), col] = np.nan | |
| final_df[col] = final_df[col].interpolate(method='linear', limit_direction='both') | |
| # --- FIX 3: REMOVED "Middle Section Cleanup" --- | |
| # The previous code deleted data between 120ms and 250ms if it was near the baseline. | |
| # This was the primary cause of spikes disappearing in that region. | |
| # The code block is deleted here. | |
| # STEP C: AUXILIARY CURVE LOGIC | |
| # Purpose: Clean auxiliary curves (C1, C2, C3) that represent the bottom edge of thick lines | |
| # The auxiliary curve should never be ABOVE the main curve at the same X position | |
| pairs = | |
| for lower, upper in pairs: | |
| if lower in final_df.columns and upper in final_df.columns: | |
| if not final_df[upper].isnull().all(): | |
| # FIX: Compare at same X position to preserve spikes | |
| # Previous logic: final_df[lower] > min_upper (global minimum) | |
| # - This deleted spike data because spike bottoms exceeded the global min | |
| # New logic: final_df[lower] > final_df[upper] (position-wise comparison) | |
| # - This only deletes truly invalid crossovers at the same X | |
| # - Preserves spikes where upper and lower legitimately differ | |
| invalid_mask = final_df[lower] > final_df[upper] | |
| final_df.loc[invalid_mask, lower] = np.nan | |
| # Final global cleanup | |
| for col in | |