| import numpy as np |
| import json |
| import os |
| import re |
| from difflib import SequenceMatcher |
| from tqdm import tqdm |
|
|
| |
| INPUT_TRAIN_JSON = "./train.json" |
| NPZ_FOLDER = "./npz_data" |
| OUTPUT_DIR = "./unity_ready_json" |
| FPS = 30.0 |
|
|
| |
| STYLE_FILLIAN = 0 |
| STYLE_BIBOO = 1 |
| STYLE_ANNY = 2 |
| STYLE_LAPWING = 3 |
|
|
| def fuzzy_match_name(text, target, threshold=0.75): |
| tokens = re.split(r'[^a-z]+', text.lower()) |
| for token in tokens: |
| if len(token) < 3: continue |
| if SequenceMatcher(None, token, target).ratio() >= threshold: |
| return True |
| return False |
|
|
| def get_base_style(filename): |
| """ |
| Determines global style based on filename. |
| Hierarchy: Biboo -> Fillian -> Anny (Default). |
| """ |
| clean_name = filename.lower() |
| |
| |
| if fuzzy_match_name(clean_name, "biboo", threshold=0.8): |
| return STYLE_BIBOO |
| |
| |
| if fuzzy_match_name(clean_name, "fillian", threshold=0.8) or "filian" in clean_name: |
| return STYLE_FILLIAN |
| |
| |
| return STYLE_ANNY |
|
|
| def is_vlog_label(label_entry): |
| """ |
| Checks if label indicates vlogging/handheld camera. |
| CRITICAL FIX: Explicitly excludes 'end of vlog' or 'place camera back'. |
| """ |
| proc_label = label_entry.get("proc_label", "").lower() |
|
|
| |
| if "place camera back" in proc_label or "end of vlog" in proc_label: |
| return False |
|
|
| |
| if "vlog" in proc_label: |
| return True |
| |
| if "act_cat" in label_entry: |
| for cat in label_entry["act_cat"]: |
| if "vlog" in cat.lower(): |
| return True |
| |
| return False |
|
|
| def is_transition_label(label_entry): |
| """Checks if this is a generic transition label.""" |
| proc = label_entry.get("proc_label", "").lower() |
| return "transition" in proc |
|
|
| def process_single_entry(entry_id, entry_data): |
| npz_filename = entry_data.get("feat_p") |
| npz_path = os.path.join(NPZ_FOLDER, npz_filename) |
|
|
| if not os.path.exists(npz_path): |
| return |
|
|
| |
| try: |
| data = np.load(npz_path) |
| poses = data['poses'] |
| trans = data['trans'] |
| betas = data['betas'] |
| |
| if poses.ndim == 3: poses = poses[0] |
| if trans.ndim == 3: trans = trans[0] |
| |
| num_frames = poses.shape[0] |
| except Exception as e: |
| print(f"โ Error loading {npz_filename}: {e}") |
| return |
|
|
| |
| base_style = get_base_style(npz_filename) |
| |
| |
| frame_styles = np.full(num_frames, base_style, dtype=int) |
|
|
| |
| if "frame_ann" in entry_data and "labels" in entry_data["frame_ann"]: |
| |
| labels = sorted(entry_data["frame_ann"]["labels"], key=lambda x: x.get("start_t", 0)) |
| |
| previous_was_vlog = False |
|
|
| for label in labels: |
| start_t = label.get("start_t", 0.0) |
| end_t = label.get("end_t", 0.0) |
| |
| s_f = max(0, int(start_t * FPS)) |
| e_f = min(num_frames, int(end_t * FPS)) |
| |
| if e_f <= s_f: continue |
|
|
| if is_vlog_label(label): |
| |
| frame_styles[s_f:e_f] = STYLE_LAPWING |
| previous_was_vlog = True |
| |
| elif is_transition_label(label) and previous_was_vlog: |
| |
| frame_styles[s_f:e_f] = STYLE_LAPWING |
| |
| |
| else: |
| |
| previous_was_vlog = False |
|
|
| |
| frames_data = [] |
| poses_list = np.round(poses, 4).tolist() |
| trans_list = np.round(trans, 4).tolist() |
| betas_list = np.round(betas, 4).tolist() |
| styles_list = frame_styles.tolist() |
|
|
| for i in range(num_frames): |
| frame_entry = { |
| "i": i, |
| "p": poses_list[i], |
| "t": trans_list[i], |
| "b": betas_list, |
| "s": styles_list[i] |
| } |
| frames_data.append(frame_entry) |
|
|
| |
| clean_name = os.path.splitext(npz_filename)[0] |
| output_filename = f"{entry_id}_{clean_name}.json" |
| output_path = os.path.join(OUTPUT_DIR, output_filename) |
| |
| style_debug = "Anny" |
| if base_style == STYLE_FILLIAN: style_debug = "Fillian" |
| elif base_style == STYLE_BIBOO: style_debug = "Biboo" |
|
|
| wrapper = { |
| "fps": FPS, |
| "video_ref": entry_data.get("video_ref_path", ""), |
| "base_style_debug": style_debug, |
| "frames": frames_data |
| } |
|
|
| with open(output_path, 'w') as f: |
| json.dump(wrapper, f, separators=(',', ':')) |
|
|
| def main(): |
| if not os.path.exists(OUTPUT_DIR): |
| os.makedirs(OUTPUT_DIR) |
|
|
| print(f"๐ Loading Train JSON: {INPUT_TRAIN_JSON}") |
| if not os.path.exists(INPUT_TRAIN_JSON): |
| print("โ Train JSON not found.") |
| return |
|
|
| with open(INPUT_TRAIN_JSON, 'r') as f: |
| train_index = json.load(f) |
|
|
| print(f"๐ Processing {len(train_index)} sequences...") |
| for key, val in tqdm(train_index.items()): |
| process_single_entry(key, val) |
|
|
| print("โ
Conversion Complete.") |
|
|
| if __name__ == "__main__": |
| main() |