| |
| """ |
| Augment training data by duplicating isolated atomic tokens. |
| |
| The model struggles with single-token inputs because they appear rarely as isolated |
| training examples. This script identifies atomic tokens and adds more isolated |
| training pairs for them. |
| |
| Run this AFTER train_tokeniser.py and BEFORE train_t5.py: |
| 1. generate_syr_lat_pairs.py -> syriac_*_corpus.jsonl |
| 2. generate_clean_corpus.sh -> syriac_*_clean_corpus.jsonl |
| 3. train_tokeniser.py -> src/tokeniser/ |
| 4. augment_atomic_tokens.py -> syriac_*_augmented_corpus.jsonl (this script) |
| 5. train_t5.py -> model |
| """ |
|
|
| import argparse |
| import json |
| from collections import Counter |
| from pathlib import Path |
|
|
| from transformers import AutoTokenizer |
|
|
| |
| MIN_ISOLATED_COUNT = 200 |
| _SCRIPT_DIR = Path(__file__).resolve().parent |
| DEFAULT_TOKENIZER_PATH = str(_SCRIPT_DIR.parent / "tokeniser") |
|
|
|
|
| def load_corpus(path: Path, strip_augmented: bool = False) -> list[dict]: |
| """Load JSONL corpus. |
| |
| Args: |
| path: Path to corpus file |
| strip_augmented: If True, filter out previously augmented entries |
| """ |
| with open(path) as f: |
| data = [json.loads(line) for line in f] |
|
|
| if strip_augmented: |
| |
| data = [ |
| d for d in data if d["transliteration"].get("source") != "augmented-atomic" |
| ] |
|
|
| return data |
|
|
|
|
| def save_corpus(data: list[dict], path: Path): |
| """Save JSONL corpus.""" |
| with open(path, "w") as f: |
| for item in data: |
| f.write(json.dumps(item, ensure_ascii=False) + "\n") |
|
|
|
|
| def get_atomic_tokens(tokenizer, corpus: list[dict]) -> set[str]: |
| """Find all inputs that tokenize to a single content token. |
| |
| SentencePiece adds a leading space token (▁), so we check for |
| either 1 token or 2 tokens where the first is the space prefix. |
| """ |
| atomic = set() |
| space_token_id = tokenizer.convert_tokens_to_ids("▁") |
|
|
| for item in corpus: |
| src = item["transliteration"]["src"].strip() |
| ids = tokenizer(src).input_ids[:-1] |
|
|
| |
| if len(ids) == 1: |
| atomic.add(src) |
| |
| elif len(ids) == 2 and ids[0] == space_token_id: |
| atomic.add(src) |
|
|
| return atomic |
|
|
|
|
| def augment_corpus( |
| corpus: list[dict], tokenizer, min_count: int = MIN_ISOLATED_COUNT |
| ) -> list[dict]: |
| """Augment corpus with more isolated atomic token examples.""" |
| |
| atomic_tokens = get_atomic_tokens(tokenizer, corpus) |
| print(f"Found {len(atomic_tokens)} atomic tokens") |
|
|
| |
| src_counts = Counter(item["transliteration"]["src"].strip() for item in corpus) |
|
|
| |
| need_augmentation = { |
| src: min_count - src_counts[src] |
| for src in atomic_tokens |
| if src_counts[src] < min_count |
| } |
| print(f"Need to augment {len(need_augmentation)} tokens") |
|
|
| |
| src_to_entry = {} |
| for item in corpus: |
| src = item["transliteration"]["src"].strip() |
| if src in need_augmentation and src not in src_to_entry: |
| src_to_entry[src] = item["transliteration"] |
|
|
| |
| augmented = [] |
| for src, copies_needed in need_augmentation.items(): |
| if src not in src_to_entry: |
| continue |
|
|
| entry = src_to_entry[src] |
| for _ in range(copies_needed): |
| augmented.append( |
| { |
| "transliteration": { |
| "src": entry["src"], |
| "tgt": entry["tgt"], |
| "title": "word", |
| "dialect": entry.get("dialect", "unknown"), |
| "source": "augmented-atomic", |
| } |
| } |
| ) |
|
|
| print(f"Adding {len(augmented)} augmented entries") |
| return corpus + augmented |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Augment corpus with atomic token examples" |
| ) |
| parser.add_argument( |
| "--tokenizer", |
| default=DEFAULT_TOKENIZER_PATH, |
| help=f"Path to tokenizer (default: {DEFAULT_TOKENIZER_PATH})", |
| ) |
| parser.add_argument( |
| "--min-count", |
| type=int, |
| default=MIN_ISOLATED_COUNT, |
| help=f"Minimum isolated occurrences per atomic token (default: {MIN_ISOLATED_COUNT})", |
| ) |
| args = parser.parse_args() |
|
|
| print(f"Loading tokenizer from {args.tokenizer}...") |
| tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) |
|
|
| data_dir = _SCRIPT_DIR |
|
|
| for dialect in ["west", "east"]: |
| clean_path = data_dir / f"syriac_{dialect}_clean_corpus.jsonl" |
| augmented_path = data_dir / f"syriac_{dialect}_augmented_corpus.jsonl" |
|
|
| print(f"\n=== Processing {dialect.capitalize()} corpus ===") |
|
|
| |
| if augmented_path.exists(): |
| print( |
| f"Loading from {augmented_path.name} (stripping old augmented entries)..." |
| ) |
| corpus = load_corpus(augmented_path, strip_augmented=True) |
| elif clean_path.exists(): |
| print(f"Loading from {clean_path.name}...") |
| corpus = load_corpus(clean_path) |
| else: |
| print(f"ERROR: Neither {clean_path.name} nor {augmented_path.name} found!") |
| continue |
|
|
| print(f"Base corpus size: {len(corpus)}") |
|
|
| augmented = augment_corpus(corpus, tokenizer, min_count=args.min_count) |
| print(f"Augmented size: {len(augmented)}") |
|
|
| save_corpus(augmented, augmented_path) |
| print(f"Saved to {augmented_path}") |
|
|
| print("\nDone! Run train_t5.py to train the T5 model using the augmented corpus.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|