File size: 4,442 Bytes
b5dd333 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import pandas as pd
import numpy as np
import json
import os
def temporal_fill_gaps_in_csv(csv_path, ranges=None):
"""
Fills missing face entries by interpolating between known detections in specified frame ranges.
If no ranges are provided, it uses the full range from min to max frame.
Args:
csv_path (str): Path to identity-specific CSV file.
ranges (list of tuple): Optional list of (start_frame, end_frame) to limit interpolation.
Saves the result as a new CSV with '_filled' appended to the filename.
"""
df = pd.read_csv(csv_path)
if df.empty:
print(f"β Empty CSV: {csv_path}")
return
df_filled = df.copy()
new_rows = []
# Default to full frame range
if ranges is None:
min_frame = int(df['frame'].min())
max_frame = int(df['frame'].max())
ranges = [(min_frame, max_frame)]
for start, end in ranges:
range_df = df[(df['frame'] >= start) & (df['frame'] <= end)].copy()
present_frames = set(range_df['frame'].tolist())
missing_frames = [f for f in range(start, end + 1) if f not in present_frames]
if len(range_df) < 2:
print(f"β οΈ Skipping range ({start}-{end}) β insufficient anchor frames.")
continue
start_row = range_df.sort_values("frame").iloc[0]
end_row = range_df.sort_values("frame").iloc[-1]
for frame_num in missing_frames:
t = (frame_num - start) / (end - start)
interp_row = start_row.copy()
interp_row['frame'] = frame_num
# Interpolate bounding box
for col in ['x1', 'y1', 'x2', 'y2']:
interp_row[col] = (1 - t) * start_row[col] + t * end_row[col]
# Interpolate landmarks
try:
lm_start = np.array(eval(start_row['landmarks']))
lm_end = np.array(eval(end_row['landmarks']))
lm_interp = (1 - t) * lm_start + t * lm_end
interp_row['landmarks'] = str(lm_interp.tolist())
except:
interp_row['landmarks'] = "[]"
new_rows.append(interp_row)
if new_rows:
df_filled = pd.concat([df_filled, pd.DataFrame(new_rows)], ignore_index=True)
df_filled = df_filled.sort_values(by="frame").reset_index(drop=True)
output_path = csv_path.replace(".csv", "_filled.csv")
df_filled.to_csv(output_path, index=False)
print(f"β
Gaps filled and saved to: {output_path}")
return output_path
def temporal_smooth_csv(csv_path, window_size=5):
"""
Applies temporal smoothing to bounding boxes and landmarks in a face CSV.
Args:
csv_path (str): Path to the input CSV with frame-wise face data.
window_size (int): Size of the moving average window (must be odd).
Returns:
str: Path to the smoothed CSV.
"""
assert window_size % 2 == 1, "Window size must be odd."
df = pd.read_csv(csv_path)
if df.empty:
print(f"β CSV is empty: {csv_path}")
return None
df = df.sort_values("frame").reset_index(drop=True)
half_window = window_size // 2
smoothed_rows = []
for i in range(len(df)):
window_df = df[max(0, i - half_window): min(len(df), i + half_window + 1)]
# Smooth bounding boxes
x1 = int(window_df["x1"].mean())
y1 = int(window_df["y1"].mean())
x2 = int(window_df["x2"].mean())
y2 = int(window_df["y2"].mean())
# Smooth landmarks if they exist
landmarks = []
for l in window_df.get("landmark_2d_106", window_df.get("landmarks", "[]")):
try:
parsed = np.array(json.loads(l))
if parsed.ndim == 2:
landmarks.append(parsed)
except Exception:
continue
if landmarks:
landmarks_mean = np.mean(landmarks, axis=0)
landmarks_str = json.dumps(landmarks_mean.tolist())
else:
landmarks_str = "[]"
row = df.iloc[i].copy()
row["x1"], row["y1"], row["x2"], row["y2"] = x1, y1, x2, y2
row["landmark_2d_106"] = landmarks_str
smoothed_rows.append(row)
smoothed_df = pd.DataFrame(smoothed_rows)
out_path = csv_path.replace(".csv", "_smoothed.csv")
smoothed_df.to_csv(out_path, index=False)
print(f"β
Smoothed CSV saved to: {out_path}")
return out_path
|