Spaces:
Runtime error
Runtime error
| from Map import MapIn,MapOut | |
| from Axis import AxisFinder | |
| import cv2 | |
| import config | |
| import time | |
| from ast import literal_eval as make_tuple | |
| def run_axis_finder(total_execution_time): | |
| maps_list = [] | |
| lines_list = [] | |
| axis_division_cond = True | |
| maps_processing_queue = [] | |
| map_id = 0 | |
| iteration = 0 | |
| # axis finding process (1.1) | |
| config.log_axis_finding_settings() | |
| config.log(f"------Axis Finder------") | |
| start_time = time.time() | |
| while axis_division_cond: | |
| config.log(f"----Step {iteration}----") | |
| if (map_id == 0): | |
| # read map and make masks | |
| mymap = MapIn(config.MAIN_MAP_ADDR,config.MAIN_MAP_FILLED_BLOCK_MASK,config.MAIN_MAP_FILLED_F_F_MASK, | |
| config.PARCELS_COUNT,config.ARCH,map_id=map_id) | |
| # set random values for carbon | |
| # mymap.correct_input() | |
| else: | |
| mymap = maps_processing_queue.pop(0) | |
| # makes axis_finder object to split | |
| myaxis = AxisFinder(mymap) | |
| # Arch Selection on start division is the same | |
| if mymap.arch_choice == config.ArchStyles.Customized: | |
| # axis_center = myaxis.get_center(mymap.block_mask) | |
| # mymap.set_map_axis_center(axis_center) | |
| cv2.imshow(f'Map{mymap.map_id}', mymap.frame) | |
| cv2.setMouseCallback(f'Map{mymap.map_id}', AxisFinder.click_callback) | |
| cv2.waitKey(0) | |
| cv2.destroyAllWindows() | |
| p_f = AxisFinder.clicked_points.pop(0) | |
| p_s = AxisFinder.clicked_points.pop(0) | |
| points=[] | |
| points.append((1,p_f,p_s,p_f,p_s)) | |
| elif map_id == 0 and mymap.arch_choice == config.ArchStyles.Human_Centered_AI: | |
| p_f = () | |
| if config.ARCH_FIRST_INPUT == None: | |
| cv2.imshow(f'Map{mymap.map_id}', mymap.frame) | |
| cv2.setMouseCallback(f'Map{mymap.map_id}', AxisFinder.click_callback) | |
| cv2.waitKey(0) | |
| cv2.destroyAllWindows() | |
| p_f = AxisFinder.clicked_points.pop(0) | |
| else: | |
| p_f = config.ARCH_FIRST_INPUT | |
| points = myaxis.iterate_throughall(mymap,p_f) | |
| elif map_id == 0: | |
| points = myaxis.iterate_throughall(mymap) | |
| else: | |
| points = myaxis.iterate_old_boundries_new_york() | |
| # dump axis if hit fixed_facility | |
| if points[-1][0] == -1: | |
| if not maps_processing_queue: | |
| axis_division_cond = False | |
| continue | |
| # draw the found line carbon_fitness | |
| config.log(f"Map div best score:{points[-1][0]} map_id:{mymap.map_id}") | |
| config.log(f"Map div best access split fitness:{points[-1][5][0]}") | |
| config.log(f"Map div best area split fitness:{points[-1][5][1]}") | |
| config.log(f"Map div best fixed facilities fitness:{points[-1][5][2]}") | |
| config.log(f"Map div best carbon fitness:{points[-1][5][3]}") | |
| lines_list.append((points[-1][3:5],mymap.map_id)) | |
| # make map for split two parts (add line as access) | |
| split_mask_u,split_mask_d = mymap.line_split_mask_maker(points[-1][1],points[-1][2]) | |
| line_mask = mymap.line_mask_maker(points[-1][1],points[-1][2]) | |
| up_map = MapIn(split_mask_u,mymap,line_mask,map_id+1,0,points[-1][1:3]) | |
| down_map = MapIn(split_mask_d,mymap,line_mask,map_id+2,1,points[-1][1:3]) | |
| parcels = AxisFinder.cal_split_parcels(up_map,down_map,mymap.parcel_cnt) | |
| up_map.parcel_cnt,down_map.parcel_cnt = parcels | |
| config.log(f"Map:{map_id+1} Parcels:{up_map.parcel_cnt}") | |
| config.log(f"Map:{map_id+2} Parcels:{down_map.parcel_cnt}") | |
| # check finishing condition | |
| up_feasibility, down_feasibility = (up_map.isfeasible(), down_map.isfeasible()) | |
| # add new maps to processing queue | |
| if up_feasibility: | |
| config.log(f"Map:{map_id+1} Added!") | |
| up_map.set_line_point(points[-1][1:]) | |
| maps_processing_queue.append(up_map) | |
| # was the last axis so add axis to final answer | |
| else: | |
| config.log(f"Map:{map_id+1} Added As Answer") | |
| up_map.set_line_point(points[-1][1:]) | |
| maps_list.append(up_map) | |
| if down_feasibility: | |
| config.log(f"Map:{map_id+2} Added!") | |
| down_map.set_line_point(points[-1][1:]) | |
| maps_processing_queue.append(down_map) | |
| else: | |
| config.log(f"Map:{map_id+2} Added As Answer") | |
| down_map.set_line_point(points[-1][1:]) | |
| maps_list.append(down_map) | |
| if not maps_processing_queue: | |
| axis_division_cond = False | |
| map_id += 2 | |
| iteration+=1 | |
| config.log('-'*8) | |
| # sort maps by size | |
| maps_list.sort(key=lambda x: x.curr_size,reverse=True) | |
| cv2.destroyAllWindows() | |
| # export split result | |
| config.log(f"------Axis Result------") | |
| export_map = MapOut(config.MAIN_MAP_ADDR,lines_list) | |
| export_map.draw_axis() | |
| export_map.draw_collision() | |
| export_map.report() | |
| config.log("--- Axis Finder Finished In %s seconds ---" % (time.time() - start_time)) | |
| total_execution_time += time.time() - start_time | |
| return total_execution_time | |
| def gradio_inference(access_ratio,start_point,carbon_weight): | |
| config.ACCESS_RATIO = float(access_ratio) | |
| config.ARCH_FIRST_INPUT = make_tuple(start_point) | |
| config.A_CARBON_WEIGHT = int(carbon_weight) | |
| run_axis_finder(0) | |
| image = cv2.imread(f'outputs/collision_map.bmp') | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| return image | |
| if __name__ == "__main__": | |
| run_axis_finder(0) |