Spaces:
Running
Running
| import json | |
| from pathlib import Path | |
| from zipfile import ZipFile | |
| from typing import List, Dict, Any | |
| from tempfile import TemporaryDirectory | |
| def validate_zip(submission_track: str, submission_zip: str): | |
| """ | |
| Validates the submission format and contents | |
| Args: | |
| submission_track: the track of the submission | |
| submission_zip: path to the submission zip file | |
| Raises: | |
| ValueError: if the submission zip is invalid | |
| """ | |
| with TemporaryDirectory() as temp_dir: | |
| with ZipFile(submission_zip, 'r') as submission_zip_file: | |
| submission_zip_file.extractall(temp_dir) | |
| submission_dir = Path(temp_dir) | |
| if submission_track in ['NOTSOFAR-SC', 'NOTSOFAR-MC']: | |
| validate_notsofar_submission(submission_dir=submission_dir) | |
| elif submission_track in ['DASR-Constrained-LM', 'DASR-Unconstrained-LM']: | |
| validate_dasr_submission(submission_dir=submission_dir) | |
| else: | |
| raise ValueError(f'Invalid submission track: {submission_track}') | |
| def validate_notsofar_submission(submission_dir: Path): | |
| """ | |
| Validates NOTSOFAR submission format and contents | |
| Args: | |
| submission_dir: path to the submission directory | |
| Raises: | |
| ValueError: if the submission zip is invalid | |
| """ | |
| submission_file_names = ['tc_orc_wer_hyp.json', 'tcp_wer_hyp.json'] | |
| fields = ['session_id', 'words', 'speaker', 'start_time', 'end_time'] | |
| for file_name in submission_file_names: | |
| file_path = submission_dir / file_name | |
| if not file_path.exists(): | |
| raise ValueError(f'Missing {file_name}') | |
| with open(file_path, 'r') as json_file: | |
| json_data: List[Dict[str, Any]] = json.load(json_file) | |
| if not isinstance(json_data, list): | |
| raise ValueError(f'Invalid `{file_name}` format, expecting a list of entries') | |
| for data in json_data: | |
| if not all(field in data for field in fields): | |
| raise ValueError(f'Invalid `{file_name}` format, fields: {fields} are required in each entry') | |
| def validate_dasr_submission(submission_dir: Path): | |
| """ | |
| Validates DASR submission format and contents | |
| Args: | |
| submission_dir: path to the submission directory | |
| Raises: | |
| ValueError: if the submission zip is invalid | |
| """ | |
| submission_file_names = ['chime6.json', 'dipco.json', 'mixer6.json', 'notsofar1.json'] | |
| fields = ['session_id', 'words', 'speaker', 'start_time', 'end_time'] | |
| if not (submission_dir / 'dev').exists(): | |
| raise ValueError('Missing dev directory, expecting a directory named `dev` with the submission files in it.') | |
| for file_name in submission_file_names: | |
| file_path = submission_dir / 'dev' / file_name | |
| if not file_path.exists(): | |
| raise ValueError(f'Missing {file_name}') | |
| with open(file_path, 'r') as json_file: | |
| json_data: List[Dict[str, Any]] = json.load(json_file) | |
| if not isinstance(json_data, list): | |
| raise ValueError(f'Invalid `{file_name}` format, expecting a list of entries') | |
| for data in json_data: | |
| if not all(field in data for field in fields): | |
| raise ValueError(f'Invalid `{file_name}` format, fields: {fields} are required in each entry') | |