Spaces:
Sleeping
Sleeping
| import random | |
| import heapq | |
| import time | |
| import pickle | |
| from collections import deque, Counter | |
| class Variable(): | |
| ACROSS = "across" | |
| DOWN = "down" | |
| def __init__(self, i, j, direction, length): | |
| """Create a new variable with starting point, direction, and length.""" | |
| self.i = i | |
| self.j = j | |
| self.direction = direction | |
| self.length = length | |
| self.cells = [] | |
| for k in range(self.length): | |
| self.cells.append( | |
| (self.i + (k if self.direction == Variable.DOWN else 0), | |
| self.j + (k if self.direction == Variable.ACROSS else 0)) | |
| ) | |
| def __hash__(self): | |
| return hash((self.i, self.j, self.direction, self.length)) | |
| def __eq__(self, other): | |
| return ( | |
| (self.i == other.i) and | |
| (self.j == other.j) and | |
| (self.direction == other.direction) and | |
| (self.length == other.length) | |
| ) | |
| def __str__(self): | |
| return f"({self.i}, {self.j}) {self.direction} : {self.length}" | |
| def __repr__(self): | |
| direction = repr(self.direction) | |
| return f"Variable({self.i}, {self.j}, {direction}, {self.length})" | |
| class Crossword(): | |
| def __init__(self, grid, words_file, file_path = True): | |
| self.structure = [] | |
| self.height = len(grid) # the number of rows in the grid | |
| self.width = len(grid[0]) # the number of columns in the grid | |
| for i in range(len(grid)): | |
| row = [] | |
| for j in range(len(grid[0])): | |
| if grid[i][j] == '': | |
| row.append(False) | |
| else: | |
| row.append(True) | |
| self.structure.append(row) | |
| if not file_path: | |
| self.words = [word.upper() for word in words_file] | |
| else: | |
| # Save vocabulary list | |
| with open(words_file) as f: | |
| self.words = set(f.read().upper().splitlines()) # to remove all the duplicates | |
| self.words = list(self.words) | |
| for _ in range(5): | |
| random.shuffle(self.words) | |
| self.words = set(self.words) | |
| # Determine variable set | |
| self.variables = set() | |
| for i in range(self.height): | |
| for j in range(self.width): | |
| # Vertical words | |
| starts_word = ( | |
| self.structure[i][j] | |
| and (i == 0 or not self.structure[i - 1][j]) | |
| ) | |
| if starts_word: | |
| length = 1 | |
| for k in range(i + 1, self.height): | |
| if self.structure[k][j]: | |
| length += 1 | |
| else: | |
| break | |
| if length > 1: | |
| self.variables.add(Variable( | |
| i=i, j=j, | |
| direction=Variable.DOWN, | |
| length=length | |
| )) | |
| # Horizontal words | |
| starts_word = ( | |
| self.structure[i][j] | |
| and (j == 0 or not self.structure[i][j - 1]) | |
| ) | |
| if starts_word: | |
| length = 1 | |
| for k in range(j + 1, self.width): | |
| if self.structure[i][k]: | |
| length += 1 | |
| else: | |
| break | |
| if length > 1: | |
| self.variables.add(Variable( | |
| i=i, j=j, | |
| direction=Variable.ACROSS, | |
| length=length | |
| )) | |
| # Compute overlaps for each word | |
| # For any pair of variables v1, v2, their overlap is either: | |
| # None, if the two variables do not overlap; or | |
| # (i, j), where v1's ith character overlaps v2's jth character | |
| self.overlaps = dict() | |
| self.overlaps_positions = dict() | |
| for v1 in self.variables: | |
| for v2 in self.variables: | |
| if v1 == v2: | |
| continue | |
| cells1 = v1.cells | |
| cells2 = v2.cells | |
| intersection = set(cells1).intersection(cells2) | |
| if not intersection: | |
| self.overlaps[v1, v2] = None | |
| else: | |
| intersection = intersection.pop() | |
| self.overlaps[v1, v2] = ( | |
| cells1.index(intersection), | |
| cells2.index(intersection) | |
| ) | |
| for cell in cells1: | |
| for cell_ in cells2: | |
| if cell == cell_: | |
| self.overlaps_positions[v1, v2] = cell | |
| break | |
| def neighbors(self, var): | |
| """Given a variable, return set of overlapping variables.""" | |
| return set( | |
| v for v in self.variables | |
| if v != var and self.overlaps[v, var] | |
| ) | |
| class CrosswordCreator(): | |
| def __init__(self, crossword, max_secs = 30, do_random = False): | |
| """ | |
| Create new CSP crossword generate. | |
| """ | |
| self.crossword = crossword | |
| self.back_track_count = 0 | |
| self.memoized_back_track_count = 0 | |
| self.states = [] | |
| self.do_random = do_random | |
| self.memoization_cache = dict() | |
| self.t_revise_time = 0 | |
| self.t_revise_called = 0 | |
| self.consistency_elapsed_time = 0 | |
| self.consistency_called = 0 | |
| self.assignment = {} | |
| self.assignment_stack = [] | |
| self.assignment_order_list = [] | |
| self.start_time = 0 | |
| self.max_time_limit = max_secs | |
| self.var_len_domain = {} | |
| for var in self.crossword.variables: | |
| ans_len = var.length | |
| if ans_len not in self.var_len_domain.keys(): | |
| self.var_len_domain[ans_len] = self.get_required_length_answers(ans_len) | |
| # setting the domains for each variables (faster approach) | |
| self.domains = { | |
| var: [self.var_len_domain[var.length]] for var in self.crossword.variables | |
| } | |
| # # setting up the domains for each of the variables | |
| # self.domains = { | |
| # var: [self.get_required_length_answers(var.length)] | |
| # for var in self.crossword.variables | |
| # } | |
| self.temp_assignment = dict() | |
| while self.select_unassigned_variable(self.temp_assignment) is not None: | |
| var = self.select_unassigned_variable(self.temp_assignment) | |
| answer = 'A' * var.length | |
| self.temp_assignment[var] = answer | |
| self.assignment_order_list.append(var) | |
| # enforcing the node consistency here | |
| def get_required_length_answers(self, ans_length): | |
| output = [] | |
| for word in self.crossword.words: | |
| if len(word) == ans_length: | |
| output.append(word.upper()) | |
| # random.shuffle(output) | |
| # output = output[:5000] | |
| # using set for the domains, do randomizing here is no advantage | |
| return set(output) # lets get the domain answers in list format | |
| def letter_grid(self, assignment): | |
| """ | |
| Return 2D array representing a given assignment. | |
| """ | |
| letters = [ | |
| [None for _ in range(self.crossword.width)] | |
| for _ in range(self.crossword.height) | |
| ] | |
| for variable, word in assignment.items(): | |
| direction = variable.direction | |
| for k in range(len(word)): | |
| i = variable.i + (k if direction == Variable.DOWN else 0) | |
| j = variable.j + (k if direction == Variable.ACROSS else 0) | |
| letters[i][j] = word[k] | |
| return letters | |
| def print(self, assignment): | |
| """ | |
| Print crossword assignment to the terminal. | |
| """ | |
| letters = self.letter_grid(assignment) | |
| for i in range(self.crossword.height): | |
| for j in range(self.crossword.width): | |
| if self.crossword.structure[i][j]: | |
| print(letters[i][j] or " ", end="") | |
| else: | |
| print("██", end="") | |
| print() | |
| def write_info(self, message, filename = 'log_info.txt', filemode = 'a'): | |
| with open(filename, filemode) as f: | |
| f.write(message) | |
| def print_domain_info(self): | |
| ''' | |
| print the variable information with its present domain size. | |
| ''' | |
| for var in self.crossword.variables: | |
| print(f"Cell Position: ({var.i, var.j}) | Direction: {var.direction.upper()} | Length: {var.length} --> Domain Size: {len(self.domains[var][-1])}") | |
| ### here starts the main solving category | |
| def solve(self): | |
| """ | |
| Enforce node and arc consistency, and then solve the CSP. | |
| """ | |
| # print("Before: Domain sizes Arc-Consistency (Pre-Processing Step): ") | |
| # self.print_domain_info() | |
| self.ac3() | |
| # print("\nAfter: Domain sizes Arc-Consistency (Pre-Processing Step): ") | |
| # self.print_domain_info() | |
| self.start_time = time.time() | |
| return self.backtrack(dict()) | |
| def enforce_node_consistency(self): | |
| """ | |
| Update `self.domains` such that each variable is node-consistent. | |
| (Remove any values that are inconsistent with a variable's unary | |
| constraints; in this case, the length of the word.) | |
| """ | |
| for variable in self.crossword.variables: | |
| valid_words = set() | |
| for word in self.domains[variable][-1]: | |
| if len(word) == variable.length: | |
| valid_words.add(word) | |
| self.domains[variable][-1] = valid_words | |
| def revise(self, x, y, forward_checking = False): | |
| """ | |
| Make variable `x` arc consistent with variable `y`. | |
| To do so, remove values from `self.domains[x]` for which there is no | |
| possible corresponding value for `y` in `self.domains[y]`. | |
| Return True if a revision was made to the domain of `x`; return | |
| False if no revision was made. | |
| """ | |
| start_t = time.time() | |
| revised = False | |
| overlap = self.crossword.overlaps[x, y] | |
| if overlap: | |
| y_chars = set(word[overlap[1]] for word in self.domains[y][-1]) # Use a set for faster membership tests | |
| x_domain = self.domains[x][-1] | |
| # Optimize: Use list comprehension for faster filtering | |
| x_domain = {word for word in x_domain if word[overlap[0]] in y_chars} | |
| if len(x_domain) < len(self.domains[x][-1]): | |
| revised = True | |
| if forward_checking: | |
| self.domains[x].append(x_domain) | |
| else: | |
| self.domains[x][-1] = x_domain | |
| end_t = time.time() | |
| self.t_revise_time += end_t - start_t | |
| self.t_revise_called += 1 | |
| return revised | |
| def ac3(self, arcs=None, f_checking = False): | |
| if arcs is None: | |
| arcs = deque([(v1, v2) for v1 in self.crossword.variables for v2 in self.crossword.neighbors(v1)]) | |
| else: | |
| arcs = deque(arcs) | |
| revised_arcs = set() | |
| while arcs: | |
| x, y = arcs.popleft() # Efficient pop from the left | |
| # check if the arc has already been revised | |
| if (x, y) in revised_arcs: | |
| continue | |
| if self.revise(x, y, forward_checking = f_checking): | |
| if len(self.domains[x][-1]) == 0: | |
| return False | |
| revised_arcs.add((x, y)) | |
| for z in self.crossword.neighbors(x) - {y}: | |
| arcs.append((z, x)) | |
| revised_arcs.add((z, x)) | |
| return True | |
| # assigment_complete checkup - longcut way | |
| def assignment_complete(self, assignment): | |
| """ | |
| Return True if `assignment` is complete (i.e., assigns a value to each | |
| crossword variable); return False otherwise. | |
| """ | |
| # self.ASSIGNMENT_COUNT += 1 | |
| self.back_track_count += 1 | |
| self.states.append(assignment.copy()) | |
| # if len(assignment.keys()) / len(self.crossword.variables) >= 9.0: | |
| # return True | |
| complete = True | |
| vars_in_assignment = set(var for var in assignment) | |
| # Checking if all vars in the crossword has been assigned | |
| if vars_in_assignment != self.crossword.variables: | |
| complete = False | |
| for var in assignment: | |
| # making sure no var is empty | |
| # assert isinstance(self.assignment[var], str) | |
| if not assignment[var]: | |
| complete = False | |
| return complete | |
| # phind AI | |
| def consistent(self, assignment): | |
| """ | |
| Return True if `assignment` is consistent (i.e., words fit in crossword | |
| puzzle without conflicting characters); return False otherwise. | |
| """ | |
| start_t = time.time() | |
| values = set() | |
| for var, word in assignment.items(): | |
| if word in values or len(word) != var.length: | |
| end_t = time.time() | |
| self.consistency_elapsed_time += end_t - start_t | |
| self.consistency_called += 1 | |
| return False | |
| values.add(word) | |
| for neighbor in self.crossword.neighbors(var): | |
| overlap = self.crossword.overlaps[var, neighbor] | |
| if neighbor in assignment: | |
| if assignment[var][overlap[0]] != assignment[neighbor][overlap[1]]: | |
| end_t = time.time() | |
| self.consistency_elapsed_time += end_t - start_t | |
| self.consistency_called += 1 | |
| return False | |
| end_t = time.time() | |
| self.consistency_elapsed_time += end_t - start_t | |
| self.consistency_called += 1 | |
| return True | |
| def order_domain_values(self, var, assignment, temp_var_domain): | |
| # start_t = time.time() | |
| values_penalty = Counter() | |
| for neighbor in self.crossword.neighbors(var): | |
| if neighbor not in assignment: | |
| overlap = self.crossword.overlaps[var, neighbor] | |
| neighbor_list = [value[overlap[1]] for value in list(self.domains[neighbor][-1])] | |
| for value in temp_var_domain: | |
| letter_to_be_searched = neighbor_list.count(value[overlap[0]]) | |
| values_penalty[value] += len(neighbor_list) - letter_to_be_searched | |
| priority_queue = [(-values_penalty[value], value) for value in temp_var_domain] | |
| heapq.heapify(priority_queue) | |
| # end_t = time.time() | |
| # print("Ordering the domain values: ", end_t - start_t) | |
| return [value for _, value in priority_queue] | |
| # smart-choosing approach | |
| def select_unassigned_variable(self, assignment): | |
| ## DEBUG purpose | |
| if len(assignment.keys()) == len(self.crossword.variables): | |
| return None | |
| # if it is choosing unassigned variable for the firs time then | |
| var_penalty = {} | |
| if len(assignment.keys()) == 0: | |
| for var in self.crossword.variables: | |
| var_penalty[var] = len(self.crossword.neighbors(var)) | |
| vars = sorted(var_penalty, key = lambda v: var_penalty[v], reverse = True) | |
| return vars[0] # -> if choosing for the first time, then choose the one with the maximum attached neighbors | |
| else: | |
| # lets find the slot that has the most engaging assignment at first, | |
| # if not engaging assignment is found then look for the next in the assignment stack | |
| for var in self.crossword.variables: | |
| var_penalty[var] = [0, 0] | |
| if var not in assignment: | |
| for neighbor in self.crossword.neighbors(var): | |
| if neighbor in assignment: | |
| var_penalty[var][0] += 1 | |
| var_penalty[var][1] = var_penalty[var][0] / var.length | |
| for var, (count, percentage) in var_penalty.items(): | |
| if count > 1: | |
| vars = sorted(var_penalty, key = lambda v: var_penalty[v][1], reverse = True) | |
| return vars[0] | |
| var_penalty = {} | |
| for var_assignment in self.assignment_stack: | |
| var_penalty = {} | |
| for neighbor in self.crossword.neighbors(var_assignment): | |
| if neighbor not in assignment: | |
| var_penalty[neighbor] = len(self.crossword.neighbors(neighbor)) | |
| if len(var_penalty.keys()) != 0: | |
| vars = sorted(var_penalty, key = lambda v: var_penalty[v], reverse = True) | |
| return vars[0] | |
| if len(var_penalty.keys()) == 0: | |
| for var in self.crossword.variables: | |
| if var not in assignment: | |
| var_penalty[var] = len(self.crossword.neighbors(var)) | |
| vars = sorted(var_penalty, key = lambda v: var_penalty[v], reverse = True) | |
| return vars[0] | |
| else: | |
| vars = sorted(var_penalty, key = lambda v: var_penalty[v], reverse = True) | |
| return vars[0] | |
| def backtrack(self, assignment, assigned_var = None): | |
| """ | |
| Using Backtracking Search, take as input a partial assignment for the | |
| crossword and return a complete assignment if possible to do so. | |
| `assignment` is a mapping from variables (keys) to words (values). | |
| If no assignment is possible, return None. | |
| """ | |
| if (time.time() - self.start_time) > self.max_time_limit: | |
| print("Exceed Time Limit") | |
| return assignment | |
| if self.assignment_complete(assignment): | |
| # print(assignment) | |
| return assignment # base case | |
| # lets have some caching done here | |
| assignment_key = frozenset(assignment.items()) | |
| if assignment_key in self.memoization_cache: | |
| # self.ASSIGNMENT_COUNT += 1 | |
| self.memoized_back_track_count += 1 | |
| return self.memoization_cache[assignment_key] | |
| # selecting a new variable | |
| var = self.select_unassigned_variable(assignment) | |
| did_shorten_domain = False | |
| temp_domain_var = self.domains[var][-1].copy() | |
| self.write_info(message = f"{var} | Before: {len(temp_domain_var)}\n") | |
| if len(assignment.keys()) > 0: | |
| for variable in self.crossword.variables: | |
| if var != variable and variable in assignment.keys(): | |
| overlap = self.crossword.overlaps[var, variable] | |
| if overlap: | |
| ref_cross_section_word = assignment[variable] | |
| ref_intersection_letter = ref_cross_section_word[overlap[1]] | |
| # Filter the words in the domain of var | |
| did_shorten_domain = True | |
| temp_domain_var = {word for word in temp_domain_var if word[overlap[0]] == ref_intersection_letter} | |
| self.write_info(message = f"{var} | After: {len(temp_domain_var)}\n") | |
| if did_shorten_domain: | |
| if len(temp_domain_var) == 0: | |
| self.memoization_cache[assignment_key] = None | |
| return None | |
| else: | |
| self.domains[var].append(temp_domain_var) | |
| revised_neighbor_list = [] | |
| arc_list = deque([(neighbor, var) for neighbor in self.crossword.neighbors(var)]) | |
| revised_arcs = set() | |
| while arc_list: | |
| x, y = arc_list.pop() | |
| if (x, y) in revised_arcs: | |
| continue | |
| if self.revise(x, y, forward_checking = True): | |
| revised_neighbor_list.append(x) | |
| for z in self.crossword.neighbors(x) - {y}: | |
| arc_list.append((z, x)) | |
| revised_arcs.add((z, x)) | |
| for var_ in self.crossword.variables: | |
| if var_ not in self.assignment: | |
| if len(self.domains[var_][-1]) == 0: | |
| for n in revised_neighbor_list: | |
| self.domains[n].pop() | |
| revised_neighbor_list = [] | |
| return None | |
| # lets introduce the randomness in iterating the values of the assigned variabel | |
| if self.do_random: | |
| shuffled_curr_domain = list(self.domains[var][-1].copy()) | |
| random.shuffle(shuffled_curr_domain) | |
| domain_values = shuffled_curr_domain if self.do_random else list(self.domains[var][-1]) | |
| # print(var, len(domain_values)) | |
| # new_assignment = assignment.copy() | |
| for value in domain_values: | |
| # print(var, value) | |
| self.write_info(message = f"{var} || Answer: {value}\n") | |
| # new_assignment[var] = value | |
| assignment[var] = value | |
| if self.consistent(assignment): | |
| self.assignment_stack.append(var) | |
| result = self.backtrack(assignment, var) | |
| if result is not None: | |
| self.memoization_cache[assignment_key] = result | |
| return result | |
| self.assignment_stack.pop() | |
| for n in revised_neighbor_list: | |
| self.domains[n].pop() | |
| revised_neighbor_list = set() | |
| if did_shorten_domain: | |
| self.domains[var].pop() | |
| self.memoization_cache[assignment_key] = None | |
| # if assigned_var is not None: | |
| assignment.pop(var) | |
| return None | |
| def solve_grid(grid_json_data, | |
| answer_list_path, | |
| hash_map_path, | |
| time_limit = 40): # time limit in seconds | |
| rows = grid_json_data['size']['rows'] | |
| cols = grid_json_data['size']['cols'] | |
| grid_alt = grid_json_data['grid'] | |
| grid = grid_json_data['grid'] | |
| main_grid = [['.'] * cols for row in range(rows)] | |
| reshaped_grid = [grid[i:i + rows] for i in range(0, len(grid), rows)] | |
| gridnums_2d = [grid_json_data['gridnums'][i:i + rows] for i in range(0, len(grid), rows)] | |
| grid = [] | |
| for i in reshaped_grid: | |
| row = [] | |
| for j in i: | |
| if j == '.': | |
| row.append('') | |
| else: | |
| row.append('A') | |
| grid.append(row) | |
| crossword = Crossword(grid, answer_list_path) | |
| creator = CrosswordCreator(crossword, max_secs = time_limit, do_random = True) | |
| assignment = creator.solve() | |
| if assignment is not None: | |
| print("Solved Successfully !!!") | |
| for variable, answer in assignment.items(): | |
| ith_row, jth_col = variable.i, variable.j | |
| ans_length = variable.length | |
| ans_direction = variable.direction | |
| if ans_direction == 'across': | |
| for char_index, k in enumerate(range(jth_col, jth_col + ans_length)): | |
| main_grid[ith_row][k] = answer[char_index] | |
| elif ans_direction == 'down': | |
| for char_index, k in enumerate(range(ith_row, ith_row + ans_length)): | |
| main_grid[k][jth_col] = answer[char_index] | |
| else: | |
| print("Didn't solve the given grid within framed time limit.") | |
| return None | |
| grid_json_data['grid'] = [element for row in main_grid for element in row] | |
| for i in range(0,len(grid_alt)): | |
| if(grid_alt[i]=='.'): | |
| if(grid_json_data['grid'][i]!='.'): | |
| grid_json_data['grid'][i]='.' | |
| else: | |
| if(grid_json_data['grid'][i]=='.'): | |
| grid_json_data['grid'][i] = ' ' | |
| print("Loading the Answer-Clue Hash Map !!!") | |
| with open(hash_map_path, 'rb') as f: | |
| ans_clue_map = pickle.load(f) | |
| grid_json_data['answers'] = { | |
| "across": [], | |
| "down": [] | |
| } | |
| across_list = [] | |
| down_list = [] | |
| for variable, answer in assignment.items(): | |
| grid_num = gridnums_2d[variable.i][variable.j] | |
| clue = random.choice(ans_clue_map[answer.lower()]).capitalize() | |
| if variable.direction == 'across': | |
| across_list.append([grid_num, f"{grid_num}. {clue.capitalize()}", answer.upper()]) | |
| elif variable.direction == 'down': | |
| down_list.append([grid_num, f"{grid_num}. {clue.capitalize()}", answer.upper()]) | |
| across_list.sort(key = lambda x: x[0]) | |
| down_list.sort(key = lambda x: x[0]) | |
| for data in across_list: | |
| grid_json_data['clues']['across'].append(data[1]) | |
| grid_json_data['answers']['across'].append(data[2]) | |
| for data in down_list: | |
| grid_json_data['clues']['down'].append(data[1]) | |
| grid_json_data['answers']['down'].append(data[2]) | |
| return grid_json_data | |