File size: 5,596 Bytes
fbb20ff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import numpy as np
import json
import os
import re
from difflib import SequenceMatcher
from tqdm import tqdm
# --- CONFIGURATION ---
INPUT_TRAIN_JSON = "./train.json"
NPZ_FOLDER = "./npz_data"
OUTPUT_DIR = "./unity_ready_json"
FPS = 30.0
# Styles
STYLE_FILLIAN = 0
STYLE_BIBOO = 1
STYLE_ANNY = 2 # Default for unknown characters
STYLE_LAPWING = 3 # Vlog Override
def fuzzy_match_name(text, target, threshold=0.75):
tokens = re.split(r'[^a-z]+', text.lower())
for token in tokens:
if len(token) < 3: continue
if SequenceMatcher(None, token, target).ratio() >= threshold:
return True
return False
def get_base_style(filename):
"""
Determines global style based on filename.
Hierarchy: Biboo -> Fillian -> Anny (Default).
"""
clean_name = filename.lower()
# 1. Check for Biboo
if fuzzy_match_name(clean_name, "biboo", threshold=0.8):
return STYLE_BIBOO
# 2. Check for Fillian
if fuzzy_match_name(clean_name, "fillian", threshold=0.8) or "filian" in clean_name:
return STYLE_FILLIAN
# 3. Default fallback for Miltina, Anny, and others
return STYLE_ANNY
def is_vlog_label(label_entry):
"""
Checks if label indicates vlogging/handheld camera.
CRITICAL FIX: Explicitly excludes 'end of vlog' or 'place camera back'.
"""
proc_label = label_entry.get("proc_label", "").lower()
# 1. EXCLUSION RULES (If these exist, it is NOT vlogging)
if "place camera back" in proc_label or "end of vlog" in proc_label:
return False
# 2. INCLUSION RULES
if "vlog" in proc_label:
return True
if "act_cat" in label_entry:
for cat in label_entry["act_cat"]:
if "vlog" in cat.lower():
return True
return False
def is_transition_label(label_entry):
"""Checks if this is a generic transition label."""
proc = label_entry.get("proc_label", "").lower()
return "transition" in proc
def process_single_entry(entry_id, entry_data):
npz_filename = entry_data.get("feat_p")
npz_path = os.path.join(NPZ_FOLDER, npz_filename)
if not os.path.exists(npz_path):
return
# 1. Load Data
try:
data = np.load(npz_path)
poses = data['poses']
trans = data['trans']
betas = data['betas']
if poses.ndim == 3: poses = poses[0]
if trans.ndim == 3: trans = trans[0]
num_frames = poses.shape[0]
except Exception as e:
print(f"β Error loading {npz_filename}: {e}")
return
# 2. Determine Base Style
base_style = get_base_style(npz_filename)
# Initialize all frames with the Base Style
frame_styles = np.full(num_frames, base_style, dtype=int)
# 3. Apply Vlog Logic (State Machine Override)
if "frame_ann" in entry_data and "labels" in entry_data["frame_ann"]:
# Sort labels by time
labels = sorted(entry_data["frame_ann"]["labels"], key=lambda x: x.get("start_t", 0))
previous_was_vlog = False
for label in labels:
start_t = label.get("start_t", 0.0)
end_t = label.get("end_t", 0.0)
s_f = max(0, int(start_t * FPS))
e_f = min(num_frames, int(end_t * FPS))
if e_f <= s_f: continue
if is_vlog_label(label):
# Vlog Label -> Set to Lapwing Style
frame_styles[s_f:e_f] = STYLE_LAPWING
previous_was_vlog = True
elif is_transition_label(label) and previous_was_vlog:
# Transition immediately after Vlog -> Collapse Gap (Keep as Vlog)
frame_styles[s_f:e_f] = STYLE_LAPWING
# Keep state as true
else:
# Regular action OR "Place camera back" -> Reset to Base Style
previous_was_vlog = False
# 4. Construct Frame Data
frames_data = []
poses_list = np.round(poses, 4).tolist()
trans_list = np.round(trans, 4).tolist()
betas_list = np.round(betas, 4).tolist()
styles_list = frame_styles.tolist()
for i in range(num_frames):
frame_entry = {
"i": i,
"p": poses_list[i],
"t": trans_list[i],
"b": betas_list,
"s": styles_list[i]
}
frames_data.append(frame_entry)
# 5. Save JSON
clean_name = os.path.splitext(npz_filename)[0]
output_filename = f"{entry_id}_{clean_name}.json"
output_path = os.path.join(OUTPUT_DIR, output_filename)
style_debug = "Anny"
if base_style == STYLE_FILLIAN: style_debug = "Fillian"
elif base_style == STYLE_BIBOO: style_debug = "Biboo"
wrapper = {
"fps": FPS,
"video_ref": entry_data.get("video_ref_path", ""),
"base_style_debug": style_debug,
"frames": frames_data
}
with open(output_path, 'w') as f:
json.dump(wrapper, f, separators=(',', ':'))
def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
print(f"π Loading Train JSON: {INPUT_TRAIN_JSON}")
if not os.path.exists(INPUT_TRAIN_JSON):
print("β Train JSON not found.")
return
with open(INPUT_TRAIN_JSON, 'r') as f:
train_index = json.load(f)
print(f"π Processing {len(train_index)} sequences...")
for key, val in tqdm(train_index.items()):
process_single_entry(key, val)
print("β
Conversion Complete.")
if __name__ == "__main__":
main() |