tre-1 / scripts /train_word_segmentation.py
rain1024's picture
Add word segmentation support and underthesea-core integration
5d8bdc8
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "python-crfsuite>=0.9.11",
# "crfsuite>=0.3.0",
# "datasets>=4.5.0",
# "scikit-learn>=1.6.1",
# "click>=8.0.0",
# "psutil>=5.9.0",
# "pyyaml>=6.0.0",
# "underthesea>=6.8.0",
# "underthesea-core @ file:///home/claude-user/projects/workspace_underthesea/underthesea-core-dev/extensions/underthesea_core/target/wheels/underthesea_core-1.0.7-cp312-cp312-manylinux_2_34_x86_64.whl",
# ]
# ///
# Note: underthesea-core trainer now uses crfsuite (LBFGS) for fast training
"""
Training script for Vietnamese Word Segmentation using CRF.
Supports 3 CRF trainers:
- python-crfsuite: Original Python bindings to CRFsuite
- crfsuite-rs: Rust bindings to CRFsuite (pip install crfsuite)
- underthesea-core: Underthesea's native Rust CRF implementation
Models are saved to: models/word_segmentation/{version}/model.crfsuite
Uses BIO tagging at SYLLABLE level:
- B: Beginning of a word (first syllable)
- I: Inside a word (continuation syllables)
Usage:
uv run scripts/train_word_segmentation.py
uv run scripts/train_word_segmentation.py --trainer crfsuite-rs
uv run scripts/train_word_segmentation.py --trainer underthesea-core
uv run scripts/train_word_segmentation.py --version v1.1.0
"""
import os
import platform
import time
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
import click
import psutil
import yaml
from datasets import load_dataset
from sklearn.metrics import accuracy_score, classification_report, f1_score
from underthesea.pipeline.word_tokenize.regex_tokenize import tokenize as regex_tokenize
# Get project root directory
PROJECT_ROOT = Path(__file__).parent.parent
# Available trainers
TRAINERS = ["python-crfsuite", "crfsuite-rs", "underthesea-core"]
def get_hardware_info():
"""Collect hardware and system information."""
info = {
"platform": platform.system(),
"platform_release": platform.release(),
"architecture": platform.machine(),
"python_version": platform.python_version(),
"cpu_physical_cores": psutil.cpu_count(logical=False),
"cpu_logical_cores": psutil.cpu_count(logical=True),
"ram_total_gb": round(psutil.virtual_memory().total / (1024**3), 2),
}
try:
if platform.system() == "Linux":
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
info["cpu_model"] = line.split(":")[1].strip()
break
except Exception:
info["cpu_model"] = "Unknown"
return info
def format_duration(seconds):
"""Format duration in human-readable format."""
if seconds < 60:
return f"{seconds:.2f}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}m {secs:.2f}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours}h {minutes}m {secs:.2f}s"
# Syllable-level feature templates
FEATURE_TEMPLATES = [
# Current syllable
"S[0]", # Syllable text
"S[0].lower", # Lowercase
"S[0].istitle", # Is title case
"S[0].isupper", # Is all uppercase
"S[0].isdigit", # Is digit
"S[0].ispunct", # Is punctuation
"S[0].len", # Length
"S[0].prefix2", # First 2 chars
"S[0].suffix2", # Last 2 chars
# Previous syllables
"S[-1]",
"S[-1].lower",
"S[-2]",
"S[-2].lower",
# Next syllables
"S[1]",
"S[1].lower",
"S[2]",
"S[2].lower",
# Bigrams
"S[-1,0]",
"S[0,1]",
# Trigrams
"S[-1,0,1]",
]
def get_syllable_at(syllables, position, offset):
"""Get syllable at position + offset, with boundary handling."""
idx = position + offset
if idx < 0:
return "__BOS__"
elif idx >= len(syllables):
return "__EOS__"
return syllables[idx]
def is_punct(s):
"""Check if string is punctuation."""
return len(s) == 1 and not s.isalnum()
def extract_syllable_features(syllables, position):
"""Extract features for a syllable at given position."""
features = {}
# Current syllable
s0 = get_syllable_at(syllables, position, 0)
is_boundary = s0 in ("__BOS__", "__EOS__")
features["S[0]"] = s0
features["S[0].lower"] = s0.lower() if not is_boundary else s0
features["S[0].istitle"] = str(s0.istitle()) if not is_boundary else "False"
features["S[0].isupper"] = str(s0.isupper()) if not is_boundary else "False"
features["S[0].isdigit"] = str(s0.isdigit()) if not is_boundary else "False"
features["S[0].ispunct"] = str(is_punct(s0)) if not is_boundary else "False"
features["S[0].len"] = str(len(s0)) if not is_boundary else "0"
features["S[0].prefix2"] = s0[:2] if not is_boundary and len(s0) >= 2 else s0
features["S[0].suffix2"] = s0[-2:] if not is_boundary and len(s0) >= 2 else s0
# Previous syllables
s_1 = get_syllable_at(syllables, position, -1)
s_2 = get_syllable_at(syllables, position, -2)
features["S[-1]"] = s_1
features["S[-1].lower"] = s_1.lower() if s_1 not in ("__BOS__", "__EOS__") else s_1
features["S[-2]"] = s_2
features["S[-2].lower"] = s_2.lower() if s_2 not in ("__BOS__", "__EOS__") else s_2
# Next syllables
s1 = get_syllable_at(syllables, position, 1)
s2 = get_syllable_at(syllables, position, 2)
features["S[1]"] = s1
features["S[1].lower"] = s1.lower() if s1 not in ("__BOS__", "__EOS__") else s1
features["S[2]"] = s2
features["S[2].lower"] = s2.lower() if s2 not in ("__BOS__", "__EOS__") else s2
# Bigrams
features["S[-1,0]"] = f"{s_1}|{s0}"
features["S[0,1]"] = f"{s0}|{s1}"
# Trigrams
features["S[-1,0,1]"] = f"{s_1}|{s0}|{s1}"
return features
def sentence_to_syllable_features(syllables):
"""Convert syllable sequence to feature sequences."""
return [
[f"{k}={v}" for k, v in extract_syllable_features(syllables, i).items()]
for i in range(len(syllables))
]
def tokens_to_syllable_labels(tokens):
"""
Convert tokenized compound words to syllable-level BIO labels.
Each compound word (e.g., "Thời hạn") is split into syllables,
first syllable gets 'B', rest get 'I'.
"""
syllables = []
labels = []
for token in tokens:
# Split compound word into syllables using regex_tokenize
token_syllables = regex_tokenize(token)
for i, syl in enumerate(token_syllables):
syllables.append(syl)
if i == 0:
labels.append("B")
else:
labels.append("I")
return syllables, labels
def labels_to_words(syllables, labels):
"""Convert syllable sequence and BIO labels back to words."""
words = []
current_word = []
for syl, label in zip(syllables, labels):
if label == "B":
if current_word:
words.append(" ".join(current_word))
current_word = [syl]
else: # I
current_word.append(syl)
if current_word:
words.append(" ".join(current_word))
return words
def compute_word_metrics(y_true, y_pred, syllables_list):
"""Compute word-level F1 score."""
correct = 0
total_pred = 0
total_true = 0
for syllables, true_labels, pred_labels in zip(syllables_list, y_true, y_pred):
true_words = labels_to_words(syllables, true_labels)
pred_words = labels_to_words(syllables, pred_labels)
total_true += len(true_words)
total_pred += len(pred_words)
# Count exact word matches at same positions
true_boundaries = set()
pred_boundaries = set()
pos = 0
for word in true_words:
n_syls = len(word.split())
true_boundaries.add((pos, pos + n_syls))
pos += n_syls
pos = 0
for word in pred_words:
n_syls = len(word.split())
pred_boundaries.add((pos, pos + n_syls))
pos += n_syls
correct += len(true_boundaries & pred_boundaries)
precision = correct / total_pred if total_pred > 0 else 0
recall = correct / total_true if total_true > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return precision, recall, f1
def load_data():
"""Load UDD-1 dataset and convert to syllable-level sequences."""
click.echo("Loading UDD-1 dataset...")
dataset = load_dataset("undertheseanlp/UDD-1")
def extract_syllable_sequences(split):
sequences = []
for item in split:
tokens = item["tokens"]
if tokens:
syllables, labels = tokens_to_syllable_labels(tokens)
if syllables:
sequences.append((syllables, labels))
return sequences
train_data = extract_syllable_sequences(dataset["train"])
val_data = extract_syllable_sequences(dataset["validation"])
test_data = extract_syllable_sequences(dataset["test"])
# Statistics
train_syls = sum(len(syls) for syls, _ in train_data)
val_syls = sum(len(syls) for syls, _ in val_data)
test_syls = sum(len(syls) for syls, _ in test_data)
click.echo(f"Loaded {len(train_data)} train ({train_syls} syllables), "
f"{len(val_data)} val ({val_syls} syllables), "
f"{len(test_data)} test ({test_syls} syllables) sentences")
return train_data, val_data, test_data, {
"train_sentences": len(train_data),
"train_syllables": train_syls,
"val_sentences": len(val_data),
"val_syllables": val_syls,
"test_sentences": len(test_data),
"test_syllables": test_syls,
}
# ============================================================================
# Trainer Abstraction
# ============================================================================
class CRFTrainerBase(ABC):
"""Abstract base class for CRF trainers."""
name: str = "base"
@abstractmethod
def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True):
"""Train the CRF model and save to output_path."""
pass
@abstractmethod
def predict(self, model_path, X_test):
"""Load model and predict on test data."""
pass
class PythonCRFSuiteTrainer(CRFTrainerBase):
"""Trainer using python-crfsuite (original Python bindings)."""
name = "python-crfsuite"
def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True):
import pycrfsuite
trainer = pycrfsuite.Trainer(verbose=verbose)
for xseq, yseq in zip(X_train, y_train):
trainer.append(xseq, yseq)
trainer.set_params({
"c1": c1,
"c2": c2,
"max_iterations": max_iterations,
"feature.possible_transitions": True,
})
trainer.train(str(output_path))
def predict(self, model_path, X_test):
import pycrfsuite
tagger = pycrfsuite.Tagger()
tagger.open(str(model_path))
return [tagger.tag(xseq) for xseq in X_test]
class CRFSuiteRsTrainer(CRFTrainerBase):
"""Trainer using crfsuite-rs (Rust bindings via pip install crfsuite)."""
name = "crfsuite-rs"
def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True):
import crfsuite
trainer = crfsuite.Trainer()
# Set parameters
trainer.set_params({
"c1": c1,
"c2": c2,
"max_iterations": max_iterations,
"feature.possible_transitions": True,
})
# Add training data
for xseq, yseq in zip(X_train, y_train):
trainer.append(xseq, yseq)
# Train
trainer.train(str(output_path))
def predict(self, model_path, X_test):
import crfsuite
model = crfsuite.Model(str(model_path))
return [model.tag(xseq) for xseq in X_test]
class UndertheseaCoreTrainer(CRFTrainerBase):
"""Trainer using underthesea-core native Rust CRF with LBFGS optimization.
This trainer uses the native underthesea-core Rust CRF implementation
with L-BFGS optimization, matching CRFsuite performance.
Requires building underthesea-core from source:
cd ~/projects/workspace_underthesea/underthesea-core-dev/extensions/underthesea_core
uv venv && source .venv/bin/activate
uv pip install maturin
maturin develop --release
"""
name = "underthesea-core"
def _check_trainer_import(self):
"""Check if CRFTrainer is available."""
try:
from underthesea_core import CRFTrainer
return CRFTrainer
except ImportError:
pass
try:
from underthesea_core.underthesea_core import CRFTrainer
return CRFTrainer
except ImportError:
pass
raise ImportError(
"CRFTrainer not available in underthesea_core.\n"
"Build from source with LBFGS support:\n"
" cd ~/projects/workspace_underthesea/underthesea-core-dev/extensions/underthesea_core\n"
" source .venv/bin/activate && maturin develop --release"
)
def _check_tagger_import(self):
"""Check if CRFModel and CRFTagger are available."""
try:
from underthesea_core import CRFModel, CRFTagger
return CRFModel, CRFTagger
except ImportError:
pass
try:
from underthesea_core.underthesea_core import CRFModel, CRFTagger
return CRFModel, CRFTagger
except ImportError:
pass
raise ImportError("CRFModel/CRFTagger not available in underthesea_core")
def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True):
CRFTrainer = self._check_trainer_import()
# Use LBFGS (default, fast)
trainer = CRFTrainer(
loss_function="lbfgs",
l1_penalty=c1,
l2_penalty=c2,
max_iterations=max_iterations,
verbose=1 if verbose else 0,
)
# Train
model = trainer.train(X_train, y_train)
# Save model
output_path_str = str(output_path)
if output_path_str.endswith('.crfsuite'):
output_path_str = output_path_str.replace('.crfsuite', '.crf')
model.save(output_path_str)
# Store the actual path for prediction
self._model_path = output_path_str
def predict(self, model_path, X_test):
CRFModel, CRFTagger = self._check_tagger_import()
# Use the actual saved path if available
model_path_str = str(model_path)
if hasattr(self, '_model_path'):
model_path_str = self._model_path
elif model_path_str.endswith('.crfsuite'):
model_path_str = model_path_str.replace('.crfsuite', '.crf')
model = CRFModel.load(model_path_str)
tagger = CRFTagger.from_model(model)
return [tagger.tag(xseq) for xseq in X_test]
def get_trainer(trainer_name: str) -> CRFTrainerBase:
"""Get trainer instance by name."""
trainers = {
"python-crfsuite": PythonCRFSuiteTrainer,
"crfsuite-rs": CRFSuiteRsTrainer,
"underthesea-core": UndertheseaCoreTrainer,
}
if trainer_name not in trainers:
raise ValueError(f"Unknown trainer: {trainer_name}. Available: {list(trainers.keys())}")
return trainers[trainer_name]()
# ============================================================================
# Metadata and CLI
# ============================================================================
def save_metadata(output_dir, version, trainer_name, data_stats, c1, c2, max_iterations, metrics, hw_info, training_time):
"""Save model metadata to YAML file."""
metadata = {
"model": {
"name": "Vietnamese Word Segmentation",
"version": version,
"type": "CRF (Conditional Random Field)",
"framework": trainer_name,
"tagging_scheme": "BIO",
},
"training": {
"dataset": "undertheseanlp/UDD-1",
"train_sentences": data_stats["train_sentences"],
"train_syllables": data_stats["train_syllables"],
"val_sentences": data_stats["val_sentences"],
"val_syllables": data_stats["val_syllables"],
"test_sentences": data_stats["test_sentences"],
"test_syllables": data_stats["test_syllables"],
"hyperparameters": {
"c1": c1,
"c2": c2,
"max_iterations": max_iterations,
},
"duration_seconds": round(training_time, 2),
},
"performance": {
"syllable_accuracy": round(metrics["syl_accuracy"], 4),
"syllable_f1": round(metrics["syl_f1"], 4),
"word_precision": round(metrics["word_precision"], 4),
"word_recall": round(metrics["word_recall"], 4),
"word_f1": round(metrics["word_f1"], 4),
},
"environment": {
"platform": hw_info["platform"],
"cpu_model": hw_info.get("cpu_model", "Unknown"),
"python_version": hw_info["python_version"],
},
"files": {
"model": "model.crfsuite",
"config": "../../../configs/word_segmentation.yaml",
},
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"author": "undertheseanlp",
}
metadata_path = output_dir / "metadata.yaml"
with open(metadata_path, "w") as f:
yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
click.echo(f"Metadata saved to {metadata_path}")
def get_default_version():
"""Generate timestamp-based version."""
return datetime.now().strftime("%Y%m%d_%H%M%S")
@click.command()
@click.option(
"--trainer", "-t",
type=click.Choice(TRAINERS),
default="python-crfsuite",
help="CRF trainer to use",
show_default=True,
)
@click.option(
"--version", "-v",
default=None,
help="Model version (default: timestamp, e.g., 20260131_154530)",
)
@click.option(
"--output", "-o",
default=None,
help="Custom output path (overrides version-based path)",
)
@click.option(
"--c1",
default=1.0,
type=float,
help="L1 regularization coefficient",
show_default=True,
)
@click.option(
"--c2",
default=0.001,
type=float,
help="L2 regularization coefficient",
show_default=True,
)
@click.option(
"--max-iterations",
default=100,
type=int,
help="Maximum training iterations",
show_default=True,
)
@click.option(
"--wandb/--no-wandb",
default=False,
help="Enable Weights & Biases logging",
)
def train(trainer, version, output, c1, c2, max_iterations, wandb):
"""Train Vietnamese Word Segmenter using CRF on UDD-1 dataset."""
total_start_time = time.time()
start_datetime = datetime.now()
# Get trainer
crf_trainer = get_trainer(trainer)
# Use timestamp version if not specified
if version is None:
version = get_default_version()
# Determine output directory
if output:
output_path = Path(output)
output_dir = output_path.parent
else:
output_dir = PROJECT_ROOT / "models" / "word_segmentation" / version
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "model.crfsuite"
# Collect hardware info
hw_info = get_hardware_info()
click.echo("=" * 60)
click.echo(f"Word Segmentation Training - {version}")
click.echo("=" * 60)
click.echo(f"Trainer: {trainer}")
click.echo(f"Platform: {hw_info['platform']}")
click.echo(f"CPU: {hw_info.get('cpu_model', 'Unknown')}")
click.echo(f"Output: {output_path}")
click.echo(f"Started: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
click.echo("=" * 60)
# Load data
train_data, val_data, test_data, data_stats = load_data()
click.echo(f"\nTrain: {len(train_data)} sentences ({data_stats['train_syllables']} syllables)")
click.echo(f"Validation: {len(val_data)} sentences ({data_stats['val_syllables']} syllables)")
click.echo(f"Test: {len(test_data)} sentences ({data_stats['test_syllables']} syllables)")
# Prepare training data
click.echo("\nExtracting syllable-level features...")
feature_start = time.time()
X_train = [sentence_to_syllable_features(syls) for syls, _ in train_data]
y_train = [labels for _, labels in train_data]
click.echo(f"Feature extraction: {format_duration(time.time() - feature_start)}")
# Train CRF
click.echo(f"\nTraining CRF model with {trainer}...")
use_wandb = wandb
if use_wandb:
try:
import wandb as wb
wb.init(project="word-segmentation-vietnamese", name=f"crf-{version}")
wb.config.update({
"trainer": trainer,
"c1": c1,
"c2": c2,
"max_iterations": max_iterations,
"num_feature_templates": len(FEATURE_TEMPLATES),
"train_sentences": len(train_data),
"val_sentences": len(val_data),
"test_sentences": len(test_data),
"version": version,
"level": "syllable",
})
except ImportError:
click.echo("wandb not installed, skipping logging", err=True)
use_wandb = False
crf_start = time.time()
crf_trainer.train(X_train, y_train, output_path, c1, c2, max_iterations, verbose=True)
crf_time = time.time() - crf_start
click.echo(f"\nModel saved to {output_path}")
click.echo(f"CRF training: {format_duration(crf_time)}")
# Evaluation
click.echo("\nEvaluating on test set...")
X_test = [sentence_to_syllable_features(syls) for syls, _ in test_data]
y_test = [labels for _, labels in test_data]
syllables_test = [syls for syls, _ in test_data]
y_pred = crf_trainer.predict(output_path, X_test)
# Syllable-level metrics
y_test_flat = [label for labels in y_test for label in labels]
y_pred_flat = [label for labels in y_pred for label in labels]
syl_accuracy = accuracy_score(y_test_flat, y_pred_flat)
syl_f1 = f1_score(y_test_flat, y_pred_flat, average="weighted")
click.echo(f"\nSyllable-level Accuracy: {syl_accuracy:.4f}")
click.echo(f"Syllable-level F1 (weighted): {syl_f1:.4f}")
click.echo("\nSyllable-level Classification Report:")
click.echo(classification_report(y_test_flat, y_pred_flat))
# Word-level metrics
precision, recall, word_f1 = compute_word_metrics(y_test, y_pred, syllables_test)
click.echo(f"\nWord-level Metrics:")
click.echo(f" Precision: {precision:.4f}")
click.echo(f" Recall: {recall:.4f}")
click.echo(f" F1: {word_f1:.4f}")
total_time = time.time() - total_start_time
# Collect metrics
metrics = {
"syl_accuracy": syl_accuracy,
"syl_f1": syl_f1,
"word_precision": precision,
"word_recall": recall,
"word_f1": word_f1,
}
# Save metadata
if not output:
save_metadata(output_dir, version, trainer, data_stats, c1, c2, max_iterations,
metrics, hw_info, total_time)
# Show examples
click.echo("\n" + "=" * 60)
click.echo("Example predictions:")
click.echo("=" * 60)
for i in range(min(3, len(test_data))):
syllables = syllables_test[i]
true_words = labels_to_words(syllables, y_test[i])
pred_words = labels_to_words(syllables, y_pred[i])
click.echo(f"\nInput: {' '.join(syllables)}")
click.echo(f"True: {' | '.join(true_words)}")
click.echo(f"Pred: {' | '.join(pred_words)}")
click.echo("\n" + "=" * 60)
click.echo("Training Summary")
click.echo("=" * 60)
click.echo(f"Trainer: {trainer}")
click.echo(f"Version: {version}")
click.echo(f"Model: {output_path}")
click.echo(f"Syllable Accuracy: {syl_accuracy:.4f}")
click.echo(f"Word F1: {word_f1:.4f}")
click.echo(f"Total time: {format_duration(total_time)}")
click.echo("=" * 60)
if use_wandb:
wb.log(metrics)
wb.finish()
if __name__ == "__main__":
train()