Text Classification
Transformers
Safetensors
Chinese
chinese
ai-text-detection
ensemble
bert
roberta
qwen
lora
research
dataset
Instructions to use LUCIFerace/enhanced-replica-model-pack with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use LUCIFerace/enhanced-replica-model-pack with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="LUCIFerace/enhanced-replica-model-pack")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("LUCIFerace/enhanced-replica-model-pack", dtype="auto") - Notebooks
- Google Colab
- Kaggle
File size: 8,859 Bytes
b09b696 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""E03: Rule-based module evaluation.
This script evaluates the pattern-based rule engine on DS01 dev/test,
producing per-sample hit records and per-rule precision statistics.
"""
from __future__ import annotations
import argparse
import logging
import sys
from pathlib import Path
import numpy as np
import pandas as pd
# Allow imports from source root package
REPO_ROOT = Path(__file__).resolve()
while REPO_ROOT != REPO_ROOT.parent and not (REPO_ROOT / "src").exists():
REPO_ROOT = REPO_ROOT.parent
for _candidate in (REPO_ROOT, REPO_ROOT / "src"):
_candidate_str = str(_candidate)
if _candidate.exists() and _candidate_str not in sys.path:
sys.path.insert(0, _candidate_str)
from enhanced_replica.cli_args import add_base_args, resolve_arg, setup_logging
from enhanced_replica.data_utils import get_ds_meta, load_dataset_manifest, load_dataset_splits, SPLITS
from enhanced_replica.io_utils import (
create_run_context,
prepare_primary_ds_for_train,
save_pred_files,
write_csv,
write_run_manifest,
write_run_report,
write_yaml_minimal,
)
from enhanced_replica.rules import _DEFAULT_ENGINE, PatternRule, RuleEngine
def evaluate_rules_on_split(df: pd.DataFrame, engine: RuleEngine) -> pd.DataFrame:
"""Apply rule engine to a DataFrame and return expanded hit records."""
records = []
for _, row in df.iterrows():
text = str(row["text"])
label = int(row["label"])
score, hit_map, _ = engine.score(text)
explain = engine.explain(text)
# Flatten: one row per rule hit, plus one summary row even if no hits
if not explain:
records.append({
"record_id": row["record_id"],
"split": row.get("split", ""),
"label": label,
"length_char": row.get("length_char", len(text)),
"rule_score": score,
"rid": "",
"rule_name": "",
"category": "",
"match_count": 0,
"weight": 0.0,
"matches": "",
})
else:
for item in explain:
records.append({
"record_id": row["record_id"],
"split": row.get("split", ""),
"label": label,
"length_char": row.get("length_char", len(text)),
"rule_score": score,
"rid": item["rid"],
"rule_name": item["name"],
"category": item["category"],
"match_count": item["count"],
"weight": item["weight"],
"matches": " | ".join(str(m) for m in item["matches"]),
})
return pd.DataFrame(records)
def compute_rule_precision(hit_df: pd.DataFrame, rules: list[PatternRule]) -> list[dict]:
"""Compute per-rule precision, recall, and hit rates.
Computes both match-level precision (legacy) and sample-level precision
to guard against extreme long-text outliers skewing the metric.
"""
rows = []
for r in rules:
if not r.enabled:
continue
rid = r.rid
sub = hit_df[hit_df["rid"] == rid]
total_hits = int(sub["match_count"].sum())
tp = int(sub[sub["label"] == 1]["match_count"].sum())
fp = int(sub[sub["label"] == 0]["match_count"].sum())
# Precision at match level (legacy)
precision_match = tp / (tp + fp) if (tp + fp) > 0 else 0.0
# Unique samples that fired this rule
n_ai_fired = int(sub[sub["label"] == 1]["record_id"].nunique())
n_human_fired = int(sub[sub["label"] == 0]["record_id"].nunique())
# Sample-level precision: each sample counts once regardless of hit count
precision_sample = n_ai_fired / (n_ai_fired + n_human_fired) if (n_ai_fired + n_human_fired) > 0 else 0.0
# Denominators: total samples in each class for this split set
total_ai = int(hit_df[hit_df["label"] == 1]["record_id"].nunique())
total_human = int(hit_df[hit_df["label"] == 0]["record_id"].nunique())
recall_ai = n_ai_fired / total_ai if total_ai > 0 else 0.0
fpr_human = n_human_fired / total_human if total_human > 0 else 0.0
rows.append({
"rid": rid,
"rule_name": r.name,
"category": r.category,
"weight": r.weight,
"total_match_hits": total_hits,
"tp_matches": tp,
"fp_matches": fp,
"precision_match": round(precision_match, 4),
"precision_sample": round(precision_sample, 4),
"recall_ai": round(recall_ai, 4),
"fpr_human": round(fpr_human, 4),
"ai_samples_fired": n_ai_fired,
"human_samples_fired": n_human_fired,
"total_ai_samples": total_ai,
"total_human_samples": total_human,
})
return rows
def run_e03(args: argparse.Namespace) -> dict:
setup_logging(args.log_level)
logger = logging.getLogger("E03")
ctx = create_run_context(eid="E03", output_root=Path(args.output_root), run_name=args.run_name)
logger.info(f"E03 Rule module eval start | run_name={ctx.run_name} | smoke={args.smoke}")
manifest = load_dataset_manifest(Path(args.manifest_file))
ds_meta = get_ds_meta(manifest, "DS01")
logger.info(f"Dataset: {ds_meta['dataset_id']} | dir={ds_meta['dataset_dir']}")
splits = prepare_primary_ds_for_train(ds_meta, smoke=args.smoke, seed=args.seed)
for sp in SPLITS:
logger.info(f" {sp}: {len(splits[sp])} rows")
engine = _DEFAULT_ENGINE
logger.info(f"Active rules: {sum(1 for r in engine.rules if r.enabled)}")
all_hits = []
pred_splits = {}
split_metrics = {}
for sp in SPLITS:
if splits[sp].empty:
logger.warning(f"{sp} is empty, skipping.")
continue
hit_df = evaluate_rules_on_split(splits[sp], engine)
hit_df["split"] = sp
all_hits.append(hit_df)
# Build prediction split for downstream consumers (E04/E05/E07)
pred_df = splits[sp][["record_id", "text", "label", "source", "split", "length_char"]].copy()
pred_df["score"] = pred_df["text"].apply(lambda t: engine.score(t)[0])
pred_df["pred"] = (pred_df["score"] >= 0.5).astype(int)
pred_splits[sp] = pred_df
# Aggregate per-split quick stats
fired = hit_df[hit_df["rid"] != ""]
n_fired = fired["record_id"].nunique()
n_total = len(splits[sp])
split_metrics[sp] = {"samples": n_total, "fired": n_fired, "coverage": round(n_fired / n_total, 4) if n_total else 0.0}
logger.info(f"{sp} coverage: {n_fired}/{n_total} ({split_metrics[sp]['coverage']:.2%})")
if not all_hits:
raise ValueError("No dev/test data available for rule evaluation.")
combined_hits = pd.concat(all_hits, ignore_index=True)
rule_precisions = compute_rule_precision(combined_hits, engine.rules)
# Save outputs
save_pred_files(ctx, pred_splits)
combined_hits.to_csv(ctx.run_dir / "rule_hits.csv", index=False, encoding="utf-8-sig")
write_csv(ctx.run_dir / "rule_precision.csv", rule_precisions)
# Determine determinative candidates (high sample-level precision + reasonable coverage)
determinative_candidates = [
r for r in rule_precisions
if r["precision_sample"] >= 0.85 and r["recall_ai"] >= 0.05
]
logger.info(f"Determinative candidates (precision_sample>=0.85 & recall_ai>=0.05): {len(determinative_candidates)}")
for c in determinative_candidates:
logger.info(f" - {c['rid']}: precision_sample={c['precision_sample']}, recall_ai={c['recall_ai']}")
config = {
"seed": args.seed,
"smoke": args.smoke,
"active_rules": sum(1 for r in engine.rules if r.enabled),
"determinative_candidates": len(determinative_candidates),
}
write_yaml_minimal(ctx.config_file, config)
result = {
"dev_coverage": split_metrics.get("dev", {}).get("coverage", 0.0),
"test_coverage": split_metrics.get("test", {}).get("coverage", 0.0),
"determinative_candidates": len(determinative_candidates),
}
write_run_manifest(ctx, status="success", payload=result)
write_run_report(ctx, status="success", config=config, payload=result)
logger.info("E03 complete.")
return result
def main() -> int:
parser = argparse.ArgumentParser(description="E03 Rule module evaluation")
parser = add_base_args(parser)
args = parser.parse_args()
try:
run_e03(args)
return 0
except Exception as e:
logging.getLogger("E03").error(f"ERROR: {e}", exc_info=True)
raise
if __name__ == "__main__":
raise SystemExit(main())
|