""" OMRChecker Author: Udayraj Deshmukh Github: https://github.com/Udayraj123 """ import os from csv import QUOTE_NONNUMERIC from pathlib import Path from time import time import cv2 import pandas as pd from rich.table import Table import json from src import constants from src.defaults import CONFIG_DEFAULTS from src.evaluation import EvaluationConfig, evaluate_concatenated_response from src.logger import console, logger from src.template import Template from src.utils.file import Paths, setup_dirs_for_paths, setup_outputs_for_template from src.utils.image import ImageUtils from src.utils.interaction import InteractionUtils, Stats from src.utils.parsing import get_concatenated_response, open_config_with_defaults # Load processors STATS = Stats() def entry_point(input_dir, args): if not os.path.exists(input_dir): raise Exception(f"Given input directory does not exist: '{input_dir}'") curr_dir = input_dir return process_dir(input_dir, curr_dir, args) def print_config_summary( curr_dir, omr_files, template, tuning_config, local_config_path, evaluation_config, args, ): logger.info("") table = Table(title="Current Configurations", show_header=False, show_lines=False) table.add_column("Key", style="cyan", no_wrap=True) table.add_column("Value", style="magenta") table.add_row("Directory Path", f"{curr_dir}") table.add_row("Count of Images", f"{len(omr_files)}") table.add_row("Set Layout Mode ", "ON" if args["setLayout"] else "OFF") pre_processor_names = [pp.__class__.__name__ for pp in template.pre_processors] table.add_row( "Markers Detection", "ON" if "CropOnMarkers" in pre_processor_names else "OFF", ) table.add_row("Auto Alignment", f"{tuning_config.alignment_params.auto_align}") table.add_row("Detected Template Path", f"{template}") if local_config_path: table.add_row("Detected Local Config", f"{local_config_path}") if evaluation_config: table.add_row("Detected Evaluation Config", f"{evaluation_config}") table.add_row( "Detected pre-processors", ", ".join(pre_processor_names), ) console.print(table, justify="center") def process_dir( root_dir, curr_dir, args, template=None, tuning_config=CONFIG_DEFAULTS, evaluation_config=None, ): # Update local tuning_config (in current recursion stack) local_config_path = curr_dir.joinpath(constants.CONFIG_FILENAME) if os.path.exists(local_config_path): tuning_config = open_config_with_defaults(local_config_path) # Update local template (in current recursion stack) local_template_path = curr_dir.joinpath(constants.TEMPLATE_FILENAME) local_template_exists = os.path.exists(local_template_path) if local_template_exists: template = Template( local_template_path, tuning_config, ) # Look for subdirectories for processing subdirs = [d for d in curr_dir.iterdir() if d.is_dir()] output_dir = Path(args["output_dir"], curr_dir.relative_to(root_dir)) paths = Paths(output_dir) # look for images in current dir to process exts = ("*.[pP][nN][gG]", "*.[jJ][pP][gG]", "*.[jJ][pP][eE][gG]") omr_files = sorted([f for ext in exts for f in curr_dir.glob(ext)]) # Exclude images (take union over all pre_processors) excluded_files = [] if template: for pp in template.pre_processors: excluded_files.extend(Path(p) for p in pp.exclude_files()) local_evaluation_path = curr_dir.joinpath(constants.EVALUATION_FILENAME) if not args["setLayout"] and os.path.exists(local_evaluation_path): if not local_template_exists: logger.warning( f"Found an evaluation file without a parent template file: {local_evaluation_path}" ) evaluation_config = EvaluationConfig( curr_dir, local_evaluation_path, template, tuning_config, ) excluded_files.extend( Path(exclude_file) for exclude_file in evaluation_config.get_exclude_files() ) omr_files = [f for f in omr_files if f not in excluded_files] if omr_files: if not template: logger.error( f"Found images, but no template in the directory tree \ of '{curr_dir}'. \nPlace {constants.TEMPLATE_FILENAME} in the \ appropriate directory." ) raise Exception( f"No template file found in the directory tree of {curr_dir}" ) setup_dirs_for_paths(paths) outputs_namespace = setup_outputs_for_template(paths, template) print_config_summary( curr_dir, omr_files, template, tuning_config, local_config_path, evaluation_config, args, ) if args["setLayout"]: show_template_layouts(omr_files, template, tuning_config) else: process_files( omr_files, template, tuning_config, evaluation_config, outputs_namespace, ) elif not subdirs: # Each subdirectory should have images or should be non-leaf logger.info( f"No valid images or sub-folders found in {curr_dir}.\ Empty directories not allowed." ) # recursively process sub-folders for d in subdirs: process_dir( root_dir, d, args, template, tuning_config, evaluation_config, ) def show_template_layouts(omr_files, template, tuning_config): for file_path in omr_files: file_name = file_path.name file_path = str(file_path) in_omr = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE) in_omr = template.image_instance_ops.apply_preprocessors( file_path, in_omr, template ) template_layout = template.image_instance_ops.draw_template_layout( in_omr, template, shifted=False, border=2 ) InteractionUtils.show( f"Template Layout: {file_name}", template_layout, 1, 1, config=tuning_config ) def process_files( omr_files, template, tuning_config, evaluation_config, outputs_namespace, ): start_time = int(time()) files_counter = 0 STATS.files_not_moved = 0 for file_path in omr_files: files_counter += 1 file_name = file_path.name in_omr = cv2.imread(str(file_path), cv2.IMREAD_GRAYSCALE) logger.info("") logger.info( f"({files_counter}) Opening image: \t'{file_path}'\tResolution: {in_omr.shape}" ) template.image_instance_ops.reset_all_save_img() template.image_instance_ops.append_save_img(1, in_omr) in_omr = template.image_instance_ops.apply_preprocessors( file_path, in_omr, template ) if in_omr is None: # Error OMR case new_file_path = outputs_namespace.paths.errors_dir.joinpath(file_name) outputs_namespace.OUTPUT_SET.append( [file_name] + outputs_namespace.empty_resp ) if check_and_move( constants.ERROR_CODES.NO_MARKER_ERR, file_path, new_file_path ): err_line = [ file_name, file_path, new_file_path, "NA", ] + outputs_namespace.empty_resp pd.DataFrame(err_line, dtype=str).T.to_csv( outputs_namespace.files_obj["Errors"], mode="a", quoting=QUOTE_NONNUMERIC, header=False, index=False, ) continue # uniquify file_id = str(file_name) save_dir = outputs_namespace.paths.save_marked_dir ( response_dict, final_marked, multi_marked, _, ) = template.image_instance_ops.read_omr_response( template, image=in_omr, name=file_id, save_dir=save_dir ) # TODO: move inner try catch here # concatenate roll nos, set unmarked responses, etc omr_response = get_concatenated_response(response_dict, template) if ( evaluation_config is None or not evaluation_config.get_should_explain_scoring() ): logger.info(f"Read Response: \n{omr_response}") score = 0 if evaluation_config is not None: score = evaluate_concatenated_response( omr_response, evaluation_config, file_path, outputs_namespace.paths.evaluation_dir ) logger.info( f"(/{files_counter}) Graded with score: {round(score, 2)}\t for file: '{file_id}'" ) else: logger.info(f"(/{files_counter}) Processed file: '{file_id}'") read_response_path = outputs_namespace.paths.output_dir.joinpath("read_response.json") with open(read_response_path, "w", encoding="utf-8") as f: json.dump(omr_response, f, indent=4) score_path = outputs_namespace.paths.output_dir.joinpath("score.txt") with open(score_path, "w", encoding="utf-8") as f: f.write(str(round(score, 2))) if tuning_config.outputs.show_image_level >= 2: InteractionUtils.show( f"Final Marked Bubbles : '{file_id}'", ImageUtils.resize_util_h( final_marked, int(tuning_config.dimensions.display_height * 1.3) ), 1, 1, config=tuning_config, ) resp_array = [] for k in template.output_columns: resp_array.append(omr_response[k]) outputs_namespace.OUTPUT_SET.append([file_name] + resp_array) if multi_marked == 0 or not tuning_config.outputs.filter_out_multimarked_files: STATS.files_not_moved += 1 new_file_path = save_dir.joinpath(file_id) # Enter into Results sheet- results_line = [file_name, file_path, new_file_path, score] + resp_array # Write/Append to results_line file(opened in append mode) pd.DataFrame(results_line, dtype=str).T.to_csv( outputs_namespace.files_obj["Results"], mode="a", quoting=QUOTE_NONNUMERIC, header=False, index=False, ) else: # multi_marked file logger.info(f"[{files_counter}] Found multi-marked file: '{file_id}'") new_file_path = outputs_namespace.paths.multi_marked_dir.joinpath(file_name) if check_and_move( constants.ERROR_CODES.MULTI_BUBBLE_WARN, file_path, new_file_path ): mm_line = [file_name, file_path, new_file_path, "NA"] + resp_array pd.DataFrame(mm_line, dtype=str).T.to_csv( outputs_namespace.files_obj["MultiMarked"], mode="a", quoting=QUOTE_NONNUMERIC, header=False, index=False, ) # else: # TODO: Add appropriate record handling here # pass print_stats(start_time, files_counter, tuning_config) def check_and_move(error_code, file_path, filepath2): # TODO: fix file movement into error/multimarked/invalid etc again STATS.files_not_moved += 1 return True def print_stats(start_time, files_counter, tuning_config): time_checking = max(1, round(time() - start_time, 2)) log = logger.info log("") log(f"{'Total file(s) moved': <27}: {STATS.files_moved}") log(f"{'Total file(s) not moved': <27}: {STATS.files_not_moved}") log("--------------------------------") log( f"{'Total file(s) processed': <27}: {files_counter} ({'Sum Tallied!' if files_counter == (STATS.files_moved + STATS.files_not_moved) else 'Not Tallying!'})" ) if tuning_config.outputs.show_image_level <= 0: log( f"\nFinished Checking {files_counter} file(s) in {round(time_checking, 1)} seconds i.e. ~{round(time_checking / 60, 1)} minute(s)." ) log( f"{'OMR Processing Rate': <27}: \t ~ {round(time_checking / files_counter, 2)} seconds/OMR" ) log( f"{'OMR Processing Speed': <27}: \t ~ {round((files_counter * 60) / time_checking, 2)} OMRs/minute" ) else: log(f"\n{'Total script time': <27}: {time_checking} seconds") if tuning_config.outputs.show_image_level <= 1: log( "\nTip: To see some awesome visuals, open config.json and increase 'show_image_level'" )