LUCIFerace's picture
Add files using upload-large-folder tool
4a0f6a5 verified
from __future__ import annotations
import json
import pickle
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from .rules import _DEFAULT_ENGINE
def text_features(text: str) -> Dict[str, float]:
text = str(text)
length = len(text)
score, hits, _ = _DEFAULT_ENGINE.score(text)
comma_count = text.count(",") + text.count(",")
semicolon_count = text.count(";") + text.count(";")
newline_count = text.count("\n")
list_pattern = 1.0 if ("1." in text or "2." in text or "第一" in text) else 0.0
return {
"f_ai_rule_score": float(score),
"f_ai_rule_hits": float(sum(hits.values())),
"f_len": float(length),
"f_comma_ratio": float(comma_count / max(length, 1)),
"f_semicolon_ratio": float(semicolon_count / max(length, 1)),
"f_newline_ratio": float(newline_count / max(length, 1)),
"f_list_pattern": list_pattern,
}
def feature_matrix(texts: pd.Series) -> np.ndarray:
rows = []
for t in texts.astype(str).tolist():
f = text_features(t)
rows.append(
[
f["f_ai_rule_score"],
f["f_ai_rule_hits"],
f["f_len"],
f["f_comma_ratio"],
f["f_semicolon_ratio"],
f["f_newline_ratio"],
f["f_list_pattern"],
]
)
return np.asarray(rows, dtype=float)
def heuristic_scores(texts: pd.Series, kind: str = "bert") -> np.ndarray:
"""Return heuristic baseline scores (UNCALIBRATED WEIGHTS – for demo/quick checks only)."""
x = feature_matrix(texts)
if kind == "bert":
w = np.array([1.5, 0.3, 0.0002, 0.1, 0.2, 0.05, 0.25], dtype=float)
else:
w = np.array([1.2, 0.2, 0.00025, 0.12, 0.18, 0.06, 0.22], dtype=float)
b = -0.15
logits = x @ w + b
probs = 1.0 / (1.0 + np.exp(-np.clip(logits, -20.0, 20.0)))
return probs
def _require_module(module_name: str):
try:
return __import__(module_name)
except Exception as e:
raise RuntimeError(
f"Missing dependency `{module_name}`. "
f"Please install via env/conda_environment.yml and env/环境准备.md."
) from e
def _resolve_device(device: str) -> str:
torch = _require_module("torch")
d = (device or "auto").lower()
if d == "auto":
return "cuda" if torch.cuda.is_available() else "cpu"
if d == "cuda" and not torch.cuda.is_available():
raise RuntimeError("Requested device=cuda but CUDA is not available.")
return d
def _sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -20.0, 20.0)))
def train_tfidf_lr_baseline(
train_df: pd.DataFrame,
dev_df: pd.DataFrame,
seed: int = 42,
) -> tuple[dict, np.ndarray]:
_require_module("sklearn")
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000)
x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist())
y_train = train_df["label"].astype(int).to_numpy()
clf = LogisticRegression(max_iter=2000, random_state=seed)
clf.fit(x_train, y_train)
x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist())
if hasattr(clf, "predict_proba"):
dev_scores = clf.predict_proba(x_dev)[:, 1]
else:
raw = clf.decision_function(x_dev)
dev_scores = _sigmoid(raw)
model_payload = {
"mode": "tfidf_lr",
"vectorizer": vectorizer,
"classifier": clf,
"model_name": "tfidf_logreg",
}
return model_payload, dev_scores
def train_tfidf_svm_baseline(
train_df: pd.DataFrame,
dev_df: pd.DataFrame,
) -> tuple[dict, np.ndarray]:
_require_module("sklearn")
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(2, 5), min_df=2, max_features=50000)
x_train = vectorizer.fit_transform(train_df["text"].astype(str).tolist())
y_train = train_df["label"].astype(int).to_numpy()
clf = LinearSVC()
clf.fit(x_train, y_train)
x_dev = vectorizer.transform(dev_df["text"].astype(str).tolist())
raw = clf.decision_function(x_dev)
dev_scores = _sigmoid(raw)
model_payload = {
"mode": "tfidf_svm",
"vectorizer": vectorizer,
"classifier": clf,
"model_name": "tfidf_linearsvc",
}
return model_payload, dev_scores
class _TokenizedDataset:
"""Dataset that returns tokenized inputs as tensors (suitable for DataCollatorWithPadding)."""
def __init__(self, texts: List[str], labels: Optional[np.ndarray], tokenizer, max_len: int):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self) -> int:
return len(self.texts)
def __getitem__(self, idx: int):
torch = _require_module("torch")
enc = self.tokenizer(self.texts[idx], truncation=True, max_length=self.max_len)
item = {k: torch.tensor(v, dtype=torch.long) for k, v in enc.items()}
if self.labels is not None:
item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
return item
def _make_transformer_classifier(base_model, hidden_size: int, num_labels: int = 2, dropout: float = 0.3, intermediate: int = 512):
"""Factory for a custom transformer classifier with an MLP head."""
torch = _require_module("torch")
import torch.nn as nn
import torch.nn.functional as F
class TransformerClassifier(nn.Module):
def __init__(self):
super().__init__()
self.base = base_model
self.dropout = nn.Dropout(dropout)
self.intermediate = nn.Linear(hidden_size, intermediate)
self.activation = nn.ReLU()
self.classifier = nn.Linear(intermediate, num_labels)
def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs):
outputs = self.base(input_ids=input_ids, attention_mask=attention_mask, **kwargs)
cls = outputs.last_hidden_state[:, 0, :]
x = self.dropout(cls)
x = self.intermediate(x)
x = self.activation(x)
logits = self.classifier(x)
loss = None
if labels is not None:
loss = F.cross_entropy(logits, labels)
return type("Out", (object,), {"loss": loss, "logits": logits})()
return TransformerClassifier()
def _build_transformer_loader(
texts: List[str],
labels: Optional[np.ndarray],
tokenizer,
max_len: int,
batch_size: int,
shuffle: bool,
):
torch = _require_module("torch")
from torch.utils.data import DataLoader
from transformers import DataCollatorWithPadding
ds = _TokenizedDataset(texts, labels, tokenizer, max_len)
collator = DataCollatorWithPadding(tokenizer, padding="max_length", max_length=max_len)
return DataLoader(ds, batch_size=batch_size, shuffle=shuffle, collate_fn=collator)
def _predict_with_runtime(
model,
tokenizer,
texts: List[str],
device: str,
max_len: int,
batch_size: int,
) -> np.ndarray:
torch = _require_module("torch")
loader = _build_transformer_loader(
texts=texts,
labels=None,
tokenizer=tokenizer,
max_len=max_len,
batch_size=batch_size,
shuffle=False,
)
model.eval()
model.to(device)
score_chunks: List[np.ndarray] = []
with torch.no_grad():
for batch in loader:
if "labels" in batch:
batch.pop("labels")
for k in batch:
batch[k] = batch[k].to(device)
out = model(**batch)
probs = torch.softmax(out.logits, dim=-1)[:, 1]
score_chunks.append(probs.detach().cpu().numpy())
if not score_chunks:
return np.array([], dtype=float)
return np.concatenate(score_chunks).astype(float)
def _save_transformer_model(model, tokenizer, model_dir: Path, use_custom_head: bool = False) -> None:
model_dir = Path(model_dir)
model_dir.mkdir(parents=True, exist_ok=True)
tokenizer.save_pretrained(model_dir)
if use_custom_head:
torch = _require_module("torch")
torch.save(model.state_dict(), model_dir / "pytorch_model.bin")
if hasattr(model, "base") and hasattr(model.base, "config"):
model.base.config.save_pretrained(model_dir)
meta = {
"use_custom_head": True,
"hidden_size": model.intermediate.in_features,
"intermediate": model.classifier.in_features,
"dropout": float(model.dropout.p),
}
with open(model_dir / "model_meta.json", "w", encoding="utf-8") as f:
json.dump(meta, f, indent=2)
else:
model.save_pretrained(model_dir)
def train_transformer_classifier(
train_df: pd.DataFrame,
dev_df: pd.DataFrame,
model_id: str,
model_output_dir: Path,
seed: int = 42,
device: str = "auto",
epochs: int = 2,
batch_size: int = 8,
eval_batch_size: int = 16,
learning_rate: float = 2e-5,
max_len: int = 256,
weight_decay: float = 0.01,
warmup_ratio: float = 0.06,
gradient_accumulation_steps: int = 1,
use_custom_head: bool = True,
custom_head_dropout: float = 0.3,
custom_head_intermediate: int = 512,
save_best: bool = True,
early_stopping_patience: int | None = None,
use_amp: bool = True,
) -> tuple[dict, np.ndarray]:
torch = _require_module("torch")
transformers = _require_module("transformers")
from transformers import (
AutoModel,
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
)
from .metrics import best_threshold_by_f1, binary_metrics
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
resolved_device = _resolve_device(device)
tokenizer = AutoTokenizer.from_pretrained(model_id)
if use_custom_head:
base = AutoModel.from_pretrained(model_id)
hidden_size = base.config.hidden_size
model = _make_transformer_classifier(
base_model=base,
hidden_size=hidden_size,
num_labels=2,
dropout=custom_head_dropout,
intermediate=custom_head_intermediate,
)
else:
model = AutoModelForSequenceClassification.from_pretrained(model_id, num_labels=2)
model.to(resolved_device)
train_loader = _build_transformer_loader(
texts=train_df["text"].astype(str).tolist(),
labels=train_df["label"].astype(int).to_numpy(),
tokenizer=tokenizer,
max_len=max_len,
batch_size=batch_size,
shuffle=True,
)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
grad_acc = max(1, int(gradient_accumulation_steps))
total_steps = max(1, len(train_loader) * max(1, epochs) // grad_acc)
warmup_steps = int(total_steps * max(0.0, warmup_ratio))
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)
step_count = 0
model.train()
optimizer.zero_grad(set_to_none=True)
# AMP setup
scaler = None
if use_amp and resolved_device == "cuda":
scaler = torch.amp.GradScaler("cuda")
best_dev_f1 = -1.0
best_state: Optional[Dict[str, object]] = None
best_dev_scores: Optional[np.ndarray] = None
patience = early_stopping_patience
epochs_no_improve = 0
training_log = []
for epoch in range(1, max(1, epochs) + 1):
epoch_losses = []
for batch in train_loader:
labels = batch.pop("labels").to(resolved_device)
for k in batch:
batch[k] = batch[k].to(resolved_device)
if scaler is not None:
with torch.amp.autocast("cuda"):
out = model(**batch, labels=labels)
loss = out.loss / grad_acc
scaler.scale(loss).backward()
else:
out = model(**batch, labels=labels)
loss = out.loss / grad_acc
loss.backward()
if loss is not None:
epoch_losses.append(loss.item() * grad_acc)
step_count += 1
if step_count % grad_acc == 0:
if scaler is not None:
scaler.step(optimizer)
scaler.update()
else:
optimizer.step()
scheduler.step()
optimizer.zero_grad(set_to_none=True)
avg_train_loss = float(np.mean(epoch_losses)) if epoch_losses else 0.0
# Per-epoch dev evaluation
dev_scores = _predict_with_runtime(
model=model,
tokenizer=tokenizer,
texts=dev_df["text"].astype(str).tolist(),
device=resolved_device,
max_len=max_len,
batch_size=eval_batch_size,
)
threshold = best_threshold_by_f1(dev_df["label"].astype(int).to_numpy(), dev_scores)
m = binary_metrics(dev_df["label"].astype(int).to_numpy(), dev_scores, threshold)
dev_f1 = m["f1"]
training_log.append({"epoch": epoch, "train_loss": avg_train_loss, "dev_f1": dev_f1})
improved = False
if save_best and dev_f1 > best_dev_f1:
best_dev_f1 = dev_f1
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
best_dev_scores = dev_scores.copy()
improved = True
if patience is not None:
if improved:
epochs_no_improve = 0
else:
epochs_no_improve += 1
if epochs_no_improve >= patience:
break
if save_best and best_state is not None:
model.load_state_dict(best_state)
model_dir = model_output_dir / "hf_model_best"
else:
model_dir = model_output_dir / "hf_model"
_save_transformer_model(model, tokenizer, model_dir, use_custom_head=use_custom_head)
# Persist training log
if training_log:
import csv
log_path = model_output_dir / "training_log.csv"
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["epoch", "train_loss", "dev_f1"])
writer.writeheader()
writer.writerows(training_log)
final_dev_scores = best_dev_scores if save_best and best_dev_scores is not None else dev_scores
model_payload = {
"mode": "transformer",
"backend": "transformers",
"model_name": model_id,
"model_id": model_id,
"model_dir": str(model_dir),
"max_len": int(max_len),
"eval_batch_size": int(eval_batch_size),
"use_custom_head": use_custom_head,
}
return model_payload, final_dev_scores
_TRANSFORMER_RUNTIME_CACHE: Dict[Tuple[str, str], Tuple[object, object]] = {}
def clear_transformer_cache() -> None:
"""Clear the global transformer runtime cache (useful in long-lived notebooks/services)."""
_TRANSFORMER_RUNTIME_CACHE.clear()
def _load_transformer_runtime(model_dir: Path, device: str):
key = (str(model_dir), device)
if key in _TRANSFORMER_RUNTIME_CACHE:
return _TRANSFORMER_RUNTIME_CACHE[key]
from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer
model_dir = Path(model_dir)
tokenizer = AutoTokenizer.from_pretrained(model_dir)
meta_path = model_dir / "model_meta.json"
if meta_path.exists():
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
base = AutoModel.from_pretrained(model_dir)
hidden_size = meta.get("hidden_size", base.config.hidden_size)
intermediate = meta.get("intermediate", 512)
dropout = meta.get("dropout", 0.3)
model = _make_transformer_classifier(
base_model=base,
hidden_size=hidden_size,
num_labels=2,
dropout=dropout,
intermediate=intermediate,
)
torch = _require_module("torch")
state_dict = torch.load(model_dir / "pytorch_model.bin", map_location="cpu", weights_only=True)
model.load_state_dict(state_dict)
else:
model = AutoModelForSequenceClassification.from_pretrained(model_dir)
model.to(device)
_TRANSFORMER_RUNTIME_CACHE[key] = (model, tokenizer)
return model, tokenizer
def predict_with_model_payload(
model_payload: dict,
df: pd.DataFrame,
device: str = "auto",
eval_batch_size: Optional[int] = None,
max_len: Optional[int] = None,
) -> np.ndarray:
mode = model_payload.get("mode", "")
if mode == "transformer":
resolved_device = _resolve_device(device)
model_dir = Path(model_payload["model_dir"])
model, tokenizer = _load_transformer_runtime(model_dir=model_dir, device=resolved_device)
bs = int(eval_batch_size or model_payload.get("eval_batch_size", 16))
ml = int(max_len or model_payload.get("max_len", 256))
return _predict_with_runtime(
model=model,
tokenizer=tokenizer,
texts=df["text"].astype(str).tolist(),
device=resolved_device,
max_len=ml,
batch_size=bs,
)
if mode in {"tfidf_lr", "tfidf_svm"}:
vec = model_payload["vectorizer"]
clf = model_payload["classifier"]
x = vec.transform(df["text"].astype(str).tolist())
if hasattr(clf, "predict_proba"):
return clf.predict_proba(x)[:, 1]
raw = clf.decision_function(x)
return _sigmoid(raw)
raise ValueError(f"Unsupported model payload mode: {mode}")
def predict_with_mlp_payload(mlp_payload: dict, x: np.ndarray, device: str = "cpu") -> np.ndarray:
"""Run inference with a lightweight PyTorch MLP saved in E07 payload format.
Parameters
----------
mlp_payload : dict
Must contain keys ``in_dim``, ``hidden_dims``, ``state_dict``.
x : np.ndarray
Feature matrix (n_samples, n_features). Will be z-scored externally before calling this.
device : str
Target torch device.
Returns
-------
np.ndarray
Sigmoid probabilities of shape (n_samples,).
"""
torch = _require_module("torch")
nn = torch.nn
class MLP(nn.Module):
def __init__(self, in_dim: int, hidden_dims: tuple[int, ...]):
super().__init__()
layers = []
prev = in_dim
for h in hidden_dims:
layers.append(nn.Linear(prev, h))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
prev = h
layers.append(nn.Linear(prev, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x).squeeze(-1)
in_dim = int(mlp_payload["in_dim"])
hidden_dims = tuple(mlp_payload["hidden_dims"])
model = MLP(in_dim, hidden_dims)
model.load_state_dict(mlp_payload["state_dict"])
resolved_device = _resolve_device(device)
model.to(resolved_device)
model.eval()
tx = torch.tensor(x, dtype=torch.float32, device=resolved_device)
with torch.no_grad():
logits = model(tx).cpu().numpy()
return _sigmoid(logits)
def train_fusion_logistic(
x: np.ndarray, y: np.ndarray, steps: int = 300, lr: float = 0.05, seed: int = 42
) -> tuple[np.ndarray, float]:
"""Train a simple logistic regression on joint features (numpy only)."""
np.random.seed(seed)
n_features = x.shape[1]
w = np.zeros(n_features, dtype=float)
b = 0.0
for _ in range(steps):
logits = x @ w + b
p = _sigmoid(logits)
grad_w = x.T @ (p - y) / len(y)
grad_b = np.mean(p - y)
w -= lr * grad_w
b -= lr * grad_b
return w, float(b)
def fusion_predict_score(x: np.ndarray, w: np.ndarray, b: float) -> np.ndarray:
"""Predict with trained fusion logistic weights."""
return _sigmoid(x @ w + b)
def save_model_payload(path: Path, model_payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("wb") as f:
pickle.dump(model_payload, f)
def load_model_payload(path: Path) -> dict:
with path.open("rb") as f:
return pickle.load(f)