dcrm-analysis-api / dcrm /image_processing.py
Aditya Adaki
Add DCRM Analysis API
fdcec08
import cv2
import numpy as np
import pandas as pd
from functools import reduce
from PIL import Image
def detect_graph_boundaries(img):
height, width = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
col_sums = np.sum(thresh, axis=0) / 255
is_line = col_sums > (height * 0.40)
line_indices = np.where(is_line)[0]
start_x = 0
if len(line_indices) > 0:
left_lines = [x for x in line_indices if x < width * 0.2 and x > 5]
if left_lines:
start_x = left_lines[0]
end_x = width - 1
if len(line_indices) > 0:
right_margin = width * 0.95
right_lines = [x for x in line_indices if x > right_margin]
if right_lines:
end_x = right_lines[-1]
# Create debug image
debug_img = img.copy()
cv2.line(debug_img, (int(start_x), 0), (int(start_x), height), (0, 255, 0), 3)
cv2.line(debug_img, (int(end_x), 0), (int(end_x), height), (0, 0, 255), 3)
return int(start_x), int(end_x), debug_img
def extract_color_pixels(
image, color="green", mode="dominant", threshold=0, difference=10
):
"""
Process an image and extract only pixels of a specific color.
Display them on a black background.
Args:
image: PIL Image object
color: str, one of 'red', 'green', or 'blue'
mode: str, detection mode - 'dominant', 'difference', or 'strict'
threshold: int, minimum value for the target color channel (0-255)
difference: int/float, parameter meaning depends on mode
Returns:
tuple: (PIL Image object with only specified color pixels, color_mask array)
"""
# Convert image to RGB if it's not already
if image.mode != "RGB":
image = image.convert("RGB")
# Convert to numpy array for easier manipulation
img_array = np.array(image)
# Create a black background with the same dimensions
result_array = np.zeros_like(img_array)
# Extract RGB channels
red = img_array[:, :, 0].astype(np.float32)
green = img_array[:, :, 1].astype(np.float32)
blue = img_array[:, :, 2].astype(np.float32)
# Create mask based on selected color and mode
if mode == "dominant":
# Simply check if the target color is the highest channel
if color == "red":
color_mask = (red >= green) & (red >= blue) & (red > threshold)
elif color == "green":
color_mask = (green >= red) & (green >= blue) & (green > threshold)
elif color == "blue":
color_mask = (blue >= red) & (blue >= green) & (blue > threshold)
elif mode == "difference":
# Target color must be higher than others by a certain absolute difference
if color == "red":
color_mask = (
(red > threshold)
& (red > green + difference)
& (red > blue + difference)
)
elif color == "green":
color_mask = (
(green > threshold)
& (green > red + difference)
& (green > blue + difference)
)
elif color == "blue":
color_mask = (
(blue > threshold)
& (blue > red + difference)
& (blue > green + difference)
)
elif mode == "strict":
# Target color must be significantly higher (percentage-based)
dominance_factor = 1.0 + (difference / 100.0)
if color == "red":
color_mask = (
(red > threshold)
& (red > green * dominance_factor)
& (red > blue * dominance_factor)
)
elif color == "green":
color_mask = (
(green > threshold)
& (green > red * dominance_factor)
& (green > blue * dominance_factor)
)
elif color == "blue":
color_mask = (
(blue > threshold)
& (blue > red * dominance_factor)
& (blue > green * dominance_factor)
)
else:
raise ValueError("Mode must be 'dominant', 'difference', or 'strict'")
# Apply mask to keep only target color pixels
result_array[color_mask] = img_array[color_mask]
# Convert back to PIL Image
result_image = Image.fromarray(result_array.astype("uint8"))
return result_image, color_mask
def extract_line_mask(
img_cropped, line_color, saturation_factor, gap_fill_size, noise_threshold
):
# Boost Saturation
hsv_pre = cv2.cvtColor(img_cropped, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv_pre)
s = np.clip(s.astype(np.float32) * saturation_factor, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# Convert OpenCV BGR (boosted) to PIL RGB
boosted_bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
img_rgb = cv2.cvtColor(boosted_bgr, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(img_rgb)
target_color = "green"
if line_color == "Red":
target_color = "red"
elif line_color == "Blue (Cyan)":
target_color = "blue"
diff_val = 20
if line_color == "Green":
diff_val = 30
_, color_mask = extract_color_pixels(
pil_image,
color=target_color,
mode="difference",
threshold=40,
difference=diff_val,
)
# Convert boolean mask to uint8
mask = np.zeros_like(img_cropped[:, :, 0], dtype=np.uint8)
mask[color_mask] = 255
debug_image = None
# Additional processing for Green (White removal)
if line_color == "Green":
original_bgr = img_cropped
original_hsv = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2HSV)
_, orig_s, orig_v = cv2.split(original_hsv)
white_mask = (orig_v > 200) & (orig_s < 50)
mask_before_white_removal = mask.copy()
mask[white_mask] = 0
# Create debug visualization
debug_image = img_cropped.copy()
debug_image[mask > 0] = [0, 255, 0]
removed_white = white_mask & (mask_before_white_removal > 0)
debug_image[removed_white] = [0, 0, 255]
if mask is None:
return None, None
# Noise/Gap cleanup
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mask_clean = np.zeros_like(mask)
for cnt in contours:
if cv2.contourArea(cnt) > (noise_threshold * 0.5):
cv2.drawContours(mask_clean, [cnt], -1, 255, -1)
mask = mask_clean
if gap_fill_size > 0:
k_h = np.ones((1, gap_fill_size), np.uint8)
close_h = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_h)
k_v = np.ones((gap_fill_size, 1), np.uint8)
close_v = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k_v)
mask = cv2.bitwise_or(close_h, close_v)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((2, 2), np.uint8))
return mask, debug_image
def generate_curve_data(mask, name_upper, name_lower):
height, width = mask.shape
data = []
for x in range(width):
col = mask[:, x]
indices = np.where(col > 0)[0]
val_top, val_bot = None, None
if len(indices) > 0:
y_min, y_max = indices[0], indices[-1]
graph_y_top = height - y_min
graph_y_bot = height - y_max
val_top = graph_y_top
val_bot = graph_y_bot
data.append({"X": x, name_upper: val_top, name_lower: val_bot})
df = pd.DataFrame(data)
df[name_upper] = df[name_upper].interpolate(
method="linear", limit=3, limit_area="inside"
)
df[name_lower] = df[name_lower].interpolate(
method="linear", limit=3, limit_area="inside"
)
df[name_upper] = df[name_upper].bfill().ffill()
df[name_lower] = df[name_lower].bfill().ffill()
return df
def process_uploaded_image(
file_bytes,
sat_factor,
gap_size,
noise_threshold,
crop_enabled,
total_duration,
travel_gradient_threshold=30,
):
file_bytes = np.asarray(bytearray(file_bytes), dtype=np.uint8)
img_orig = cv2.imdecode(file_bytes, 1)
debug_img_bounds = img_orig.copy()
sx, ex = 0, img_orig.shape[1]
if crop_enabled:
sx, ex, debug_img_bounds = detect_graph_boundaries(img_orig)
img_working = img_orig[:, sx:ex]
else:
img_working = img_orig
if img_working.shape[1] == 0:
return None, None, None, "Crop failed.", {}
configs = [
("Red", "Red", ("Travel", "C1")),
("Green", "Green", ("Resistance", "C2")),
("Blue (Cyan)", "Blue", ("Current", "C3")),
]
dfs = []
debug_images = {}
debug_images["Boundaries"] = debug_img_bounds
height, width = img_working.shape[:2]
for color_key, _, col_names in configs:
mask, debug_img = extract_line_mask(
img_working, color_key, sat_factor, gap_size, noise_threshold
)
if mask is not None:
if debug_img is not None and color_key == "Green":
debug_images[color_key + " (White Removal)"] = cv2.cvtColor(
debug_img, cv2.COLOR_BGR2RGB
)
colored_mask_clean = np.zeros_like(img_working)
colored_mask_clean[mask > 0] = [0, 255, 0]
overlay_clean = cv2.addWeighted(
img_working, 0.7, colored_mask_clean, 0.3, 0
)
debug_images[color_key + " (Cleaned Overlay)"] = cv2.cvtColor(
overlay_clean, cv2.COLOR_BGR2RGB
)
colored_mask = np.zeros_like(img_working)
colored_mask[mask > 0] = [0, 255, 0]
overlay = cv2.addWeighted(img_working, 0.7, colored_mask, 0.3, 0)
debug_images[color_key] = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
df_curve = generate_curve_data(mask, col_names[0], col_names[1])
dfs.append(df_curve)
else:
df_empty = pd.DataFrame(
{"X": range(width), col_names[0]: np.nan, col_names[1]: np.nan}
)
dfs.append(df_empty)
if dfs:
final_df = reduce(
lambda left, right: pd.merge(left, right, on="X", how="outer"), dfs
)
cols = ["X", "Travel", "C1", "Resistance", "C2", "Current", "C3"]
existing_cols = [c for c in cols if c in final_df.columns]
if "X" in final_df.columns:
# === UPDATED TIME CALCULATION ===
# Calculates strict linear time: Pixel 0 = 0ms, Pixel Last = total_duration
final_df["Time (ms)"] = (final_df["X"] / (width - 1)) * total_duration
existing_cols.insert(1, "Time (ms)")
else:
return None, None, None, "X-axis alignment failed.", {}
# IMPROVED BASELINE CLEANUP - Remove dotted reference lines
baselines = {}
for col in ["Travel", "Current"]:
if col in final_df.columns:
# Calculate baseline from first 60 entries
first_60 = final_df[col].head(60)
if first_60.notna().any():
initial_baseline = first_60.mean(skipna=True)
if col == "Travel":
# Identify outliers: points < 98% of initial baseline
outlier_threshold = initial_baseline * 0.98
valid_points = first_60[first_60 >= outlier_threshold]
if valid_points.notna().any():
baseline_val = valid_points.mean(skipna=True)
else:
baseline_val = initial_baseline
else:
baseline_val = initial_baseline
else:
valid_idx = final_df[col].first_valid_index()
if valid_idx is not None:
baseline_val = final_df.loc[valid_idx, col]
else:
continue
baselines[col] = baseline_val
# Find minimum value (dotted reference line level)
min_val = final_df[col].min(skipna=True)
# Set values near minimum to NaN
threshold = min_val + (baseline_val - min_val) * 0.15
final_df.loc[final_df[col] < threshold, col] = np.nan
# Abrupt Change (Gradient) Filter
if col == "Travel":
gradient_threshold = travel_gradient_threshold
diff = final_df[col].diff().abs()
mask_abrupt = diff > gradient_threshold
final_df.loc[mask_abrupt, col] = np.nan
# Time-Based Baseline Tolerances
# 1. Start (0-30ms)
mask_start = final_df["Time (ms)"] < 30
threshold_start = baseline_val * 0.98
mask_remove_start = mask_start & (final_df[col] < threshold_start)
final_df.loc[mask_remove_start, col] = np.nan
# 2. End (Last 50ms)
max_time = final_df["Time (ms)"].max()
mask_end = final_df["Time (ms)"] > (max_time - 50)
threshold_end = baseline_val * 0.98
mask_remove_end = mask_end & (final_df[col] < threshold_end)
final_df.loc[mask_remove_end, col] = np.nan
# 3. Center (100-300ms)
mask_center = (final_df["Time (ms)"] >= 100) & (
final_df["Time (ms)"] <= 300
)
threshold_center = baseline_val * 1.05
mask_remove_center = mask_center & (final_df[col] < threshold_center)
final_df.loc[mask_remove_center, col] = np.nan
# 4. Main (30-350ms) excluding Center
mask_main_pre = (final_df["Time (ms)"] >= 30) & (
final_df["Time (ms)"] < 100
)
mask_main_post = (final_df["Time (ms)"] > 300) & (
final_df["Time (ms)"] <= 350
)
mask_remove_main_pre = mask_main_pre & (final_df[col] < baseline_val)
mask_remove_main_post = mask_main_post & (final_df[col] < baseline_val)
final_df.loc[mask_remove_main_pre, col] = np.nan
final_df.loc[mask_remove_main_post, col] = np.nan
# Fill gaps
final_df[col] = (
final_df[col]
.interpolate(method="linear", limit=3, limit_area="inside")
.bfill()
.ffill()
)
# CROSS-CHANNEL BASELINE CONSTRAINTS
if "Travel" in baselines:
travel_base = baselines["Travel"]
if "Current" in final_df.columns:
mask = final_df["Current"] < travel_base
final_df.loc[mask, "Current"] = np.nan
final_df["Current"] = (
final_df["Current"]
.interpolate(method="linear", limit=3, limit_area="inside")
.bfill()
.ffill()
)
# AUXILIARY CURVE LOGIC
pairs = [("C1", "Travel"), ("C2", "Resistance"), ("C3", "Current")]
for lower, upper in pairs:
if lower in final_df.columns and upper in final_df.columns:
if not final_df[upper].isnull().all():
invalid_mask = final_df[lower] > final_df[upper]
final_df.loc[invalid_mask, lower] = np.nan
# Final global cleanup (excluding Resistance)
for col in ["Travel", "Current", "C1", "C3"]:
if col in final_df.columns:
final_df[col] = (
final_df[col]
.interpolate(method="linear", limit=3, limit_area="inside")
.bfill()
.ffill()
)
return final_df[existing_cols], debug_images, (sx, ex), None, baselines
return None, None, None, "No data extracted.", {}