#!/usr/bin/env python3 """ Augment training data by duplicating isolated atomic tokens. The model struggles with single-token inputs because they appear rarely as isolated training examples. This script identifies atomic tokens and adds more isolated training pairs for them. Run this AFTER train_tokeniser.py and BEFORE train_t5.py: 1. generate_syr_lat_pairs.py -> syriac_*_corpus.jsonl 2. generate_clean_corpus.sh -> syriac_*_clean_corpus.jsonl 3. train_tokeniser.py -> src/tokeniser/ 4. augment_atomic_tokens.py -> syriac_*_augmented_corpus.jsonl (this script) 5. train_t5.py -> model """ import argparse import json from collections import Counter from pathlib import Path from transformers import AutoTokenizer # Configuration MIN_ISOLATED_COUNT = 200 # Ensure each atomic token appears at least this many times _SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_TOKENIZER_PATH = str(_SCRIPT_DIR.parent / "tokeniser") # src/tokeniser def load_corpus(path: Path, strip_augmented: bool = False) -> list[dict]: """Load JSONL corpus. Args: path: Path to corpus file strip_augmented: If True, filter out previously augmented entries """ with open(path) as f: data = [json.loads(line) for line in f] if strip_augmented: # Remove entries that were added by previous augmentation runs data = [ d for d in data if d["transliteration"].get("source") != "augmented-atomic" ] return data def save_corpus(data: list[dict], path: Path): """Save JSONL corpus.""" with open(path, "w") as f: for item in data: f.write(json.dumps(item, ensure_ascii=False) + "\n") def get_atomic_tokens(tokenizer, corpus: list[dict]) -> set[str]: """Find all inputs that tokenize to a single content token. SentencePiece adds a leading space token (▁), so we check for either 1 token or 2 tokens where the first is the space prefix. """ atomic = set() space_token_id = tokenizer.convert_tokens_to_ids("▁") for item in corpus: src = item["transliteration"]["src"].strip() ids = tokenizer(src).input_ids[:-1] # Remove # Single token = atomic if len(ids) == 1: atomic.add(src) # Space prefix + single content token = also atomic elif len(ids) == 2 and ids[0] == space_token_id: atomic.add(src) return atomic def augment_corpus( corpus: list[dict], tokenizer, min_count: int = MIN_ISOLATED_COUNT ) -> list[dict]: """Augment corpus with more isolated atomic token examples.""" # Get atomic tokens atomic_tokens = get_atomic_tokens(tokenizer, corpus) print(f"Found {len(atomic_tokens)} atomic tokens") # Count current isolated occurrences src_counts = Counter(item["transliteration"]["src"].strip() for item in corpus) # Find atomic tokens that need augmentation need_augmentation = { src: min_count - src_counts[src] for src in atomic_tokens if src_counts[src] < min_count } print(f"Need to augment {len(need_augmentation)} tokens") # Build lookup: src -> transliteration entry src_to_entry = {} for item in corpus: src = item["transliteration"]["src"].strip() if src in need_augmentation and src not in src_to_entry: src_to_entry[src] = item["transliteration"] # Create augmentation entries augmented = [] for src, copies_needed in need_augmentation.items(): if src not in src_to_entry: continue # Skip if we couldn't find the entry entry = src_to_entry[src] for _ in range(copies_needed): augmented.append( { "transliteration": { "src": entry["src"], "tgt": entry["tgt"], "title": "word", "dialect": entry.get("dialect", "unknown"), "source": "augmented-atomic", } } ) print(f"Adding {len(augmented)} augmented entries") return corpus + augmented def main(): parser = argparse.ArgumentParser( description="Augment corpus with atomic token examples" ) parser.add_argument( "--tokenizer", default=DEFAULT_TOKENIZER_PATH, help=f"Path to tokenizer (default: {DEFAULT_TOKENIZER_PATH})", ) parser.add_argument( "--min-count", type=int, default=MIN_ISOLATED_COUNT, help=f"Minimum isolated occurrences per atomic token (default: {MIN_ISOLATED_COUNT})", ) args = parser.parse_args() print(f"Loading tokenizer from {args.tokenizer}...") tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) data_dir = _SCRIPT_DIR for dialect in ["west", "east"]: clean_path = data_dir / f"syriac_{dialect}_clean_corpus.jsonl" augmented_path = data_dir / f"syriac_{dialect}_augmented_corpus.jsonl" print(f"\n=== Processing {dialect.capitalize()} corpus ===") # Try to load from augmented file (stripping old augmented entries) or clean file if augmented_path.exists(): print( f"Loading from {augmented_path.name} (stripping old augmented entries)..." ) corpus = load_corpus(augmented_path, strip_augmented=True) elif clean_path.exists(): print(f"Loading from {clean_path.name}...") corpus = load_corpus(clean_path) else: print(f"ERROR: Neither {clean_path.name} nor {augmented_path.name} found!") continue print(f"Base corpus size: {len(corpus)}") augmented = augment_corpus(corpus, tokenizer, min_count=args.min_count) print(f"Augmented size: {len(augmented)}") save_corpus(augmented, augmented_path) print(f"Saved to {augmented_path}") print("\nDone! Run train_t5.py to train the T5 model using the augmented corpus.") if __name__ == "__main__": main()