dcrm-analysis-api / dcrm /image_processing.py.backup
Aditya Adaki
Add DCRM Analysis API
fdcec08
import cv2
import numpy as np
import pandas as pd
from functools import reduce
def detect_graph_boundaries(img):
height, width = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
col_sums = np.sum(thresh, axis=0) / 255
is_line = col_sums > (height * 0.40)
line_indices = np.where(is_line)[0]
start_x = 0
if len(line_indices) > 0:
left_lines = [x for x in line_indices if x < width * 0.2 and x > 5]
if left_lines: start_x = left_lines[0]
end_x = width - 1
if len(line_indices) > 0:
right_margin = width * 0.95
right_lines = [x for x in line_indices if x > right_margin]
if right_lines: end_x = right_lines[-1]
# Create debug image
debug_img = img.copy()
cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3)
cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3)
return int(start_x), int(end_x), debug_img
def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold):
# Boost Saturation
hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv_pre)
s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR))
mask = None
if line_color == "Green":
lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255])
mask_hsv = cv2.inRange(hsv, lower, upper)
diff_gb = g.astype(np.int16) - b.astype(np.int16)
diff_gr = g.astype(np.int16) - r.astype(np.int16)
mask_channel = np.zeros_like(g, dtype=np.uint8)
mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
elif line_color == "Blue (Cyan)":
lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255])
mask_hsv = cv2.inRange(hsv, lower, upper)
diff_br = b.astype(np.int16) - r.astype(np.int16)
mask_channel = np.zeros_like(b, dtype=np.uint8)
mask_channel[diff_br > 20] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
elif line_color == "Red":
lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255])
lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255])
mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2))
diff_rg = r.astype(np.int16) - g.astype(np.int16)
diff_rb = r.astype(np.int16) - b.astype(np.int16)
mask_channel = np.zeros_like(r, dtype=np.uint8)
mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
if mask is None: return None
# Noise/Gap cleanup
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mask_clean = np.zeros_like(mask)
for cnt in contours:
# Reduced noise threshold slightly to keep thin spikes
if cv2.contourArea(cnt) > (noise_threshold * 0.5):
cv2.drawContours(mask_clean, [cnt], -1, 255, -1)
mask = mask_clean
if gap_fill_size > 0:
k_h = np.ones((1, gap_fill_size), np.uint8)
close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h)
k_v = np.ones((gap_fill_size, 1), np.uint8)
close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v)
mask = cv2.bitwise_or(close_h, close_v)
# Replaced the 3x3 CLOSE with a smaller one to prevent merging nearby spikes too aggressively
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2,2), np.uint8))
# --- FIX 1: REMOVED MORPH_OPEN ---
# The previous code had: mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2,2), np.uint8))
# This erodes the image. If a spike is very sharp (1-2px tip), this line deletes the tip.
# We remove it to preserve high-frequency details.
return mask
def generate_curve_data(mask, name_upper, name_lower):
height, width = mask.shape
data = []
prev_top, prev_bot = None, None
proximity_thresh = 10
for x in range(width):
col = mask[:, x]
indices = np.where(col > 0)[0]
val_top, val_bot = None, None
if len(indices) > 0:
y_min, y_max = indices[0], indices[-1]
graph_y_top = height - y_min
graph_y_bot = height - y_max
# If the line is thin, it's a single curve
if abs(y_max - y_min) <= proximity_thresh:
current_val = height - int((y_min + y_max) / 2)
# Simple tracking to decide if it belongs to top or bottom curve if they were split previously
if prev_top is None and prev_bot is None: val_top = current_val
elif prev_top is not None and prev_bot is None: val_top = current_val
elif prev_top is None and prev_bot is not None: val_bot = current_val
else:
if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val
else: val_bot = current_val
else:
# Vertical line (spike) or filled area
val_top = graph_y_top
val_bot = graph_y_bot
if val_top is not None: prev_top = val_top
if val_bot is not None: prev_bot = val_bot
data.append({"X": x, name_upper: val_top, name_lower: val_bot})
df = pd.DataFrame(data)
# Using 'pchip' or 'linear' interpolation.
# 'linear' is safer for sharp spikes. 'pchip' can overshoot.
df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both')
df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both')
return df
def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration):
# 1. Decode Image
file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8)
img_orig = cv2.imdecode(file_bytes, 1)
# 2. Crop
debug_img_bounds = img_orig.copy()
sx, ex = 0, img_orig.shape[1]
if crop_enabled:
sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig)
img_working = img_orig[:, sx:ex]
else:
img_working = img_orig
if img_working.shape[1] == 0:
return None, None, None, "Crop failed."
# 3. Process Colors
configs = [
("Red", "Red", ("Travel", "C1")),
("Green", "Green", ("Resistance", "C2")),
("Blue (Cyan)", "Blue", ("Current", "C3"))
]
dfs = []
debug_images = {}
debug_images["Boundaries"] = debug_img_bounds
height, width = img_working.shape[:2]
for color_key, _, col_names in configs:
mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold)
if mask is not None:
colored_mask = np.zeros_like(img_working)
colored_mask[mask > 0] = [0, 255, 0]
overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0)
debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
df_curve = generate_curve_data(mask, col_names[0], col_names[1])
dfs.append(df_curve)
else:
df_empty = pd.DataFrame({"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan})
dfs.append(df_empty)
# 4. Merge
if dfs:
final_df = reduce(lambda left, right: pd.merge(left, right, on='X', how='outer'), dfs)
# STEP A: GENERATE TIME COLUMN
cols = ['X', 'Travel', 'C1', 'Resistance', 'C2', 'Current', 'C3']
existing_cols = [c for c in cols if c in final_df.columns]
if 'X' in final_df.columns:
time_per_pixel = total_duration / width
final_df['Time (ms)'] = final_df['X'] * time_per_pixel
existing_cols.insert(1, 'Time (ms)')
else:
return None, None, None, "X-axis alignment failed."
# STEP B: CALCULATE BASELINES & CLEANUP
for col in ['Current', 'Travel']:
if col in final_df.columns:
# Baseline - keep this simple to detect zero offset
baseline_val = 0
start_window = final_df[final_df['Time (ms)'] <= 30]
if not start_window.empty and start_window[col].notna().any():
baseline_val = start_window[col].mean()
else:
valid_idx = final_df[col].first_valid_index()
if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col]
# --- FIX 2: Relaxed Baseline Clipping ---
# Previous code strictly deleted anything below baseline.
# Spikes often oscillate. We allow small dips now.
end_x = width - 1
if len(line_indices) > 0:
right_margin = width * 0.95
right_lines = [x for x in line_indices if x > right_margin]
if right_lines: end_x = right_lines[-1]
# Create debug image
debug_img = img.copy()
cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3)
cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3)
return int(start_x), int(end_x), debug_img
def extract_line_mask(img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold):
# Boost Saturation
hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv_pre)
s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
b, g, r = cv2.split(cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR))
mask = None
if line_color == "Green":
lower = np.array([35, 20, 100]); upper = np.array([75, 255, 255])
mask_hsv = cv2.inRange(hsv, lower, upper)
diff_gb = g.astype(np.int16) - b.astype(np.int16)
diff_gr = g.astype(np.int16) - r.astype(np.int16)
mask_channel = np.zeros_like(g, dtype=np.uint8)
mask_channel[(diff_gb > 20) & (diff_gr > 10)] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
elif line_color == "Blue (Cyan)":
lower = np.array([80, 20, 100]); upper = np.array([100, 255, 255])
mask_hsv = cv2.inRange(hsv, lower, upper)
diff_br = b.astype(np.int16) - r.astype(np.int16)
mask_channel = np.zeros_like(b, dtype=np.uint8)
mask_channel[diff_br > 20] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
elif line_color == "Red":
lower1 = np.array([0, 20, 100]); upper1 = np.array([10, 255, 255])
lower2 = np.array([170, 20, 100]); upper2 = np.array([180, 255, 255])
mask_hsv = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), cv2.inRange(hsv, lower2, upper2))
diff_rg = r.astype(np.int16) - g.astype(np.int16)
diff_rb = r.astype(np.int16) - b.astype(np.int16)
mask_channel = np.zeros_like(r, dtype=np.uint8)
mask_channel[(diff_rg > 20) & (diff_rb > 20)] = 255
mask = cv2.bitwise_and(mask_hsv, mask_channel)
if mask is None: return None
# Noise/Gap cleanup
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mask_clean = np.zeros_like(mask)
for cnt in contours:
# Reduced noise threshold slightly to keep thin spikes
if cv2.contourArea(cnt) > (noise_threshold * 0.5):
cv2.drawContours(mask_clean, [cnt], -1, 255, -1)
mask = mask_clean
if gap_fill_size > 0:
k_h = np.ones((1, gap_fill_size), np.uint8)
close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h)
k_v = np.ones((gap_fill_size, 1), np.uint8)
close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v)
mask = cv2.bitwise_or(close_h, close_v)
# Replaced the 3x3 CLOSE with a smaller one to prevent merging nearby spikes too aggressively
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2,2), np.uint8))
# --- FIX 1: REMOVED MORPH_OPEN ---
# The previous code had: mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2,2), np.uint8))
# This erodes the image. If a spike is very sharp (1-2px tip), this line deletes the tip.
# We remove it to preserve high-frequency details.
return mask
def generate_curve_data(mask, name_upper, name_lower):
height, width = mask.shape
data = []
prev_top, prev_bot = None, None
proximity_thresh = 10
for x in range(width):
col = mask[:, x]
indices = np.where(col > 0)[0]
val_top, val_bot = None, None
if len(indices) > 0:
y_min, y_max = indices[0], indices[-1]
graph_y_top = height - y_min
graph_y_bot = height - y_max
# If the line is thin, it's a single curve
if abs(y_max - y_min) <= proximity_thresh:
current_val = height - int((y_min + y_max) / 2)
# Simple tracking to decide if it belongs to top or bottom curve if they were split previously
if prev_top is None and prev_bot is None: val_top = current_val
elif prev_top is not None and prev_bot is None: val_top = current_val
elif prev_top is None and prev_bot is not None: val_bot = current_val
else:
if abs(current_val - prev_top) <= abs(current_val - prev_bot): val_top = current_val
else: val_bot = current_val
else:
# Vertical line (spike) or filled area
val_top = graph_y_top
val_bot = graph_y_bot
if val_top is not None: prev_top = val_top
if val_bot is not None: prev_bot = val_bot
data.append({"X": x, name_upper: val_top, name_lower: val_bot})
df = pd.DataFrame(data)
# Using 'pchip' or 'linear' interpolation.
# 'linear' is safer for sharp spikes. 'pchip' can overshoot.
df[name_upper] = df[name_upper].interpolate(method='linear', limit_direction='both')
df[name_lower] = df[name_lower].interpolate(method='linear', limit_direction='both')
return df
def process_uploaded_image(file_bytes, sat_factor, gap_size, noise_threshold, crop_enabled, total_duration):
# 1. Decode Image
file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8)
img_orig = cv2.imdecode(file_bytes, 1)
# 2. Crop
debug_img_bounds = img_orig.copy()
sx, ex = 0, img_orig.shape[1]
if crop_enabled:
sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig)
img_working = img_orig[:, sx:ex]
else:
img_working = img_orig
if img_working.shape[1] == 0:
return None, None, None, "Crop failed."
# 3. Process Colors
configs = [
("Red", "Red", ("Travel", "C1")),
("Green", "Green", ("Resistance", "C2")),
("Blue (Cyan)", "Blue", ("Current", "C3"))
]
dfs = []
debug_images = {}
debug_images["Boundaries"] = debug_img_bounds
height, width = img_working.shape[:2]
for color_key, _, col_names in configs:
mask = extract_line_mask(img_working, color_key, sat_factor, gap_size, noise_threshold)
if mask is not None:
colored_mask = np.zeros_like(img_working)
colored_mask[mask > 0] = [0, 255, 0]
overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0)
debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
df_curve = generate_curve_data(mask, col_names[0], col_names[1])
dfs.append(df_curve)
else:
df_empty = pd.DataFrame({"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan})
dfs.append(df_empty)
# 4. Merge
if dfs:
final_df = reduce(lambda left, right: pd.merge(left, right, on='X', how='outer'), dfs)
# STEP A: GENERATE TIME COLUMN
cols = ['X', 'Travel', 'C1', 'Resistance', 'C2', 'Current', 'C3']
existing_cols = [c for c in cols if c in final_df.columns]
if 'X' in final_df.columns:
time_per_pixel = total_duration / width
final_df['Time (ms)'] = final_df['X'] * time_per_pixel
existing_cols.insert(1, 'Time (ms)')
else:
return None, None, None, "X-axis alignment failed."
# STEP B: CALCULATE BASELINES & CLEANUP
for col in ['Current', 'Travel']:
if col in final_df.columns:
# Baseline - keep this simple to detect zero offset
baseline_val = 0
start_window = final_df[final_df['Time (ms)'] <= 30]
if not start_window.empty and start_window[col].notna().any():
baseline_val = start_window[col].mean()
else:
valid_idx = final_df[col].first_valid_index()
if valid_idx is not None: baseline_val = final_df.loc[valid_idx, col]
# --- FIX 2: Relaxed Baseline Clipping ---
# Previous code strictly deleted anything below baseline.
# Spikes often oscillate. We allow small dips now.
# Only clip if it is essentially noise below 0 (assuming values are positive)
# If your graph allows negative values, remove this line entirely.
if baseline_val > 0:
final_df.loc[final_df[col] < (baseline_val * 0.8), col] = np.nan
final_df[col] = final_df[col].interpolate(method='linear', limit_direction='both')
# --- FIX 3: REMOVED "Middle Section Cleanup" ---
# The previous code deleted data between 120ms and 250ms if it was near the baseline.
# This was the primary cause of spikes disappearing in that region.
# The code block is deleted here.
# STEP C: AUXILIARY CURVE LOGIC
# Purpose: Clean auxiliary curves (C1, C2, C3) that represent the bottom edge of thick lines
# The auxiliary curve should never be ABOVE the main curve at the same X position
pairs = [('C1', 'Travel'), ('C2', 'Resistance'), ('C3', 'Current')]
for lower, upper in pairs:
if lower in final_df.columns and upper in final_df.columns:
if not final_df[upper].isnull().all():
# FIX: Compare at same X position to preserve spikes
# Previous logic: final_df[lower] > min_upper (global minimum)
# - This deleted spike data because spike bottoms exceeded the global min
# New logic: final_df[lower] > final_df[upper] (position-wise comparison)
# - This only deletes truly invalid crossovers at the same X
# - Preserves spikes where upper and lower legitimately differ
invalid_mask = final_df[lower] > final_df[upper]
final_df.loc[invalid_mask, lower] = np.nan
# Final global cleanup
for col in ['Current', 'Travel', 'C1', 'C2', 'C3']:
if col in final_df.columns:
final_df[col] = final_df[col].interpolate(method='linear', limit_direction='both')
return final_df[existing_cols], debug_images, (sx, ex), None
return None, None, None, "No data extracted."