Spaces:
Sleeping
Sleeping
| import json | |
| from datetime import datetime, timedelta | |
| import base64 | |
| from collections import Counter | |
| import cv2 | |
| delta = timedelta(seconds=0.4) | |
| UNSHIFTED = "`1234567890-=[]\\;',./" | |
| SHIFTED = "~!@#$%^&*()_+{}|:\"<>?" | |
| SHIFT_MAP = dict(zip(UNSHIFTED, SHIFTED)) | |
| def get_video_start_time(base_path): | |
| with open(base_path / "screen_events.json", "r") as f: | |
| timestamps = json.load(f) | |
| start_timestamp = timestamps[0]["data"]["start_time"] | |
| video_file_name = timestamps[0]["data"]["filename"].split("/")[-1] | |
| # change extension to .mp4 | |
| video_file_name = video_file_name.rsplit(".", 1)[0] + ".mp4" | |
| return datetime.fromisoformat(start_timestamp), video_file_name | |
| def get_drag_delta(event_data): | |
| start_x = event_data["x1"] | |
| start_y = event_data["y1"] | |
| end_x = event_data["x2"] | |
| end_y = event_data["y2"] | |
| if start_x == end_x and start_y == end_y: | |
| return 0 | |
| return ((end_x - start_x) ** 2 + (end_y - start_y) ** 2) ** 0.5 | |
| def get_scroll_delta_direction(scrolls): | |
| directions = [scroll["data"]["direction"] for scroll in scrolls] | |
| direction_counts = Counter(directions) | |
| most_common_direction, count = direction_counts.most_common(1)[0] | |
| if most_common_direction == "down": | |
| delta = count - direction_counts.get("up", 0) | |
| elif most_common_direction == "up": | |
| delta = count - direction_counts.get("down", 0) | |
| elif most_common_direction == "left": | |
| delta = count - direction_counts.get("right", 0) | |
| elif most_common_direction == "right": | |
| delta = count - direction_counts.get("left", 0) | |
| else: | |
| raise ValueError(f"Unknown scroll direction: {most_common_direction}") | |
| return delta, most_common_direction | |
| def apply_modifiers_macos_abc(key_char: str, modifiers: dict) -> str: | |
| """ | |
| Convert unshifted key_char + modifiers (Shift, CapsLock) to actual macOS ABC output. | |
| """ | |
| if not key_char: | |
| return key_char | |
| shift = modifiers.get("shift", False) | |
| caps = modifiers.get("caps_lock", False) | |
| # LETTERS | |
| if key_char.isalpha(): | |
| upper = shift or caps | |
| return key_char.upper() if upper else key_char | |
| # NON-LETTERS (digits, punctuation, symbols) | |
| if shift: | |
| return SHIFT_MAP.get(key_char, key_char) | |
| return key_char | |
| def handle_key_press(key_char, key_modifiers_state): | |
| if key_char == "space": | |
| return " ", "write" | |
| if key_char == "return": | |
| return "enter", "press" | |
| if key_char == "tab": | |
| return "\t", "write" | |
| if key_char == "delete": | |
| return "delete", "write" | |
| if (key_modifiers_state.get("shift") or key_modifiers_state.get("caps_lock")) and \ | |
| all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "fn"]): | |
| # TODO: fix for special characters | |
| return apply_modifiers_macos_abc(key_char, key_modifiers_state), "write" | |
| if all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "shift", "fn", "caps_lock"]): | |
| return key_char, "write" | |
| parts = [] | |
| if key_modifiers_state.get("ctrl"): | |
| parts.append("Ctrl") | |
| if key_modifiers_state.get("alt"): | |
| parts.append("Alt") | |
| if key_modifiers_state.get("cmd"): | |
| parts.append("Cmd") | |
| if key_modifiers_state.get("shift"): | |
| parts.append("Shift") | |
| if key_modifiers_state.get("fn"): | |
| parts.append("Fn") | |
| parts.append(key_char) | |
| return "+".join(parts), "hotkey" | |
| def trajectory_iter(base_path): | |
| with open(base_path / "trajectories.json", "r") as f: | |
| trajectories = json.load(f) | |
| for i in trajectories: | |
| traj = i["trajectory"] | |
| write_string = "" | |
| write_end_timestamp = None | |
| write_start_timestamp = None | |
| last_event_timestamp = None | |
| for step in traj: | |
| event = step["event"]["original"] | |
| event_type = event["event_type"] | |
| if write_string and event_type not in ["key_press", "key_modifier"]: | |
| yield { | |
| "timestamp": datetime.fromisoformat(write_start_timestamp), | |
| "end_timestamp": datetime.fromisoformat(write_end_timestamp), | |
| "data": { | |
| "event_type": "write", | |
| "text": write_string, | |
| }, | |
| } | |
| write_string = "" | |
| write_start_timestamp = None | |
| write_end_timestamp = None | |
| if event_type == "drag" and get_drag_delta(event["data"]) < 5 or event_type == "key_modifier": | |
| continue | |
| if event_type == "scroll": | |
| scroll_delta, scroll_direction = get_scroll_delta_direction(event["originals"]) | |
| if scroll_delta > 0: | |
| last_event_timestamp = event["end_time"] | |
| yield { | |
| "timestamp": datetime.fromisoformat(event["timestamp"]), | |
| "end_timestamp": datetime.fromisoformat(event["end_time"]), | |
| "data": { | |
| "event_type": event_type, | |
| "clicks": scroll_delta, | |
| "direction": scroll_direction, | |
| "x": event["originals"][0]["data"]["x"], | |
| "y": event["originals"][0]["data"]["y"], | |
| }, | |
| } | |
| elif event_type == "key_press": | |
| key_char = event["data"]["key_char"] | |
| key_char_processed, action_type = handle_key_press(key_char, event["data"]["modifiers"]) | |
| if action_type == "write": | |
| if key_char_processed == "delete": | |
| write_string = write_string[:-1] if write_string else "" | |
| else: | |
| write_string += key_char_processed | |
| if write_start_timestamp is None: | |
| write_start_timestamp = event["timestamp"] | |
| write_end_timestamp = event["timestamp"] | |
| elif action_type == "press": | |
| if write_string: | |
| yield { | |
| "timestamp": datetime.fromisoformat(write_start_timestamp), | |
| "end_timestamp": datetime.fromisoformat(write_end_timestamp), | |
| "data": { | |
| "event_type": "write", | |
| "text": write_string, | |
| }, | |
| } | |
| write_string = "" | |
| write_start_timestamp = None | |
| write_end_timestamp = None | |
| last_event_timestamp = event["timestamp"] | |
| yield { | |
| "timestamp": datetime.fromisoformat(event["timestamp"]), | |
| "data": { | |
| "event_type": action_type, | |
| "key": key_char_processed, | |
| }, | |
| } | |
| else: | |
| if write_string: | |
| yield { | |
| "timestamp": datetime.fromisoformat(write_start_timestamp), | |
| "end_timestamp": datetime.fromisoformat(write_end_timestamp), | |
| "data": { | |
| "event_type": "write", | |
| "text": write_string, | |
| }, | |
| } | |
| write_string = "" | |
| write_start_timestamp = None | |
| write_end_timestamp = None | |
| last_event_timestamp = event["timestamp"] | |
| yield { | |
| "timestamp": datetime.fromisoformat(event["timestamp"]), | |
| "data": { | |
| "event_type": action_type, | |
| "combination": key_char_processed, | |
| }, | |
| } | |
| elif event_type == "drag": | |
| event["data"]["event_type"] = event_type | |
| yield { | |
| "timestamp": datetime.fromisoformat(last_event_timestamp), | |
| "end_timestamp": datetime.fromisoformat(event["timestamp"]), | |
| "data": event["data"], | |
| } | |
| last_event_timestamp = event["timestamp"] | |
| else: | |
| event["data"]["event_type"] = event_type | |
| last_event_timestamp = event["timestamp"] | |
| yield { | |
| "timestamp": datetime.fromisoformat(event["timestamp"]), | |
| "data": event["data"], | |
| } | |
| if write_string: | |
| yield { | |
| "timestamp": datetime.fromisoformat(write_start_timestamp), | |
| "end_timestamp": datetime.fromisoformat(write_end_timestamp), | |
| "data": { | |
| "event_type": "write", | |
| "text": write_string, | |
| }, | |
| } | |
| def get_video_fps(video_path): | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Unable to open video file: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| return fps | |
| def get_video_frame(video_path, frame_number): | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Unable to open video file: {video_path}") | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| raise ValueError(f"Unable to retrieve frame {frame_number} from video: {video_path}") | |
| return frame | |
| def get_start_end_trajectory_timestamps(base_path): | |
| with open(base_path / "app_events.json", "r") as f: | |
| app_events = json.load(f) | |
| paw_events = [event for event in app_events if event["data"]["action"] == "window_changed" and "🐾 Paw Recorder" in event["data"]["current_windows"]] | |
| start_timestamp = datetime.fromisoformat(paw_events[0]["timestamp"]) | |
| end_timestamp = datetime.fromisoformat(paw_events[-1]["timestamp"]) | |
| return start_timestamp, end_timestamp | |
| def get_start_frame(start_event_timestamp, video_start_timestamp, fps, trajectory_start_timestamp, video_path): | |
| if start_event_timestamp > trajectory_start_timestamp + delta: | |
| time_diff = (trajectory_start_timestamp - video_start_timestamp + delta).total_seconds() | |
| else: | |
| time_diff = (start_event_timestamp - trajectory_start_timestamp).total_seconds() / 2 + (trajectory_start_timestamp - video_start_timestamp).total_seconds() | |
| frame_number = int(time_diff * fps) | |
| frame = get_video_frame(video_path, frame_number) | |
| return frame | |
| def get_frame_before_event(event_timestamp, video_start_timestamp, fps, video_path, prev_event_timestamp): | |
| if prev_event_timestamp + delta < event_timestamp: | |
| time_diff = (event_timestamp - delta - video_start_timestamp).total_seconds() | |
| elif prev_event_timestamp + delta >= event_timestamp: | |
| time_diff = (event_timestamp - prev_event_timestamp).total_seconds() / 2 + (prev_event_timestamp - video_start_timestamp).total_seconds() | |
| else: | |
| time_diff = (event_timestamp - video_start_timestamp).total_seconds() | |
| frame_number = int(time_diff * fps) | |
| frame = get_video_frame(video_path, frame_number) | |
| return frame | |
| def get_frame_after_event(event_timestamp, video_start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp): | |
| if event_timestamp + delta < trajectory_end_timestamp and event_timestamp + delta < next_event_timestamp: | |
| time_diff = (event_timestamp + delta - video_start_timestamp).total_seconds() | |
| else: | |
| time_diff = (event_timestamp - video_start_timestamp).total_seconds() | |
| frame_number = int(time_diff * fps) | |
| frame = get_video_frame(video_path, frame_number) | |
| return frame | |
| def get_end_timestamp(event): | |
| if "end_timestamp" in event: | |
| return event["end_timestamp"] | |
| return event["timestamp"] | |
| def parse_video(base_path): | |
| start_timestamp, video_file_name = get_video_start_time(base_path) | |
| video_path = base_path / video_file_name | |
| fps = get_video_fps(video_path) | |
| trajectory_start_timestamp, trajectory_end_timestamp = get_start_end_trajectory_timestamps(base_path) | |
| trajectory = list(trajectory_iter(base_path)) | |
| start_event = trajectory[0] | |
| yield { | |
| "frame": get_start_frame(start_event["timestamp"], start_timestamp, fps, trajectory_start_timestamp, video_path), | |
| } | |
| for i in range(len(trajectory) - 1): | |
| event = trajectory[i] | |
| next_event = trajectory[i + 1] | |
| if get_end_timestamp(next_event) > trajectory_end_timestamp: | |
| break | |
| if next_event["data"]["event_type"] == "drag" and event["data"]["event_type"] == "mouse_click": | |
| continue | |
| yield { | |
| "event": event["data"], | |
| } | |
| yield { | |
| "frame": get_frame_after_event(get_end_timestamp(event), start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp=next_event["timestamp"]), | |
| } | |
| seconds_between_events = int((next_event["timestamp"] - get_end_timestamp(event) - 2 * delta).total_seconds()) | |
| if seconds_between_events > 0: | |
| yield { | |
| "event": {"action": f"wait({seconds_between_events})"} | |
| } | |
| yield { | |
| "frame": get_frame_before_event(next_event["timestamp"], start_timestamp, fps, video_path, prev_event_timestamp=get_end_timestamp(event)), | |
| } | |
| yield { | |
| "event": {"action": "finished"} | |
| } | |
| def parse_conversation(base_path): | |
| filtered_trajectory = list(parse_video(base_path)) | |
| conversation = [] | |
| for item in filtered_trajectory: | |
| if 'event' in item: | |
| conversation.append({"role": "assistant", "content": [{"type": "text", "text": json.dumps(item['event'])}]}) | |
| elif 'frame' in item: | |
| _, buffer = cv2.imencode('.jpg', item['frame']) | |
| encoded_image = base64.b64encode(buffer).decode("utf-8") | |
| conversation.append({"role": "user", "content": [{"type": "image_url", "image_url": "data:image/jpeg;base64," + encoded_image}]}) | |
| return conversation |