Spaces:
Running
Running
| import json | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import math | |
| import heapq | |
| import datetime | |
| from moviepy.editor import ImageSequenceClip | |
| import tqdm | |
| from utils import get_scene_dir_path, OBJECT_PICTURE_DIR | |
| class Video_Generator(): | |
| def __init__(self): | |
| self.frame_radius = 20 | |
| self.frame_thickness = 1 | |
| self.frame_outline_color = (0,0,0) #black | |
| self.frame_filling_color = (255, 255, 255) #white | |
| self.traj_count = {} | |
| def get_img_coord(self, points, camera_pose, image_width, image_height): | |
| transformation_matrix = compute_transformation_matrix(camera_pose) | |
| camera_positions = world_to_camera(points, transformation_matrix) | |
| projects_points = project_to_2d(camera_positions, camera_pose, image_width, image_height) | |
| return projects_points | |
| def draw_objframe(self, obj_type, point, background): | |
| image = background | |
| obj_path = os.path.join(OBJECT_PICTURE_DIR, '{}.png'.format(obj_type)) | |
| if not os.path.exists(obj_path): | |
| obj_path = os.path.join(OBJECT_PICTURE_DIR, 'Phone.png') | |
| obj_img = cv2.imread(obj_path) | |
| #draw frame | |
| center = (int(point[0]-1.2*self.frame_radius), int(point[1]-1.2*self.frame_radius)) | |
| cv2.circle(image, center, self.frame_radius, self.frame_filling_color, -1) | |
| cv2.circle(image, center, self.frame_radius, self.frame_outline_color, self.frame_thickness) | |
| theta = np.pi/8 | |
| line_start1 = (int(center[0]+self.frame_radius*np.sin(theta)), int(center[1]+self.frame_radius*np.cos(theta))) | |
| line_start2 = (int(center[0]+self.frame_radius*np.cos(theta)), int(center[1]+self.frame_radius*np.sin(theta))) | |
| line_end = (int(center[0] + 1.2*self.frame_radius), int(center[1] + 1.2*self.frame_radius)) | |
| cv2.line(image, line_start1, line_end, self.frame_outline_color, self.frame_thickness) | |
| cv2.line(image, line_start2, line_end, self.frame_outline_color, self.frame_thickness) | |
| cv2.circle(image, line_end, 3, (0,0,255), -1) | |
| #put object | |
| obj_resized = cv2.resize(obj_img, (self.frame_radius, self.frame_radius)) | |
| x_start = max(0, obj_resized.shape[0]//2-center[1]) | |
| x_end = min(obj_resized.shape[0], obj_resized.shape[0]//2 + image.shape[0] - center[1]) | |
| y_start = max(0, obj_resized.shape[1]//2-center[0]) | |
| y_end = min(obj_resized.shape[1], obj_resized.shape[1]//2 + image.shape[1] - center[0]) | |
| img_x_start = max(0, center[1]-obj_resized.shape[0]//2) | |
| img_x_end = min(image.shape[0], center[1]+obj_resized.shape[0]//2) | |
| img_y_start = max(0, center[0]-obj_resized.shape[1]//2) | |
| img_y_end = min(image.shape[1], center[0]+obj_resized.shape[1]//2) | |
| image[img_x_start:img_x_end, img_y_start: img_y_end] = obj_resized[x_start:x_end, y_start:y_end] | |
| def add_description(self, activity_name, time, obj_list, receptacle_list, background): | |
| image = background.copy() | |
| descrpition_width = 300 | |
| description_bg = np.zeros((background.shape[0], descrpition_width,3), np.uint8) | |
| res = np.hstack((image, description_bg)) | |
| font = cv2.FONT_HERSHEY_COMPLEX | |
| font_scale = 0.5 | |
| font_color = (0,0,0) | |
| thickness = 1 | |
| line_type = 8 | |
| # text_size = cv2.getTextSize(text,font, font_scale, line_type)[0] | |
| text_x = background.shape[0] + 10 | |
| text_y = descrpition_width//2 -50 | |
| text_y = 50 | |
| line_interval = 30 | |
| cv2.rectangle(res, (background.shape[1],0), (background.shape[1]+descrpition_width, background.shape[0]), (255,255,255),-1) | |
| texts = ['activity:', activity_name, 'time: ', str(time), 'object movement: '] | |
| for i, text in enumerate(texts): | |
| if i%2==0: | |
| text_x = background.shape[0] + 10 | |
| font_color = (0,0,0) | |
| else: | |
| text_x = background.shape[0] + 30 | |
| font_color = (0,0,255) | |
| cv2.putText(res, text, (text_x, text_y + i*line_interval), font, font_scale, font_color,thickness, line_type) | |
| start_line = 5 | |
| for i in range(len(obj_list)): | |
| obj = obj_list[i].split('|')[0] | |
| recep = receptacle_list[i].split('|')[0] | |
| # obj_move_text = '{} -> {}'.format(obj, recep) | |
| text_x = background.shape[0] + 120 | |
| font_color = (0,0,255) | |
| obj_text_size = cv2.getTextSize(obj, font, font_scale, thickness)[0][0] | |
| cv2.putText(res, obj, (text_x - 20 - obj_text_size, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
| cv2.putText(res, '->', (text_x, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
| cv2.putText(res, recep, (text_x + 40, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
| return res | |
| def draw_traj(self, info, image): | |
| last_point = info['last_point'] | |
| point = info['point'] | |
| is_end = info['end'] | |
| is_arrow = info['arrow'] | |
| radius = 3 | |
| next_point = (int(point[0]), int(point[1])) | |
| if last_point is None: | |
| start_color = (0,0,255) | |
| end_color = (255, 255, 0) | |
| cv2.circle(image, next_point, radius, start_color, -1) | |
| return | |
| pre_point = (int(last_point[0]), int(last_point[1])) | |
| line_color = (0,0,0) | |
| line_thickness = 1 | |
| arrowcolor = (0,255,0) | |
| arrow_thickness = 1 | |
| #count | |
| count = self.traj_count.get((pre_point, next_point),0) | |
| self.traj_count[(pre_point, next_point)] = count + 1 | |
| count = self.traj_count.get((next_point, pre_point),0) | |
| self.traj_count[(next_point, pre_point)] = count + 1 | |
| step = 0.2 | |
| line_thickness = min(int(1 + count * step), 5) | |
| #draw | |
| cv2.line(image, pre_point, next_point, line_color, line_thickness) | |
| if is_arrow: | |
| cv2.arrowedLine(image, pre_point, next_point,arrowcolor,arrow_thickness,tipLength=1.5) | |
| if is_end: | |
| end_color = (255, 255, 0) | |
| cv2.circle(image, next_point, radius, end_color, -1) | |
| def get_multiobj_image(self, draw_infos, background): | |
| image_list = [] | |
| if len(draw_infos)<=0: | |
| return image_list, background | |
| activity_name = draw_infos[0]['activity'] | |
| time = draw_infos[0]['time'] | |
| object_list = [info['object'] for info in draw_infos] | |
| receptacle_list = [info['receptacle'] for info in draw_infos] | |
| image_infos = [] | |
| for draw_info in draw_infos: | |
| obj = draw_info['object'].split('|')[0] | |
| points = draw_info['points'] | |
| last_point = None | |
| for point_num, point in enumerate(points): | |
| if point_num >= len(image_infos): | |
| image_infos.append([]) | |
| image_infos[point_num].append({ | |
| 'object':obj, | |
| 'point':point, | |
| 'last_point':last_point, | |
| 'end':point_num == len(points)-1, | |
| 'arrow':point_num == len(points)//3 | |
| }) | |
| last_point = (point[0], point[1]) | |
| image_with_traj = background.copy() | |
| for image_info in image_infos: | |
| #draw traj | |
| for info in image_info: | |
| self.draw_traj(info, image_with_traj) | |
| #draw obj with frame | |
| image = image_with_traj.copy() | |
| for info in image_info: | |
| self.draw_objframe(info['object'], info['point'], image) | |
| image = self.add_description(activity_name, time, object_list, receptacle_list, image) | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| image_list.append(image) | |
| return image_list, image_with_traj | |
| def get_receptacle_position_from_meta(self, receptacle_id, metadata): | |
| position = None | |
| objects = metadata['objects'] | |
| for obj in objects: | |
| if obj['objectId'] == receptacle_id: | |
| position = obj['position'] | |
| break | |
| return position | |
| def get_distance(self, point1, point2): | |
| return math.sqrt((point1['x'] - point2['x'])**2 + (point1['z'] - point2['z'])**2) | |
| def get_nearest_point(self, point, reachable_points): | |
| min_distance = 100000 | |
| nearest_point = None | |
| for rp in reachable_points: | |
| distance = self.get_distance(point, rp) | |
| if distance < min_distance: | |
| min_distance = distance | |
| nearest_point = rp | |
| return nearest_point | |
| def get_path(self, start_position, end_position, reachable_positions): | |
| res = [] | |
| res.append(start_position) | |
| start_point = self.get_nearest_point(start_position, reachable_positions) | |
| target_point = self.get_nearest_point(end_position, reachable_positions) | |
| point_id = 0 | |
| open_list = [(0, point_id, start_point)] | |
| came_from = {tuple((start_point['x'], start_point['z'])):0} | |
| cost_so_far = {tuple((start_point['x'], start_point['z'])):0} | |
| while open_list: | |
| current = heapq.heappop(open_list)[-1] | |
| if current == target_point: | |
| break | |
| for next_point in reachable_positions: | |
| dis = self.get_distance(current, next_point) | |
| if dis - 0.25 > 0.001: | |
| continue | |
| new_cost = cost_so_far[tuple((current['x'], current['z']))] + 1 | |
| if tuple((next_point['x'], next_point['z'])) not in cost_so_far or new_cost < cost_so_far[tuple((next_point['x'], next_point['z']))]: | |
| cost_so_far[tuple((next_point['x'], next_point['z']))] = new_cost | |
| priority = new_cost + abs(next_point['x'] - current['x']) + abs(next_point['z'] - current['z']) | |
| point_id += 1 | |
| heapq.heappush(open_list, (priority, point_id, next_point)) | |
| came_from[tuple((next_point['x'], next_point['z']))] = current | |
| path = [] | |
| current = target_point | |
| while current != start_point: | |
| path.append(current) | |
| current = came_from[tuple((current['x'], current['z']))] | |
| path.append(start_point) | |
| path.reverse() | |
| res.extend(path) | |
| res.append(end_position) | |
| return res | |
| def init_obj_traj(self, metadata): | |
| res = {} | |
| for obj in metadata['objects']: | |
| if obj['pickupable']: | |
| parentReceptacle = obj['parentReceptacles'] | |
| if parentReceptacle is not None and 'Floor' in parentReceptacle: | |
| parentReceptacle.remove('Floor') | |
| res[obj['objectId']] = parentReceptacle[0] if (parentReceptacle is not None and len(parentReceptacle)) >0 else None | |
| return res | |
| def save_vedio(self, dynamic_info, background, camera_pose, metadata, reachable_positions, output_path): | |
| object_traj = self.init_obj_traj(metadata) | |
| self.traj_count = {} | |
| image_list = [] | |
| paths_list = [] | |
| for time,timeinfo in tqdm.tqdm(dynamic_info.items()): | |
| draw_infos = [] | |
| for info in timeinfo: | |
| info['time'] = time | |
| target_object_id = info['object'] | |
| target_receptacle_id = info['receptacle'] | |
| target_object_receptacle = object_traj[target_object_id] | |
| if target_object_receptacle == target_receptacle_id: | |
| continue | |
| if target_object_receptacle is None: | |
| for obj in metadata['objects']: | |
| if obj['objectId'] == target_object_id: | |
| start_position = obj['position'] | |
| break | |
| else: | |
| start_position = self.get_receptacle_position_from_meta(target_object_receptacle, metadata) | |
| end_position = self.get_receptacle_position_from_meta(target_receptacle_id, metadata) | |
| path = self.get_path(start_position, end_position, reachable_positions) #path 包括start, end | |
| image_width = 300 | |
| image_height = 300 | |
| img_path = self.get_img_coord(path, camera_pose, image_width, image_height) | |
| paths_list.append(img_path) | |
| draw_info = { | |
| 'time':time, | |
| 'activity':info['activity'], | |
| 'object':target_object_id, | |
| 'receptacle':target_receptacle_id, | |
| 'points':img_path, | |
| } | |
| draw_infos.append(draw_info) | |
| object_traj[target_object_id] = target_receptacle_id | |
| time_images,image_with_traj = self.get_multiobj_image(draw_infos, background) | |
| background = image_with_traj | |
| image_list.extend(time_images) | |
| clip = ImageSequenceClip(image_list, fps=30) | |
| clip.write_videofile(str(output_path), fps=30, codec="libx264") | |
| def get_dynamic_info(self, schedules): | |
| res = {} | |
| for day, day_schedules in schedules.items(): | |
| for activity in day_schedules: | |
| activity_name = activity['activity'] | |
| start_time = activity['start_time'] | |
| content = activity['content'] | |
| for c in content: | |
| c['activity'] = activity_name | |
| time = datetime.datetime.combine(day, start_time) | |
| if time not in res: | |
| res[time] = [] | |
| res[time].extend(content) | |
| return res | |
| def generate(self, schedules, scene_file_name, vedio_path): | |
| metadata, camera_pose, background, reachable_points = read_scene(scene_file_name) | |
| schekeys = list(schedules.keys()) | |
| schedules_filter = {} | |
| schedules_filter[schekeys[0]] = schedules[schekeys[0]] | |
| dynamic_info = self.get_dynamic_info(schedules_filter) | |
| self.save_vedio(dynamic_info, background, camera_pose, metadata, reachable_points, vedio_path) | |
| def read_scene(scene_file_name): | |
| data_dir = get_scene_dir_path(scene_file_name) | |
| metadata = json.load(open(os.path.join(data_dir, 'metadata.json'),'r',encoding='utf-8')) | |
| camera_pose = json.load(open(os.path.join(data_dir, 'camera_pose.json'),'r',encoding='utf-8')) | |
| background = cv2.imread(os.path.join(data_dir, 'background.png')) | |
| reachable_points = json.load(open(os.path.join(data_dir, 'reachablePositions.json'),'r',encoding='utf-8')) | |
| return metadata, camera_pose, background, reachable_points | |
| def compute_transformation_matrix(camera_pose): | |
| position = camera_pose['position'] | |
| rotation = camera_pose['rotation'] | |
| translation_matrix = np.array([ | |
| [1, 0, 0, -position['x']], | |
| [0, 1, 0, -position['y']], | |
| [0, 0, 1, -position['z']], | |
| [0, 0, 0, 1] | |
| ]) | |
| theta_x = np.radians(rotation['x']) | |
| rotation_matrix_x = np.array([ | |
| [1, 0, 0, 0], | |
| [0, np.cos(theta_x), -np.sin(theta_x), 0], | |
| [0, np.sin(theta_x), np.cos(theta_x), 0], | |
| [0, 0, 0, 1] | |
| ]) | |
| transformation_matrix = np.dot(rotation_matrix_x, translation_matrix) | |
| return transformation_matrix | |
| def world_to_camera(positions, transformation_matrix): | |
| camera_positions = [] | |
| for pos in positions: | |
| world_pos = np.array([pos['x'], pos['y'], pos['z'], 1]) | |
| camera_pos = np.dot(transformation_matrix, world_pos) | |
| camera_positions.append(camera_pos) | |
| return camera_positions | |
| def project_to_2d(camera_positions, camera_pose, image_width, image_height): | |
| fov = camera_pose['fieldOfView'] | |
| aspect_ratio = image_width / image_height | |
| f = 1 / np.tan(np.radians(fov) / 2) | |
| projection_matrix = np.array([ | |
| [f / aspect_ratio, 0, 0, 0], | |
| [0, f, 0, 0], | |
| [0, 0, 1, 0] | |
| ]) | |
| projected_points = [] | |
| for pos in camera_positions: | |
| projected = np.dot(projection_matrix, pos) | |
| projected /= projected[2] | |
| x = (projected[0] + 1) * image_width / 2 | |
| y = (1 - projected[1]) * image_height / 2 | |
| x = image_width - x | |
| projected_points.append((x, y)) | |
| return projected_points | |