| | import os |
| | import numpy as np |
| |
|
| | INDENT = " " |
| |
|
| |
|
| | def print_result(result): |
| | print("\n".join(result)) |
| |
|
| |
|
| | def get_result(result): |
| | return "\n".join(result) |
| |
|
| |
|
| | def save_to_file(result, scene_name, language, folder=""): |
| | file_name = os.path.join( |
| | folder, |
| | f"{scene_name}".upper() + "_" + "_".join(language.lower().split(" ")) + ".bddl", |
| | ) |
| | with open(file_name, "w") as f: |
| | f.write(result) |
| | return file_name |
| |
|
| |
|
| | class _PDDLDefinition: |
| | def __init__(self, func, problem_name="", domain="robosuite"): |
| | self.func = func |
| | self.problem_name = problem_name |
| | self.domain = domain |
| |
|
| | def __call__(self, *args, **kwargs): |
| | meta_strings = [] |
| | content_strings = [] |
| | meta_strings.append(f"(define (problem {self.problem_name})") |
| | content_strings.append(f"(:domain {self.domain})") |
| | content_strings += self.func(*args, **kwargs) |
| | content_strings = [INDENT + each_line for each_line in content_strings] |
| | meta_strings += content_strings |
| | meta_strings.append(")\n") |
| | return meta_strings |
| |
|
| |
|
| | def PDDLDefinition(func=None, problem_name=""): |
| | if func: |
| | return _PDDLDefinition(func) |
| | else: |
| |
|
| | def wrapper(func): |
| | return _PDDLDefinition(func, problem_name) |
| |
|
| | return wrapper |
| |
|
| |
|
| | class Language: |
| | def __init__(self, func): |
| | self.func = func |
| |
|
| | def __call__(self, *args, **kwargs): |
| | strings = [] |
| | language = kwargs["language"] |
| | strings.append(f"(:language {language})") |
| | del kwargs["language"] |
| | strings += self.func(*args, **kwargs) |
| | |
| | return strings |
| |
|
| |
|
| | class _LogicalState: |
| | def __init__(self, func, state_type=""): |
| | self.func = func |
| | self.state_type = state_type |
| |
|
| | def __call__(self, *args, **kwargs): |
| | strings = [] |
| | strings.append(f"(:{self.state_type}") |
| | strings += self.func(*args, **kwargs) |
| | strings.append(")\n") |
| | |
| | return strings |
| |
|
| |
|
| | def LogicalState(func=None, state_type="PLACEHOLDER"): |
| | if func: |
| | return _LogicalState(func) |
| | else: |
| |
|
| | def wrapper(func): |
| | return _LogicalState(func, state_type) |
| |
|
| | return wrapper |
| |
|
| |
|
| | |
| |
|
| |
|
| | class RegionWrapper: |
| | def __init__(self, func): |
| | self.func = func |
| |
|
| | def __call__(self, *args, **kwargs): |
| | strings = [] |
| | strings.append("(:regions") |
| | strings += self.func(*args, **kwargs) |
| | strings.append(")\n") |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | class Region: |
| | def __init__(self, func): |
| | self.func = func |
| |
|
| | def __call__(self, *args, **kwargs): |
| | assert "target" in kwargs, "target property has to be defined!" |
| | assert "region_name" in kwargs, "region name has to be defined" |
| | strings = [] |
| | region_name = kwargs["region_name"] |
| | strings.append(f"({region_name}") |
| | del kwargs["region_name"] |
| | strings += self.func(*args, **kwargs) |
| | strings.append(")") |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | |
| | class _ObjectDict: |
| | def __init__(self, func, object_type): |
| | self.func = func |
| | self.object_type = object_type |
| |
|
| | def __call__(self, *args, **kwargs): |
| | strings = [] |
| | strings.append(f"(:{self.object_type}") |
| | strings += self.func(*args, **kwargs) |
| | strings.append(")\n") |
| | |
| | return strings |
| |
|
| |
|
| | def ObjectDict(func=None, object_type="objects"): |
| | if func: |
| | return _ObjectDict(func) |
| | else: |
| |
|
| | def wrapper(func): |
| | return _ObjectDict(func, object_type) |
| |
|
| | return wrapper |
| |
|
| |
|
| | @ObjectDict(object_type="fixtures") |
| | def get_fixtures(**kwargs): |
| | return get_dict_string(**kwargs) |
| |
|
| |
|
| | @ObjectDict(object_type="objects") |
| | def get_objects(**kwargs): |
| | return get_dict_string(**kwargs) |
| |
|
| |
|
| | @ObjectDict(object_type="obj_of_interest") |
| | def get_objects_of_interest(l): |
| | return get_list_string(l) |
| |
|
| |
|
| | def general_get_str_func(v): |
| | if type(v) is list: |
| | return get_list_string(v) |
| | elif type(v) is tuple: |
| | return get_tuple_string(v) |
| | elif type(v) is dict: |
| | return get_dict_string(v) |
| | elif type(v) is int or type(v) is float: |
| | return str(v) |
| | elif type(v) is str: |
| | return v |
| |
|
| |
|
| | def get_dict_string(**kwargs): |
| | strings = [] |
| | for k, v in kwargs.items(): |
| | assert type(v) is list |
| | object_names = " ".join(v) |
| | strings.append(f"{object_names} - {k}") |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | def get_list_string(l): |
| | strings = [] |
| | assert type(l) is list |
| | for v in l: |
| | strings.append(general_get_str_func(v)) |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | def get_tuple_string(t): |
| | strings = [] |
| | assert type(t) is tuple |
| | return "(" + " ".join([general_get_str_func(v) for v in t]) + ")" |
| |
|
| |
|
| | def get_logical_expression_string(l): |
| | strings = [] |
| | assert type(l) is list |
| | for v in l: |
| | strings.append(general_get_str_func(v)) |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | def get_property_string(**kwargs): |
| | strings = [] |
| | for k, v in kwargs.items(): |
| | if type(v) is str: |
| | strings.append(f"{INDENT}(:{k} {v})") |
| | else: |
| | strings.append(f"{INDENT}(:{k} (") |
| | strings += [INDENT + INDENT + new_v for new_v in general_get_str_func(v)] |
| | strings.append(f"{INDENT}{INDENT})") |
| | strings.append(f"{INDENT})") |
| | strings = [INDENT + each_line for each_line in strings] |
| | return strings |
| |
|
| |
|
| | def get_prediate_string(predicates): |
| | |
| | assert type(predicates) |
| | strings = [] |
| |
|
| |
|
| | @LogicalState(state_type="init") |
| | def get_init_state(l): |
| | return get_logical_expression_string(l) |
| |
|
| |
|
| | @LogicalState(state_type="goal") |
| | def get_goal_state(l): |
| | return get_logical_expression_string(l) |
| |
|
| |
|
| | @Region |
| | def get_xy_region(**kwargs): |
| | assert "ranges" in kwargs |
| | for v in kwargs["ranges"]: |
| | assert len(v) == 4 |
| | return get_property_string(**kwargs) |
| |
|
| |
|
| | @Region |
| | def get_object_affordance_region(**kwargs): |
| | new_kwargs = {"target": kwargs["target"]} |
| | return get_property_string(**new_kwargs) |
| |
|
| |
|
| | @RegionWrapper |
| | def region_module(xy_region_kwargs_list=None, affordance_region_kwargs_list=None): |
| | strings = [] |
| | if xy_region_kwargs_list is not None: |
| | for fixture_kwargs in xy_region_kwargs_list: |
| | strings += get_xy_region(**fixture_kwargs) |
| | if affordance_region_kwargs_list is not None: |
| | for fixture_kwargs in affordance_region_kwargs_list: |
| | strings += get_object_affordance_region(**fixture_kwargs) |
| | return strings |
| |
|
| |
|
| | def object_naming_mapping(category_name, object_id): |
| | if category_name == "table": |
| | if object_id > 1: |
| | raise ValueError("Table can only be one for the moment.") |
| | return "main_table" |
| | elif category_name == "kitchen_table": |
| | if object_id > 1: |
| | raise ValueError("Kitchen table can only be one for the moment.") |
| | return "kitchen_table" |
| | elif category_name == "floor": |
| | if object_id > 1: |
| | raise ValueError("Floor can only be one.") |
| | return "floor" |
| | elif category_name == "coffee_table": |
| | if object_id > 1: |
| | raise ValueError("Coffee table can only be one for the moment.") |
| | return "coffee_table" |
| | elif category_name == "living_room_table": |
| | if object_id > 1: |
| | raise ValueError("Living room table can only be one for the moment.") |
| | return "living_room_table" |
| | elif category_name == "study_table": |
| | if object_id > 1: |
| | raise ValueError("Study table can only be one for the moment.") |
| | return "study_table" |
| | else: |
| | return f"{category_name}_{object_id}" |
| |
|
| |
|
| | def retrieve_fixture_property(category_name): |
| | """Retrieve fixture property""" |
| | property_dict = {} |
| | return property_dict |
| |
|
| |
|
| | def get_affordance_region_kwargs_list_from_fixture_info(fixture_info_dict): |
| | kwargs_list = [] |
| | for k, v in fixture_info_dict.items(): |
| | for item in v: |
| | kwargs_list.append({"target": k, "region_name": item}) |
| | return kwargs_list |
| |
|
| |
|
| | def get_xy_region_kwargs_list_from_regions_info(regions_info_dict): |
| | kwargs_list = [] |
| | for k, v in regions_info_dict.items(): |
| | assert type(v) is dict |
| | kwargs = { |
| | "region_name": k, |
| | } |
| | kwargs.update(v) |
| | kwargs_list.append(kwargs) |
| | return kwargs_list |
| |
|
| |
|
| | def get_object_dict(objects_num_info): |
| | """ |
| | A dctionary of objects that has "category_name": number_of_objects |
| | """ |
| | object_dict = {} |
| | for category_name, num_objects in objects_num_info.items(): |
| | object_dict[category_name] = [] |
| | for object_id in range(1, num_objects + 1): |
| | object_dict[category_name].append( |
| | object_naming_mapping(category_name, object_id) |
| | ) |
| | return object_dict |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Tabletop_Manipulation") |
| | @Language |
| | def tabletop_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Kitchen_Tabletop_Manipulation") |
| | @Language |
| | def kitchen_table_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Floor_Manipulation") |
| | @Language |
| | def floor_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Coffee_Table_Manipulation") |
| | @Language |
| | def coffee_table_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Study_Tabletop_Manipulation") |
| | @Language |
| | def study_table_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|
| |
|
| | @PDDLDefinition(problem_name="LIBERO_Living_Room_Tabletop_Manipulation") |
| | @Language |
| | def living_room_table_task_suites_generator( |
| | xy_region_kwargs_list, |
| | affordance_region_kwargs_list, |
| | fixture_object_dict, |
| | movable_object_dict, |
| | objects_of_interest, |
| | init_states, |
| | goal_states, |
| | ): |
| | result = [] |
| | result += region_module( |
| | xy_region_kwargs_list=xy_region_kwargs_list, |
| | affordance_region_kwargs_list=affordance_region_kwargs_list, |
| | ) |
| | result += get_fixtures(**fixture_object_dict) |
| | result += get_objects(**movable_object_dict) |
| | result += get_objects_of_interest(objects_of_interest) |
| | result += get_init_state(init_states) |
| | result += get_goal_state(goal_states) |
| | return result |
| |
|