| | import ast |
| | import os |
| | import re |
| | from copy import deepcopy |
| | from csv import QUOTE_NONNUMERIC |
| |
|
| | import cv2 |
| | import pandas as pd |
| | from rich.table import Table |
| |
|
| | from src.logger import console, logger |
| | from src.schemas.constants import ( |
| | BONUS_SECTION_PREFIX, |
| | DEFAULT_SECTION_KEY, |
| | MARKING_VERDICT_TYPES, |
| | ) |
| | from src.utils.parsing import ( |
| | get_concatenated_response, |
| | open_evaluation_with_validation, |
| | parse_fields, |
| | parse_float_or_fraction, |
| | ) |
| |
|
| |
|
| | class AnswerMatcher: |
| | def __init__(self, answer_item, section_marking_scheme): |
| | self.section_marking_scheme = section_marking_scheme |
| | self.answer_item = answer_item |
| | self.answer_type = self.validate_and_get_answer_type(answer_item) |
| | self.set_defaults_from_scheme(section_marking_scheme) |
| |
|
| | @staticmethod |
| | def is_a_marking_score(answer_element): |
| | |
| | |
| | return type(answer_element) == str or type(answer_element) == int |
| |
|
| | @staticmethod |
| | def is_standard_answer(answer_element): |
| | return type(answer_element) == str and len(answer_element) >= 1 |
| |
|
| | def validate_and_get_answer_type(self, answer_item): |
| | if self.is_standard_answer(answer_item): |
| | return "standard" |
| | elif type(answer_item) == list: |
| | if ( |
| | |
| | len(answer_item) >= 2 |
| | and all( |
| | self.is_standard_answer(answers_or_score) |
| | for answers_or_score in answer_item |
| | ) |
| | ): |
| | return "multiple-correct" |
| | elif ( |
| | |
| | len(answer_item) >= 1 |
| | and all( |
| | type(answer_and_score) == list and len(answer_and_score) == 2 |
| | for answer_and_score in answer_item |
| | ) |
| | and all( |
| | self.is_standard_answer(allowed_answer) |
| | and self.is_a_marking_score(answer_score) |
| | for allowed_answer, answer_score in answer_item |
| | ) |
| | ): |
| | return "multiple-correct-weighted" |
| |
|
| | logger.critical( |
| | f"Unable to determine answer type for answer item: {answer_item}" |
| | ) |
| | raise Exception("Unable to determine answer type") |
| |
|
| | def set_defaults_from_scheme(self, section_marking_scheme): |
| | answer_type = self.answer_type |
| | self.empty_val = section_marking_scheme.empty_val |
| | answer_item = self.answer_item |
| | self.marking = deepcopy(section_marking_scheme.marking) |
| | |
| | if answer_type == "standard": |
| | |
| | pass |
| | elif answer_type == "multiple-correct": |
| | |
| | for allowed_answer in answer_item: |
| | self.marking[f"correct-{allowed_answer}"] = self.marking["correct"] |
| | elif answer_type == "multiple-correct-weighted": |
| | |
| | for allowed_answer, answer_score in answer_item: |
| | self.marking[f"correct-{allowed_answer}"] = parse_float_or_fraction( |
| | answer_score |
| | ) |
| |
|
| | def get_marking_scheme(self): |
| | return self.section_marking_scheme |
| |
|
| | def get_section_explanation(self): |
| | answer_type = self.answer_type |
| | if answer_type in ["standard", "multiple-correct"]: |
| | return self.section_marking_scheme.section_key |
| | elif answer_type == "multiple-correct-weighted": |
| | return f"Custom: {self.marking}" |
| |
|
| | def get_verdict_marking(self, marked_answer): |
| | answer_type = self.answer_type |
| | question_verdict = "incorrect" |
| | if answer_type == "standard": |
| | question_verdict = self.get_standard_verdict(marked_answer) |
| | elif answer_type == "multiple-correct": |
| | question_verdict = self.get_multiple_correct_verdict(marked_answer) |
| | elif answer_type == "multiple-correct-weighted": |
| | question_verdict = self.get_multiple_correct_weighted_verdict(marked_answer) |
| | return question_verdict, self.marking[question_verdict] |
| |
|
| | def get_standard_verdict(self, marked_answer): |
| | allowed_answer = self.answer_item |
| | if marked_answer == self.empty_val: |
| | return "unmarked" |
| | elif marked_answer == allowed_answer: |
| | return "correct" |
| | else: |
| | return "incorrect" |
| |
|
| | def get_multiple_correct_verdict(self, marked_answer): |
| | allowed_answers = self.answer_item |
| | if marked_answer == self.empty_val: |
| | return "unmarked" |
| | elif marked_answer in allowed_answers: |
| | return f"correct-{marked_answer}" |
| | else: |
| | return "incorrect" |
| |
|
| | def get_multiple_correct_weighted_verdict(self, marked_answer): |
| | allowed_answers = [ |
| | allowed_answer for allowed_answer, _answer_score in self.answer_item |
| | ] |
| | if marked_answer == self.empty_val: |
| | return "unmarked" |
| | elif marked_answer in allowed_answers: |
| | return f"correct-{marked_answer}" |
| | else: |
| | return "incorrect" |
| |
|
| | def __str__(self): |
| | return f"{self.answer_item}" |
| |
|
| |
|
| | class SectionMarkingScheme: |
| | def __init__(self, section_key, section_scheme, empty_val): |
| | |
| | self.empty_val = empty_val |
| | self.section_key = section_key |
| | |
| | if section_key == DEFAULT_SECTION_KEY: |
| | self.questions = None |
| | self.marking = self.parse_scheme_marking(section_scheme) |
| | else: |
| | self.questions = parse_fields(section_key, section_scheme["questions"]) |
| | self.marking = self.parse_scheme_marking(section_scheme["marking"]) |
| |
|
| | def __str__(self): |
| | return self.section_key |
| |
|
| | def parse_scheme_marking(self, marking): |
| | parsed_marking = {} |
| | for verdict_type in MARKING_VERDICT_TYPES: |
| | verdict_marking = parse_float_or_fraction(marking[verdict_type]) |
| | if ( |
| | verdict_marking > 0 |
| | and verdict_type == "incorrect" |
| | and not self.section_key.startswith(BONUS_SECTION_PREFIX) |
| | ): |
| | logger.warning( |
| | f"Found positive marks({round(verdict_marking, 2)}) for incorrect answer in the schema '{self.section_key}'. For Bonus sections, add a prefix 'BONUS_' to them." |
| | ) |
| | parsed_marking[verdict_type] = verdict_marking |
| |
|
| | return parsed_marking |
| |
|
| | def match_answer(self, marked_answer, answer_matcher): |
| | question_verdict, verdict_marking = answer_matcher.get_verdict_marking( |
| | marked_answer |
| | ) |
| |
|
| | return verdict_marking, question_verdict |
| |
|
| |
|
| | class EvaluationConfig: |
| | """Note: this instance will be reused for multiple omr sheets""" |
| |
|
| | def __init__(self, curr_dir, evaluation_path, template, tuning_config): |
| | self.path = evaluation_path |
| | evaluation_json = open_evaluation_with_validation(evaluation_path) |
| | options, marking_schemes, source_type = map( |
| | evaluation_json.get, ["options", "marking_schemes", "source_type"] |
| | ) |
| | self.should_explain_scoring = options.get("should_explain_scoring", False) |
| | self.has_non_default_section = False |
| | self.exclude_files = [] |
| | self.enable_evaluation_table_to_csv = options.get( |
| | "enable_evaluation_table_to_csv", False |
| | ) |
| |
|
| | if source_type == "csv": |
| | csv_path = curr_dir.joinpath(options["answer_key_csv_path"]) |
| | if not os.path.exists(csv_path): |
| | logger.warning(f"Answer key csv does not exist at: '{csv_path}'.") |
| |
|
| | answer_key_image_path = options.get("answer_key_image_path", None) |
| | if os.path.exists(csv_path): |
| | |
| | answer_key = pd.read_csv( |
| | csv_path, |
| | header=None, |
| | names=["question", "answer"], |
| | converters={"question": str, "answer": self.parse_answer_column}, |
| | ) |
| |
|
| | self.questions_in_order = answer_key["question"].to_list() |
| | answers_in_order = answer_key["answer"].to_list() |
| | elif not answer_key_image_path: |
| | raise Exception(f"Answer key csv not found at '{csv_path}'") |
| | else: |
| | image_path = str(curr_dir.joinpath(answer_key_image_path)) |
| | if not os.path.exists(image_path): |
| | raise Exception(f"Answer key image not found at '{image_path}'") |
| |
|
| | |
| |
|
| | logger.debug( |
| | f"Attempting to generate answer key from image: '{image_path}'" |
| | ) |
| | |
| | in_omr = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
| | in_omr = template.image_instance_ops.apply_preprocessors( |
| | image_path, in_omr, template |
| | ) |
| | if in_omr is None: |
| | raise Exception( |
| | f"Could not read answer key from image {image_path}" |
| | ) |
| | ( |
| | response_dict, |
| | _final_marked, |
| | _multi_marked, |
| | _multi_roll, |
| | ) = template.image_instance_ops.read_omr_response( |
| | template, |
| | image=in_omr, |
| | name=image_path, |
| | save_dir=None, |
| | ) |
| | omr_response = get_concatenated_response(response_dict, template) |
| |
|
| | empty_val = template.global_empty_val |
| | empty_answer_regex = ( |
| | rf"{re.escape(empty_val)}+" if empty_val != "" else r"^$" |
| | ) |
| |
|
| | if "questions_in_order" in options: |
| | self.questions_in_order = self.parse_questions_in_order( |
| | options["questions_in_order"] |
| | ) |
| | empty_answered_questions = [ |
| | question |
| | for question in self.questions_in_order |
| | if re.search(empty_answer_regex, omr_response[question]) |
| | ] |
| | if len(empty_answered_questions) > 0: |
| | logger.error( |
| | f"Found empty answers for questions: {empty_answered_questions}, empty value used: '{empty_val}'" |
| | ) |
| | raise Exception( |
| | f"Found empty answers in file '{image_path}'. Please check your template again in the --setLayout mode." |
| | ) |
| | else: |
| | logger.warning( |
| | f"questions_in_order not provided, proceeding to use non-empty values as answer key" |
| | ) |
| | self.questions_in_order = sorted( |
| | question |
| | for (question, answer) in omr_response.items() |
| | if not re.search(empty_answer_regex, answer) |
| | ) |
| | answers_in_order = [ |
| | omr_response[question] for question in self.questions_in_order |
| | ] |
| | |
| | else: |
| | self.questions_in_order = self.parse_questions_in_order( |
| | options["questions_in_order"] |
| | ) |
| | answers_in_order = options["answers_in_order"] |
| |
|
| | self.validate_questions(answers_in_order) |
| |
|
| | self.section_marking_schemes, self.question_to_scheme = {}, {} |
| | for section_key, section_scheme in marking_schemes.items(): |
| | section_marking_scheme = SectionMarkingScheme( |
| | section_key, section_scheme, template.global_empty_val |
| | ) |
| | if section_key != DEFAULT_SECTION_KEY: |
| | self.section_marking_schemes[section_key] = section_marking_scheme |
| | for q in section_marking_scheme.questions: |
| | |
| | self.question_to_scheme[q] = section_marking_scheme |
| | self.has_non_default_section = True |
| | else: |
| | self.default_marking_scheme = section_marking_scheme |
| |
|
| | self.validate_marking_schemes() |
| |
|
| | self.question_to_answer_matcher = self.parse_answers_and_map_questions( |
| | answers_in_order |
| | ) |
| | self.validate_answers(answers_in_order, tuning_config) |
| |
|
| | def __str__(self): |
| | return str(self.path) |
| |
|
| | |
| | def prepare_and_validate_omr_response(self, omr_response): |
| | self.reset_explanation_table() |
| |
|
| | omr_response_questions = set(omr_response.keys()) |
| | all_questions = set(self.questions_in_order) |
| | missing_questions = sorted(all_questions.difference(omr_response_questions)) |
| | if len(missing_questions) > 0: |
| | logger.critical(f"Missing OMR response for: {missing_questions}") |
| | raise Exception( |
| | f"Some questions are missing in the OMR response for the given answer key" |
| | ) |
| |
|
| | prefixed_omr_response_questions = set( |
| | [k for k in omr_response.keys() if k.startswith("q")] |
| | ) |
| | missing_prefixed_questions = sorted( |
| | prefixed_omr_response_questions.difference(all_questions) |
| | ) |
| | if len(missing_prefixed_questions) > 0: |
| | logger.warning( |
| | f"No answer given for potential questions in OMR response: {missing_prefixed_questions}" |
| | ) |
| |
|
| | def match_answer_for_question(self, current_score, question, marked_answer): |
| | answer_matcher = self.question_to_answer_matcher[question] |
| | question_verdict, delta = answer_matcher.get_verdict_marking(marked_answer) |
| | self.conditionally_add_explanation( |
| | answer_matcher, |
| | delta, |
| | marked_answer, |
| | question_verdict, |
| | question, |
| | current_score, |
| | ) |
| | return delta |
| |
|
| | def conditionally_print_explanation(self): |
| | if self.should_explain_scoring: |
| | console.print(self.explanation_table, justify="center") |
| |
|
| | |
| | def conditionally_save_explanation_csv(self, file_path, evaluation_output_dir): |
| | if self.enable_evaluation_table_to_csv: |
| | data = {col.header: col._cells for col in self.explanation_table.columns} |
| |
|
| | output_path = os.path.join( |
| | evaluation_output_dir, |
| | f"{file_path.stem}_evaluation.csv", |
| | ) |
| |
|
| | pd.DataFrame(data, dtype=str).to_csv( |
| | output_path, |
| | mode="a", |
| | quoting=QUOTE_NONNUMERIC, |
| | index=False, |
| | ) |
| |
|
| | def get_should_explain_scoring(self): |
| | return self.should_explain_scoring |
| |
|
| | def get_exclude_files(self): |
| | return self.exclude_files |
| |
|
| | @staticmethod |
| | def parse_answer_column(answer_column): |
| | if answer_column[0] == "[": |
| | |
| | parsed_answer = ast.literal_eval(answer_column) |
| | elif "," in answer_column: |
| | |
| | parsed_answer = answer_column.split(",") |
| | else: |
| | |
| | parsed_answer = answer_column |
| | return parsed_answer |
| |
|
| | def parse_questions_in_order(self, questions_in_order): |
| | return parse_fields("questions_in_order", questions_in_order) |
| |
|
| | def validate_answers(self, answers_in_order, tuning_config): |
| | answer_matcher_map = self.question_to_answer_matcher |
| | if tuning_config.outputs.filter_out_multimarked_files: |
| | multi_marked_answer = False |
| | for question, answer_item in zip(self.questions_in_order, answers_in_order): |
| | answer_type = answer_matcher_map[question].answer_type |
| | if answer_type == "standard": |
| | if len(answer_item) > 1: |
| | multi_marked_answer = True |
| | if answer_type == "multiple-correct": |
| | for single_answer in answer_item: |
| | if len(single_answer) > 1: |
| | multi_marked_answer = True |
| | break |
| | if answer_type == "multiple-correct-weighted": |
| | for single_answer, _answer_score in answer_item: |
| | if len(single_answer) > 1: |
| | multi_marked_answer = True |
| |
|
| | if multi_marked_answer: |
| | raise Exception( |
| | f"Provided answer key contains multiple correct answer(s), but config.filter_out_multimarked_files is True. Scoring will get skipped." |
| | ) |
| |
|
| | def validate_questions(self, answers_in_order): |
| | questions_in_order = self.questions_in_order |
| | len_questions_in_order, len_answers_in_order = len(questions_in_order), len( |
| | answers_in_order |
| | ) |
| | if len_questions_in_order != len_answers_in_order: |
| | logger.critical( |
| | f"questions_in_order({len_questions_in_order}): {questions_in_order}\nanswers_in_order({len_answers_in_order}): {answers_in_order}" |
| | ) |
| | raise Exception( |
| | f"Unequal lengths for questions_in_order and answers_in_order ({len_questions_in_order} != {len_answers_in_order})" |
| | ) |
| |
|
| | def validate_marking_schemes(self): |
| | section_marking_schemes = self.section_marking_schemes |
| | section_questions = set() |
| | for section_key, section_scheme in section_marking_schemes.items(): |
| | if section_key == DEFAULT_SECTION_KEY: |
| | continue |
| | current_set = set(section_scheme.questions) |
| | if not section_questions.isdisjoint(current_set): |
| | raise Exception( |
| | f"Section '{section_key}' has overlapping question(s) with other sections" |
| | ) |
| | section_questions = section_questions.union(current_set) |
| |
|
| | all_questions = set(self.questions_in_order) |
| | missing_questions = sorted(section_questions.difference(all_questions)) |
| | if len(missing_questions) > 0: |
| | logger.critical(f"Missing answer key for: {missing_questions}") |
| | raise Exception( |
| | f"Some questions are missing in the answer key for the given marking scheme" |
| | ) |
| |
|
| | def parse_answers_and_map_questions(self, answers_in_order): |
| | question_to_answer_matcher = {} |
| | for question, answer_item in zip(self.questions_in_order, answers_in_order): |
| | section_marking_scheme = self.get_marking_scheme_for_question(question) |
| | answer_matcher = AnswerMatcher(answer_item, section_marking_scheme) |
| | question_to_answer_matcher[question] = answer_matcher |
| | if ( |
| | answer_matcher.answer_type == "multiple-correct-weighted" |
| | and section_marking_scheme.section_key != DEFAULT_SECTION_KEY |
| | ): |
| | logger.warning( |
| | f"The custom scheme '{section_marking_scheme}' will not apply to question '{question}' as it will use the given answer weights f{answer_item}" |
| | ) |
| | return question_to_answer_matcher |
| |
|
| | |
| | def reset_explanation_table(self): |
| | self.explanation_table = None |
| | self.prepare_explanation_table() |
| |
|
| | def prepare_explanation_table(self): |
| | |
| | if not self.should_explain_scoring: |
| | return |
| | table = Table(title="Evaluation Explanation Table", show_lines=True) |
| | table.add_column("Question") |
| | table.add_column("Marked") |
| | table.add_column("Answer(s)") |
| | table.add_column("Verdict") |
| | table.add_column("Delta") |
| | table.add_column("Score") |
| | |
| | if self.has_non_default_section: |
| | table.add_column("Section") |
| | self.explanation_table = table |
| |
|
| | def get_marking_scheme_for_question(self, question): |
| | return self.question_to_scheme.get(question, self.default_marking_scheme) |
| |
|
| | def conditionally_add_explanation( |
| | self, |
| | answer_matcher, |
| | delta, |
| | marked_answer, |
| | question_verdict, |
| | question, |
| | current_score, |
| | ): |
| | if self.should_explain_scoring: |
| | next_score = current_score + delta |
| | |
| | row = [ |
| | item |
| | for item in [ |
| | question, |
| | marked_answer, |
| | str(answer_matcher), |
| | str.title(question_verdict), |
| | str(round(delta, 2)), |
| | str(round(next_score, 2)), |
| | ( |
| | answer_matcher.get_section_explanation() |
| | if self.has_non_default_section |
| | else None |
| | ), |
| | ] |
| | if item is not None |
| | ] |
| | self.explanation_table.add_row(*row) |
| |
|
| |
|
| | def evaluate_concatenated_response( |
| | concatenated_response, evaluation_config, file_path, evaluation_output_dir |
| | ): |
| | evaluation_config.prepare_and_validate_omr_response(concatenated_response) |
| | current_score = 0.0 |
| | for question in evaluation_config.questions_in_order: |
| | marked_answer = concatenated_response[question] |
| | delta = evaluation_config.match_answer_for_question( |
| | current_score, question, marked_answer |
| | ) |
| | current_score += delta |
| |
|
| | evaluation_config.conditionally_print_explanation() |
| | evaluation_config.conditionally_save_explanation_csv(file_path, evaluation_output_dir) |
| |
|
| | return current_score |
| |
|