Text Classification
Transformers
Safetensors
Chinese
chinese
ai-text-detection
ensemble
bert
roberta
qwen
lora
research
dataset
Instructions to use LUCIFerace/enhanced-replica-model-pack with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use LUCIFerace/enhanced-replica-model-pack with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="LUCIFerace/enhanced-replica-model-pack")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("LUCIFerace/enhanced-replica-model-pack", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| from __future__ import annotations | |
| import json | |
| import pickle | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from .rules import _DEFAULT_ENGINE | |
| def text_features(text: str) -> Dict[str, float]: | |
| text = str(text) | |
| length = len(text) | |
| score, hits, _ = _DEFAULT_ENGINE.score(text) | |
| comma_count = text.count(",") + text.count(",") | |
| semicolon_count = text.count(";") + text.count(";") | |
| newline_count = text.count("\n") | |
| list_pattern = 1.0 if ("1." in text or "2." in text or "第一" in text) else 0.0 | |
| return { | |
| "f_ai_rule_score": float(score), | |
| "f_ai_rule_hits": float(sum(hits.values())), | |
| "f_len": float(length), | |
| "f_comma_ratio": float(comma_count / max(length, 1)), | |
| "f_semicolon_ratio": float(semicolon_count / max(length, 1)), | |
| "f_newline_ratio": float(newline_count / max(length, 1)), | |
| "f_list_pattern": list_pattern, | |
| } | |
| def feature_matrix(texts: pd.Series) -> np.ndarray: | |
| rows = [] | |
| for t in texts.astype(str).tolist(): | |
| f = text_features(t) | |
| rows.append( | |
| [ | |
| f["f_ai_rule_score"], | |
| f["f_ai_rule_hits"], | |
| f["f_len"], | |
| f["f_comma_ratio"], | |
| f["f_semicolon_ratio"], | |
| f["f_newline_ratio"], | |
| f["f_list_pattern"], | |
| ] | |
| ) | |
| return np.asarray(rows, dtype=float) | |
| def heuristic_scores(texts: pd.Series, kind: str = "bert") -> np.ndarray: | |
| """Return heuristic baseline scores (UNCALIBRATED WEIGHTS – for demo/quick checks only).""" | |
| x = feature_matrix(texts) | |
| if kind == "bert": | |
| w = np.array([1.5, 0.3, 0.0002, 0.1, 0.2, 0.05, 0.25], dtype=float) | |
| else: | |
| w = np.array([1.2, 0.2, 0.00025, 0.12, 0.18, 0.06, 0.22], dtype=float) | |
| b = -0.15 | |
| logits = x @ w + b | |
| probs = 1.0 / (1.0 + np.exp(-np.clip(logits, -20.0, 20.0))) | |
| return probs | |
| def _require_module(module_name: str): | |
| try: | |
| return __import__(module_name) | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Missing dependency `{module_name}`. " | |
| f"Please install via env/conda_environment.yml and env/环境准备.md." | |
| ) from e | |
| def _resolve_device(device: str) -> str: | |
| torch = _require_module("torch") | |
| d = (device or "auto").lower() | |
| if d == "auto": | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| if d == "cuda" and not torch.cuda.is_available(): | |
| raise RuntimeError("Requested device=cuda but CUDA is not available.") | |
| return d | |
| def _sigmoid(x: np.ndarray) -> np.ndarray: | |
| return 1.0 / (1.0 + np.exp(-np.clip(x, -20.0, 20.0))) | |
| def train_tfidf_lr_baseline( | |
| train_df: pd.DataFrame, | |
| dev_df: pd.DataFrame, | |
| seed: int = 42, | |
| ) -> tuple[dict, np.ndarray]: | |
| _require_module("sklearn") | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000) | |
| x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist()) | |
| y_train = train_df["label"].astype(int).to_numpy() | |
| clf = LogisticRegression(max_iter=2000, random_state=seed) | |
| clf.fit(x_train, y_train) | |
| x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist()) | |
| if hasattr(clf, "predict_proba"): | |
| dev_scores = clf.predict_proba(x_dev)[:, 1] | |
| else: | |
| raw = clf.decision_function(x_dev) | |
| dev_scores = _sigmoid(raw) | |
| model_payload = { | |
| "mode": "tfidf_lr", | |
| "vectorizer": vectorizer, | |
| "classifier": clf, | |
| "model_name": "tfidf_logreg", | |
| } | |
| return model_payload, dev_scores | |
| def train_tfidf_svm_baseline( | |
| train_df: pd.DataFrame, | |
| dev_df: pd.DataFrame, | |
| ) -> tuple[dict, np.ndarray]: | |
| _require_module("sklearn") | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.svm import LinearSVC | |
| vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000) | |
| x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist()) | |
| y_train = train_df["label"].astype(int).to_numpy() | |
| clf = LinearSVC() | |
| clf.fit(x_train, y_train) | |
| x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist()) | |
| raw = clf.decision_function(x_dev) | |
| dev_scores = _sigmoid(raw) | |
| model_payload = { | |
| "mode": "tfidf_svm", | |
| "vectorizer": vectorizer, | |
| "classifier": clf, | |
| "model_name": "tfidf_linearsvc", | |
| } | |
| return model_payload, dev_scores | |
| class _TokenizedDataset: | |
| """Dataset that returns tokenized inputs as tensors (suitable for DataCollatorWithPadding).""" | |
| def __init__(self, texts: List[str], labels: Optional[np.ndarray], tokenizer, max_len: int): | |
| self.texts = texts | |
| self.labels = labels | |
| self.tokenizer = tokenizer | |
| self.max_len = max_len | |
| def __len__(self) -> int: | |
| return len(self.texts) | |
| def __getitem__(self, idx: int): | |
| torch = _require_module("torch") | |
| enc = self.tokenizer(self.texts[idx], truncation=True, max_length=self.max_len) | |
| item = {k: torch.tensor(v, dtype=torch.long) for k, v in enc.items()} | |
| if self.labels is not None: | |
| item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long) | |
| return item | |
| def _make_transformer_classifier(base_model, hidden_size: int, num_labels: int = 2, dropout: float = 0.3, intermediate: int = 512): | |
| """Factory for a custom transformer classifier with an MLP head.""" | |
| torch = _require_module("torch") | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| class TransformerClassifier(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.base = base_model | |
| self.dropout = nn.Dropout(dropout) | |
| self.intermediate = nn.Linear(hidden_size, intermediate) | |
| self.activation = nn.ReLU() | |
| self.classifier = nn.Linear(intermediate, num_labels) | |
| def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs): | |
| outputs = self.base(input_ids=input_ids, attention_mask=attention_mask, **kwargs) | |
| cls = outputs.last_hidden_state[:, 0, :] | |
| x = self.dropout(cls) | |
| x = self.intermediate(x) | |
| x = self.activation(x) | |
| logits = self.classifier(x) | |
| loss = None | |
| if labels is not None: | |
| loss = F.cross_entropy(logits, labels) | |
| return type("Out", (object,), {"loss": loss, "logits": logits})() | |
| return TransformerClassifier() | |
| def _build_transformer_loader( | |
| texts: List[str], | |
| labels: Optional[np.ndarray], | |
| tokenizer, | |
| max_len: int, | |
| batch_size: int, | |
| shuffle: bool, | |
| ): | |
| torch = _require_module("torch") | |
| from torch.utils.data import DataLoader | |
| from transformers import DataCollatorWithPadding | |
| ds = _TokenizedDataset(texts, labels, tokenizer, max_len) | |
| collator = DataCollatorWithPadding(tokenizer, padding="max_length", max_length=max_len) | |
| return DataLoader(ds, batch_size=batch_size, shuffle=shuffle, collate_fn=collator) | |
| def _predict_with_runtime( | |
| model, | |
| tokenizer, | |
| texts: List[str], | |
| device: str, | |
| max_len: int, | |
| batch_size: int, | |
| ) -> np.ndarray: | |
| torch = _require_module("torch") | |
| loader = _build_transformer_loader( | |
| texts=texts, | |
| labels=None, | |
| tokenizer=tokenizer, | |
| max_len=max_len, | |
| batch_size=batch_size, | |
| shuffle=False, | |
| ) | |
| model.eval() | |
| model.to(device) | |
| score_chunks: List[np.ndarray] = [] | |
| with torch.no_grad(): | |
| for batch in loader: | |
| if "labels" in batch: | |
| batch.pop("labels") | |
| for k in batch: | |
| batch[k] = batch[k].to(device) | |
| out = model(**batch) | |
| probs = torch.softmax(out.logits, dim=-1)[:, 1] | |
| score_chunks.append(probs.detach().cpu().numpy()) | |
| if not score_chunks: | |
| return np.array([], dtype=float) | |
| return np.concatenate(score_chunks).astype(float) | |
| def _save_transformer_model(model, tokenizer, model_dir: Path, use_custom_head: bool = False) -> None: | |
| model_dir = Path(model_dir) | |
| model_dir.mkdir(parents=True, exist_ok=True) | |
| tokenizer.save_pretrained(model_dir) | |
| if use_custom_head: | |
| torch = _require_module("torch") | |
| torch.save(model.state_dict(), model_dir / "pytorch_model.bin") | |
| if hasattr(model, "base") and hasattr(model.base, "config"): | |
| model.base.config.save_pretrained(model_dir) | |
| meta = { | |
| "use_custom_head": True, | |
| "hidden_size": model.intermediate.in_features, | |
| "intermediate": model.classifier.in_features, | |
| "dropout": float(model.dropout.p), | |
| } | |
| with open(model_dir / "model_meta.json", "w", encoding="utf-8") as f: | |
| json.dump(meta, f, indent=2) | |
| else: | |
| model.save_pretrained(model_dir) | |
| def train_transformer_classifier( | |
| train_df: pd.DataFrame, | |
| dev_df: pd.DataFrame, | |
| model_id: str, | |
| model_output_dir: Path, | |
| seed: int = 42, | |
| device: str = "auto", | |
| epochs: int = 2, | |
| batch_size: int = 8, | |
| eval_batch_size: int = 16, | |
| learning_rate: float = 2e-5, | |
| max_len: int = 256, | |
| weight_decay: float = 0.01, | |
| warmup_ratio: float = 0.06, | |
| gradient_accumulation_steps: int = 1, | |
| use_custom_head: bool = True, | |
| custom_head_dropout: float = 0.3, | |
| custom_head_intermediate: int = 512, | |
| save_best: bool = True, | |
| early_stopping_patience: int | None = None, | |
| use_amp: bool = True, | |
| ) -> tuple[dict, np.ndarray]: | |
| torch = _require_module("torch") | |
| transformers = _require_module("transformers") | |
| from transformers import ( | |
| AutoModel, | |
| AutoModelForSequenceClassification, | |
| AutoTokenizer, | |
| get_linear_schedule_with_warmup, | |
| ) | |
| from .metrics import best_threshold_by_f1, binary_metrics | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| resolved_device = _resolve_device(device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| if use_custom_head: | |
| base = AutoModel.from_pretrained(model_id) | |
| hidden_size = base.config.hidden_size | |
| model = _make_transformer_classifier( | |
| base_model=base, | |
| hidden_size=hidden_size, | |
| num_labels=2, | |
| dropout=custom_head_dropout, | |
| intermediate=custom_head_intermediate, | |
| ) | |
| else: | |
| model = AutoModelForSequenceClassification.from_pretrained(model_id, num_labels=2) | |
| model.to(resolved_device) | |
| train_loader = _build_transformer_loader( | |
| texts=train_df["text"].astype(str).tolist(), | |
| labels=train_df["label"].astype(int).to_numpy(), | |
| tokenizer=tokenizer, | |
| max_len=max_len, | |
| batch_size=batch_size, | |
| shuffle=True, | |
| ) | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) | |
| grad_acc = max(1, int(gradient_accumulation_steps)) | |
| total_steps = max(1, len(train_loader) * max(1, epochs) // grad_acc) | |
| warmup_steps = int(total_steps * max(0.0, warmup_ratio)) | |
| scheduler = get_linear_schedule_with_warmup( | |
| optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps | |
| ) | |
| step_count = 0 | |
| model.train() | |
| optimizer.zero_grad(set_to_none=True) | |
| # AMP setup | |
| scaler = None | |
| if use_amp and resolved_device == "cuda": | |
| scaler = torch.amp.GradScaler("cuda") | |
| best_dev_f1 = -1.0 | |
| best_state: Optional[Dict[str, object]] = None | |
| best_dev_scores: Optional[np.ndarray] = None | |
| patience = early_stopping_patience | |
| epochs_no_improve = 0 | |
| training_log = [] | |
| for epoch in range(1, max(1, epochs) + 1): | |
| epoch_losses = [] | |
| for batch in train_loader: | |
| labels = batch.pop("labels").to(resolved_device) | |
| for k in batch: | |
| batch[k] = batch[k].to(resolved_device) | |
| if scaler is not None: | |
| with torch.amp.autocast("cuda"): | |
| out = model(**batch, labels=labels) | |
| loss = out.loss / grad_acc | |
| scaler.scale(loss).backward() | |
| else: | |
| out = model(**batch, labels=labels) | |
| loss = out.loss / grad_acc | |
| loss.backward() | |
| if loss is not None: | |
| epoch_losses.append(loss.item() * grad_acc) | |
| step_count += 1 | |
| if step_count % grad_acc == 0: | |
| if scaler is not None: | |
| scaler.step(optimizer) | |
| scaler.update() | |
| else: | |
| optimizer.step() | |
| scheduler.step() | |
| optimizer.zero_grad(set_to_none=True) | |
| avg_train_loss = float(np.mean(epoch_losses)) if epoch_losses else 0.0 | |
| # Per-epoch dev evaluation | |
| dev_scores = _predict_with_runtime( | |
| model=model, | |
| tokenizer=tokenizer, | |
| texts=dev_df["text"].astype(str).tolist(), | |
| device=resolved_device, | |
| max_len=max_len, | |
| batch_size=eval_batch_size, | |
| ) | |
| threshold = best_threshold_by_f1(dev_df["label"].astype(int).to_numpy(), dev_scores) | |
| m = binary_metrics(dev_df["label"].astype(int).to_numpy(), dev_scores, threshold) | |
| dev_f1 = m["f1"] | |
| training_log.append({"epoch": epoch, "train_loss": avg_train_loss, "dev_f1": dev_f1}) | |
| improved = False | |
| if save_best and dev_f1 > best_dev_f1: | |
| best_dev_f1 = dev_f1 | |
| best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} | |
| best_dev_scores = dev_scores.copy() | |
| improved = True | |
| if patience is not None: | |
| if improved: | |
| epochs_no_improve = 0 | |
| else: | |
| epochs_no_improve += 1 | |
| if epochs_no_improve >= patience: | |
| break | |
| if save_best and best_state is not None: | |
| model.load_state_dict(best_state) | |
| model_dir = model_output_dir / "hf_model_best" | |
| else: | |
| model_dir = model_output_dir / "hf_model" | |
| _save_transformer_model(model, tokenizer, model_dir, use_custom_head=use_custom_head) | |
| # Persist training log | |
| if training_log: | |
| import csv | |
| log_path = model_output_dir / "training_log.csv" | |
| log_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(log_path, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=["epoch", "train_loss", "dev_f1"]) | |
| writer.writeheader() | |
| writer.writerows(training_log) | |
| final_dev_scores = best_dev_scores if save_best and best_dev_scores is not None else dev_scores | |
| model_payload = { | |
| "mode": "transformer", | |
| "backend": "transformers", | |
| "model_name": model_id, | |
| "model_id": model_id, | |
| "model_dir": str(model_dir), | |
| "max_len": int(max_len), | |
| "eval_batch_size": int(eval_batch_size), | |
| "use_custom_head": use_custom_head, | |
| } | |
| return model_payload, final_dev_scores | |
| _TRANSFORMER_RUNTIME_CACHE: Dict[Tuple[str, str], Tuple[object, object]] = {} | |
| def clear_transformer_cache() -> None: | |
| """Clear the global transformer runtime cache (useful in long-lived notebooks/services).""" | |
| _TRANSFORMER_RUNTIME_CACHE.clear() | |
| def _load_transformer_runtime(model_dir: Path, device: str): | |
| key = (str(model_dir), device) | |
| if key in _TRANSFORMER_RUNTIME_CACHE: | |
| return _TRANSFORMER_RUNTIME_CACHE[key] | |
| from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer | |
| model_dir = Path(model_dir) | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
| meta_path = model_dir / "model_meta.json" | |
| if meta_path.exists(): | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| base = AutoModel.from_pretrained(model_dir) | |
| hidden_size = meta.get("hidden_size", base.config.hidden_size) | |
| intermediate = meta.get("intermediate", 512) | |
| dropout = meta.get("dropout", 0.3) | |
| model = _make_transformer_classifier( | |
| base_model=base, | |
| hidden_size=hidden_size, | |
| num_labels=2, | |
| dropout=dropout, | |
| intermediate=intermediate, | |
| ) | |
| torch = _require_module("torch") | |
| state_dict = torch.load(model_dir / "pytorch_model.bin", map_location="cpu", weights_only=True) | |
| model.load_state_dict(state_dict) | |
| else: | |
| model = AutoModelForSequenceClassification.from_pretrained(model_dir) | |
| model.to(device) | |
| _TRANSFORMER_RUNTIME_CACHE[key] = (model, tokenizer) | |
| return model, tokenizer | |
| def predict_with_model_payload( | |
| model_payload: dict, | |
| df: pd.DataFrame, | |
| device: str = "auto", | |
| eval_batch_size: Optional[int] = None, | |
| max_len: Optional[int] = None, | |
| ) -> np.ndarray: | |
| mode = model_payload.get("mode", "") | |
| if mode == "transformer": | |
| resolved_device = _resolve_device(device) | |
| model_dir = Path(model_payload["model_dir"]) | |
| model, tokenizer = _load_transformer_runtime(model_dir=model_dir, device=resolved_device) | |
| bs = int(eval_batch_size or model_payload.get("eval_batch_size", 16)) | |
| ml = int(max_len or model_payload.get("max_len", 256)) | |
| return _predict_with_runtime( | |
| model=model, | |
| tokenizer=tokenizer, | |
| texts=df["text"].astype(str).tolist(), | |
| device=resolved_device, | |
| max_len=ml, | |
| batch_size=bs, | |
| ) | |
| if mode in {"tfidf_lr", "tfidf_svm"}: | |
| vec = model_payload["vectorizer"] | |
| clf = model_payload["classifier"] | |
| x = vec.transform(df["text"].astype(str).tolist()) | |
| if hasattr(clf, "predict_proba"): | |
| return clf.predict_proba(x)[:, 1] | |
| raw = clf.decision_function(x) | |
| return _sigmoid(raw) | |
| raise ValueError(f"Unsupported model payload mode: {mode}") | |
| def predict_with_mlp_payload(mlp_payload: dict, x: np.ndarray, device: str = "cpu") -> np.ndarray: | |
| """Run inference with a lightweight PyTorch MLP saved in E07 payload format. | |
| Parameters | |
| ---------- | |
| mlp_payload : dict | |
| Must contain keys ``in_dim``, ``hidden_dims``, ``state_dict``. | |
| x : np.ndarray | |
| Feature matrix (n_samples, n_features). Will be z-scored externally before calling this. | |
| device : str | |
| Target torch device. | |
| Returns | |
| ------- | |
| np.ndarray | |
| Sigmoid probabilities of shape (n_samples,). | |
| """ | |
| torch = _require_module("torch") | |
| nn = torch.nn | |
| class MLP(nn.Module): | |
| def __init__(self, in_dim: int, hidden_dims: tuple[int, ...]): | |
| super().__init__() | |
| layers = [] | |
| prev = in_dim | |
| for h in hidden_dims: | |
| layers.append(nn.Linear(prev, h)) | |
| layers.append(nn.ReLU()) | |
| layers.append(nn.Dropout(0.2)) | |
| prev = h | |
| layers.append(nn.Linear(prev, 1)) | |
| self.net = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| in_dim = int(mlp_payload["in_dim"]) | |
| hidden_dims = tuple(mlp_payload["hidden_dims"]) | |
| model = MLP(in_dim, hidden_dims) | |
| model.load_state_dict(mlp_payload["state_dict"]) | |
| resolved_device = _resolve_device(device) | |
| model.to(resolved_device) | |
| model.eval() | |
| tx = torch.tensor(x, dtype=torch.float32, device=resolved_device) | |
| with torch.no_grad(): | |
| logits = model(tx).cpu().numpy() | |
| return _sigmoid(logits) | |
| def train_fusion_logistic( | |
| x: np.ndarray, y: np.ndarray, steps: int = 300, lr: float = 0.05, seed: int = 42 | |
| ) -> tuple[np.ndarray, float]: | |
| """Train a simple logistic regression on joint features (numpy only).""" | |
| np.random.seed(seed) | |
| n_features = x.shape[1] | |
| w = np.zeros(n_features, dtype=float) | |
| b = 0.0 | |
| for _ in range(steps): | |
| logits = x @ w + b | |
| p = _sigmoid(logits) | |
| grad_w = x.T @ (p - y) / len(y) | |
| grad_b = np.mean(p - y) | |
| w -= lr * grad_w | |
| b -= lr * grad_b | |
| return w, float(b) | |
| def fusion_predict_score(x: np.ndarray, w: np.ndarray, b: float) -> np.ndarray: | |
| """Predict with trained fusion logistic weights.""" | |
| return _sigmoid(x @ w + b) | |
| def save_model_payload(path: Path, model_payload: dict) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("wb") as f: | |
| pickle.dump(model_payload, f) | |
| def load_model_payload(path: Path) -> dict: | |
| with path.open("rb") as f: | |
| return pickle.load(f) | |