Spaces:
Sleeping
Sleeping
| import argparse | |
| import csv | |
| import json | |
| import sys | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List | |
| csv.field_size_limit(sys.maxsize) | |
| COLS = [ | |
| "sent_id", | |
| "token_id", | |
| "word", | |
| "lemma", | |
| "upos", | |
| "xpos", | |
| "feats", | |
| "head", | |
| "deprel", | |
| "deps", | |
| "misc", | |
| "predicate_sense", | |
| "semantic_role", | |
| ] | |
| def _iter_rows(path: Path) -> Iterable[List[str]]: | |
| with path.open("r", encoding="utf-8", newline="") as f: | |
| sample = f.read(4096) | |
| f.seek(0) | |
| delimiter = "\t" if "\t" in sample else "," | |
| reader = csv.reader(f, delimiter=delimiter) | |
| for row in reader: | |
| if not row: | |
| continue | |
| if len(row) == 1 and not row[0].strip(): | |
| continue | |
| if row[0].startswith("#"): | |
| continue | |
| yield row | |
| def read_conllu_srl(path: Path) -> List[Dict[str, str]]: | |
| records: List[Dict[str, str]] = [] | |
| for row in _iter_rows(path): | |
| if len(row) < len(COLS): | |
| continue | |
| row = row[: len(COLS)] | |
| sent_id = row[0].strip() | |
| token_id = row[1].strip() | |
| if not sent_id.isdigit(): | |
| continue | |
| if not token_id.isdigit(): | |
| continue | |
| records.append({col: value for col, value in zip(COLS, row)}) | |
| return records | |
| def _join_tokens(tokens_with_misc: List[Dict[str, str]]) -> str: | |
| chunks: List[str] = [] | |
| for token in tokens_with_misc: | |
| chunks.append(token["word"]) | |
| if "SpaceAfter=No" not in token.get("misc", "_"): | |
| chunks.append(" ") | |
| return "".join(chunks).strip() | |
| def flatten_to_corpus(records: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| grouped: Dict[str, List[Dict[str, str]]] = defaultdict(list) | |
| for rec in records: | |
| grouped[rec["sent_id"]].append(rec) | |
| corpus: List[Dict[str, str]] = [] | |
| for sent_id in sorted(grouped.keys(), key=lambda x: int(x)): | |
| toks = sorted(grouped[sent_id], key=lambda r: int(r["token_id"])) | |
| text = _join_tokens(toks) | |
| predicate = "" | |
| roles: Dict[str, List[str]] = defaultdict(list) | |
| for tok in toks: | |
| sense = tok.get("predicate_sense", "_") | |
| role = tok.get("semantic_role", "_") | |
| word = tok.get("word", "") | |
| if not predicate and sense and sense != "_": | |
| predicate = sense | |
| if role and role != "_": | |
| roles[role].append(word) | |
| doc: Dict[str, str] = {"id": sent_id, "text": text, "predicate": predicate} | |
| doc.update({role_name: " ".join(words) for role_name, words in roles.items()}) | |
| corpus.append(doc) | |
| return corpus | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Parse CoNLL-U SRL data to flattened JSON corpus" | |
| ) | |
| parser.add_argument( | |
| "--input-dir", | |
| type=Path, | |
| default=Path(__file__).resolve().parents[1] / "data", | |
| help="Directory containing CoNLL-U style TSV/CSV files", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=Path, | |
| default=Path(__file__).resolve().parents[1] / "data" / "corpus.json", | |
| help="Output JSON corpus path", | |
| ) | |
| args = parser.parse_args() | |
| data_files = [f for f in args.input_dir.glob("*.csv")] | |
| corpus: List[Dict[str, str]] = [] | |
| for data_file in sorted(data_files): | |
| records = read_conllu_srl(data_file) | |
| corpus.extend(flatten_to_corpus(records)) | |
| args.output.parent.mkdir(parents=True, exist_ok=True) | |
| with args.output.open("w", encoding="utf-8") as f: | |
| json.dump(corpus, f, ensure_ascii=False, indent=2) | |
| print(f"Wrote {len(corpus)} documents to {args.output}") | |
| if __name__ == "__main__": | |
| main() | |