| """ |
| Training script for Vietnamese POS Tagger using CRF with Hydra config. |
| |
| Supports 3 CRF trainers: |
| - python-crfsuite: Original Python bindings to CRFsuite |
| - crfsuite-rs: Rust bindings to CRFsuite (pip install crfsuite) |
| - underthesea-core: Underthesea's native Rust CRF implementation |
| |
| Supports two datasets: |
| - VLSP 2013 POS (local, tab-separated word\tTAG format) |
| - UDD-1 (HuggingFace, Universal Dependencies) |
| |
| Usage: |
| python src/train_pos.py |
| python src/train_pos.py data=udd1 |
| python src/train_pos.py model.trainer=crfsuite-rs |
| python src/train_pos.py model.c1=0.5 model.c2=0.01 |
| python src/train_pos.py model.features.bigram=false |
| |
| Feature ablation: |
| python src/train_pos.py model.features.form=false |
| python src/train_pos.py model.features.type=false |
| python src/train_pos.py model.features.morphology=false |
| python src/train_pos.py model.features.left=false |
| python src/train_pos.py model.features.right=false |
| python src/train_pos.py model.features.bigram=false |
| python src/train_pos.py model.features.dictionary=false |
| """ |
|
|
| import logging |
| import platform |
| import re |
| import time |
| from abc import ABC, abstractmethod |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import hydra |
| import psutil |
| import yaml |
| from omegaconf import DictConfig, OmegaConf |
| from sklearn.metrics import accuracy_score, classification_report |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| FEATURE_GROUPS = { |
| "form": ["T[0]", "T[0].lower"], |
| "type": ["T[0].istitle", "T[0].isupper", "T[0].isdigit", "T[0].isalpha"], |
| "morphology": ["T[0].prefix2", "T[0].prefix3", "T[0].suffix2", "T[0].suffix3"], |
| "left": ["T[-1]", "T[-1].lower", "T[-1].istitle", "T[-1].isupper", |
| "T[-2]", "T[-2].lower"], |
| "right": ["T[1]", "T[1].lower", "T[1].istitle", "T[1].isupper", |
| "T[2]", "T[2].lower"], |
| "bigram": ["T[-1,0]", "T[0,1]"], |
| "dictionary": ["T[0].is_in_dict", "T[-1,0].is_in_dict", "T[0,1].is_in_dict"], |
| } |
|
|
|
|
| def get_active_templates(features_cfg): |
| """Build active feature template list from config.""" |
| templates = [] |
| for group_name, group_templates in FEATURE_GROUPS.items(): |
| if features_cfg.get(group_name, True): |
| templates.extend(group_templates) |
| return templates |
|
|
|
|
| def get_active_groups(features_cfg): |
| """Return list of enabled group names.""" |
| return [g for g in FEATURE_GROUPS if features_cfg.get(g, True)] |
|
|
|
|
| |
| |
| |
|
|
| def get_hardware_info(): |
| """Collect hardware and system information.""" |
| info = { |
| "platform": platform.system(), |
| "platform_release": platform.release(), |
| "architecture": platform.machine(), |
| "python_version": platform.python_version(), |
| "cpu_physical_cores": psutil.cpu_count(logical=False), |
| "cpu_logical_cores": psutil.cpu_count(logical=True), |
| "ram_total_gb": round(psutil.virtual_memory().total / (1024**3), 2), |
| } |
|
|
| try: |
| if platform.system() == "Linux": |
| with open("/proc/cpuinfo", "r") as f: |
| for line in f: |
| if "model name" in line: |
| info["cpu_model"] = line.split(":")[1].strip() |
| break |
| except Exception: |
| info["cpu_model"] = "Unknown" |
|
|
| return info |
|
|
|
|
| def format_duration(seconds): |
| """Format duration in human-readable format.""" |
| if seconds < 60: |
| return f"{seconds:.2f}s" |
| elif seconds < 3600: |
| minutes = int(seconds // 60) |
| secs = seconds % 60 |
| return f"{minutes}m {secs:.2f}s" |
| else: |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = seconds % 60 |
| return f"{hours}h {minutes}m {secs:.2f}s" |
|
|
|
|
| |
| |
| |
|
|
| def get_token_value(tokens, position, index): |
| """Get token at position + index, with boundary handling.""" |
| actual_pos = position + index |
| if actual_pos < 0: |
| return "__BOS__" |
| elif actual_pos >= len(tokens): |
| return "__EOS__" |
| return tokens[actual_pos] |
|
|
|
|
| def apply_attribute(value, attribute, dictionary=None): |
| """Apply attribute transformation to a token value.""" |
| if value in ("__BOS__", "__EOS__"): |
| return value |
| if attribute is None: |
| return value |
| elif attribute == "lower": |
| return value.lower() |
| elif attribute == "istitle": |
| return str(value.istitle()) |
| elif attribute == "isupper": |
| return str(value.isupper()) |
| elif attribute == "isdigit": |
| return str(value.isdigit()) |
| elif attribute == "isalpha": |
| return str(value.isalpha()) |
| elif attribute == "is_in_dict": |
| return str(value in dictionary) if dictionary else "False" |
| elif attribute.startswith("prefix"): |
| n = int(attribute[6:]) if len(attribute) > 6 else 2 |
| return value[:n] if len(value) >= n else value |
| elif attribute.startswith("suffix"): |
| n = int(attribute[6:]) if len(attribute) > 6 else 2 |
| return value[-n:] if len(value) >= n else value |
| return value |
|
|
|
|
| def parse_template(template): |
| """Parse a feature template like T[0].lower into indices and attribute.""" |
| match = re.match(r"T\[([^\]]+)\](?:\.(\w+))?", template) |
| if not match: |
| return None, None |
| indices_str = match.group(1) |
| attribute = match.group(2) |
| indices = [int(i.strip()) for i in indices_str.split(",")] |
| return indices, attribute |
|
|
|
|
| def extract_features(tokens, position, active_templates, dictionary=None): |
| """Extract features for a token at given position.""" |
| features = {} |
| for template in active_templates: |
| indices, attribute = parse_template(template) |
| if indices is None: |
| continue |
| if len(indices) == 1: |
| value = get_token_value(tokens, position, indices[0]) |
| value = apply_attribute(value, attribute, dictionary) |
| features[template] = value |
| else: |
| values = [get_token_value(tokens, position, idx) for idx in indices] |
| if attribute == "is_in_dict": |
| combined = " ".join(values) |
| features[template] = str(combined in dictionary) if dictionary else "False" |
| else: |
| combined = "|".join(values) |
| features[template] = combined |
| return features |
|
|
|
|
| def sentence_to_features(tokens, active_templates, dictionary=None): |
| """Convert token sequence to feature sequences.""" |
| return [ |
| [f"{k}={v}" for k, v in extract_features(tokens, i, active_templates, dictionary).items()] |
| for i in range(len(tokens)) |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def load_data(cfg): |
| """Load dataset based on config.""" |
| if cfg.data.source == "local": |
| return load_data_vlsp2013(cfg) |
| elif cfg.data.source == "huggingface": |
| return load_data_udd1(cfg) |
| else: |
| raise ValueError(f"Unknown data source: {cfg.data.source}") |
|
|
|
|
| def load_data_vlsp2013(cfg): |
| """Load VLSP 2013 POS dataset (tab-separated word\\tTAG format).""" |
| log.info("Loading VLSP 2013 POS dataset...") |
|
|
| dataset_dir = Path(cfg.data.data_dir) |
|
|
| def load_file(path): |
| sentences = [] |
| current_tokens = [] |
| current_tags = [] |
| with open(path, encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| if current_tokens: |
| sentences.append((current_tokens, current_tags)) |
| current_tokens = [] |
| current_tags = [] |
| else: |
| parts = line.split("\t") |
| if len(parts) == 2: |
| current_tokens.append(parts[0]) |
| current_tags.append(parts[1]) |
| if current_tokens: |
| sentences.append((current_tokens, current_tags)) |
| return sentences |
|
|
| train_data = load_file(dataset_dir / "train.txt") |
| test_data = load_file(dataset_dir / "test.txt") |
|
|
| train_tokens = sum(len(toks) for toks, _ in train_data) |
| test_tokens = sum(len(toks) for toks, _ in test_data) |
|
|
| log.info(f"Loaded {len(train_data)} train ({train_tokens} tokens), " |
| f"{len(test_data)} test ({test_tokens} tokens) sentences") |
|
|
| return train_data, None, test_data, { |
| "dataset": "VLSP-2013-POS", |
| "train_sentences": len(train_data), |
| "train_tokens": train_tokens, |
| "val_sentences": 0, |
| "val_tokens": 0, |
| "test_sentences": len(test_data), |
| "test_tokens": test_tokens, |
| } |
|
|
|
|
| def load_data_udd1(cfg): |
| """Load UDD-1 dataset from HuggingFace.""" |
| from datasets import load_dataset |
|
|
| log.info("Loading UDD-1 dataset...") |
| dataset = load_dataset(cfg.data.dataset) |
|
|
| def extract_sentences(split): |
| sentences = [] |
| for item in split: |
| tokens = item["tokens"] |
| tags = item["upos"] |
| if tokens and tags: |
| sentences.append((tokens, tags)) |
| return sentences |
|
|
| train_data = extract_sentences(dataset["train"]) |
| val_data = extract_sentences(dataset["validation"]) |
| test_data = extract_sentences(dataset["test"]) |
|
|
| train_tokens = sum(len(toks) for toks, _ in train_data) |
| val_tokens = sum(len(toks) for toks, _ in val_data) |
| test_tokens = sum(len(toks) for toks, _ in test_data) |
|
|
| log.info(f"Loaded {len(train_data)} train ({train_tokens} tokens), " |
| f"{len(val_data)} val ({val_tokens} tokens), " |
| f"{len(test_data)} test ({test_tokens} tokens) sentences") |
|
|
| return train_data, val_data, test_data, { |
| "dataset": cfg.data.dataset, |
| "train_sentences": len(train_data), |
| "train_tokens": train_tokens, |
| "val_sentences": len(val_data), |
| "val_tokens": val_tokens, |
| "test_sentences": len(test_data), |
| "test_tokens": test_tokens, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def load_dictionary(): |
| """Load Viet74K + UTS Dictionary from underthesea package.""" |
| from underthesea.corpus.readers.dictionary_loader import DictionaryLoader |
| from underthesea.datasets import get_dictionary |
| dictionary = set() |
| for word in DictionaryLoader("Viet74K.txt").words: |
| w = word.lower().strip() |
| dictionary.add(w) |
| for word in get_dictionary(): |
| w = word.lower().strip() |
| dictionary.add(w) |
| return dictionary |
|
|
|
|
| |
| |
| |
|
|
| class CRFTrainerBase(ABC): |
| """Abstract base class for CRF trainers.""" |
|
|
| name: str = "base" |
|
|
| @abstractmethod |
| def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True): |
| pass |
|
|
| @abstractmethod |
| def predict(self, model_path, X_test): |
| pass |
|
|
|
|
| class PythonCRFSuiteTrainer(CRFTrainerBase): |
| name = "python-crfsuite" |
|
|
| def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True): |
| import pycrfsuite |
| trainer = pycrfsuite.Trainer(verbose=verbose) |
| for xseq, yseq in zip(X_train, y_train): |
| trainer.append(xseq, yseq) |
| trainer.set_params({ |
| "c1": c1, "c2": c2, "max_iterations": max_iterations, |
| "feature.possible_transitions": True, |
| }) |
| trainer.train(str(output_path)) |
|
|
| def predict(self, model_path, X_test): |
| import pycrfsuite |
| tagger = pycrfsuite.Tagger() |
| tagger.open(str(model_path)) |
| return [tagger.tag(xseq) for xseq in X_test] |
|
|
|
|
| class CRFSuiteRsTrainer(CRFTrainerBase): |
| name = "crfsuite-rs" |
|
|
| def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True): |
| import crfsuite |
| trainer = crfsuite.Trainer() |
| trainer.set_params({ |
| "c1": c1, "c2": c2, "max_iterations": max_iterations, |
| "feature.possible_transitions": True, |
| }) |
| for xseq, yseq in zip(X_train, y_train): |
| trainer.append(xseq, yseq) |
| trainer.train(str(output_path)) |
|
|
| def predict(self, model_path, X_test): |
| import crfsuite |
| model = crfsuite.Model(str(model_path)) |
| return [model.tag(xseq) for xseq in X_test] |
|
|
|
|
| class UndertheseaCoreTrainer(CRFTrainerBase): |
| name = "underthesea-core" |
|
|
| def _check_trainer_import(self): |
| try: |
| from underthesea_core import CRFTrainer |
| return CRFTrainer |
| except ImportError: |
| pass |
| try: |
| from underthesea_core.underthesea_core import CRFTrainer |
| return CRFTrainer |
| except ImportError: |
| pass |
| raise ImportError("CRFTrainer not available in underthesea_core.") |
|
|
| def _check_tagger_import(self): |
| try: |
| from underthesea_core import CRFModel, CRFTagger |
| return CRFModel, CRFTagger |
| except ImportError: |
| pass |
| try: |
| from underthesea_core.underthesea_core import CRFModel, CRFTagger |
| return CRFModel, CRFTagger |
| except ImportError: |
| pass |
| raise ImportError("CRFModel/CRFTagger not available in underthesea_core") |
|
|
| def train(self, X_train, y_train, output_path, c1, c2, max_iterations, verbose=True): |
| CRFTrainer = self._check_trainer_import() |
| trainer = CRFTrainer( |
| loss_function="lbfgs", l1_penalty=c1, l2_penalty=c2, |
| max_iterations=max_iterations, verbose=1 if verbose else 0, |
| ) |
| model = trainer.train(X_train, y_train) |
| output_path_str = str(output_path) |
| if output_path_str.endswith('.crfsuite'): |
| output_path_str = output_path_str.replace('.crfsuite', '.crf') |
| model.save(output_path_str) |
| self._model_path = output_path_str |
|
|
| def predict(self, model_path, X_test): |
| CRFModel, CRFTagger = self._check_tagger_import() |
| model_path_str = str(model_path) |
| if hasattr(self, '_model_path'): |
| model_path_str = self._model_path |
| elif model_path_str.endswith('.crfsuite'): |
| model_path_str = model_path_str.replace('.crfsuite', '.crf') |
| model = CRFModel.load(model_path_str) |
| tagger = CRFTagger.from_model(model) |
| return [tagger.tag(xseq) for xseq in X_test] |
|
|
|
|
| def get_trainer(trainer_name: str) -> CRFTrainerBase: |
| """Get trainer instance by name.""" |
| trainers = { |
| "python-crfsuite": PythonCRFSuiteTrainer, |
| "crfsuite-rs": CRFSuiteRsTrainer, |
| "underthesea-core": UndertheseaCoreTrainer, |
| } |
| if trainer_name not in trainers: |
| raise ValueError(f"Unknown trainer: {trainer_name}. Available: {list(trainers.keys())}") |
| return trainers[trainer_name]() |
|
|
|
|
| |
| |
| |
|
|
| def save_metadata(output_dir, cfg, data_stats, active_groups, active_templates, accuracy, hw_info, training_time): |
| """Save model metadata to YAML file.""" |
| model_cfg = cfg.model |
| metadata = { |
| "model": { |
| "name": "Vietnamese POS Tagger", |
| "type": "CRF (Conditional Random Field)", |
| "framework": model_cfg.trainer, |
| }, |
| "training": { |
| "dataset": data_stats.get("dataset", "unknown"), |
| "train_sentences": data_stats["train_sentences"], |
| "train_tokens": data_stats["train_tokens"], |
| "val_sentences": data_stats["val_sentences"], |
| "val_tokens": data_stats["val_tokens"], |
| "test_sentences": data_stats["test_sentences"], |
| "test_tokens": data_stats["test_tokens"], |
| "hyperparameters": { |
| "c1": model_cfg.c1, |
| "c2": model_cfg.c2, |
| "max_iterations": model_cfg.max_iterations, |
| }, |
| "feature_groups": active_groups, |
| "num_feature_templates": len(active_templates), |
| "feature_templates": active_templates, |
| "duration_seconds": round(training_time, 2), |
| }, |
| "performance": { |
| "test_accuracy": round(accuracy, 4), |
| }, |
| "environment": { |
| "platform": hw_info["platform"], |
| "cpu_model": hw_info.get("cpu_model", "Unknown"), |
| "python_version": hw_info["python_version"], |
| }, |
| "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "author": "undertheseanlp", |
| } |
|
|
| metadata_path = output_dir / "metadata.yaml" |
| with open(metadata_path, "w") as f: |
| yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False) |
| log.info(f"Metadata saved to {metadata_path}") |
|
|
|
|
| |
| |
| |
|
|
| @hydra.main(version_base=None, config_path="conf/pos", config_name="config") |
| def train(cfg: DictConfig): |
| """Train Vietnamese POS Tagger using CRF.""" |
| total_start_time = time.time() |
| start_datetime = datetime.now() |
|
|
| log.info(f"Config:\n{OmegaConf.to_yaml(cfg)}") |
|
|
| model_cfg = cfg.model |
|
|
| |
| active_templates = get_active_templates(model_cfg.features) |
| active_groups = get_active_groups(model_cfg.features) |
|
|
| |
| crf_trainer = get_trainer(model_cfg.trainer) |
|
|
| |
| original_cwd = Path(hydra.utils.get_original_cwd()) |
| output_dir = original_cwd / cfg.output |
| output_dir.mkdir(parents=True, exist_ok=True) |
| output_path = output_dir / "model.crfsuite" |
|
|
| |
| hw_info = get_hardware_info() |
|
|
| log.info("=" * 60) |
| log.info("POS Tagger Training") |
| log.info("=" * 60) |
| log.info(f"Dataset: {cfg.data.name}") |
| log.info(f"Trainer: {model_cfg.trainer}") |
| log.info(f"Features: {active_groups} ({len(active_templates)} templates)") |
| log.info(f"Platform: {hw_info['platform']}") |
| log.info(f"CPU: {hw_info.get('cpu_model', 'Unknown')}") |
| log.info(f"Output: {output_dir}") |
| log.info(f"Started: {start_datetime.strftime('%Y-%m-%d %H:%M:%S')}") |
| log.info("=" * 60) |
|
|
| |
| train_data, val_data, test_data, data_stats = load_data(cfg) |
|
|
| log.info(f"Train: {len(train_data)} sentences ({data_stats['train_tokens']} tokens)") |
| if val_data: |
| log.info(f"Validation: {len(val_data)} sentences ({data_stats['val_tokens']} tokens)") |
| log.info(f"Test: {len(test_data)} sentences ({data_stats['test_tokens']} tokens)") |
|
|
| |
| dictionary = None |
| if model_cfg.features.get("dictionary", True): |
| log.info("Loading dictionary...") |
| dictionary = load_dictionary() |
| log.info(f"Dictionary: {len(dictionary)} words") |
|
|
| |
| log.info("Extracting features...") |
| feature_start = time.time() |
| X_train = [sentence_to_features(tokens, active_templates, dictionary) for tokens, _ in train_data] |
| y_train = [tags for _, tags in train_data] |
| log.info(f"Feature extraction: {format_duration(time.time() - feature_start)}") |
|
|
| |
| log.info(f"Training CRF model with {model_cfg.trainer}...") |
| crf_start = time.time() |
| crf_trainer.train( |
| X_train, y_train, output_path, |
| model_cfg.c1, model_cfg.c2, model_cfg.max_iterations, |
| verbose=True, |
| ) |
| crf_time = time.time() - crf_start |
| log.info(f"Model saved to {output_path}") |
| log.info(f"CRF training: {format_duration(crf_time)}") |
|
|
| |
| log.info("Evaluating on test set...") |
|
|
| X_test = [sentence_to_features(tokens, active_templates, dictionary) for tokens, _ in test_data] |
| y_test = [tags for _, tags in test_data] |
|
|
| y_pred = crf_trainer.predict(output_path, X_test) |
|
|
| |
| y_test_flat = [tag for tags in y_test for tag in tags] |
| y_pred_flat = [tag for tags in y_pred for tag in tags] |
|
|
| accuracy = accuracy_score(y_test_flat, y_pred_flat) |
|
|
| log.info(f"Accuracy: {accuracy:.4f}") |
| log.info(f"Classification Report:\n{classification_report(y_test_flat, y_pred_flat)}") |
|
|
| total_time = time.time() - total_start_time |
|
|
| |
| save_metadata(output_dir, cfg, data_stats, active_groups, active_templates, accuracy, hw_info, total_time) |
|
|
| |
| log.info("=" * 60) |
| log.info("Example predictions:") |
| log.info("=" * 60) |
| for i in range(min(3, len(test_data))): |
| tokens = test_data[i][0] |
| true_tags = y_test[i] |
| pred_tags = y_pred[i] |
| pairs_true = " ".join(f"{t}/{g}" for t, g in zip(tokens, true_tags)) |
| pairs_pred = " ".join(f"{t}/{g}" for t, g in zip(tokens, pred_tags)) |
| log.info(f"True: {pairs_true}") |
| log.info(f"Pred: {pairs_pred}") |
|
|
| log.info("=" * 60) |
| log.info("Training Summary") |
| log.info("=" * 60) |
| log.info(f"Dataset: {cfg.data.name}") |
| log.info(f"Trainer: {model_cfg.trainer}") |
| log.info(f"Features: {active_groups} ({len(active_templates)} templates)") |
| log.info(f"Model: {output_path}") |
| log.info(f"Accuracy: {accuracy:.4f}") |
| log.info(f"Total time: {format_duration(total_time)}") |
| log.info("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| train() |
|
|