| |
| """ |
| Generate coarse-grained annotations by merging consecutive fine-grained segments |
| into composite actions (8-15s duration) using LLM. |
| |
| Input: annotations_v2/ (fine-grained, ~2-3s segments, 11 classes) |
| Output: annotations_coarse/ (coarse-grained, ~8-15s segments, ~6 classes) |
| |
| Does NOT modify annotations_v2/. |
| """ |
|
|
| import os |
| import json |
| import re |
| import time |
| import glob |
| import urllib.request |
| from collections import Counter |
|
|
| INPUT_DIR = "${PULSE_ROOT}/annotations_v2" |
| OUTPUT_DIR = "${PULSE_ROOT}/annotations_coarse" |
|
|
| API_URL = "https://api.chatanywhere.tech/v1/chat/completions" |
| API_KEYS = [ |
| "sk-MN5n1uEETyaky96fLJdHqZobXF1f7KmOrZHzwD3lt585asFQ", |
| "sk-YnYrtPdAXwlE12hRpi6dYqlE1RRVR3LDVBka6wKaefU4iQRY", |
| "sk-jOZtodDv6OxUOMu3NuJ8lzffjwBlshn9OHY5KSmqmPTtc9qs", |
| "sk-qAaKTKYIRF24btu1oQWgubWG4UdA92bILNtzOkHNEPAcCxdB", |
| "sk-MgCBBonblMrCFnSXd6fJZaBLTCfCJ5FjYZfSe2e46bgmyktk", |
| "sk-79e30kYRgduuf2fSU0Lsc814YjNkClXXzQqIbx0iLS40IOEH", |
| "sk-h9Tej4tW6AQC6fT0njfzrPKXEk6fBwpiSvvQd0aJAhw4UwLz", |
| "sk-k2QNHt5wAH26Fw8hZuPWuVXw8Psd1jX09qusiA6PdBj5Vzuu", |
| "sk-w7EkTblciNI44cwosHXi0PGZNUf1hnJmpzOQ85va9VPdAKbz", |
| "sk-Dexs5ZF7OjFCq7CZW45wJ8EKoGtIswv6rsLUMzUXXkWBDBBJ", |
| ] |
|
|
| SCENE_DESCRIPTIONS = { |
| "s1": "办公桌面整理与工作准备", |
| "s2": "快递打包发送", |
| "s3": "厨房调料整理", |
| "s4": "清理餐后桌面", |
| "s5": "餐前桌面布置", |
| "s6": "商务旅行行李箱打包", |
| "s7": "冲泡咖啡/饮品", |
| "s8": "晾衣架整理与衣物收纳", |
| } |
|
|
| COARSE_CATEGORIES = """粗粒度动作类别(共6类): |
| |
| 1. Manipulate - 操作物体(抓取、调整、放置某个物体的完整过程,包含拿起→操作→放下的组合) |
| 2. CleanOrganize - 清洁/整理(擦桌子、理线、整理桌面、叠衣服等持续性整理活动) |
| 3. Transfer - 搬运/传递(将物体从一个位置搬到另一个位置的过程) |
| 4. Assemble - 组装/连接/包装(封箱、贴胶带、盖盖子、插电源、拧瓶盖等需要精细对准的操作) |
| 5. FoodPrep - 食物/饮品准备(倒水、倒调料、搅拌、冲泡等与食物饮品相关的操作) |
| 6. Idle - 空闲/过渡(无明确操作的间隔) |
| """ |
|
|
| current_key_idx = 0 |
| call_count = 0 |
|
|
|
|
| def call_llm(prompt, max_tokens=1500, retries=3): |
| global current_key_idx, call_count |
| for attempt in range(retries * len(API_KEYS)): |
| key = API_KEYS[current_key_idx] |
| try: |
| data = json.dumps({ |
| "model": "gpt-4o-mini", |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": max_tokens, |
| "temperature": 0.1, |
| }).encode() |
| req = urllib.request.Request( |
| API_URL, data=data, |
| headers={"Content-Type": "application/json", "Authorization": f"Bearer {key}"} |
| ) |
| resp = urllib.request.urlopen(req, timeout=30) |
| result = json.loads(resp.read()) |
| call_count += 1 |
| return result["choices"][0]["message"]["content"] |
| except Exception as e: |
| err = str(e) |
| if any(k in err for k in ["429", "quota", "limit", "402", "403"]): |
| current_key_idx = (current_key_idx + 1) % len(API_KEYS) |
| else: |
| time.sleep(0.5) |
| current_key_idx = (current_key_idx + 1) % len(API_KEYS) |
| return None |
|
|
|
|
| def parse_ts(ts_str): |
| """Parse 'MM:SS' to seconds.""" |
| m = re.match(r'(\d+):(\d+)', ts_str.strip()) |
| if m: |
| return int(m.group(1)) * 60 + int(m.group(2)) |
| return 0 |
|
|
|
|
| def format_ts(sec): |
| """Format seconds to 'MM:SS'.""" |
| return f"{sec//60:02d}:{sec%60:02d}" |
|
|
|
|
| def merge_segments_with_llm(segments, scene_id): |
| """Use LLM to merge fine-grained segments into coarse composite actions.""" |
| scene_desc = SCENE_DESCRIPTIONS.get(scene_id, "日常活动") |
|
|
| |
| seg_lines = [] |
| for i, seg in enumerate(segments): |
| label = seg.get("action_label", "Idle") |
| seg_lines.append(f"{i+1}. [{seg['timestamp']}] {label}: {seg['task']}") |
| seg_text = "\n".join(seg_lines) |
|
|
| prompt = f"""你是一个动作标注专家。以下是一段"{scene_desc}"录制中的细粒度动作序列(每个2-3秒)。 |
| 请将相关的连续动作合并为粗粒度复合动作,每个复合动作持续5-15秒。 |
| |
| 合并规则: |
| - 围绕同一个物体的连续操作合并为一个(如"抓取杯子→调整→放下"合并为一个Manipulate) |
| - 连续的整理/清洁动作合并 |
| - 合并后的时间范围 = 第一个子动作的开始时间 到 最后一个子动作的结束时间 |
| - 如果中间有短暂Idle(≤3秒),可以包含进去 |
| - 每个复合动作必须从6个类别中选一个 |
| |
| {COARSE_CATEGORIES} |
| |
| 细粒度动作序列: |
| {seg_text} |
| |
| 请严格按以下JSON格式返回,不要添加任何额外文字: |
| [{{"timestamp": "MM:SS-MM:SS", "coarse_action": "类别名", "description": "简要描述这段复合动作", "fine_segments": [子动作编号列表]}}]""" |
|
|
| response = call_llm(prompt, max_tokens=2000) |
| if response is None: |
| return None |
|
|
| try: |
| match = re.search(r'\[.*\]', response, re.DOTALL) |
| if match: |
| results = json.loads(match.group()) |
| valid = [] |
| for r in results: |
| if all(k in r for k in ["timestamp", "coarse_action", "description"]): |
| |
| if r["coarse_action"] in {"Manipulate", "CleanOrganize", "Transfer", |
| "Assemble", "FoodPrep", "Idle"}: |
| valid.append(r) |
| return valid |
| except (json.JSONDecodeError, KeyError) as e: |
| print(f" Parse error: {e}") |
| return None |
|
|
|
|
| def process_file(input_path, vol, scenario): |
| """Process one annotation file.""" |
| data = json.load(open(input_path)) |
| segments = data["segments"] |
|
|
| if not segments: |
| return {"fine_segments": segments, "coarse_segments": []}, 0 |
|
|
| print(f" Merging {len(segments)} fine segments...") |
| coarse = merge_segments_with_llm(segments, scenario) |
|
|
| if coarse is None: |
| |
| print(f" LLM failed, using fallback merge") |
| coarse = fallback_merge(segments) |
|
|
| result = { |
| "fine_segments": segments, |
| "coarse_segments": coarse, |
| } |
| return result, len(coarse) |
|
|
|
|
| def fallback_merge(segments): |
| """Simple rule-based merging as fallback.""" |
| if not segments: |
| return [] |
|
|
| coarse = [] |
| group = [segments[0]] |
|
|
| for seg in segments[1:]: |
| |
| prev_ts = group[-1]["timestamp"] |
| curr_ts = seg["timestamp"] |
| m1 = re.match(r'(\d+:\d+)\s*-\s*(\d+:\d+)', prev_ts) |
| m2 = re.match(r'(\d+:\d+)\s*-\s*(\d+:\d+)', curr_ts) |
| if not m1 or not m2: |
| group.append(seg) |
| continue |
|
|
| prev_end = parse_ts(m1.group(2)) |
| curr_start = parse_ts(m2.group(1)) |
| gap = curr_start - prev_end |
|
|
| |
| group_start = parse_ts(re.match(r'(\d+:\d+)', group[0]["timestamp"]).group(1)) |
| curr_end = parse_ts(m2.group(2)) |
| group_duration = curr_end - group_start |
|
|
| if gap <= 3 and group_duration <= 15: |
| group.append(seg) |
| else: |
| |
| coarse.append(_emit_group(group)) |
| group = [seg] |
|
|
| if group: |
| coarse.append(_emit_group(group)) |
|
|
| return coarse |
|
|
|
|
| def _emit_group(group): |
| """Create a coarse segment from a group of fine segments.""" |
| m_start = re.match(r'(\d+:\d+)', group[0]["timestamp"]) |
| m_end = re.match(r'\d+:\d+\s*-\s*(\d+:\d+)', group[-1]["timestamp"]) |
| start = m_start.group(1) if m_start else "00:00" |
| end = m_end.group(1) if m_end else "00:00" |
|
|
| labels = [seg.get("action_label", "Idle") for seg in group] |
| label_counts = Counter(labels) |
| dominant = label_counts.most_common(1)[0][0] |
|
|
| |
| label_map = { |
| "Grasp": "Manipulate", "Place": "Manipulate", "Arrange": "CleanOrganize", |
| "Wipe": "CleanOrganize", "Fold": "CleanOrganize", "Transport": "Transfer", |
| "OpenClose": "Assemble", "TearCut": "Assemble", |
| "Pour": "FoodPrep", "Stir": "FoodPrep", "Idle": "Idle", |
| } |
| coarse_label = label_map.get(dominant, "Manipulate") |
|
|
| tasks = [seg["task"] for seg in group] |
| desc = tasks[0] if len(tasks) == 1 else f"{tasks[0]}...{tasks[-1]}" |
|
|
| return { |
| "timestamp": f"{start}-{end}", |
| "coarse_action": coarse_label, |
| "description": desc[:80], |
| "fine_segments": list(range(1, len(group) + 1)), |
| } |
|
|
|
|
| def main(): |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| total_fine = 0 |
| total_coarse = 0 |
| total_files = 0 |
| coarse_labels = Counter() |
|
|
| for vol_dir in sorted(glob.glob(f"{INPUT_DIR}/v*")): |
| vol = os.path.basename(vol_dir) |
| out_dir = os.path.join(OUTPUT_DIR, vol) |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| for ann_file in sorted(glob.glob(f"{vol_dir}/s*.json")): |
| scenario = os.path.basename(ann_file).replace(".json", "") |
| print(f"[{vol}/{scenario}]", flush=True) |
|
|
| result, n_coarse = process_file(ann_file, vol, scenario) |
|
|
| out_path = os.path.join(out_dir, f"{scenario}.json") |
| with open(out_path, "w", encoding="utf-8") as f: |
| json.dump(result, f, ensure_ascii=False, indent=2) |
|
|
| n_fine = len(result["fine_segments"]) |
| total_fine += n_fine |
| total_coarse += n_coarse |
| total_files += 1 |
|
|
| for seg in result["coarse_segments"]: |
| coarse_labels[seg["coarse_action"]] += 1 |
|
|
| print(f" {n_fine} fine → {n_coarse} coarse segments", flush=True) |
|
|
| print(f"\n{'='*60}") |
| print(f"Total: {total_files} files") |
| print(f" Fine segments: {total_fine}") |
| print(f" Coarse segments: {total_coarse}") |
| print(f" Compression: {total_fine/max(total_coarse,1):.1f}x") |
| print(f" API calls: {call_count}") |
|
|
| print(f"\n Coarse label distribution:") |
| for label, count in coarse_labels.most_common(): |
| print(f" {label:<20} {count:>5} ({count/max(total_coarse,1)*100:.1f}%)") |
|
|
| print(f"\n Output: {OUTPUT_DIR}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|