import json from datetime import datetime, timedelta import base64 from collections import Counter import cv2 delta = timedelta(seconds=0.4) UNSHIFTED = "`1234567890-=[]\\;',./" SHIFTED = "~!@#$%^&*()_+{}|:\"<>?" SHIFT_MAP = dict(zip(UNSHIFTED, SHIFTED)) def get_video_start_time(base_path): with open(base_path / "screen_events.json", "r") as f: timestamps = json.load(f) start_timestamp = timestamps[0]["data"]["start_time"] video_file_name = timestamps[0]["data"]["filename"].split("/")[-1] # change extension to .mp4 video_file_name = video_file_name.rsplit(".", 1)[0] + ".mp4" return datetime.fromisoformat(start_timestamp), video_file_name def get_drag_delta(event_data): start_x = event_data["x1"] start_y = event_data["y1"] end_x = event_data["x2"] end_y = event_data["y2"] if start_x == end_x and start_y == end_y: return 0 return ((end_x - start_x) ** 2 + (end_y - start_y) ** 2) ** 0.5 def get_scroll_delta_direction(scrolls): directions = [scroll["data"]["direction"] for scroll in scrolls] direction_counts = Counter(directions) most_common_direction, count = direction_counts.most_common(1)[0] if most_common_direction == "down": delta = count - direction_counts.get("up", 0) elif most_common_direction == "up": delta = count - direction_counts.get("down", 0) elif most_common_direction == "left": delta = count - direction_counts.get("right", 0) elif most_common_direction == "right": delta = count - direction_counts.get("left", 0) else: raise ValueError(f"Unknown scroll direction: {most_common_direction}") return delta, most_common_direction def apply_modifiers_macos_abc(key_char: str, modifiers: dict) -> str: """ Convert unshifted key_char + modifiers (Shift, CapsLock) to actual macOS ABC output. """ if not key_char: return key_char shift = modifiers.get("shift", False) caps = modifiers.get("caps_lock", False) # LETTERS if key_char.isalpha(): upper = shift or caps return key_char.upper() if upper else key_char # NON-LETTERS (digits, punctuation, symbols) if shift: return SHIFT_MAP.get(key_char, key_char) return key_char def handle_key_press(key_char, key_modifiers_state): if key_char == "space": return " ", "write" if key_char == "return": return "enter", "press" if key_char == "tab": return "\t", "write" if key_char == "delete": return "delete", "write" if (key_modifiers_state.get("shift") or key_modifiers_state.get("caps_lock")) and \ all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "fn"]): # TODO: fix for special characters return apply_modifiers_macos_abc(key_char, key_modifiers_state), "write" if all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "shift", "fn", "caps_lock"]): return key_char, "write" parts = [] if key_modifiers_state.get("ctrl"): parts.append("Ctrl") if key_modifiers_state.get("alt"): parts.append("Alt") if key_modifiers_state.get("cmd"): parts.append("Cmd") if key_modifiers_state.get("shift"): parts.append("Shift") if key_modifiers_state.get("fn"): parts.append("Fn") parts.append(key_char) return "+".join(parts), "hotkey" def trajectory_iter(base_path): with open(base_path / "trajectories.json", "r") as f: trajectories = json.load(f) for i in trajectories: traj = i["trajectory"] write_string = "" write_end_timestamp = None write_start_timestamp = None last_event_timestamp = None for step in traj: event = step["event"]["original"] event_type = event["event_type"] if write_string and event_type not in ["key_press", "key_modifier"]: yield { "timestamp": datetime.fromisoformat(write_start_timestamp), "end_timestamp": datetime.fromisoformat(write_end_timestamp), "data": { "event_type": "write", "text": write_string, }, } write_string = "" write_start_timestamp = None write_end_timestamp = None if event_type == "drag" and get_drag_delta(event["data"]) < 5 or event_type == "key_modifier": continue if event_type == "scroll": scroll_delta, scroll_direction = get_scroll_delta_direction(event["originals"]) if scroll_delta > 0: last_event_timestamp = event["end_time"] yield { "timestamp": datetime.fromisoformat(event["timestamp"]), "end_timestamp": datetime.fromisoformat(event["end_time"]), "data": { "event_type": event_type, "clicks": scroll_delta, "direction": scroll_direction, "x": event["originals"][0]["data"]["x"], "y": event["originals"][0]["data"]["y"], }, } elif event_type == "key_press": key_char = event["data"]["key_char"] key_char_processed, action_type = handle_key_press(key_char, event["data"]["modifiers"]) if action_type == "write": if key_char_processed == "delete": write_string = write_string[:-1] if write_string else "" else: write_string += key_char_processed if write_start_timestamp is None: write_start_timestamp = event["timestamp"] write_end_timestamp = event["timestamp"] elif action_type == "press": if write_string: yield { "timestamp": datetime.fromisoformat(write_start_timestamp), "end_timestamp": datetime.fromisoformat(write_end_timestamp), "data": { "event_type": "write", "text": write_string, }, } write_string = "" write_start_timestamp = None write_end_timestamp = None last_event_timestamp = event["timestamp"] yield { "timestamp": datetime.fromisoformat(event["timestamp"]), "data": { "event_type": action_type, "key": key_char_processed, }, } else: if write_string: yield { "timestamp": datetime.fromisoformat(write_start_timestamp), "end_timestamp": datetime.fromisoformat(write_end_timestamp), "data": { "event_type": "write", "text": write_string, }, } write_string = "" write_start_timestamp = None write_end_timestamp = None last_event_timestamp = event["timestamp"] yield { "timestamp": datetime.fromisoformat(event["timestamp"]), "data": { "event_type": action_type, "combination": key_char_processed, }, } elif event_type == "drag": event["data"]["event_type"] = event_type yield { "timestamp": datetime.fromisoformat(last_event_timestamp), "end_timestamp": datetime.fromisoformat(event["timestamp"]), "data": event["data"], } last_event_timestamp = event["timestamp"] else: event["data"]["event_type"] = event_type last_event_timestamp = event["timestamp"] yield { "timestamp": datetime.fromisoformat(event["timestamp"]), "data": event["data"], } if write_string: yield { "timestamp": datetime.fromisoformat(write_start_timestamp), "end_timestamp": datetime.fromisoformat(write_end_timestamp), "data": { "event_type": "write", "text": write_string, }, } def get_video_fps(video_path): cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise ValueError(f"Unable to open video file: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return fps def get_video_frame(video_path, frame_number): cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise ValueError(f"Unable to open video file: {video_path}") cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() cap.release() if not ret: raise ValueError(f"Unable to retrieve frame {frame_number} from video: {video_path}") return frame def get_start_end_trajectory_timestamps(base_path): with open(base_path / "app_events.json", "r") as f: app_events = json.load(f) paw_events = [event for event in app_events if event["data"]["action"] == "window_changed" and "🐾 Paw Recorder" in event["data"]["current_windows"]] start_timestamp = datetime.fromisoformat(paw_events[0]["timestamp"]) end_timestamp = datetime.fromisoformat(paw_events[-1]["timestamp"]) return start_timestamp, end_timestamp def get_start_frame(start_event_timestamp, video_start_timestamp, fps, trajectory_start_timestamp, video_path): if start_event_timestamp > trajectory_start_timestamp + delta: time_diff = (trajectory_start_timestamp - video_start_timestamp + delta).total_seconds() else: time_diff = (start_event_timestamp - trajectory_start_timestamp).total_seconds() / 2 + (trajectory_start_timestamp - video_start_timestamp).total_seconds() frame_number = int(time_diff * fps) frame = get_video_frame(video_path, frame_number) return frame def get_frame_before_event(event_timestamp, video_start_timestamp, fps, video_path, prev_event_timestamp): if prev_event_timestamp + delta < event_timestamp: time_diff = (event_timestamp - delta - video_start_timestamp).total_seconds() elif prev_event_timestamp + delta >= event_timestamp: time_diff = (event_timestamp - prev_event_timestamp).total_seconds() / 2 + (prev_event_timestamp - video_start_timestamp).total_seconds() else: time_diff = (event_timestamp - video_start_timestamp).total_seconds() frame_number = int(time_diff * fps) frame = get_video_frame(video_path, frame_number) return frame def get_frame_after_event(event_timestamp, video_start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp): if event_timestamp + delta < trajectory_end_timestamp and event_timestamp + delta < next_event_timestamp: time_diff = (event_timestamp + delta - video_start_timestamp).total_seconds() else: time_diff = (event_timestamp - video_start_timestamp).total_seconds() frame_number = int(time_diff * fps) frame = get_video_frame(video_path, frame_number) return frame def get_end_timestamp(event): if "end_timestamp" in event: return event["end_timestamp"] return event["timestamp"] def parse_video(base_path): start_timestamp, video_file_name = get_video_start_time(base_path) video_path = base_path / video_file_name fps = get_video_fps(video_path) trajectory_start_timestamp, trajectory_end_timestamp = get_start_end_trajectory_timestamps(base_path) trajectory = list(trajectory_iter(base_path)) start_event = trajectory[0] yield { "frame": get_start_frame(start_event["timestamp"], start_timestamp, fps, trajectory_start_timestamp, video_path), } for i in range(len(trajectory) - 1): event = trajectory[i] next_event = trajectory[i + 1] if get_end_timestamp(next_event) > trajectory_end_timestamp: break if next_event["data"]["event_type"] == "drag" and event["data"]["event_type"] == "mouse_click": continue yield { "event": event["data"], } yield { "frame": get_frame_after_event(get_end_timestamp(event), start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp=next_event["timestamp"]), } seconds_between_events = int((next_event["timestamp"] - get_end_timestamp(event) - 2 * delta).total_seconds()) if seconds_between_events > 0: yield { "event": {"action": f"wait({seconds_between_events})"} } yield { "frame": get_frame_before_event(next_event["timestamp"], start_timestamp, fps, video_path, prev_event_timestamp=get_end_timestamp(event)), } yield { "event": {"action": "finished"} } def parse_conversation(base_path): filtered_trajectory = list(parse_video(base_path)) conversation = [] for item in filtered_trajectory: if 'event' in item: conversation.append({"role": "assistant", "content": [{"type": "text", "text": json.dumps(item['event'])}]}) elif 'frame' in item: _, buffer = cv2.imencode('.jpg', item['frame']) encoded_image = base64.b64encode(buffer).decode("utf-8") conversation.append({"role": "user", "content": [{"type": "image_url", "image_url": "data:image/jpeg;base64," + encoded_image}]}) return conversation