| |
| import argparse |
| import json |
| import math |
| from collections import defaultdict |
| from pathlib import Path |
|
|
|
|
| LABEL_MAP = {"反复折返": "折返"} |
| TIME_FIELDS = [ |
| "elapsed_seconds_in_current_behavior", |
| "estimated_remaining_seconds", |
| "full_remaining_seconds", |
| "expected_end_time", |
| ] |
| OUTPUT_FIELDS = [ |
| "current_behavior", |
| "is_transition", |
| "elapsed_seconds_in_current_behavior", |
| "estimated_remaining_seconds", |
| "full_remaining_seconds", |
| "expected_end_time", |
| "next_possible_behavior", |
| "stage_index", |
| "total_stages", |
| "sequence_so_far", |
| ] |
|
|
|
|
| def normalize_label(value): |
| return LABEL_MAP.get(value, value) |
|
|
|
|
| def normalize_tree(value): |
| if isinstance(value, dict): |
| out = {} |
| for key, item in value.items(): |
| if key in {"current_behavior", "next_possible_behavior", "label"} and isinstance(item, str): |
| out[key] = normalize_label(item) |
| elif key in {"full_sequence_order", "label_space"} and isinstance(item, list): |
| out[key] = [normalize_label(x) if isinstance(x, str) else x for x in item] |
| else: |
| out[key] = normalize_tree(item) |
| return out |
| if isinstance(value, list): |
| return [normalize_tree(item) for item in value] |
| if isinstance(value, str): |
| return normalize_label(value) |
| return value |
|
|
|
|
| def read_chat_jsonl(path): |
| with path.open(encoding="utf-8") as f: |
| for line_no, line in enumerate(f, 1): |
| if not line.strip(): |
| continue |
| try: |
| obj = json.loads(line) |
| except json.JSONDecodeError as exc: |
| raise ValueError(f"{path}:{line_no} is not valid JSONL: {exc}") from exc |
| yield obj |
|
|
|
|
| def get_assistant_json(example): |
| content = example["messages"][-1]["content"] |
| return json.loads(content) if isinstance(content, str) else content |
|
|
|
|
| def set_assistant_json(example, data): |
| example["messages"][-1]["content"] = json.dumps(data, ensure_ascii=False, separators=(",", ":")) |
|
|
|
|
| def clean_example(example): |
| example = normalize_tree(example) |
| for message in example.get("messages", []): |
| content = message.get("content") |
| if isinstance(content, str): |
| try: |
| parsed = json.loads(content) |
| except Exception: |
| continue |
| message["content"] = json.dumps(normalize_tree(parsed), ensure_ascii=False, separators=(",", ":")) |
| assistant = get_assistant_json(example) |
| assistant = normalize_tree(assistant) |
| set_assistant_json(example, assistant) |
| return example |
|
|
|
|
| def write_jsonl(path, rows): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| for row in rows: |
| f.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n") |
|
|
|
|
| def percentile(values, q): |
| if not values: |
| return 0.0 |
| values = sorted(values) |
| idx = (len(values) - 1) * q |
| lo = math.floor(idx) |
| hi = math.ceil(idx) |
| if lo == hi: |
| return float(values[lo]) |
| return float(values[lo] * (hi - idx) + values[hi] * (idx - lo)) |
|
|
|
|
| def build_thresholds(clean_train): |
| by_label = defaultdict(list) |
| for ex in clean_train: |
| assistant = get_assistant_json(ex) |
| label = assistant.get("current_behavior") |
| elapsed = assistant.get("elapsed_seconds_in_current_behavior") |
| if isinstance(elapsed, (int, float)): |
| by_label[label].append(float(elapsed)) |
| thresholds = {label: max(5.0, percentile(vals, 0.95)) for label, vals in by_label.items()} |
| thresholds["__default__"] = max(30.0, percentile([v for vals in by_label.values() for v in vals], 0.95)) |
| return thresholds |
|
|
|
|
| def used_areas(sequence): |
| mapping = { |
| "门": {"进入", "门锁", "靠近门", "离开"}, |
| "马桶": {"靠近马桶", "马桶盖", "马桶垫纸", "坐下", "坐用马桶", "站用马桶", "卷筒厕纸", "起身", "冲水"}, |
| "洗手池": {"靠近洗手池", "洗手", "刷牙"}, |
| "垃圾桶": {"垃圾桶"}, |
| } |
| labels = {item.get("label") for item in sequence or []} |
| return [area for area, area_labels in mapping.items() if labels & area_labels] |
|
|
|
|
| def qa_target(assistant, thresholds): |
| sequence = assistant.get("sequence_so_far") or [] |
| current = assistant.get("current_behavior") |
| full_remaining = assistant.get("full_remaining_seconds") |
| elapsed = assistant.get("elapsed_seconds_in_current_behavior") |
| occupied = bool(sequence) and current != "离开" and (not isinstance(full_remaining, (int, float)) or full_remaining > 0) |
| time_to_free_minutes = round(max(0.0, float(full_remaining or 0.0)) / 60.0, 2) |
| threshold = thresholds.get(current, thresholds.get("__default__", 120.0)) |
| frequent_switching = len(sequence) >= 12 |
| long_current = isinstance(elapsed, (int, float)) and elapsed > threshold |
| return { |
| "occupied": occupied, |
| "time_to_free_minutes": time_to_free_minutes, |
| "used_areas": used_areas(sequence), |
| "is_abnormal": bool(long_current or frequent_switching), |
| } |
|
|
|
|
| def make_qa_example(source_example, thresholds): |
| assistant = get_assistant_json(source_example) |
| compact = {key: assistant.get(key) for key in OUTPUT_FIELDS} |
| user_payload = { |
| "task": "qa_from_behavior_json", |
| "model_output_json": compact, |
| "questions": [ |
| "卫生间是否被占用?", |
| "预计多长时间之后卫生间会空出?", |
| "卫生间内哪些区域被使用过?", |
| "卫生间是否存在异常?", |
| ], |
| } |
| return { |
| "messages": [ |
| { |
| "role": "system", |
| "content": "你是卫生间状态问答模型。请只根据输入的行为分析 JSON 回答,并只输出 JSON:occupied(bool), time_to_free_minutes(number), used_areas(array), is_abnormal(bool)。", |
| }, |
| {"role": "user", "content": json.dumps(user_payload, ensure_ascii=False, separators=(",", ":"))}, |
| {"role": "assistant", "content": json.dumps(qa_target(assistant, thresholds), ensure_ascii=False, separators=(",", ":"))}, |
| ] |
| } |
|
|
|
|
| def dataset_stats(rows): |
| labels = defaultdict(int) |
| sample_ids = set() |
| for ex in rows: |
| assistant = get_assistant_json(ex) |
| labels[assistant.get("current_behavior")] += 1 |
| try: |
| sample_ids.add(json.loads(ex["messages"][1]["content"]).get("sample_id")) |
| except Exception: |
| pass |
| return {"num_examples": len(rows), "num_sample_ids": len(sample_ids), "label_counts": dict(sorted(labels.items()))} |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--train", default="train.jsonl") |
| parser.add_argument("--val", default="val.jsonl") |
| parser.add_argument("--out-dir", default="data/processed") |
| args = parser.parse_args() |
|
|
| out_dir = Path(args.out_dir) |
| train = [clean_example(ex) for ex in read_chat_jsonl(Path(args.train))] |
| val = [clean_example(ex) for ex in read_chat_jsonl(Path(args.val))] |
| thresholds = build_thresholds(train) |
| train_qa = [make_qa_example(ex, thresholds) for ex in train] |
| val_qa = [make_qa_example(ex, thresholds) for ex in val] |
|
|
| write_jsonl(out_dir / "train_struct.jsonl", train) |
| write_jsonl(out_dir / "val_struct.jsonl", val) |
| write_jsonl(out_dir / "train_qa.jsonl", train_qa) |
| write_jsonl(out_dir / "val_qa.jsonl", val_qa) |
| write_jsonl(out_dir / "train_mixed.jsonl", train + train_qa) |
| write_jsonl(out_dir / "val_mixed.jsonl", val + val_qa) |
|
|
| summary = { |
| "normalization": LABEL_MAP, |
| "train_struct": dataset_stats(train), |
| "val_struct": dataset_stats(val), |
| "train_qa": {"num_examples": len(train_qa)}, |
| "val_qa": {"num_examples": len(val_qa)}, |
| "abnormal_elapsed_thresholds_p95": thresholds, |
| "qa_schema": ["occupied", "time_to_free_minutes", "used_areas", "is_abnormal"], |
| } |
| (out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") |
| print(json.dumps(summary, ensure_ascii=False, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|