AIGC_detector / app.py
khs
fix: 重新分配进度比例,模型推理0-95%,段落分析95-100%,修复日志频率和进度跳跃问题
6f6a0f2
import json
import re
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple
import fitz
import gradio as gr
import numpy as np
import torch
from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer
try:
import joblib
except Exception:
joblib = None
try:
import docx
except Exception:
docx = None
MODEL_CHOICES = {
"paperpass-v3(默认,论文场景优先)": "yibo365/paperpass-v3",
"AIGC_detector_zhv3(新版中文检测)": "yuchuantian/aigc_detector_zhv3",
"AIGC_detector_zhv2(兜底)": "yuchuantian/AIGC_detector_zhv2",
"mba-aigc-detector(实验版,需本地模型包)": "mba_local_pack",
}
DEFAULT_MODEL_LABEL = "paperpass-v3(默认,论文场景优先)"
RISK_THRESHOLD = 0.75
MIN_PARAGRAPH_CHARS = 80
TARGET_PARAGRAPH_CHARS = 420
MAX_PARAGRAPH_CHARS = 900
WINDOW_MAX_LENGTH = 512
WINDOW_STRIDE = 192
WINDOW_BATCH_SIZE = 64
PARAGRAPH_CHUNK_SIZE = 32
MAX_HISTORY_ITEMS = 30
CALIBRATION_PATH = Path("calibration/model.json")
MBA_MODELS_DIR = Path("models/mba")
HISTORY_PATH = Path("history/analysis_records.json")
EXPORT_DIR = Path("exports")
CURRENT_MODEL_NAME = None
CURRENT_TOKENIZER = None
CURRENT_MODEL = None
MBA_STATE = {
"ready": False,
"extractor_tokenizer": None,
"extractor_model": None,
"tree_models": {},
}
try:
torch.set_num_threads(max(1, (torch.get_num_threads() or 4)))
except Exception:
pass
def load_calibration_model() -> Dict:
if not CALIBRATION_PATH.exists():
return {}
try:
data = json.loads(CALIBRATION_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
required = {"feature_order", "coef", "intercept"}
if data.get("model_type") != "linear" or not required.issubset(data.keys()):
return {}
return data
CALIBRATION_MODEL = load_calibration_model()
def ensure_history_file():
HISTORY_PATH.parent.mkdir(parents=True, exist_ok=True)
if not HISTORY_PATH.exists():
HISTORY_PATH.write_text("[]", encoding="utf-8")
def load_history() -> List[Dict]:
ensure_history_file()
try:
data = json.loads(HISTORY_PATH.read_text(encoding="utf-8"))
if isinstance(data, list):
return data
except Exception:
pass
return []
def save_history_item(item: Dict):
items = load_history()
items.insert(0, item)
HISTORY_PATH.write_text(json.dumps(items[:MAX_HISTORY_ITEMS], ensure_ascii=False, indent=2), encoding="utf-8")
def format_history_markdown() -> str:
items = load_history()
if not items:
return "暂无历史记录。"
lines = ["# 历史分析记录"]
for i, x in enumerate(items, 1):
lines.append(
f"{i}. `{x.get('time')}` | 文件: {x.get('source')} | 模型: {x.get('model')} | "
f"综合风险: {x.get('overall', 0):.2%} | 预测知网率: {x.get('kn_like', 0):.2%} | 段落数: {x.get('paragraphs', 0)}"
)
return "\n".join(lines)
def get_or_load_model(model_name: str):
global CURRENT_MODEL_NAME, CURRENT_TOKENIZER, CURRENT_MODEL
if CURRENT_MODEL_NAME == model_name and CURRENT_TOKENIZER is not None and CURRENT_MODEL is not None:
return CURRENT_TOKENIZER, CURRENT_MODEL
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
try:
model = torch.compile(model)
except Exception:
pass
CURRENT_MODEL_NAME = model_name
CURRENT_TOKENIZER = tokenizer
CURRENT_MODEL = model
return tokenizer, model
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def is_probable_page_number(line: str) -> bool:
s = line.strip()
patterns = [r"^第\s*\d+\s*页$", r"^\d+\s*/\s*\d+$", r"^[-—]*\s*\d{1,4}\s*[-—]*$", r"^page\s*\d+$"]
return any(re.match(p, s, flags=re.IGNORECASE) for p in patterns)
def clean_common_noise(line: str) -> str:
return re.sub(r"[ \t]+", " ", normalize_text(line))
def extract_pdf_text(file_path: str) -> Tuple[str, Dict]:
doc = fitz.open(file_path)
all_pages = len(doc)
page_lines: List[List[str]] = []
for idx in range(all_pages):
page = doc[idx]
rect = page.rect
top_cut, bottom_cut = rect.height * 0.06, rect.height * 0.06
blocks = page.get_text("blocks")
lines = []
for b in sorted(blocks, key=lambda x: (round(x[1], 1), round(x[0], 1))):
_, y0, _, y1, text, *_ = b
if y1 <= top_cut or y0 >= rect.height - bottom_cut:
continue
for raw in text.splitlines():
line = clean_common_noise(raw)
if line and not is_probable_page_number(line):
lines.append(line)
page_lines.append(lines)
freq = {}
for lines in page_lines:
for c in set(lines[:2] + lines[-2:]):
if len(c) >= 4:
freq[c] = freq.get(c, 0) + 1
repeat_lines = {k for k, v in freq.items() if v >= max(3, int(0.4 * all_pages))}
merged = []
for lines in page_lines:
merged.append("\n".join([ln for ln in lines if ln not in repeat_lines and not is_probable_page_number(ln)]))
return "\n\n".join(merged), {"total_pages": all_pages, "used_pages": all_pages, "page_truncated": False}
def extract_docx_text(file_path: str) -> Tuple[str, Dict]:
if docx is None:
raise RuntimeError("当前环境缺少 python-docx。")
d = docx.Document(file_path)
paras = [clean_common_noise(p.text) for p in d.paragraphs if clean_common_noise(p.text)]
return "\n\n".join(paras), {"total_pages": None, "used_pages": None, "page_truncated": False}
def extract_txt_text(file_path: str) -> Tuple[str, Dict]:
return Path(file_path).read_text(encoding="utf-8", errors="ignore"), {"total_pages": None, "used_pages": None, "page_truncated": False}
def extract_document_text(upload_file) -> Tuple[str, Dict]:
path = upload_file.name
suffix = Path(path).suffix.lower()
if suffix == ".pdf":
return extract_pdf_text(path)
if suffix == ".docx":
return extract_docx_text(path)
if suffix in {".txt", ".md"}:
return extract_txt_text(path)
raise RuntimeError("仅支持 pdf / docx / txt / md 文件。")
def split_sentences(text: str) -> List[str]:
t = re.sub(r"\n+", " ", text)
parts = re.split(r"(?<=[。!?!?;;])\s*", t)
return [normalize_text(x) for x in parts if normalize_text(x)]
def rebuild_paragraphs_from_sentences(sentences: List[str]) -> List[str]:
paragraphs: List[str] = []
cur: List[str] = []
cur_len = 0
for s in sentences:
s_len = len(s)
if s_len >= MAX_PARAGRAPH_CHARS:
if cur:
p = normalize_text(" ".join(cur))
if len(p) >= MIN_PARAGRAPH_CHARS:
paragraphs.append(p)
cur, cur_len = [], 0
paragraphs.append(s[:MAX_PARAGRAPH_CHARS])
continue
should_flush = False
if cur_len >= TARGET_PARAGRAPH_CHARS:
should_flush = True
if cur_len + s_len > MAX_PARAGRAPH_CHARS:
should_flush = True
if should_flush and cur:
p = normalize_text(" ".join(cur))
if len(p) >= MIN_PARAGRAPH_CHARS:
paragraphs.append(p)
cur, cur_len = [], 0
cur.append(s)
cur_len += s_len
if cur:
p = normalize_text(" ".join(cur))
if len(p) >= MIN_PARAGRAPH_CHARS:
paragraphs.append(p)
return paragraphs
def split_paragraphs(text: str) -> List[str]:
sents = split_sentences(text)
return rebuild_paragraphs_from_sentences(sents)
def should_skip_paragraph(text: str) -> bool:
t = normalize_text(text)
if not t:
return True
if re.search(r"(参考文献|致谢|附录|作者简介)", t[:40], flags=re.IGNORECASE):
return True
cn_chars = len(re.findall(r"[\u4e00-\u9fff]", t))
digit_punc = len(re.findall(r"[\d\W_]", t))
return cn_chars < 20 or digit_punc > len(t) * 0.75
def calc_repetition(text: str) -> float:
t = normalize_text(text)
grams = [t[i : i + 2] for i in range(max(0, len(t) - 1))]
return 0.0 if not grams else max(0.0, 1.0 - len(set(grams)) / len(grams))
def calc_sentence_variance(text: str) -> float:
sents = [s.strip() for s in re.split(r"[。!?!?]", text) if s.strip()]
return 0.0 if len(sents) < 2 else float(min(np.var([len(s) for s in sents]) / 900.0, 1.0))
def detector_scores_transformer_stream(
texts: List[str],
model_name: str,
progress_cb,
log_cb,
) -> List[float]:
if not texts:
return []
tokenizer, model = get_or_load_model(model_name)
all_scores: List[float] = []
total_chunks = max(1, (len(texts) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE)
for ci, cstart in enumerate(range(0, len(texts), PARAGRAPH_CHUNK_SIZE), 1):
cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(texts))
chunk = texts[cstart:cend]
log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})")
enc = tokenizer(
chunk,
truncation=True,
max_length=WINDOW_MAX_LENGTH,
stride=WINDOW_STRIDE,
return_overflowing_tokens=True,
padding=True,
return_tensors="pt",
)
sample_map = enc.pop("overflow_to_sample_mapping").tolist()
window_count = len(sample_map)
ai_probs = np.zeros(window_count, dtype=np.float32)
batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE)
with torch.inference_mode():
for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1):
e = min(s + WINDOW_BATCH_SIZE, window_count)
batch = {k: v[s:e] for k, v in enc.items()}
logits = model(**batch).logits
probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy()
ai_probs[s:e] = probs
# 0~95% for model stage.
global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks
progress_cb(round(global_batch_progress * 95))
log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}")
buckets: List[List[float]] = [[] for _ in range(len(chunk))]
for i, sid in enumerate(sample_map):
buckets[sid].append(float(ai_probs[i]))
for vals in buckets:
arr = np.array(vals, dtype=np.float32)
all_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr)))
return all_scores
def _extract_stat_features(text: str) -> np.ndarray:
char_count = max(1, len(text))
sentences = [s.strip() for s in re.split(r"[。!?\.\n]", text) if s.strip()]
lens = [len(s) for s in sentences] if sentences else [0]
avg_sentence_length = float(np.mean(lens))
sentence_length_std = float(np.std(lens))
comma_ratio = (text.count(",") + text.count(",")) / char_count
period_ratio = (text.count("。") + text.count(".")) / char_count
pronouns = ["我", "你", "他", "她", "它", "我们", "你们", "他们"]
conjunctions = ["和", "与", "或", "但是", "然而", "因此", "因为", "所以"]
pronoun_ratio = sum(text.count(p) for p in pronouns) / char_count
conjunction_ratio = sum(text.count(c) for c in conjunctions) / char_count
unique_word_ratio = len(set(text)) / char_count
words = text.split()
avg_word_length = float(np.mean([len(w) for w in words])) if words else 0.0
digit_ratio = sum(c.isdigit() for c in text) / char_count
chinese_char_ratio = len(re.findall(r"[\u4e00-\u9fff]", text)) / char_count
paragraph_length = float(len(text))
burstiness = sentence_length_std / avg_sentence_length if avg_sentence_length > 0 else 0.0
formality_score = sum(text.count(w) for w in ["研究", "分析", "策略", "管理", "企业", "市场", "发展"]) / char_count
return np.array([avg_sentence_length, sentence_length_std, comma_ratio, period_ratio, pronoun_ratio, conjunction_ratio, unique_word_ratio, avg_word_length, digit_ratio, chinese_char_ratio, paragraph_length, burstiness, formality_score])
def is_lfs_pointer(path: Path) -> bool:
try:
return path.read_text(encoding="utf-8", errors="ignore").startswith("version https://git-lfs.github.com/spec/v1")
except Exception:
return False
def init_mba_pack() -> Tuple[bool, str]:
if MBA_STATE["ready"]:
return True, ""
if joblib is None:
return False, "当前环境缺少 joblib。"
needed = ["select5_tree_d2_model.pkl", "select10_tree_d2_model.pkl", "select15_tree_d3_model.pkl", "select20_tree_d2_model.pkl", "bert_tree_d1_model.pkl"]
if any(not (MBA_MODELS_DIR / f).exists() for f in needed):
return False, "缺少 mba 模型文件,请将模型文件放入 models/mba/。"
if any(is_lfs_pointer(MBA_MODELS_DIR / f) for f in needed):
return False, "检测到 mba 模型文件是 Git LFS 指针,不是真实权重。"
try:
tok = AutoTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext")
mdl = AutoModel.from_pretrained("hfl/chinese-roberta-wwm-ext")
mdl.eval()
trees = {name: joblib.load(MBA_MODELS_DIR / name) for name in needed}
MBA_STATE.update({"ready": True, "extractor_tokenizer": tok, "extractor_model": mdl, "tree_models": trees})
return True, ""
except Exception as e:
return False, f"加载 mba 模型包失败: {e}"
def detector_score_mba(text: str) -> float:
ok, msg = init_mba_pack()
if not ok:
raise RuntimeError(msg)
tok = MBA_STATE["extractor_tokenizer"]
mdl = MBA_STATE["extractor_model"]
inputs = tok(text[:512], return_tensors="pt", max_length=512, truncation=True, padding=True)
with torch.inference_mode():
bert_feat = mdl(**inputs).last_hidden_state[:, 0, :].cpu().numpy()[0]
stat_feat = _extract_stat_features(text)
combined = np.concatenate([stat_feat, bert_feat]).reshape(1, -1)
bert2d = bert_feat.reshape(1, -1)
probs = []
for name, tree in MBA_STATE["tree_models"].items():
probs.append(float(tree.predict_proba(bert2d if "bert_tree" in name else combined)[0, 1]))
return float(max(probs))
def analyze_paragraph_with_detector(text: str, detector: float) -> Dict[str, float]:
repetition = calc_repetition(text)
variance = calc_sentence_variance(text)
risk = float(min(max(detector * 0.78 + repetition * 0.12 + (1 - variance) * 0.10, 0.0), 1.0))
return {"detector": detector, "repetition": repetition, "variance": variance, "risk": risk}
def clip01(v: float) -> float:
return float(min(max(v, 0.0), 1.0))
def build_doc_features(risks: List[float]) -> Dict[str, float]:
arr = np.array(risks, dtype=float)
return {"overall": clip01(float(np.mean(arr))), "p90": clip01(float(np.percentile(arr, 90))), "high_ratio": clip01(float(np.mean(arr > 0.75))), "mid_ratio": clip01(float(np.mean((arr > 0.55) & (arr <= 0.75)))), "std": clip01(float(np.std(arr)))}
def predict_kn_like_rate(features: Dict[str, float]) -> float:
if not CALIBRATION_MODEL:
return features["overall"]
x = np.array([features.get(n, 0.0) for n in CALIBRATION_MODEL["feature_order"]], dtype=float)
return clip01(float(np.dot(x, np.array(CALIBRATION_MODEL["coef"], dtype=float)) + float(CALIBRATION_MODEL["intercept"])))
def build_filtered_details(blocks: List[Dict], level_filter: str) -> str:
selected = blocks if level_filter == "全部" else [b for b in blocks if b["risk_level"] == level_filter]
if not selected:
return f"当前筛选 `{level_filter}` 下暂无段落。"
return "\n\n---\n\n".join([b["content"] for b in selected])
def write_exports(state: Dict) -> Tuple[str, str]:
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_path = EXPORT_DIR / f"analysis_{stamp}.json"
md_path = EXPORT_DIR / f"analysis_{stamp}.md"
json_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
md_path.write_text(state.get("summary", "") + "\n\n" + state.get("details", ""), encoding="utf-8")
return str(json_path), str(md_path)
def append_log(logs: List[str], message: str) -> str:
stamp = datetime.now().strftime("%H:%M:%S")
logs.append(f"[{stamp}] {message}")
return "\n".join(logs)
def analyze_document(input_mode, upload_file, pasted_text, model_label):
logs: List[str] = []
last_status = "状态: 等待分析"
def emit(summary="", details="", history_md=None, status=None, state=None, json_path=None, md_path=None):
nonlocal last_status
if history_md is None:
history_md = format_history_markdown()
if status is None:
status = last_status
else:
last_status = status
return summary, details, history_md, status, "\n".join(logs), (state or {}), json_path, md_path
def progress_cb(pct: int):
nonlocal last_status
pct = max(0, min(100, pct))
last_status = f"状态: 推理中 {pct}%"
def log_cb(message: str):
append_log(logs, message)
if input_mode == "文本输入" and not normalize_text(pasted_text or ""):
log_cb("等待输入:文本模式下未检测到文本内容。")
yield emit(summary="请先粘贴文本内容。", status="状态: 等待输入")
return
if input_mode == "文件上传" and upload_file is None:
log_cb("等待输入:文件模式下未检测到文件。")
yield emit(summary="请先上传文件。", status="状态: 等待输入")
return
if upload_file is None and not normalize_text(pasted_text or ""):
log_cb("等待输入:未检测到文件和文本。")
yield emit(summary="请先上传文件,或粘贴文本。", status="状态: 等待输入")
return
model_name = MODEL_CHOICES.get(model_label, MODEL_CHOICES[DEFAULT_MODEL_LABEL])
log_cb(f"任务开始,模型={model_label}")
if model_name == "mba_local_pack":
ok, msg = init_mba_pack()
if not ok:
log_cb(f"模型不可用:{msg}")
yield emit(summary=f"# 当前模型: {model_label}\n\n{msg}\n\n请切回其他模型。", status="状态: 模型不可用")
return
source = "pasted_text"
use_text = input_mode == "文本输入" or (input_mode == "自动(有文本优先)" and normalize_text(pasted_text or ""))
if use_text:
t_extract = time.time()
raw_text = pasted_text
extract_meta = {"total_pages": None, "used_pages": None, "page_truncated": False}
paragraphs = split_paragraphs(raw_text)
log_cb(f"输入来源:文本框,切分段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s")
else:
t_extract = time.time()
source = Path(upload_file.name).name
raw_text, extract_meta = extract_document_text(upload_file)
paragraphs = [p for p in split_paragraphs(raw_text) if not should_skip_paragraph(p)]
log_cb(f"输入来源:文件 {source},抽取+切分后段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s")
if not paragraphs:
log_cb("终止:未提取到有效段落。")
yield emit(summary="未提取到可分析正文。", status="状态: 无有效段落")
return
t0 = time.time()
risks, details = [], []
total = len(paragraphs)
log_cb("开始推理。")
progress_cb(0)
yield emit(status="状态: 开始分析... 0%")
detector_scores = []
t_model = time.time()
if model_name == "mba_local_pack":
detector_scores = [float(min(max(detector_score_mba(p) * 0.3, 0.0), 1.0)) for p in paragraphs]
log_cb(f"MBA 推理完成,段落={total},耗时={time.time()-t_model:.2f}s")
progress_cb(95)
yield emit()
else:
tokenizer, model = get_or_load_model(model_name)
total_chunks = max(1, (len(paragraphs) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE)
detector_scores = []
for ci, cstart in enumerate(range(0, len(paragraphs), PARAGRAPH_CHUNK_SIZE), 1):
cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(paragraphs))
chunk = paragraphs[cstart:cend]
log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})")
yield emit()
enc = tokenizer(
chunk,
truncation=True,
max_length=WINDOW_MAX_LENGTH,
stride=WINDOW_STRIDE,
return_overflowing_tokens=True,
padding=True,
return_tensors="pt",
)
sample_map = enc.pop("overflow_to_sample_mapping").tolist()
window_count = len(sample_map)
ai_probs = np.zeros(window_count, dtype=np.float32)
batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE)
with torch.inference_mode():
for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1):
e = min(s + WINDOW_BATCH_SIZE, window_count)
batch = {k: v[s:e] for k, v in enc.items()}
logits = model(**batch).logits
probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy()
ai_probs[s:e] = probs
global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks
pct = round(global_batch_progress * 95)
progress_cb(pct)
log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}")
yield emit()
buckets: List[List[float]] = [[] for _ in range(len(chunk))]
for i, sid in enumerate(sample_map):
buckets[sid].append(float(ai_probs[i]))
for vals in buckets:
arr = np.array(vals, dtype=np.float32)
detector_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr)))
log_cb(f"模型前向完成,段落={total},耗时={time.time()-t_model:.2f}s")
progress_cb(95)
yield emit()
last_pct = 95
for i, p in enumerate(paragraphs, 1):
score = analyze_paragraph_with_detector(p, detector_scores[i - 1])
risks.append(score["risk"])
level = "🟢"
if score["risk"] > RISK_THRESHOLD:
level = "🔴"
elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15):
level = "🟡"
risk_level = "低风险"
if score["risk"] > RISK_THRESHOLD:
risk_level = "高风险"
elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15):
risk_level = "中风险"
details.append({
"risk_level": risk_level,
"content": f"""
{level} 段落 {i} AI风险: {score['risk']:.2%}
Detector: {score['detector']:.2%}
重复度: {score['repetition']:.2%}
句式稳定性: {1 - score['variance']:.2%}
{p}
""",
})
pct = 95 + round(i * 5 / total)
if pct >= last_pct + 5 or i == total:
last_pct = pct
log_cb(f"进度 {pct}%({i}/{total})")
progress_cb(pct)
yield emit()
f = build_doc_features(risks)
kn_like = predict_kn_like_rate(f)
elapsed = time.time() - t0
speed = len(paragraphs) / max(elapsed, 1e-6)
trunc_info = []
if extract_meta.get("total_pages") is not None:
trunc_info.append(f"页面截断: 否({extract_meta.get('used_pages')}/{extract_meta.get('total_pages')} 页)")
trunc_info.append(f"段落截断: 否(分析 {len(paragraphs)}/{len(paragraphs)} 段)")
mode_line = "当前模式: 原始风险率(未加载校准模型)" if not CALIBRATION_MODEL else "当前模式: 知网对齐预测率(已加载校准模型)"
summary = f"""
# 当前模型: {model_label}
# 综合AI风险率: {f['overall']:.2%}
# 预测知网AIGC率: {kn_like:.2%}
高风险段落占比: {f['high_ratio']:.2%}
中风险段落占比: {f['mid_ratio']:.2%}
有效段落数: {len(paragraphs)}
平均速度: {speed:.2f} 段/秒
{mode_line}
{' | '.join(trunc_info)}
(说明:该结果为“风险分析与校准预测”,并非官方系统结果)
"""
save_history_item({"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": source, "model": model_label, "overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs)})
history_md = format_history_markdown()
details_text = build_filtered_details(details, "全部")
log_cb(f"聚合完成:overall={f['overall']:.2%}, kn_like={kn_like:.2%}, 总耗时={elapsed:.2f}s, 速度={speed:.2f}段/秒")
state = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": source,
"model": model_label,
"summary": summary,
"details": details_text,
"metrics": {"overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs), "high_ratio": f["high_ratio"], "mid_ratio": f["mid_ratio"], "elapsed_sec": elapsed, "speed_para_per_sec": speed},
"paragraphs": details,
}
json_path, md_path = write_exports(state)
log_cb(f"导出完成:{Path(json_path).name}, {Path(md_path).name}")
progress_cb(100)
yield emit(summary=summary, details=details_text, history_md=history_md, status="状态: 分析完成 100%", state=state, json_path=json_path, md_path=md_path)
def apply_risk_filter(state: Dict, risk_filter: str):
if not state:
return "请先完成一次分析。"
return build_filtered_details(state.get("paragraphs", []), risk_filter)
def export_json(state: Dict):
if not state:
return None
json_path, _ = write_exports(state)
return json_path
def export_md(state: Dict):
if not state:
return None
_, md_path = write_exports(state)
return md_path
GEEK_CSS = """
:root {
--bg: #f3f4ea;
--ink: #102015;
--panel: #fefef6;
--accent: #0f6b3f;
--accent2: #b57722;
}
@media (prefers-color-scheme: dark) {
:root {
--bg: #09110c;
--ink: #d7e7d8;
--panel: #0f1913;
--accent: #4bd38a;
--accent2: #efb24a;
}
}
.gradio-container {
background: radial-gradient(circle at 20% 20%, color-mix(in srgb, var(--bg) 85%, white 15%) 0%, var(--bg) 45%, color-mix(in srgb, var(--bg) 75%, black 25%) 100%);
color: var(--ink);
font-family: "IBM Plex Mono", "JetBrains Mono", monospace;
}
h1, h2, h3 {
letter-spacing: 0.4px;
}
.panel {
border: 2px solid var(--ink);
border-radius: 12px;
background: var(--panel);
box-shadow: 6px 6px 0 color-mix(in srgb, var(--ink) 18%, transparent);
}
.status-pill {
border: 2px dashed var(--accent2);
border-radius: 10px;
padding: 8px 10px;
background: color-mix(in srgb, var(--panel) 85%, var(--accent2) 15%);
}
button.primary {
background: linear-gradient(90deg, color-mix(in srgb, var(--accent) 70%, black 30%) 0%, var(--accent) 100%) !important;
border: 2px solid color-mix(in srgb, var(--accent) 40%, black 60%) !important;
}
"""
with gr.Blocks(theme=gr.themes.Base(), css=GEEK_CSS, title="论文AIGC风险检测系统") as demo:
gr.Markdown("""
# 论文AIGC风险检测系统
支持 `PDF / Word(.docx) / 文本(.txt, .md)`,默认全文检测,支持直接粘贴文本。
""")
analysis_state = gr.State({})
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 输入面板", elem_classes=["panel"])
input_mode = gr.Radio(["自动(有文本优先)", "文件上传", "文本输入"], value="自动(有文本优先)", label="输入模式")
file_input = gr.File(file_types=[".pdf", ".docx", ".txt", ".md"], label="文件输入")
pasted_text = gr.Textbox(lines=10, label="文本输入(可选)", placeholder="粘贴原文可覆盖文件输入")
model = gr.Dropdown(list(MODEL_CHOICES.keys()), value=DEFAULT_MODEL_LABEL, label="检测引擎")
run_btn = gr.Button("Run Analysis", variant="primary")
risk_filter = gr.Radio(["全部", "高风险", "中风险", "低风险"], value="全部", label="风险筛选(分析后)")
export_json_btn = gr.Button("导出 JSON")
export_md_btn = gr.Button("导出 Markdown")
with gr.Column(scale=2):
status_out = gr.Markdown(value="状态: 等待分析", elem_classes=["status-pill"])
summary_out = gr.Markdown(label="总览")
log_out = gr.Textbox(label="实时日志终端", lines=12, interactive=False)
details_out = gr.Markdown(label="段落详情")
history_out = gr.Markdown(label="历史记录", value=format_history_markdown())
json_file_out = gr.File(label="JSON导出文件")
md_file_out = gr.File(label="Markdown导出文件")
run_btn.click(
fn=analyze_document,
inputs=[input_mode, file_input, pasted_text, model],
outputs=[summary_out, details_out, history_out, status_out, log_out, analysis_state, json_file_out, md_file_out],
show_progress="hidden",
)
risk_filter.change(fn=apply_risk_filter, inputs=[analysis_state, risk_filter], outputs=[details_out])
export_json_btn.click(fn=export_json, inputs=[analysis_state], outputs=[json_file_out])
export_md_btn.click(fn=export_md, inputs=[analysis_state], outputs=[md_file_out])
demo.launch()