Spaces:
Sleeping
Sleeping
| from collections import deque | |
| from copy import deepcopy | |
| class GPContext: | |
| def __init__(self, grid): | |
| self.grid = deepcopy(grid) | |
| self.objects = [] | |
| self.top_object = None | |
| self.shift = None | |
| def extract_object(grid): | |
| ctx = GPContext(grid) | |
| rows, cols = len(ctx.grid), len(ctx.grid[0]) | |
| visited = [[False]*cols for _ in range(rows)] | |
| def bfs(r, c, val): | |
| queue = deque([(r, c)]) | |
| block = [] | |
| visited[r][c] = True | |
| while queue: | |
| x, y = queue.popleft() | |
| block.append((x, y)) | |
| for dx, dy in [(-1,0), (1,0), (0,-1), (0,1)]: | |
| nx, ny = x + dx, y + dy | |
| if 0 <= nx < rows and 0 <= ny < cols and not visited[nx][ny] and ctx.grid[nx][ny] == val: | |
| visited[nx][ny] = True | |
| queue.append((nx, ny)) | |
| return block | |
| for r in range(rows): | |
| for c in range(cols): | |
| if ctx.grid[r][c] != 0 and not visited[r][c]: | |
| block = bfs(r, c, ctx.grid[r][c]) | |
| top_row = min(x for x, y in block) | |
| value = ctx.grid[block[0][0]][block[0][1]] | |
| ctx.objects.append((top_row, value, block)) | |
| return ctx | |
| def sort_objects_by_column(ctx): | |
| ctx.objects.sort(key=lambda obj: min(y for x, y in obj[2])) | |
| return ctx | |
| def sort_object(ctx): | |
| ctx.objects.sort(key=lambda obj: obj[0]) | |
| return ctx | |
| def move_right_most_object(ctx): | |
| if not ctx.objects: | |
| return ctx.grid | |
| # Get rightmost object: object with largest column index | |
| rightmost_object = max(ctx.objects, key=lambda obj: max(y for x, y in obj[2])) | |
| _, value, block = rightmost_object | |
| for x, y in block: | |
| ctx.grid[x][y] = 0 | |
| shift = 0 | |
| cols = len(ctx.grid[0]) | |
| while True: | |
| can_move = True | |
| for x, y in block: | |
| new_y = y + shift + 1 | |
| if new_y >= cols or ctx.grid[x][new_y] != 0: | |
| can_move = False | |
| break | |
| if not can_move: | |
| break | |
| shift += 1 | |
| for x, y in block: | |
| ctx.grid[x][y + shift] = value | |
| return ctx.grid | |
| def move_left_most_object(ctx): | |
| if not ctx.objects: | |
| return ctx.grid | |
| # Get leftmost object: object with smallest column index | |
| leftmost_object = min(ctx.objects, key=lambda obj: min(y for x, y in obj[2])) | |
| _, value, block = leftmost_object | |
| for x, y in block: | |
| ctx.grid[x][y] = 0 | |
| shift = 0 | |
| while True: | |
| can_move = True | |
| for x, y in block: | |
| new_y = y - (shift + 1) | |
| if new_y < 0 or ctx.grid[x][new_y] != 0: | |
| can_move = False | |
| break | |
| if not can_move: | |
| break | |
| shift += 1 | |
| for x, y in block: | |
| ctx.grid[x][y - shift] = value | |
| return ctx.grid | |
| def move_bottom_most_object(ctx): | |
| if not ctx.objects: | |
| return ctx.grid | |
| # Get bottommost object (last one in the list after sorting by top_row) | |
| bottom_object = ctx.objects[-1] | |
| _, value, block = bottom_object | |
| # Remove it from the grid | |
| for x, y in block: | |
| ctx.grid[x][y] = 0 | |
| # Compute shift (same as top, move down) | |
| shift = 0 | |
| rows = len(ctx.grid) | |
| while True: | |
| can_move = True | |
| for x, y in block: | |
| new_x = x + shift + 1 | |
| if new_x >= rows or ctx.grid[new_x][y] != 0: | |
| can_move = False | |
| break | |
| if not can_move: | |
| break | |
| shift += 1 | |
| # Place object | |
| for x, y in block: | |
| ctx.grid[x + shift][y] = value | |
| return ctx.grid | |
| def move_top_most_object(ctx): | |
| if not ctx.objects: | |
| return ctx.grid | |
| # Get topmost object | |
| top_object = ctx.objects[0] | |
| _, value, block = top_object | |
| # Remove it from the grid | |
| for x, y in block: | |
| ctx.grid[x][y] = 0 | |
| # Compute shift | |
| shift = 0 | |
| rows = len(ctx.grid) | |
| while True: | |
| can_move = True | |
| for x, y in block: | |
| new_x = x + shift + 1 | |
| if new_x >= rows or ctx.grid[new_x][y] != 0: | |
| can_move = False | |
| break | |
| if not can_move: | |
| break | |
| shift += 1 | |
| # Place object | |
| for x, y in block: | |
| ctx.grid[x + shift][y] = value | |
| return ctx.grid | |
| def reverse_object_order(ctx): | |
| ctx.reversed_objects = list(reversed(ctx.objects)) | |
| return ctx | |
| # Step 3: Clear grid and place objects from top downward | |
| def place_objects(ctx): | |
| rows, cols = len(ctx.grid), len(ctx.grid[0]) | |
| new_grid = [[0]*cols for _ in range(rows)] | |
| current_row = 0 | |
| for _, value, block in ctx.reversed_objects: | |
| # Shift block so top of block aligns with current_row | |
| min_row = min(x for x, y in block) | |
| row_shift = current_row - min_row | |
| # Compute height of block to update current_row | |
| block_height = max(x for x, y in block) - min_row + 1 | |
| for x, y in block: | |
| new_x = x + row_shift | |
| if 0 <= new_x < rows: | |
| new_grid[new_x][y] = value | |
| current_row += block_height | |
| return new_grid | |
| from dsl import * | |
| concept_hierarchy = { | |
| "Above Below": { | |
| "remove below horizontal": ["flip_horizontal", "flip_vertical"], | |
| "Fill below the pattern": ["rotate_90", "rotate_180", "rotate_270"], | |
| "Move object": ["extract_object", "sort_objects", "move_top_most_object","extract_object""move_bottom_most_object","move_left_most_object","move_right_most_object"], | |
| "reverse object":[ "place_objects", "reverse_object_order","extract_object"] | |
| }, | |
| "Clean Up": { | |
| "Copy the grid content": ["find_center_pixel"] | |
| } | |
| } | |
| primitive_function_registry = { | |
| "move_top_most_object": move_top_most_object, | |
| "move_bottom_most_object": move_bottom_most_object, | |
| "move_right_most_object":move_right_most_object, | |
| "move_left_most_object":move_left_most_object, | |
| "extract_object": extract_object, | |
| "sort_objects": sort_object, | |
| "find_center_pixel":find_center_pixel, | |
| "extract_object": extract_object, | |
| "reverse_object_order": reverse_object_order, | |
| "place_objects": place_objects | |
| } | |
| def get_sub_concepts_and_functions(high_level_concepts): | |
| allowed_functions = [] | |
| for hlc in high_level_concepts: | |
| sub_concepts = concept_hierarchy.get(hlc, {}) | |
| for slc_funcs in sub_concepts.values(): | |
| allowed_functions.extend(slc_funcs) | |
| return list(set(allowed_functions)) | |
| import random | |
| TERMINALS = ["input_grid"] | |
| class Node: | |
| def __init__(self, value, children=None): | |
| self.value = value | |
| self.children = children if children else [] | |
| def __str__(self): | |
| if self.children: | |
| return f"{self.value}({', '.join(str(child) for child in self.children)})" | |
| return str(self.value) | |
| def evaluate(self, input_grid): | |
| if self.value == "input_grid": | |
| return input_grid | |
| child_values = [child.evaluate(input_grid) for child in self.children] | |
| func = primitive_function_registry.get(self.value) | |
| if func is None: | |
| raise ValueError(f"Unknown function: {self.value}") | |
| return func(*child_values) | |
| def generate_random_program(max_depth, current_depth=0, allowed_functions=None): | |
| if current_depth >= max_depth or (current_depth > 0 and random.random() < 0.2): | |
| return Node(random.choice(TERMINALS)) | |
| else: | |
| func_name = random.choice(allowed_functions) if allowed_functions else random.choice(list(primitive_function_registry.keys())) | |
| children = [generate_random_program(max_depth, current_depth+1, allowed_functions) for _ in range(1)] # assuming arity=1 | |
| return Node(func_name, children) | |
| def get_all_nodes(program): | |
| nodes = [program] | |
| for child in program.children: | |
| nodes.extend(get_all_nodes(child)) | |
| return nodes | |
| def crossover(parent1, parent2): | |
| import copy | |
| child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) | |
| nodes1, nodes2 = get_all_nodes(child1), get_all_nodes(child2) | |
| point1, point2 = random.choice(nodes1), random.choice(nodes2) | |
| point1.value, point1.children, point2.value, point2.children = point2.value, point2.children, point1.value, point1.children | |
| return child1, child2 | |
| def mutation(program, max_depth, mutation_rate, allowed_functions): | |
| import copy | |
| mutant = copy.deepcopy(program) | |
| nodes = get_all_nodes(mutant) | |
| for node in nodes: | |
| if random.random() < mutation_rate: | |
| new_subtree = generate_random_program(max_depth=max_depth, current_depth=0, allowed_functions=allowed_functions) | |
| node.value = new_subtree.value | |
| node.children = new_subtree.children | |
| return mutant | |
| def tournament_selection(population, fitness_scores, k): | |
| selected = [] | |
| for _ in range(len(population)): | |
| participants = random.sample(list(zip(population, fitness_scores)), k) | |
| winner = max(participants, key=lambda x: x[1])[0] | |
| selected.append(winner) | |
| return selected | |
| class Generation: | |
| def __init__(self, best_fitness, population): | |
| self.best_fitness = best_fitness | |
| self.population = population | |
| def to_dict(self): | |
| return { | |
| "best_fitness": self.best_fitness, | |
| "population": [str(ind) for ind in self.population] | |
| } | |
| def evaluate_fitness(program, input_output_pairs): | |
| score = 0 | |
| for inp, expected in input_output_pairs: | |
| try: | |
| result = program.evaluate(inp) | |
| if result == expected: | |
| score += 1 | |
| except: | |
| continue | |
| return score | |
| def genetic_programming(input_output_pairs, population_size, generations, mutation_rate, crossover_rate, max_depth, predicted_HLCs): | |
| allowed_functions = get_sub_concepts_and_functions(predicted_HLCs) | |
| population = [generate_random_program(max_depth, allowed_functions=allowed_functions) for _ in range(population_size)] | |
| all_generations = [] | |
| best_program = None | |
| for gen in range(generations): | |
| fitness_scores = [evaluate_fitness(p, input_output_pairs) for p in population] | |
| best_fitness = max(fitness_scores) | |
| best_program = population[fitness_scores.index(best_fitness)] | |
| selected = tournament_selection(population, fitness_scores, k=3) | |
| next_generation = [] | |
| while len(next_generation) < population_size: | |
| if random.random() < crossover_rate and len(selected) >= 2: | |
| p1, p2 = random.sample(selected, 2) | |
| c1, c2 = crossover(p1, p2) | |
| next_generation.extend([c1, c2]) | |
| else: | |
| parent = random.choice(selected) | |
| child = mutation(parent, max_depth, mutation_rate, allowed_functions) | |
| next_generation.append(child) | |
| population = next_generation[:population_size] | |
| all_generations.append(Generation(best_fitness, population)) | |
| print(f"Generation {gen} - Best Fitness: {best_fitness}") | |
| return best_program, all_generations | |
| if __name__ == "__main__": | |
| if __name__ == "__main__": | |
| input_output_pairs = [ | |
| ] | |
| predicted_HLCs = [] | |
| best_program, generations = genetic_programming( | |
| input_output_pairs=input_output_pairs, | |
| population_size=500, | |
| generations=700, | |
| mutation_rate=0.2, | |
| crossover_rate=0.7, | |
| max_depth=3, | |
| predicted_HLCs=predicted_HLCs | |
| ) | |
| print("\nBest Program:", best_program) | |