from __future__ import annotations import json import pickle from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd from .rules import _DEFAULT_ENGINE def text_features(text: str) -> Dict[str, float]: text = str(text) length = len(text) score, hits, _ = _DEFAULT_ENGINE.score(text) comma_count = text.count(",") + text.count(",") semicolon_count = text.count(";") + text.count(";") newline_count = text.count("\n") list_pattern = 1.0 if ("1." in text or "2." in text or "第一" in text) else 0.0 return { "f_ai_rule_score": float(score), "f_ai_rule_hits": float(sum(hits.values())), "f_len": float(length), "f_comma_ratio": float(comma_count / max(length, 1)), "f_semicolon_ratio": float(semicolon_count / max(length, 1)), "f_newline_ratio": float(newline_count / max(length, 1)), "f_list_pattern": list_pattern, } def feature_matrix(texts: pd.Series) -> np.ndarray: rows = [] for t in texts.astype(str).tolist(): f = text_features(t) rows.append( [ f["f_ai_rule_score"], f["f_ai_rule_hits"], f["f_len"], f["f_comma_ratio"], f["f_semicolon_ratio"], f["f_newline_ratio"], f["f_list_pattern"], ] ) return np.asarray(rows, dtype=float) def heuristic_scores(texts: pd.Series, kind: str = "bert") -> np.ndarray: """Return heuristic baseline scores (UNCALIBRATED WEIGHTS – for demo/quick checks only).""" x = feature_matrix(texts) if kind == "bert": w = np.array([1.5, 0.3, 0.0002, 0.1, 0.2, 0.05, 0.25], dtype=float) else: w = np.array([1.2, 0.2, 0.00025, 0.12, 0.18, 0.06, 0.22], dtype=float) b = -0.15 logits = x @ w + b probs = 1.0 / (1.0 + np.exp(-np.clip(logits, -20.0, 20.0))) return probs def _require_module(module_name: str): try: return __import__(module_name) except Exception as e: raise RuntimeError( f"Missing dependency `{module_name}`. " f"Please install via env/conda_environment.yml and env/环境准备.md." ) from e def _resolve_device(device: str) -> str: torch = _require_module("torch") d = (device or "auto").lower() if d == "auto": return "cuda" if torch.cuda.is_available() else "cpu" if d == "cuda" and not torch.cuda.is_available(): raise RuntimeError("Requested device=cuda but CUDA is not available.") return d def _sigmoid(x: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-np.clip(x, -20.0, 20.0))) def train_tfidf_lr_baseline( train_df: pd.DataFrame, dev_df: pd.DataFrame, seed: int = 42, ) -> tuple[dict, np.ndarray]: _require_module("sklearn") from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000) x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist()) y_train = train_df["label"].astype(int).to_numpy() clf = LogisticRegression(max_iter=2000, random_state=seed) clf.fit(x_train, y_train) x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist()) if hasattr(clf, "predict_proba"): dev_scores = clf.predict_proba(x_dev)[:, 1] else: raw = clf.decision_function(x_dev) dev_scores = _sigmoid(raw) model_payload = { "mode": "tfidf_lr", "vectorizer": vectorizer, "classifier": clf, "model_name": "tfidf_logreg", } return model_payload, dev_scores def train_tfidf_svm_baseline( train_df: pd.DataFrame, dev_df: pd.DataFrame, ) -> tuple[dict, np.ndarray]: _require_module("sklearn") from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.svm import LinearSVC vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000) x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist()) y_train = train_df["label"].astype(int).to_numpy() clf = LinearSVC() clf.fit(x_train, y_train) x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist()) raw = clf.decision_function(x_dev) dev_scores = _sigmoid(raw) model_payload = { "mode": "tfidf_svm", "vectorizer": vectorizer, "classifier": clf, "model_name": "tfidf_linearsvc", } return model_payload, dev_scores class _TokenizedDataset: """Dataset that returns tokenized inputs as tensors (suitable for DataCollatorWithPadding).""" def __init__(self, texts: List[str], labels: Optional[np.ndarray], tokenizer, max_len: int): self.texts = texts self.labels = labels self.tokenizer = tokenizer self.max_len = max_len def __len__(self) -> int: return len(self.texts) def __getitem__(self, idx: int): torch = _require_module("torch") enc = self.tokenizer(self.texts[idx], truncation=True, max_length=self.max_len) item = {k: torch.tensor(v, dtype=torch.long) for k, v in enc.items()} if self.labels is not None: item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long) return item def _make_transformer_classifier(base_model, hidden_size: int, num_labels: int = 2, dropout: float = 0.3, intermediate: int = 512): """Factory for a custom transformer classifier with an MLP head.""" torch = _require_module("torch") import torch.nn as nn import torch.nn.functional as F class TransformerClassifier(nn.Module): def __init__(self): super().__init__() self.base = base_model self.dropout = nn.Dropout(dropout) self.intermediate = nn.Linear(hidden_size, intermediate) self.activation = nn.ReLU() self.classifier = nn.Linear(intermediate, num_labels) def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs): outputs = self.base(input_ids=input_ids, attention_mask=attention_mask, **kwargs) cls = outputs.last_hidden_state[:, 0, :] x = self.dropout(cls) x = self.intermediate(x) x = self.activation(x) logits = self.classifier(x) loss = None if labels is not None: loss = F.cross_entropy(logits, labels) return type("Out", (object,), {"loss": loss, "logits": logits})() return TransformerClassifier() def _build_transformer_loader( texts: List[str], labels: Optional[np.ndarray], tokenizer, max_len: int, batch_size: int, shuffle: bool, ): torch = _require_module("torch") from torch.utils.data import DataLoader from transformers import DataCollatorWithPadding ds = _TokenizedDataset(texts, labels, tokenizer, max_len) collator = DataCollatorWithPadding(tokenizer, padding="max_length", max_length=max_len) return DataLoader(ds, batch_size=batch_size, shuffle=shuffle, collate_fn=collator) def _predict_with_runtime( model, tokenizer, texts: List[str], device: str, max_len: int, batch_size: int, ) -> np.ndarray: torch = _require_module("torch") loader = _build_transformer_loader( texts=texts, labels=None, tokenizer=tokenizer, max_len=max_len, batch_size=batch_size, shuffle=False, ) model.eval() model.to(device) score_chunks: List[np.ndarray] = [] with torch.no_grad(): for batch in loader: if "labels" in batch: batch.pop("labels") for k in batch: batch[k] = batch[k].to(device) out = model(**batch) probs = torch.softmax(out.logits, dim=-1)[:, 1] score_chunks.append(probs.detach().cpu().numpy()) if not score_chunks: return np.array([], dtype=float) return np.concatenate(score_chunks).astype(float) def _save_transformer_model(model, tokenizer, model_dir: Path, use_custom_head: bool = False) -> None: model_dir = Path(model_dir) model_dir.mkdir(parents=True, exist_ok=True) tokenizer.save_pretrained(model_dir) if use_custom_head: torch = _require_module("torch") torch.save(model.state_dict(), model_dir / "pytorch_model.bin") if hasattr(model, "base") and hasattr(model.base, "config"): model.base.config.save_pretrained(model_dir) meta = { "use_custom_head": True, "hidden_size": model.intermediate.in_features, "intermediate": model.classifier.in_features, "dropout": float(model.dropout.p), } with open(model_dir / "model_meta.json", "w", encoding="utf-8") as f: json.dump(meta, f, indent=2) else: model.save_pretrained(model_dir) def train_transformer_classifier( train_df: pd.DataFrame, dev_df: pd.DataFrame, model_id: str, model_output_dir: Path, seed: int = 42, device: str = "auto", epochs: int = 2, batch_size: int = 8, eval_batch_size: int = 16, learning_rate: float = 2e-5, max_len: int = 256, weight_decay: float = 0.01, warmup_ratio: float = 0.06, gradient_accumulation_steps: int = 1, use_custom_head: bool = True, custom_head_dropout: float = 0.3, custom_head_intermediate: int = 512, save_best: bool = True, early_stopping_patience: int | None = None, use_amp: bool = True, ) -> tuple[dict, np.ndarray]: torch = _require_module("torch") transformers = _require_module("transformers") from transformers import ( AutoModel, AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, ) from .metrics import best_threshold_by_f1, binary_metrics np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) resolved_device = _resolve_device(device) tokenizer = AutoTokenizer.from_pretrained(model_id) if use_custom_head: base = AutoModel.from_pretrained(model_id) hidden_size = base.config.hidden_size model = _make_transformer_classifier( base_model=base, hidden_size=hidden_size, num_labels=2, dropout=custom_head_dropout, intermediate=custom_head_intermediate, ) else: model = AutoModelForSequenceClassification.from_pretrained(model_id, num_labels=2) model.to(resolved_device) train_loader = _build_transformer_loader( texts=train_df["text"].astype(str).tolist(), labels=train_df["label"].astype(int).to_numpy(), tokenizer=tokenizer, max_len=max_len, batch_size=batch_size, shuffle=True, ) optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) grad_acc = max(1, int(gradient_accumulation_steps)) total_steps = max(1, len(train_loader) * max(1, epochs) // grad_acc) warmup_steps = int(total_steps * max(0.0, warmup_ratio)) scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps ) step_count = 0 model.train() optimizer.zero_grad(set_to_none=True) # AMP setup scaler = None if use_amp and resolved_device == "cuda": scaler = torch.amp.GradScaler("cuda") best_dev_f1 = -1.0 best_state: Optional[Dict[str, object]] = None best_dev_scores: Optional[np.ndarray] = None patience = early_stopping_patience epochs_no_improve = 0 training_log = [] for epoch in range(1, max(1, epochs) + 1): epoch_losses = [] for batch in train_loader: labels = batch.pop("labels").to(resolved_device) for k in batch: batch[k] = batch[k].to(resolved_device) if scaler is not None: with torch.amp.autocast("cuda"): out = model(**batch, labels=labels) loss = out.loss / grad_acc scaler.scale(loss).backward() else: out = model(**batch, labels=labels) loss = out.loss / grad_acc loss.backward() if loss is not None: epoch_losses.append(loss.item() * grad_acc) step_count += 1 if step_count % grad_acc == 0: if scaler is not None: scaler.step(optimizer) scaler.update() else: optimizer.step() scheduler.step() optimizer.zero_grad(set_to_none=True) avg_train_loss = float(np.mean(epoch_losses)) if epoch_losses else 0.0 # Per-epoch dev evaluation dev_scores = _predict_with_runtime( model=model, tokenizer=tokenizer, texts=dev_df["text"].astype(str).tolist(), device=resolved_device, max_len=max_len, batch_size=eval_batch_size, ) threshold = best_threshold_by_f1(dev_df["label"].astype(int).to_numpy(), dev_scores) m = binary_metrics(dev_df["label"].astype(int).to_numpy(), dev_scores, threshold) dev_f1 = m["f1"] training_log.append({"epoch": epoch, "train_loss": avg_train_loss, "dev_f1": dev_f1}) improved = False if save_best and dev_f1 > best_dev_f1: best_dev_f1 = dev_f1 best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} best_dev_scores = dev_scores.copy() improved = True if patience is not None: if improved: epochs_no_improve = 0 else: epochs_no_improve += 1 if epochs_no_improve >= patience: break if save_best and best_state is not None: model.load_state_dict(best_state) model_dir = model_output_dir / "hf_model_best" else: model_dir = model_output_dir / "hf_model" _save_transformer_model(model, tokenizer, model_dir, use_custom_head=use_custom_head) # Persist training log if training_log: import csv log_path = model_output_dir / "training_log.csv" log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["epoch", "train_loss", "dev_f1"]) writer.writeheader() writer.writerows(training_log) final_dev_scores = best_dev_scores if save_best and best_dev_scores is not None else dev_scores model_payload = { "mode": "transformer", "backend": "transformers", "model_name": model_id, "model_id": model_id, "model_dir": str(model_dir), "max_len": int(max_len), "eval_batch_size": int(eval_batch_size), "use_custom_head": use_custom_head, } return model_payload, final_dev_scores _TRANSFORMER_RUNTIME_CACHE: Dict[Tuple[str, str], Tuple[object, object]] = {} def clear_transformer_cache() -> None: """Clear the global transformer runtime cache (useful in long-lived notebooks/services).""" _TRANSFORMER_RUNTIME_CACHE.clear() def _load_transformer_runtime(model_dir: Path, device: str): key = (str(model_dir), device) if key in _TRANSFORMER_RUNTIME_CACHE: return _TRANSFORMER_RUNTIME_CACHE[key] from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer model_dir = Path(model_dir) tokenizer = AutoTokenizer.from_pretrained(model_dir) meta_path = model_dir / "model_meta.json" if meta_path.exists(): with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) base = AutoModel.from_pretrained(model_dir) hidden_size = meta.get("hidden_size", base.config.hidden_size) intermediate = meta.get("intermediate", 512) dropout = meta.get("dropout", 0.3) model = _make_transformer_classifier( base_model=base, hidden_size=hidden_size, num_labels=2, dropout=dropout, intermediate=intermediate, ) torch = _require_module("torch") state_dict = torch.load(model_dir / "pytorch_model.bin", map_location="cpu", weights_only=True) model.load_state_dict(state_dict) else: model = AutoModelForSequenceClassification.from_pretrained(model_dir) model.to(device) _TRANSFORMER_RUNTIME_CACHE[key] = (model, tokenizer) return model, tokenizer def predict_with_model_payload( model_payload: dict, df: pd.DataFrame, device: str = "auto", eval_batch_size: Optional[int] = None, max_len: Optional[int] = None, ) -> np.ndarray: mode = model_payload.get("mode", "") if mode == "transformer": resolved_device = _resolve_device(device) model_dir = Path(model_payload["model_dir"]) model, tokenizer = _load_transformer_runtime(model_dir=model_dir, device=resolved_device) bs = int(eval_batch_size or model_payload.get("eval_batch_size", 16)) ml = int(max_len or model_payload.get("max_len", 256)) return _predict_with_runtime( model=model, tokenizer=tokenizer, texts=df["text"].astype(str).tolist(), device=resolved_device, max_len=ml, batch_size=bs, ) if mode in {"tfidf_lr", "tfidf_svm"}: vec = model_payload["vectorizer"] clf = model_payload["classifier"] x = vec.transform(df["text"].astype(str).tolist()) if hasattr(clf, "predict_proba"): return clf.predict_proba(x)[:, 1] raw = clf.decision_function(x) return _sigmoid(raw) raise ValueError(f"Unsupported model payload mode: {mode}") def predict_with_mlp_payload(mlp_payload: dict, x: np.ndarray, device: str = "cpu") -> np.ndarray: """Run inference with a lightweight PyTorch MLP saved in E07 payload format. Parameters ---------- mlp_payload : dict Must contain keys ``in_dim``, ``hidden_dims``, ``state_dict``. x : np.ndarray Feature matrix (n_samples, n_features). Will be z-scored externally before calling this. device : str Target torch device. Returns ------- np.ndarray Sigmoid probabilities of shape (n_samples,). """ torch = _require_module("torch") nn = torch.nn class MLP(nn.Module): def __init__(self, in_dim: int, hidden_dims: tuple[int, ...]): super().__init__() layers = [] prev = in_dim for h in hidden_dims: layers.append(nn.Linear(prev, h)) layers.append(nn.ReLU()) layers.append(nn.Dropout(0.2)) prev = h layers.append(nn.Linear(prev, 1)) self.net = nn.Sequential(*layers) def forward(self, x): return self.net(x).squeeze(-1) in_dim = int(mlp_payload["in_dim"]) hidden_dims = tuple(mlp_payload["hidden_dims"]) model = MLP(in_dim, hidden_dims) model.load_state_dict(mlp_payload["state_dict"]) resolved_device = _resolve_device(device) model.to(resolved_device) model.eval() tx = torch.tensor(x, dtype=torch.float32, device=resolved_device) with torch.no_grad(): logits = model(tx).cpu().numpy() return _sigmoid(logits) def train_fusion_logistic( x: np.ndarray, y: np.ndarray, steps: int = 300, lr: float = 0.05, seed: int = 42 ) -> tuple[np.ndarray, float]: """Train a simple logistic regression on joint features (numpy only).""" np.random.seed(seed) n_features = x.shape[1] w = np.zeros(n_features, dtype=float) b = 0.0 for _ in range(steps): logits = x @ w + b p = _sigmoid(logits) grad_w = x.T @ (p - y) / len(y) grad_b = np.mean(p - y) w -= lr * grad_w b -= lr * grad_b return w, float(b) def fusion_predict_score(x: np.ndarray, w: np.ndarray, b: float) -> np.ndarray: """Predict with trained fusion logistic weights.""" return _sigmoid(x @ w + b) def save_model_payload(path: Path, model_payload: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("wb") as f: pickle.dump(model_payload, f) def load_model_payload(path: Path) -> dict: with path.open("rb") as f: return pickle.load(f)