| | """ |
| | |
| | OMRChecker |
| | |
| | Author: Udayraj Deshmukh |
| | Github: https://github.com/Udayraj123 |
| | |
| | """ |
| | from src.constants import FIELD_TYPES |
| | from src.core import ImageInstanceOps |
| | from src.logger import logger |
| | from src.processors.manager import PROCESSOR_MANAGER |
| | from src.utils.parsing import ( |
| | custom_sort_output_columns, |
| | open_template_with_defaults, |
| | parse_fields, |
| | ) |
| |
|
| |
|
| | class Template: |
| | def __init__(self, template_path, tuning_config): |
| | self.path = template_path |
| | self.image_instance_ops = ImageInstanceOps(tuning_config) |
| |
|
| | json_object = open_template_with_defaults(template_path) |
| | ( |
| | custom_labels_object, |
| | field_blocks_object, |
| | output_columns_array, |
| | pre_processors_object, |
| | self.bubble_dimensions, |
| | self.global_empty_val, |
| | self.options, |
| | self.page_dimensions, |
| | ) = map( |
| | json_object.get, |
| | [ |
| | "customLabels", |
| | "fieldBlocks", |
| | "outputColumns", |
| | "preProcessors", |
| | "bubbleDimensions", |
| | "emptyValue", |
| | "options", |
| | "pageDimensions", |
| | ], |
| | ) |
| |
|
| | self.parse_output_columns(output_columns_array) |
| | self.setup_pre_processors(pre_processors_object, template_path.parent) |
| | self.setup_field_blocks(field_blocks_object) |
| | self.parse_custom_labels(custom_labels_object) |
| |
|
| | non_custom_columns, all_custom_columns = ( |
| | list(self.non_custom_labels), |
| | list(custom_labels_object.keys()), |
| | ) |
| |
|
| | if len(self.output_columns) == 0: |
| | self.fill_output_columns(non_custom_columns, all_custom_columns) |
| |
|
| | self.validate_template_columns(non_custom_columns, all_custom_columns) |
| |
|
| | def parse_output_columns(self, output_columns_array): |
| | self.output_columns = parse_fields(f"Output Columns", output_columns_array) |
| |
|
| | def setup_pre_processors(self, pre_processors_object, relative_dir): |
| | |
| | self.pre_processors = [] |
| | for pre_processor in pre_processors_object: |
| | ProcessorClass = PROCESSOR_MANAGER.processors[pre_processor["name"]] |
| | pre_processor_instance = ProcessorClass( |
| | options=pre_processor["options"], |
| | relative_dir=relative_dir, |
| | image_instance_ops=self.image_instance_ops, |
| | ) |
| | self.pre_processors.append(pre_processor_instance) |
| |
|
| | def setup_field_blocks(self, field_blocks_object): |
| | |
| | self.field_blocks = [] |
| | self.all_parsed_labels = set() |
| | for block_name, field_block_object in field_blocks_object.items(): |
| | self.parse_and_add_field_block(block_name, field_block_object) |
| |
|
| | def parse_custom_labels(self, custom_labels_object): |
| | all_parsed_custom_labels = set() |
| | self.custom_labels = {} |
| | for custom_label, label_strings in custom_labels_object.items(): |
| | parsed_labels = parse_fields(f"Custom Label: {custom_label}", label_strings) |
| | parsed_labels_set = set(parsed_labels) |
| | self.custom_labels[custom_label] = parsed_labels |
| |
|
| | missing_custom_labels = sorted( |
| | parsed_labels_set.difference(self.all_parsed_labels) |
| | ) |
| | if len(missing_custom_labels) > 0: |
| | logger.critical( |
| | f"For '{custom_label}', Missing labels - {missing_custom_labels}" |
| | ) |
| | raise Exception( |
| | f"Missing field block label(s) in the given template for {missing_custom_labels} from '{custom_label}'" |
| | ) |
| |
|
| | if not all_parsed_custom_labels.isdisjoint(parsed_labels_set): |
| | |
| | logger.critical( |
| | f"field strings overlap for labels: {label_strings} and existing custom labels: {all_parsed_custom_labels}" |
| | ) |
| | raise Exception( |
| | f"The field strings for custom label '{custom_label}' overlap with other existing custom labels" |
| | ) |
| |
|
| | all_parsed_custom_labels.update(parsed_labels) |
| |
|
| | self.non_custom_labels = self.all_parsed_labels.difference( |
| | all_parsed_custom_labels |
| | ) |
| |
|
| | def fill_output_columns(self, non_custom_columns, all_custom_columns): |
| | all_template_columns = non_custom_columns + all_custom_columns |
| | |
| | self.output_columns = sorted( |
| | all_template_columns, key=custom_sort_output_columns |
| | ) |
| |
|
| | def validate_template_columns(self, non_custom_columns, all_custom_columns): |
| | output_columns_set = set(self.output_columns) |
| | all_custom_columns_set = set(all_custom_columns) |
| |
|
| | missing_output_columns = sorted( |
| | output_columns_set.difference(all_custom_columns_set).difference( |
| | self.all_parsed_labels |
| | ) |
| | ) |
| | if len(missing_output_columns) > 0: |
| | logger.critical(f"Missing output columns: {missing_output_columns}") |
| | raise Exception( |
| | f"Some columns are missing in the field blocks for the given output columns" |
| | ) |
| |
|
| | all_template_columns_set = set(non_custom_columns + all_custom_columns) |
| | missing_label_columns = sorted( |
| | all_template_columns_set.difference(output_columns_set) |
| | ) |
| | if len(missing_label_columns) > 0: |
| | logger.warning( |
| | f"Some label columns are not covered in the given output columns: {missing_label_columns}" |
| | ) |
| |
|
| | def parse_and_add_field_block(self, block_name, field_block_object): |
| | field_block_object = self.pre_fill_field_block(field_block_object) |
| | block_instance = FieldBlock(block_name, field_block_object) |
| | self.field_blocks.append(block_instance) |
| | self.validate_parsed_labels(field_block_object["fieldLabels"], block_instance) |
| |
|
| | def pre_fill_field_block(self, field_block_object): |
| | if "fieldType" in field_block_object: |
| | field_block_object = { |
| | **field_block_object, |
| | **FIELD_TYPES[field_block_object["fieldType"]], |
| | } |
| | else: |
| | field_block_object = {**field_block_object, "fieldType": "__CUSTOM__"} |
| |
|
| | return { |
| | "direction": "vertical", |
| | "emptyValue": self.global_empty_val, |
| | "bubbleDimensions": self.bubble_dimensions, |
| | **field_block_object, |
| | } |
| |
|
| | def validate_parsed_labels(self, field_labels, block_instance): |
| | parsed_field_labels, block_name = ( |
| | block_instance.parsed_field_labels, |
| | block_instance.name, |
| | ) |
| | field_labels_set = set(parsed_field_labels) |
| | if not self.all_parsed_labels.isdisjoint(field_labels_set): |
| | |
| | logger.critical( |
| | f"An overlap found between field string: {field_labels} in block '{block_name}' and existing labels: {self.all_parsed_labels}" |
| | ) |
| | raise Exception( |
| | f"The field strings for field block {block_name} overlap with other existing fields" |
| | ) |
| | self.all_parsed_labels.update(field_labels_set) |
| |
|
| | page_width, page_height = self.page_dimensions |
| | block_width, block_height = block_instance.dimensions |
| | [block_start_x, block_start_y] = block_instance.origin |
| |
|
| | block_end_x, block_end_y = ( |
| | block_start_x + block_width, |
| | block_start_y + block_height, |
| | ) |
| |
|
| | if ( |
| | block_end_x >= page_width |
| | or block_end_y >= page_height |
| | or block_start_x < 0 |
| | or block_start_y < 0 |
| | ): |
| | raise Exception( |
| | f"Overflowing field block '{block_name}' with origin {block_instance.origin} and dimensions {block_instance.dimensions} in template with dimensions {self.page_dimensions}" |
| | ) |
| |
|
| | def __str__(self): |
| | return str(self.path) |
| |
|
| |
|
| | class FieldBlock: |
| | def __init__(self, block_name, field_block_object): |
| | self.name = block_name |
| | self.shift = 0 |
| | self.setup_field_block(field_block_object) |
| |
|
| | def setup_field_block(self, field_block_object): |
| | |
| | ( |
| | bubble_dimensions, |
| | bubble_values, |
| | bubbles_gap, |
| | direction, |
| | field_labels, |
| | field_type, |
| | labels_gap, |
| | origin, |
| | self.empty_val, |
| | ) = map( |
| | field_block_object.get, |
| | [ |
| | "bubbleDimensions", |
| | "bubbleValues", |
| | "bubblesGap", |
| | "direction", |
| | "fieldLabels", |
| | "fieldType", |
| | "labelsGap", |
| | "origin", |
| | "emptyValue", |
| | ], |
| | ) |
| | self.parsed_field_labels = parse_fields( |
| | f"Field Block Labels: {self.name}", field_labels |
| | ) |
| | self.origin = origin |
| | self.bubble_dimensions = bubble_dimensions |
| | self.calculate_block_dimensions( |
| | bubble_dimensions, |
| | bubble_values, |
| | bubbles_gap, |
| | direction, |
| | labels_gap, |
| | ) |
| | self.generate_bubble_grid( |
| | bubble_values, |
| | bubbles_gap, |
| | direction, |
| | field_type, |
| | labels_gap, |
| | ) |
| |
|
| | def calculate_block_dimensions( |
| | self, |
| | bubble_dimensions, |
| | bubble_values, |
| | bubbles_gap, |
| | direction, |
| | labels_gap, |
| | ): |
| | _h, _v = (1, 0) if (direction == "vertical") else (0, 1) |
| |
|
| | values_dimension = int( |
| | bubbles_gap * (len(bubble_values) - 1) + bubble_dimensions[_h] |
| | ) |
| | fields_dimension = int( |
| | labels_gap * (len(self.parsed_field_labels) - 1) + bubble_dimensions[_v] |
| | ) |
| | self.dimensions = ( |
| | [fields_dimension, values_dimension] |
| | if (direction == "vertical") |
| | else [values_dimension, fields_dimension] |
| | ) |
| |
|
| | def generate_bubble_grid( |
| | self, |
| | bubble_values, |
| | bubbles_gap, |
| | direction, |
| | field_type, |
| | labels_gap, |
| | ): |
| | _h, _v = (1, 0) if (direction == "vertical") else (0, 1) |
| | self.traverse_bubbles = [] |
| | |
| | lead_point = [float(self.origin[0]), float(self.origin[1])] |
| | for field_label in self.parsed_field_labels: |
| | bubble_point = lead_point.copy() |
| | field_bubbles = [] |
| | for bubble_value in bubble_values: |
| | field_bubbles.append( |
| | Bubble(bubble_point.copy(), field_label, field_type, bubble_value) |
| | ) |
| | bubble_point[_h] += bubbles_gap |
| | self.traverse_bubbles.append(field_bubbles) |
| | lead_point[_v] += labels_gap |
| |
|
| |
|
| | class Bubble: |
| | """ |
| | Container for a Point Box on the OMR |
| | |
| | field_label is the point's property- field to which this point belongs to |
| | It can be used as a roll number column as well. (eg roll1) |
| | It can also correspond to a single digit of integer type Q (eg q5d1) |
| | """ |
| |
|
| | def __init__(self, pt, field_label, field_type, field_value): |
| | self.x = round(pt[0]) |
| | self.y = round(pt[1]) |
| | self.field_label = field_label |
| | self.field_type = field_type |
| | self.field_value = field_value |
| |
|
| | def __str__(self): |
| | return str([self.x, self.y]) |
| |
|