|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import json |
|
|
import os |
|
|
|
|
|
def temporal_fill_gaps_in_csv(csv_path, ranges=None): |
|
|
""" |
|
|
Fills missing face entries by interpolating between known detections in specified frame ranges. |
|
|
If no ranges are provided, it uses the full range from min to max frame. |
|
|
|
|
|
Args: |
|
|
csv_path (str): Path to identity-specific CSV file. |
|
|
ranges (list of tuple): Optional list of (start_frame, end_frame) to limit interpolation. |
|
|
|
|
|
Saves the result as a new CSV with '_filled' appended to the filename. |
|
|
""" |
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print(f"β Empty CSV: {csv_path}") |
|
|
return |
|
|
|
|
|
df_filled = df.copy() |
|
|
new_rows = [] |
|
|
|
|
|
|
|
|
if ranges is None: |
|
|
min_frame = int(df['frame'].min()) |
|
|
max_frame = int(df['frame'].max()) |
|
|
ranges = [(min_frame, max_frame)] |
|
|
|
|
|
for start, end in ranges: |
|
|
range_df = df[(df['frame'] >= start) & (df['frame'] <= end)].copy() |
|
|
present_frames = set(range_df['frame'].tolist()) |
|
|
missing_frames = [f for f in range(start, end + 1) if f not in present_frames] |
|
|
|
|
|
if len(range_df) < 2: |
|
|
print(f"β οΈ Skipping range ({start}-{end}) β insufficient anchor frames.") |
|
|
continue |
|
|
|
|
|
start_row = range_df.sort_values("frame").iloc[0] |
|
|
end_row = range_df.sort_values("frame").iloc[-1] |
|
|
|
|
|
for frame_num in missing_frames: |
|
|
t = (frame_num - start) / (end - start) |
|
|
interp_row = start_row.copy() |
|
|
interp_row['frame'] = frame_num |
|
|
|
|
|
|
|
|
for col in ['x1', 'y1', 'x2', 'y2']: |
|
|
interp_row[col] = (1 - t) * start_row[col] + t * end_row[col] |
|
|
|
|
|
|
|
|
try: |
|
|
lm_start = np.array(eval(start_row['landmarks'])) |
|
|
lm_end = np.array(eval(end_row['landmarks'])) |
|
|
lm_interp = (1 - t) * lm_start + t * lm_end |
|
|
interp_row['landmarks'] = str(lm_interp.tolist()) |
|
|
except: |
|
|
interp_row['landmarks'] = "[]" |
|
|
|
|
|
new_rows.append(interp_row) |
|
|
|
|
|
if new_rows: |
|
|
df_filled = pd.concat([df_filled, pd.DataFrame(new_rows)], ignore_index=True) |
|
|
df_filled = df_filled.sort_values(by="frame").reset_index(drop=True) |
|
|
|
|
|
output_path = csv_path.replace(".csv", "_filled.csv") |
|
|
df_filled.to_csv(output_path, index=False) |
|
|
print(f"β
Gaps filled and saved to: {output_path}") |
|
|
return output_path |
|
|
|
|
|
def temporal_smooth_csv(csv_path, window_size=5): |
|
|
""" |
|
|
Applies temporal smoothing to bounding boxes and landmarks in a face CSV. |
|
|
|
|
|
Args: |
|
|
csv_path (str): Path to the input CSV with frame-wise face data. |
|
|
window_size (int): Size of the moving average window (must be odd). |
|
|
|
|
|
Returns: |
|
|
str: Path to the smoothed CSV. |
|
|
""" |
|
|
assert window_size % 2 == 1, "Window size must be odd." |
|
|
|
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print(f"β CSV is empty: {csv_path}") |
|
|
return None |
|
|
|
|
|
df = df.sort_values("frame").reset_index(drop=True) |
|
|
half_window = window_size // 2 |
|
|
|
|
|
smoothed_rows = [] |
|
|
for i in range(len(df)): |
|
|
window_df = df[max(0, i - half_window): min(len(df), i + half_window + 1)] |
|
|
|
|
|
|
|
|
x1 = int(window_df["x1"].mean()) |
|
|
y1 = int(window_df["y1"].mean()) |
|
|
x2 = int(window_df["x2"].mean()) |
|
|
y2 = int(window_df["y2"].mean()) |
|
|
|
|
|
|
|
|
landmarks = [] |
|
|
for l in window_df.get("landmark_2d_106", window_df.get("landmarks", "[]")): |
|
|
try: |
|
|
parsed = np.array(json.loads(l)) |
|
|
if parsed.ndim == 2: |
|
|
landmarks.append(parsed) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if landmarks: |
|
|
landmarks_mean = np.mean(landmarks, axis=0) |
|
|
landmarks_str = json.dumps(landmarks_mean.tolist()) |
|
|
else: |
|
|
landmarks_str = "[]" |
|
|
|
|
|
row = df.iloc[i].copy() |
|
|
row["x1"], row["y1"], row["x2"], row["y2"] = x1, y1, x2, y2 |
|
|
row["landmark_2d_106"] = landmarks_str |
|
|
smoothed_rows.append(row) |
|
|
|
|
|
smoothed_df = pd.DataFrame(smoothed_rows) |
|
|
out_path = csv_path.replace(".csv", "_smoothed.csv") |
|
|
smoothed_df.to_csv(out_path, index=False) |
|
|
print(f"β
Smoothed CSV saved to: {out_path}") |
|
|
return out_path |
|
|
|
|
|
|
|
|
|