import pandas as pd import numpy as np import json import os def temporal_fill_gaps_in_csv(csv_path, ranges=None): """ Fills missing face entries by interpolating between known detections in specified frame ranges. If no ranges are provided, it uses the full range from min to max frame. Args: csv_path (str): Path to identity-specific CSV file. ranges (list of tuple): Optional list of (start_frame, end_frame) to limit interpolation. Saves the result as a new CSV with '_filled' appended to the filename. """ df = pd.read_csv(csv_path) if df.empty: print(f"❌ Empty CSV: {csv_path}") return df_filled = df.copy() new_rows = [] # Default to full frame range if ranges is None: min_frame = int(df['frame'].min()) max_frame = int(df['frame'].max()) ranges = [(min_frame, max_frame)] for start, end in ranges: range_df = df[(df['frame'] >= start) & (df['frame'] <= end)].copy() present_frames = set(range_df['frame'].tolist()) missing_frames = [f for f in range(start, end + 1) if f not in present_frames] if len(range_df) < 2: print(f"⚠️ Skipping range ({start}-{end}) — insufficient anchor frames.") continue start_row = range_df.sort_values("frame").iloc[0] end_row = range_df.sort_values("frame").iloc[-1] for frame_num in missing_frames: t = (frame_num - start) / (end - start) interp_row = start_row.copy() interp_row['frame'] = frame_num # Interpolate bounding box for col in ['x1', 'y1', 'x2', 'y2']: interp_row[col] = (1 - t) * start_row[col] + t * end_row[col] # Interpolate landmarks try: lm_start = np.array(eval(start_row['landmarks'])) lm_end = np.array(eval(end_row['landmarks'])) lm_interp = (1 - t) * lm_start + t * lm_end interp_row['landmarks'] = str(lm_interp.tolist()) except: interp_row['landmarks'] = "[]" new_rows.append(interp_row) if new_rows: df_filled = pd.concat([df_filled, pd.DataFrame(new_rows)], ignore_index=True) df_filled = df_filled.sort_values(by="frame").reset_index(drop=True) output_path = csv_path.replace(".csv", "_filled.csv") df_filled.to_csv(output_path, index=False) print(f"✅ Gaps filled and saved to: {output_path}") return output_path def temporal_smooth_csv(csv_path, window_size=5): """ Applies temporal smoothing to bounding boxes and landmarks in a face CSV. Args: csv_path (str): Path to the input CSV with frame-wise face data. window_size (int): Size of the moving average window (must be odd). Returns: str: Path to the smoothed CSV. """ assert window_size % 2 == 1, "Window size must be odd." df = pd.read_csv(csv_path) if df.empty: print(f"❌ CSV is empty: {csv_path}") return None df = df.sort_values("frame").reset_index(drop=True) half_window = window_size // 2 smoothed_rows = [] for i in range(len(df)): window_df = df[max(0, i - half_window): min(len(df), i + half_window + 1)] # Smooth bounding boxes x1 = int(window_df["x1"].mean()) y1 = int(window_df["y1"].mean()) x2 = int(window_df["x2"].mean()) y2 = int(window_df["y2"].mean()) # Smooth landmarks if they exist landmarks = [] for l in window_df.get("landmark_2d_106", window_df.get("landmarks", "[]")): try: parsed = np.array(json.loads(l)) if parsed.ndim == 2: landmarks.append(parsed) except Exception: continue if landmarks: landmarks_mean = np.mean(landmarks, axis=0) landmarks_str = json.dumps(landmarks_mean.tolist()) else: landmarks_str = "[]" row = df.iloc[i].copy() row["x1"], row["y1"], row["x2"], row["y2"] = x1, y1, x2, y2 row["landmark_2d_106"] = landmarks_str smoothed_rows.append(row) smoothed_df = pd.DataFrame(smoothed_rows) out_path = csv_path.replace(".csv", "_smoothed.csv") smoothed_df.to_csv(out_path, index=False) print(f"✅ Smoothed CSV saved to: {out_path}") return out_path