| import argparse |
| import errant |
| import json |
| import re |
| import spacy |
| from bisect import bisect |
| from operator import itemgetter |
| from string import punctuation |
|
|
| |
| def main(): |
| |
| args = parse_args() |
| print("Loading resources...") |
| |
| nlp = spacy.load("en") |
| |
| annotator = errant.load("en", nlp) |
| |
| norm_dict = {"’": "'", |
| "´": "'", |
| "‘": "'", |
| "′": "'", |
| "`": "'", |
| '“': '"', |
| '”': '"', |
| '˝': '"', |
| '¨': '"', |
| '„': '"', |
| '『': '"', |
| '』': '"', |
| '–': '-', |
| '—': '-', |
| '―': '-', |
| '¬': '-', |
| '、': ',', |
| ',': ',', |
| ':': ':', |
| ';': ';', |
| '?': '?', |
| '!': '!', |
| 'ِ': ' ', |
| '\u200b': ' '} |
| norm_dict = {ord(k): v for k, v in norm_dict.items()} |
| |
| out_m2 = open(args.out, "w") |
|
|
| print("Preprocessing files...") |
| |
| with open(args.json_file) as data: |
| |
| for line in data: |
| |
| line = json.loads(line) |
| |
| text = line["text"].translate(norm_dict) |
| |
| coder_dict = {} |
| |
| for coder, edits in line["edits"]: |
| |
| if coder not in coder_dict: coder_dict[coder] = [] |
| |
| para_info = get_paras(text, edits, norm_dict) |
| |
| for orig_para, para_edits in para_info: |
| |
| orig_para, para_edits = clean_para(orig_para, para_edits) |
| if not orig_para: continue |
| |
| orig_para = nlp(orig_para) |
| para_edits = get_token_edits(orig_para, para_edits, nlp) |
| |
| sents = get_sents(orig_para, para_edits, args.sents) |
| |
| coder_dict[coder].extend(sents) |
| |
| coder_ids = sorted(coder_dict.keys()) |
| |
| for sent_id, sent in enumerate(coder_dict[0]): |
| |
| out_m2.write("S "+" ".join(sent["orig"])+"\n") |
| |
| orig = annotator.parse(" ".join(sent["orig"])) |
| |
| for id in coder_ids: |
| |
| cor = annotator.parse(" ".join(coder_dict[id][sent_id]["cor"])) |
| gold_edits = coder_dict[id][sent_id]["edits"] |
| |
| if args.gold: |
| |
| gold_edits = sorted(gold_edits, key=itemgetter(0)) |
| gold_edits = sorted(gold_edits, key=itemgetter(1)) |
| proc_edits = [] |
| |
| for gold_edit in gold_edits: |
| |
| gold_edit = gold_edit[:2]+gold_edit[-2:]+[gold_edit[2]] |
| |
| if gold_edit[-1] == "D": |
| gold_edit = annotator.import_edit(orig, cor, gold_edit, |
| min=False, old_cat=args.old_cats) |
| |
| else: |
| gold_edit = annotator.import_edit(orig, cor, gold_edit, |
| not args.no_min, args.old_cats) |
| |
| if gold_edit.o_start == gold_edit.o_end and \ |
| not gold_edit.c_str: continue |
| |
| proc_edits.append(gold_edit) |
| |
| if not proc_edits: |
| out_m2.write(noop_edit(id)+"\n") |
| |
| for edit in proc_edits: |
| out_m2.write(edit.to_m2(id)+"\n") |
| |
| elif args.auto: |
| auto_edits = annotator.annotate(orig, cor, args.lev, args.merge) |
| |
| if not auto_edits: |
| out_m2.write(noop_edit(id)+"\n") |
| |
| for edit in auto_edits: |
| out_m2.write(edit.to_m2(id)+"\n") |
| |
| out_m2.write("\n") |
|
|
| |
| def parse_args(): |
| parser = argparse.ArgumentParser( |
| description="Convert BEA2019 Shared Task style JSON to M2 format.", |
| formatter_class=argparse.RawTextHelpFormatter, |
| usage="%(prog)s [-h] (-auto | -gold) [options] json_file -out <out_name>") |
| parser.add_argument( |
| "json_file", |
| help="Path to a JSON file, one JSON essay per line.") |
| type_group = parser.add_mutually_exclusive_group(required = True) |
| type_group.add_argument( |
| "-auto", |
| help = "Extract edits automatically.", |
| action = "store_true") |
| type_group.add_argument( |
| "-gold", |
| help = "Use existing edit alignments.", |
| action = "store_true") |
| parser.add_argument( |
| "-out", |
| help = "The output filepath.", |
| required = True) |
| parser.add_argument( |
| "-sents", |
| help = "The text is already sentence tokenised.", |
| action = "store_true") |
| parser.add_argument( |
| "-no_min", |
| help = "Do not minimise edit spans (gold only).", |
| action = "store_true") |
| parser.add_argument( |
| "-old_cats", |
| help = "Preserve old error types (gold only); i.e. turn off the classifier.", |
| action = "store_true") |
| parser.add_argument( |
| "-lev", |
| help = "Align using standard Levenshtein.", |
| action = "store_true") |
| parser.add_argument( |
| "-merge", |
| help = "Choose a merging strategy for automatic alignment.\n" |
| "rules: Use a rule-based merging strategy (default)\n" |
| "all-split: Merge nothing: MSSDI -> M, S, S, D, I\n" |
| "all-merge: Merge adjacent non-matches: MSSDI -> M, SSDI\n" |
| "all-equal: Merge adjacent same-type non-matches: MSSDI -> M, SS, D, I", |
| choices = ["rules", "all-split", "all-merge", "all-equal"], |
| default = "rules") |
| args = parser.parse_args() |
| return args |
|
|
| |
| |
| |
| |
| def get_paras(text, edits, norm_dict): |
| para_info = [] |
| |
| for para in re.finditer("[^\n]+", text): |
| para_edits = [] |
| |
| cor_spans = [] |
| |
| for edit in edits: |
| |
| if edit[0] >= para.start(0) and edit[1] <= para.end(0): |
| |
| new_edit = [edit[0]-para.start(0), edit[1]-para.start(0), "C", edit[2]] |
| if edit[2] == None: new_edit[2] = "D" |
| |
| if new_edit[2] == "C": |
| new_edit[3] = edit[2].translate(norm_dict) |
| |
| if len(edit) == 4: new_edit[2] = edit[3] |
| |
| cor_spans.append(new_edit[:2]) |
| |
| para_edits.append(new_edit) |
| |
| |
| |
| |
| |
| new_para_edits = [] |
| |
| for edit in para_edits: |
| |
| if edit[2] == "D": |
| |
| overlap = False |
| |
| for start, end in cor_spans: |
| |
| if (start != end and start >= edit[0] and end <= edit[1]) or \ |
| (start == end and start > edit[0] and end < edit[1]): |
| overlap = True |
| |
| if overlap: continue |
| new_para_edits.append(edit) |
| |
| para_info.append((para.group(0), new_para_edits)) |
| return para_info |
|
|
| |
| |
| |
| |
| def clean_para(para, edits): |
| |
| para = re.sub("\s", " ", para) |
| |
| |
| match = re.search(" ", para) |
| |
| while match: |
| |
| ws_start = match.start() |
| |
| para = para[:ws_start] + para[ws_start+1:] |
| |
| for edit in edits: |
| |
| if edit[0] > ws_start: |
| edit[0] -= 1 |
| if edit[1] > ws_start: |
| edit[1] -= 1 |
| |
| match = re.search(" ", para) |
| |
| if para.startswith(" "): |
| para = para.lstrip() |
| |
| for edit in edits: |
| |
| |
| edit[0] = max(edit[0] - 1, 0) |
| edit[1] = max(edit[1] - 1, 0) |
| |
| for edit in edits: |
| |
| if edit[0] == edit[1]: continue |
| |
| orig = para[edit[0]:edit[1]] |
| |
| if orig.startswith(" "): edit[0] += 1 |
| if orig.endswith(" "): edit[1] -= 1 |
| |
| return para, edits |
|
|
| |
| |
| |
| |
| def get_token_edits(para, edits, nlp): |
| |
| tok_starts, tok_ends = get_all_tok_starts_and_ends(para) |
| prev_tok_end = 0 |
| overlap_edit_ids = [] |
| |
| for edit in edits: |
| |
| if edit[3] == None: edit[3] = para.text[edit[0]:edit[1]] |
| |
| span = convert_char_to_tok(edit[0], edit[1], tok_starts, tok_ends) |
| |
| if len(span) == 4: |
| |
| if span[0] < prev_tok_end: |
| overlap_edit_ids.append(edits.index(edit)) |
| continue |
| |
| |
| left = para.text[span[2]:edit[0]] |
| right = para.text[edit[1]:span[3]] |
| |
| edit[3] = (left+edit[3]+right).strip() |
| |
| prev_tok_end = span[1] |
| |
| edit[0] = span[0] |
| edit[1] = span[1] |
| |
| if edit[2] != "D": |
| edit[3] = " ".join([tok.text for tok in nlp(edit[3].strip())]) |
| |
| elif edit[2] == "D": |
| edit[3] = " ".join([tok.text for tok in para[edit[0]:edit[1]]]) |
| |
| for id in sorted(overlap_edit_ids, reverse=True): |
| del edits[id] |
| return edits |
|
|
| |
| |
| def get_all_tok_starts_and_ends(spacy_doc): |
| tok_starts = [] |
| tok_ends = [] |
| for tok in spacy_doc: |
| tok_starts.append(tok.idx) |
| tok_ends.append(tok.idx + len(tok.text)) |
| return tok_starts, tok_ends |
|
|
| |
| |
| |
| |
| |
| def convert_char_to_tok(start, end, all_starts, all_ends): |
| |
| if start == end: |
| |
| if not start or start <= all_starts[0]: |
| return [0, 0] |
| |
| elif start >= all_ends[-1]: |
| return [len(all_starts), len(all_starts)] |
| |
| elif start in all_starts: |
| return [all_starts.index(start), all_starts.index(start)] |
| |
| elif start in all_ends: |
| return [all_ends.index(start)+1, all_ends.index(start)+1] |
| |
| else: |
| |
| if start not in all_starts: |
| start = all_starts[bisect(all_starts, start)-1] |
| if end not in all_ends: |
| end = all_ends[bisect(all_ends, end)] |
| |
| return [all_starts.index(start), all_ends.index(end)+1, start, end] |
| |
| elif start in all_starts and end in all_ends: |
| return [all_starts.index(start), all_ends.index(end)+1] |
| |
| else: |
| |
| if start not in all_starts: |
| start = all_starts[bisect(all_starts, start)-1] |
| if end not in all_ends: |
| nearest = bisect(all_ends, end) |
| |
| |
| if nearest >= len(all_ends): |
| end = all_ends[-1] |
| else: |
| end = all_ends[bisect(all_ends, end)] |
| |
| return [all_starts.index(start), all_ends.index(end)+1, start, end] |
|
|
| |
| |
| |
| |
| |
| def get_sents(orig, edits, sent_tokenised): |
| sent_list = [] |
| |
| orig_sents = [] |
| start = 0 |
| for sent in orig.sents: |
| |
| if sent[-1].text[-1] in punctuation or sent.end == len(orig): |
| orig_sents.append(orig[start:sent.end]) |
| start = sent.end |
| |
| if len(orig_sents) == 1 or sent_tokenised: |
| |
| orig, cor, edits = prepare_sent_edits_output(orig, edits) |
| out_dict = {"orig": orig, |
| "cor": cor, |
| "edits": edits} |
| sent_list.append(out_dict) |
| |
| else: |
| |
| proc = 0 |
| |
| cor_offset = 0 |
| |
| for sent_id, orig_sent in enumerate(orig_sents): |
| |
| sent_edits = [] |
| |
| for edit in edits[proc:]: |
| |
| |
| if orig_sent.start <= edit[0] < orig_sent.end and \ |
| edit[1] > orig_sent.end: |
| |
| |
| cor_offset = cor_offset-(edit[1]-edit[0])+len(edit[3].split()) |
| proc += 1 |
| |
| elif orig_sent.start <= edit[0] < orig_sent.end and \ |
| edit[1] <= orig_sent.end: |
| |
| |
| edit[0] -= orig_sent.start |
| edit[1] -= orig_sent.start |
| |
| cor_offset = cor_offset-(edit[1]-edit[0])+len(edit[3].split()) |
| proc += 1 |
| |
| sent_edits.append(edit) |
| |
| elif edit[0] == edit[1] == orig_sent.end: |
| |
| |
| |
| if sent_id == len(orig_sents)-1 or not edit[3] or \ |
| edit[3][-1] in punctuation: |
| |
| edit[0] -= orig_sent.start |
| edit[1] -= orig_sent.start |
| |
| cor_offset = cor_offset-(edit[1]-edit[0])+len(edit[3].split()) |
| proc += 1 |
| |
| sent_edits.append(edit) |
| |
| |
| orig_sent, cor_sent, sent_edits = prepare_sent_edits_output(orig_sent, sent_edits) |
| |
| out_dict = {"orig": orig_sent, |
| "cor": cor_sent, |
| "edits": sent_edits} |
| sent_list.append(out_dict) |
| return sent_list |
|
|
| |
| |
| |
| |
| def prepare_sent_edits_output(orig, edits): |
| orig = [tok.text for tok in orig] |
| cor = orig[:] |
| offset = 0 |
| for edit in edits: |
| |
| cor_toks = edit[3].split() |
| cor[edit[0]+offset:edit[1]+offset] = cor_toks |
| cor_start = edit[0]+offset |
| cor_end = cor_start+len(cor_toks) |
| offset = offset-(edit[1]-edit[0])+len(cor_toks) |
| |
| edit.extend([cor_start, cor_end]) |
| return orig, cor, edits |
|
|
| |
| |
| def noop_edit(id=0): |
| return "A -1 -1|||noop|||-NONE-|||REQUIRED|||-NONE-|||"+str(id) |
|
|
| |
| if __name__ == "__main__": |
| main() |