trajectory-viewer / postprocessing.py
mshamrai's picture
fix: delete and add enter press
298be74 unverified
import json
from datetime import datetime, timedelta
import base64
from collections import Counter
import cv2
delta = timedelta(seconds=0.4)
UNSHIFTED = "`1234567890-=[]\\;',./"
SHIFTED = "~!@#$%^&*()_+{}|:\"<>?"
SHIFT_MAP = dict(zip(UNSHIFTED, SHIFTED))
def get_video_start_time(base_path):
with open(base_path / "screen_events.json", "r") as f:
timestamps = json.load(f)
start_timestamp = timestamps[0]["data"]["start_time"]
video_file_name = timestamps[0]["data"]["filename"].split("/")[-1]
# change extension to .mp4
video_file_name = video_file_name.rsplit(".", 1)[0] + ".mp4"
return datetime.fromisoformat(start_timestamp), video_file_name
def get_drag_delta(event_data):
start_x = event_data["x1"]
start_y = event_data["y1"]
end_x = event_data["x2"]
end_y = event_data["y2"]
if start_x == end_x and start_y == end_y:
return 0
return ((end_x - start_x) ** 2 + (end_y - start_y) ** 2) ** 0.5
def get_scroll_delta_direction(scrolls):
directions = [scroll["data"]["direction"] for scroll in scrolls]
direction_counts = Counter(directions)
most_common_direction, count = direction_counts.most_common(1)[0]
if most_common_direction == "down":
delta = count - direction_counts.get("up", 0)
elif most_common_direction == "up":
delta = count - direction_counts.get("down", 0)
elif most_common_direction == "left":
delta = count - direction_counts.get("right", 0)
elif most_common_direction == "right":
delta = count - direction_counts.get("left", 0)
else:
raise ValueError(f"Unknown scroll direction: {most_common_direction}")
return delta, most_common_direction
def apply_modifiers_macos_abc(key_char: str, modifiers: dict) -> str:
"""
Convert unshifted key_char + modifiers (Shift, CapsLock) to actual macOS ABC output.
"""
if not key_char:
return key_char
shift = modifiers.get("shift", False)
caps = modifiers.get("caps_lock", False)
# LETTERS
if key_char.isalpha():
upper = shift or caps
return key_char.upper() if upper else key_char
# NON-LETTERS (digits, punctuation, symbols)
if shift:
return SHIFT_MAP.get(key_char, key_char)
return key_char
def handle_key_press(key_char, key_modifiers_state):
if key_char == "space":
return " ", "write"
if key_char == "return":
return "enter", "press"
if key_char == "tab":
return "\t", "write"
if key_char == "delete":
return "delete", "write"
if (key_modifiers_state.get("shift") or key_modifiers_state.get("caps_lock")) and \
all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "fn"]):
# TODO: fix for special characters
return apply_modifiers_macos_abc(key_char, key_modifiers_state), "write"
if all(key_modifiers_state.get(mod) == False for mod in ["ctrl", "alt", "cmd", "shift", "fn", "caps_lock"]):
return key_char, "write"
parts = []
if key_modifiers_state.get("ctrl"):
parts.append("Ctrl")
if key_modifiers_state.get("alt"):
parts.append("Alt")
if key_modifiers_state.get("cmd"):
parts.append("Cmd")
if key_modifiers_state.get("shift"):
parts.append("Shift")
if key_modifiers_state.get("fn"):
parts.append("Fn")
parts.append(key_char)
return "+".join(parts), "hotkey"
def trajectory_iter(base_path):
with open(base_path / "trajectories.json", "r") as f:
trajectories = json.load(f)
for i in trajectories:
traj = i["trajectory"]
write_string = ""
write_end_timestamp = None
write_start_timestamp = None
last_event_timestamp = None
for step in traj:
event = step["event"]["original"]
event_type = event["event_type"]
if write_string and event_type not in ["key_press", "key_modifier"]:
yield {
"timestamp": datetime.fromisoformat(write_start_timestamp),
"end_timestamp": datetime.fromisoformat(write_end_timestamp),
"data": {
"event_type": "write",
"text": write_string,
},
}
write_string = ""
write_start_timestamp = None
write_end_timestamp = None
if event_type == "drag" and get_drag_delta(event["data"]) < 5 or event_type == "key_modifier":
continue
if event_type == "scroll":
scroll_delta, scroll_direction = get_scroll_delta_direction(event["originals"])
if scroll_delta > 0:
last_event_timestamp = event["end_time"]
yield {
"timestamp": datetime.fromisoformat(event["timestamp"]),
"end_timestamp": datetime.fromisoformat(event["end_time"]),
"data": {
"event_type": event_type,
"clicks": scroll_delta,
"direction": scroll_direction,
"x": event["originals"][0]["data"]["x"],
"y": event["originals"][0]["data"]["y"],
},
}
elif event_type == "key_press":
key_char = event["data"]["key_char"]
key_char_processed, action_type = handle_key_press(key_char, event["data"]["modifiers"])
if action_type == "write":
if key_char_processed == "delete":
write_string = write_string[:-1] if write_string else ""
else:
write_string += key_char_processed
if write_start_timestamp is None:
write_start_timestamp = event["timestamp"]
write_end_timestamp = event["timestamp"]
elif action_type == "press":
if write_string:
yield {
"timestamp": datetime.fromisoformat(write_start_timestamp),
"end_timestamp": datetime.fromisoformat(write_end_timestamp),
"data": {
"event_type": "write",
"text": write_string,
},
}
write_string = ""
write_start_timestamp = None
write_end_timestamp = None
last_event_timestamp = event["timestamp"]
yield {
"timestamp": datetime.fromisoformat(event["timestamp"]),
"data": {
"event_type": action_type,
"key": key_char_processed,
},
}
else:
if write_string:
yield {
"timestamp": datetime.fromisoformat(write_start_timestamp),
"end_timestamp": datetime.fromisoformat(write_end_timestamp),
"data": {
"event_type": "write",
"text": write_string,
},
}
write_string = ""
write_start_timestamp = None
write_end_timestamp = None
last_event_timestamp = event["timestamp"]
yield {
"timestamp": datetime.fromisoformat(event["timestamp"]),
"data": {
"event_type": action_type,
"combination": key_char_processed,
},
}
elif event_type == "drag":
event["data"]["event_type"] = event_type
yield {
"timestamp": datetime.fromisoformat(last_event_timestamp),
"end_timestamp": datetime.fromisoformat(event["timestamp"]),
"data": event["data"],
}
last_event_timestamp = event["timestamp"]
else:
event["data"]["event_type"] = event_type
last_event_timestamp = event["timestamp"]
yield {
"timestamp": datetime.fromisoformat(event["timestamp"]),
"data": event["data"],
}
if write_string:
yield {
"timestamp": datetime.fromisoformat(write_start_timestamp),
"end_timestamp": datetime.fromisoformat(write_end_timestamp),
"data": {
"event_type": "write",
"text": write_string,
},
}
def get_video_fps(video_path):
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Unable to open video file: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return fps
def get_video_frame(video_path, frame_number):
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Unable to open video file: {video_path}")
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
cap.release()
if not ret:
raise ValueError(f"Unable to retrieve frame {frame_number} from video: {video_path}")
return frame
def get_start_end_trajectory_timestamps(base_path):
with open(base_path / "app_events.json", "r") as f:
app_events = json.load(f)
paw_events = [event for event in app_events if event["data"]["action"] == "window_changed" and "🐾 Paw Recorder" in event["data"]["current_windows"]]
start_timestamp = datetime.fromisoformat(paw_events[0]["timestamp"])
end_timestamp = datetime.fromisoformat(paw_events[-1]["timestamp"])
return start_timestamp, end_timestamp
def get_start_frame(start_event_timestamp, video_start_timestamp, fps, trajectory_start_timestamp, video_path):
if start_event_timestamp > trajectory_start_timestamp + delta:
time_diff = (trajectory_start_timestamp - video_start_timestamp + delta).total_seconds()
else:
time_diff = (start_event_timestamp - trajectory_start_timestamp).total_seconds() / 2 + (trajectory_start_timestamp - video_start_timestamp).total_seconds()
frame_number = int(time_diff * fps)
frame = get_video_frame(video_path, frame_number)
return frame
def get_frame_before_event(event_timestamp, video_start_timestamp, fps, video_path, prev_event_timestamp):
if prev_event_timestamp + delta < event_timestamp:
time_diff = (event_timestamp - delta - video_start_timestamp).total_seconds()
elif prev_event_timestamp + delta >= event_timestamp:
time_diff = (event_timestamp - prev_event_timestamp).total_seconds() / 2 + (prev_event_timestamp - video_start_timestamp).total_seconds()
else:
time_diff = (event_timestamp - video_start_timestamp).total_seconds()
frame_number = int(time_diff * fps)
frame = get_video_frame(video_path, frame_number)
return frame
def get_frame_after_event(event_timestamp, video_start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp):
if event_timestamp + delta < trajectory_end_timestamp and event_timestamp + delta < next_event_timestamp:
time_diff = (event_timestamp + delta - video_start_timestamp).total_seconds()
else:
time_diff = (event_timestamp - video_start_timestamp).total_seconds()
frame_number = int(time_diff * fps)
frame = get_video_frame(video_path, frame_number)
return frame
def get_end_timestamp(event):
if "end_timestamp" in event:
return event["end_timestamp"]
return event["timestamp"]
def parse_video(base_path):
start_timestamp, video_file_name = get_video_start_time(base_path)
video_path = base_path / video_file_name
fps = get_video_fps(video_path)
trajectory_start_timestamp, trajectory_end_timestamp = get_start_end_trajectory_timestamps(base_path)
trajectory = list(trajectory_iter(base_path))
start_event = trajectory[0]
yield {
"frame": get_start_frame(start_event["timestamp"], start_timestamp, fps, trajectory_start_timestamp, video_path),
}
for i in range(len(trajectory) - 1):
event = trajectory[i]
next_event = trajectory[i + 1]
if get_end_timestamp(next_event) > trajectory_end_timestamp:
break
if next_event["data"]["event_type"] == "drag" and event["data"]["event_type"] == "mouse_click":
continue
yield {
"event": event["data"],
}
yield {
"frame": get_frame_after_event(get_end_timestamp(event), start_timestamp, fps, trajectory_end_timestamp, video_path, next_event_timestamp=next_event["timestamp"]),
}
seconds_between_events = int((next_event["timestamp"] - get_end_timestamp(event) - 2 * delta).total_seconds())
if seconds_between_events > 0:
yield {
"event": {"action": f"wait({seconds_between_events})"}
}
yield {
"frame": get_frame_before_event(next_event["timestamp"], start_timestamp, fps, video_path, prev_event_timestamp=get_end_timestamp(event)),
}
yield {
"event": {"action": "finished"}
}
def parse_conversation(base_path):
filtered_trajectory = list(parse_video(base_path))
conversation = []
for item in filtered_trajectory:
if 'event' in item:
conversation.append({"role": "assistant", "content": [{"type": "text", "text": json.dumps(item['event'])}]})
elif 'frame' in item:
_, buffer = cv2.imencode('.jpg', item['frame'])
encoded_image = base64.b64encode(buffer).decode("utf-8")
conversation.append({"role": "user", "content": [{"type": "image_url", "image_url": "data:image/jpeg;base64," + encoded_image}]})
return conversation