Spaces:
Running
Running
| import json | |
| import re | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| import fitz | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer | |
| try: | |
| import joblib | |
| except Exception: | |
| joblib = None | |
| try: | |
| import docx | |
| except Exception: | |
| docx = None | |
| MODEL_CHOICES = { | |
| "paperpass-v3(默认,论文场景优先)": "yibo365/paperpass-v3", | |
| "AIGC_detector_zhv3(新版中文检测)": "yuchuantian/aigc_detector_zhv3", | |
| "AIGC_detector_zhv2(兜底)": "yuchuantian/AIGC_detector_zhv2", | |
| "mba-aigc-detector(实验版,需本地模型包)": "mba_local_pack", | |
| } | |
| DEFAULT_MODEL_LABEL = "paperpass-v3(默认,论文场景优先)" | |
| RISK_THRESHOLD = 0.75 | |
| MIN_PARAGRAPH_CHARS = 80 | |
| TARGET_PARAGRAPH_CHARS = 420 | |
| MAX_PARAGRAPH_CHARS = 900 | |
| WINDOW_MAX_LENGTH = 512 | |
| WINDOW_STRIDE = 192 | |
| WINDOW_BATCH_SIZE = 64 | |
| PARAGRAPH_CHUNK_SIZE = 32 | |
| MAX_HISTORY_ITEMS = 30 | |
| CALIBRATION_PATH = Path("calibration/model.json") | |
| MBA_MODELS_DIR = Path("models/mba") | |
| HISTORY_PATH = Path("history/analysis_records.json") | |
| EXPORT_DIR = Path("exports") | |
| CURRENT_MODEL_NAME = None | |
| CURRENT_TOKENIZER = None | |
| CURRENT_MODEL = None | |
| MBA_STATE = { | |
| "ready": False, | |
| "extractor_tokenizer": None, | |
| "extractor_model": None, | |
| "tree_models": {}, | |
| } | |
| try: | |
| torch.set_num_threads(max(1, (torch.get_num_threads() or 4))) | |
| except Exception: | |
| pass | |
| def load_calibration_model() -> Dict: | |
| if not CALIBRATION_PATH.exists(): | |
| return {} | |
| try: | |
| data = json.loads(CALIBRATION_PATH.read_text(encoding="utf-8")) | |
| except Exception: | |
| return {} | |
| required = {"feature_order", "coef", "intercept"} | |
| if data.get("model_type") != "linear" or not required.issubset(data.keys()): | |
| return {} | |
| return data | |
| CALIBRATION_MODEL = load_calibration_model() | |
| def ensure_history_file(): | |
| HISTORY_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| if not HISTORY_PATH.exists(): | |
| HISTORY_PATH.write_text("[]", encoding="utf-8") | |
| def load_history() -> List[Dict]: | |
| ensure_history_file() | |
| try: | |
| data = json.loads(HISTORY_PATH.read_text(encoding="utf-8")) | |
| if isinstance(data, list): | |
| return data | |
| except Exception: | |
| pass | |
| return [] | |
| def save_history_item(item: Dict): | |
| items = load_history() | |
| items.insert(0, item) | |
| HISTORY_PATH.write_text(json.dumps(items[:MAX_HISTORY_ITEMS], ensure_ascii=False, indent=2), encoding="utf-8") | |
| def format_history_markdown() -> str: | |
| items = load_history() | |
| if not items: | |
| return "暂无历史记录。" | |
| lines = ["# 历史分析记录"] | |
| for i, x in enumerate(items, 1): | |
| lines.append( | |
| f"{i}. `{x.get('time')}` | 文件: {x.get('source')} | 模型: {x.get('model')} | " | |
| f"综合风险: {x.get('overall', 0):.2%} | 预测知网率: {x.get('kn_like', 0):.2%} | 段落数: {x.get('paragraphs', 0)}" | |
| ) | |
| return "\n".join(lines) | |
| def get_or_load_model(model_name: str): | |
| global CURRENT_MODEL_NAME, CURRENT_TOKENIZER, CURRENT_MODEL | |
| if CURRENT_MODEL_NAME == model_name and CURRENT_TOKENIZER is not None and CURRENT_MODEL is not None: | |
| return CURRENT_TOKENIZER, CURRENT_MODEL | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| model.eval() | |
| try: | |
| model = torch.compile(model) | |
| except Exception: | |
| pass | |
| CURRENT_MODEL_NAME = model_name | |
| CURRENT_TOKENIZER = tokenizer | |
| CURRENT_MODEL = model | |
| return tokenizer, model | |
| def normalize_text(text: str) -> str: | |
| return re.sub(r"\s+", " ", text).strip() | |
| def is_probable_page_number(line: str) -> bool: | |
| s = line.strip() | |
| patterns = [r"^第\s*\d+\s*页$", r"^\d+\s*/\s*\d+$", r"^[-—]*\s*\d{1,4}\s*[-—]*$", r"^page\s*\d+$"] | |
| return any(re.match(p, s, flags=re.IGNORECASE) for p in patterns) | |
| def clean_common_noise(line: str) -> str: | |
| return re.sub(r"[ \t]+", " ", normalize_text(line)) | |
| def extract_pdf_text(file_path: str) -> Tuple[str, Dict]: | |
| doc = fitz.open(file_path) | |
| all_pages = len(doc) | |
| page_lines: List[List[str]] = [] | |
| for idx in range(all_pages): | |
| page = doc[idx] | |
| rect = page.rect | |
| top_cut, bottom_cut = rect.height * 0.06, rect.height * 0.06 | |
| blocks = page.get_text("blocks") | |
| lines = [] | |
| for b in sorted(blocks, key=lambda x: (round(x[1], 1), round(x[0], 1))): | |
| _, y0, _, y1, text, *_ = b | |
| if y1 <= top_cut or y0 >= rect.height - bottom_cut: | |
| continue | |
| for raw in text.splitlines(): | |
| line = clean_common_noise(raw) | |
| if line and not is_probable_page_number(line): | |
| lines.append(line) | |
| page_lines.append(lines) | |
| freq = {} | |
| for lines in page_lines: | |
| for c in set(lines[:2] + lines[-2:]): | |
| if len(c) >= 4: | |
| freq[c] = freq.get(c, 0) + 1 | |
| repeat_lines = {k for k, v in freq.items() if v >= max(3, int(0.4 * all_pages))} | |
| merged = [] | |
| for lines in page_lines: | |
| merged.append("\n".join([ln for ln in lines if ln not in repeat_lines and not is_probable_page_number(ln)])) | |
| return "\n\n".join(merged), {"total_pages": all_pages, "used_pages": all_pages, "page_truncated": False} | |
| def extract_docx_text(file_path: str) -> Tuple[str, Dict]: | |
| if docx is None: | |
| raise RuntimeError("当前环境缺少 python-docx。") | |
| d = docx.Document(file_path) | |
| paras = [clean_common_noise(p.text) for p in d.paragraphs if clean_common_noise(p.text)] | |
| return "\n\n".join(paras), {"total_pages": None, "used_pages": None, "page_truncated": False} | |
| def extract_txt_text(file_path: str) -> Tuple[str, Dict]: | |
| return Path(file_path).read_text(encoding="utf-8", errors="ignore"), {"total_pages": None, "used_pages": None, "page_truncated": False} | |
| def extract_document_text(upload_file) -> Tuple[str, Dict]: | |
| path = upload_file.name | |
| suffix = Path(path).suffix.lower() | |
| if suffix == ".pdf": | |
| return extract_pdf_text(path) | |
| if suffix == ".docx": | |
| return extract_docx_text(path) | |
| if suffix in {".txt", ".md"}: | |
| return extract_txt_text(path) | |
| raise RuntimeError("仅支持 pdf / docx / txt / md 文件。") | |
| def split_sentences(text: str) -> List[str]: | |
| t = re.sub(r"\n+", " ", text) | |
| parts = re.split(r"(?<=[。!?!?;;])\s*", t) | |
| return [normalize_text(x) for x in parts if normalize_text(x)] | |
| def rebuild_paragraphs_from_sentences(sentences: List[str]) -> List[str]: | |
| paragraphs: List[str] = [] | |
| cur: List[str] = [] | |
| cur_len = 0 | |
| for s in sentences: | |
| s_len = len(s) | |
| if s_len >= MAX_PARAGRAPH_CHARS: | |
| if cur: | |
| p = normalize_text(" ".join(cur)) | |
| if len(p) >= MIN_PARAGRAPH_CHARS: | |
| paragraphs.append(p) | |
| cur, cur_len = [], 0 | |
| paragraphs.append(s[:MAX_PARAGRAPH_CHARS]) | |
| continue | |
| should_flush = False | |
| if cur_len >= TARGET_PARAGRAPH_CHARS: | |
| should_flush = True | |
| if cur_len + s_len > MAX_PARAGRAPH_CHARS: | |
| should_flush = True | |
| if should_flush and cur: | |
| p = normalize_text(" ".join(cur)) | |
| if len(p) >= MIN_PARAGRAPH_CHARS: | |
| paragraphs.append(p) | |
| cur, cur_len = [], 0 | |
| cur.append(s) | |
| cur_len += s_len | |
| if cur: | |
| p = normalize_text(" ".join(cur)) | |
| if len(p) >= MIN_PARAGRAPH_CHARS: | |
| paragraphs.append(p) | |
| return paragraphs | |
| def split_paragraphs(text: str) -> List[str]: | |
| sents = split_sentences(text) | |
| return rebuild_paragraphs_from_sentences(sents) | |
| def should_skip_paragraph(text: str) -> bool: | |
| t = normalize_text(text) | |
| if not t: | |
| return True | |
| if re.search(r"(参考文献|致谢|附录|作者简介)", t[:40], flags=re.IGNORECASE): | |
| return True | |
| cn_chars = len(re.findall(r"[\u4e00-\u9fff]", t)) | |
| digit_punc = len(re.findall(r"[\d\W_]", t)) | |
| return cn_chars < 20 or digit_punc > len(t) * 0.75 | |
| def calc_repetition(text: str) -> float: | |
| t = normalize_text(text) | |
| grams = [t[i : i + 2] for i in range(max(0, len(t) - 1))] | |
| return 0.0 if not grams else max(0.0, 1.0 - len(set(grams)) / len(grams)) | |
| def calc_sentence_variance(text: str) -> float: | |
| sents = [s.strip() for s in re.split(r"[。!?!?]", text) if s.strip()] | |
| return 0.0 if len(sents) < 2 else float(min(np.var([len(s) for s in sents]) / 900.0, 1.0)) | |
| def detector_scores_transformer_stream( | |
| texts: List[str], | |
| model_name: str, | |
| progress_cb, | |
| log_cb, | |
| ) -> List[float]: | |
| if not texts: | |
| return [] | |
| tokenizer, model = get_or_load_model(model_name) | |
| all_scores: List[float] = [] | |
| total_chunks = max(1, (len(texts) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE) | |
| for ci, cstart in enumerate(range(0, len(texts), PARAGRAPH_CHUNK_SIZE), 1): | |
| cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(texts)) | |
| chunk = texts[cstart:cend] | |
| log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})") | |
| enc = tokenizer( | |
| chunk, | |
| truncation=True, | |
| max_length=WINDOW_MAX_LENGTH, | |
| stride=WINDOW_STRIDE, | |
| return_overflowing_tokens=True, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| sample_map = enc.pop("overflow_to_sample_mapping").tolist() | |
| window_count = len(sample_map) | |
| ai_probs = np.zeros(window_count, dtype=np.float32) | |
| batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE) | |
| with torch.inference_mode(): | |
| for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1): | |
| e = min(s + WINDOW_BATCH_SIZE, window_count) | |
| batch = {k: v[s:e] for k, v in enc.items()} | |
| logits = model(**batch).logits | |
| probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy() | |
| ai_probs[s:e] = probs | |
| # 0~95% for model stage. | |
| global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks | |
| progress_cb(round(global_batch_progress * 95)) | |
| log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}") | |
| buckets: List[List[float]] = [[] for _ in range(len(chunk))] | |
| for i, sid in enumerate(sample_map): | |
| buckets[sid].append(float(ai_probs[i])) | |
| for vals in buckets: | |
| arr = np.array(vals, dtype=np.float32) | |
| all_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr))) | |
| return all_scores | |
| def _extract_stat_features(text: str) -> np.ndarray: | |
| char_count = max(1, len(text)) | |
| sentences = [s.strip() for s in re.split(r"[。!?\.\n]", text) if s.strip()] | |
| lens = [len(s) for s in sentences] if sentences else [0] | |
| avg_sentence_length = float(np.mean(lens)) | |
| sentence_length_std = float(np.std(lens)) | |
| comma_ratio = (text.count(",") + text.count(",")) / char_count | |
| period_ratio = (text.count("。") + text.count(".")) / char_count | |
| pronouns = ["我", "你", "他", "她", "它", "我们", "你们", "他们"] | |
| conjunctions = ["和", "与", "或", "但是", "然而", "因此", "因为", "所以"] | |
| pronoun_ratio = sum(text.count(p) for p in pronouns) / char_count | |
| conjunction_ratio = sum(text.count(c) for c in conjunctions) / char_count | |
| unique_word_ratio = len(set(text)) / char_count | |
| words = text.split() | |
| avg_word_length = float(np.mean([len(w) for w in words])) if words else 0.0 | |
| digit_ratio = sum(c.isdigit() for c in text) / char_count | |
| chinese_char_ratio = len(re.findall(r"[\u4e00-\u9fff]", text)) / char_count | |
| paragraph_length = float(len(text)) | |
| burstiness = sentence_length_std / avg_sentence_length if avg_sentence_length > 0 else 0.0 | |
| formality_score = sum(text.count(w) for w in ["研究", "分析", "策略", "管理", "企业", "市场", "发展"]) / char_count | |
| return np.array([avg_sentence_length, sentence_length_std, comma_ratio, period_ratio, pronoun_ratio, conjunction_ratio, unique_word_ratio, avg_word_length, digit_ratio, chinese_char_ratio, paragraph_length, burstiness, formality_score]) | |
| def is_lfs_pointer(path: Path) -> bool: | |
| try: | |
| return path.read_text(encoding="utf-8", errors="ignore").startswith("version https://git-lfs.github.com/spec/v1") | |
| except Exception: | |
| return False | |
| def init_mba_pack() -> Tuple[bool, str]: | |
| if MBA_STATE["ready"]: | |
| return True, "" | |
| if joblib is None: | |
| return False, "当前环境缺少 joblib。" | |
| needed = ["select5_tree_d2_model.pkl", "select10_tree_d2_model.pkl", "select15_tree_d3_model.pkl", "select20_tree_d2_model.pkl", "bert_tree_d1_model.pkl"] | |
| if any(not (MBA_MODELS_DIR / f).exists() for f in needed): | |
| return False, "缺少 mba 模型文件,请将模型文件放入 models/mba/。" | |
| if any(is_lfs_pointer(MBA_MODELS_DIR / f) for f in needed): | |
| return False, "检测到 mba 模型文件是 Git LFS 指针,不是真实权重。" | |
| try: | |
| tok = AutoTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext") | |
| mdl = AutoModel.from_pretrained("hfl/chinese-roberta-wwm-ext") | |
| mdl.eval() | |
| trees = {name: joblib.load(MBA_MODELS_DIR / name) for name in needed} | |
| MBA_STATE.update({"ready": True, "extractor_tokenizer": tok, "extractor_model": mdl, "tree_models": trees}) | |
| return True, "" | |
| except Exception as e: | |
| return False, f"加载 mba 模型包失败: {e}" | |
| def detector_score_mba(text: str) -> float: | |
| ok, msg = init_mba_pack() | |
| if not ok: | |
| raise RuntimeError(msg) | |
| tok = MBA_STATE["extractor_tokenizer"] | |
| mdl = MBA_STATE["extractor_model"] | |
| inputs = tok(text[:512], return_tensors="pt", max_length=512, truncation=True, padding=True) | |
| with torch.inference_mode(): | |
| bert_feat = mdl(**inputs).last_hidden_state[:, 0, :].cpu().numpy()[0] | |
| stat_feat = _extract_stat_features(text) | |
| combined = np.concatenate([stat_feat, bert_feat]).reshape(1, -1) | |
| bert2d = bert_feat.reshape(1, -1) | |
| probs = [] | |
| for name, tree in MBA_STATE["tree_models"].items(): | |
| probs.append(float(tree.predict_proba(bert2d if "bert_tree" in name else combined)[0, 1])) | |
| return float(max(probs)) | |
| def analyze_paragraph_with_detector(text: str, detector: float) -> Dict[str, float]: | |
| repetition = calc_repetition(text) | |
| variance = calc_sentence_variance(text) | |
| risk = float(min(max(detector * 0.78 + repetition * 0.12 + (1 - variance) * 0.10, 0.0), 1.0)) | |
| return {"detector": detector, "repetition": repetition, "variance": variance, "risk": risk} | |
| def clip01(v: float) -> float: | |
| return float(min(max(v, 0.0), 1.0)) | |
| def build_doc_features(risks: List[float]) -> Dict[str, float]: | |
| arr = np.array(risks, dtype=float) | |
| return {"overall": clip01(float(np.mean(arr))), "p90": clip01(float(np.percentile(arr, 90))), "high_ratio": clip01(float(np.mean(arr > 0.75))), "mid_ratio": clip01(float(np.mean((arr > 0.55) & (arr <= 0.75)))), "std": clip01(float(np.std(arr)))} | |
| def predict_kn_like_rate(features: Dict[str, float]) -> float: | |
| if not CALIBRATION_MODEL: | |
| return features["overall"] | |
| x = np.array([features.get(n, 0.0) for n in CALIBRATION_MODEL["feature_order"]], dtype=float) | |
| return clip01(float(np.dot(x, np.array(CALIBRATION_MODEL["coef"], dtype=float)) + float(CALIBRATION_MODEL["intercept"]))) | |
| def build_filtered_details(blocks: List[Dict], level_filter: str) -> str: | |
| selected = blocks if level_filter == "全部" else [b for b in blocks if b["risk_level"] == level_filter] | |
| if not selected: | |
| return f"当前筛选 `{level_filter}` 下暂无段落。" | |
| return "\n\n---\n\n".join([b["content"] for b in selected]) | |
| def write_exports(state: Dict) -> Tuple[str, str]: | |
| EXPORT_DIR.mkdir(parents=True, exist_ok=True) | |
| stamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| json_path = EXPORT_DIR / f"analysis_{stamp}.json" | |
| md_path = EXPORT_DIR / f"analysis_{stamp}.md" | |
| json_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8") | |
| md_path.write_text(state.get("summary", "") + "\n\n" + state.get("details", ""), encoding="utf-8") | |
| return str(json_path), str(md_path) | |
| def append_log(logs: List[str], message: str) -> str: | |
| stamp = datetime.now().strftime("%H:%M:%S") | |
| logs.append(f"[{stamp}] {message}") | |
| return "\n".join(logs) | |
| def analyze_document(input_mode, upload_file, pasted_text, model_label): | |
| logs: List[str] = [] | |
| last_status = "状态: 等待分析" | |
| def emit(summary="", details="", history_md=None, status=None, state=None, json_path=None, md_path=None): | |
| nonlocal last_status | |
| if history_md is None: | |
| history_md = format_history_markdown() | |
| if status is None: | |
| status = last_status | |
| else: | |
| last_status = status | |
| return summary, details, history_md, status, "\n".join(logs), (state or {}), json_path, md_path | |
| def progress_cb(pct: int): | |
| nonlocal last_status | |
| pct = max(0, min(100, pct)) | |
| last_status = f"状态: 推理中 {pct}%" | |
| def log_cb(message: str): | |
| append_log(logs, message) | |
| if input_mode == "文本输入" and not normalize_text(pasted_text or ""): | |
| log_cb("等待输入:文本模式下未检测到文本内容。") | |
| yield emit(summary="请先粘贴文本内容。", status="状态: 等待输入") | |
| return | |
| if input_mode == "文件上传" and upload_file is None: | |
| log_cb("等待输入:文件模式下未检测到文件。") | |
| yield emit(summary="请先上传文件。", status="状态: 等待输入") | |
| return | |
| if upload_file is None and not normalize_text(pasted_text or ""): | |
| log_cb("等待输入:未检测到文件和文本。") | |
| yield emit(summary="请先上传文件,或粘贴文本。", status="状态: 等待输入") | |
| return | |
| model_name = MODEL_CHOICES.get(model_label, MODEL_CHOICES[DEFAULT_MODEL_LABEL]) | |
| log_cb(f"任务开始,模型={model_label}") | |
| if model_name == "mba_local_pack": | |
| ok, msg = init_mba_pack() | |
| if not ok: | |
| log_cb(f"模型不可用:{msg}") | |
| yield emit(summary=f"# 当前模型: {model_label}\n\n{msg}\n\n请切回其他模型。", status="状态: 模型不可用") | |
| return | |
| source = "pasted_text" | |
| use_text = input_mode == "文本输入" or (input_mode == "自动(有文本优先)" and normalize_text(pasted_text or "")) | |
| if use_text: | |
| t_extract = time.time() | |
| raw_text = pasted_text | |
| extract_meta = {"total_pages": None, "used_pages": None, "page_truncated": False} | |
| paragraphs = split_paragraphs(raw_text) | |
| log_cb(f"输入来源:文本框,切分段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s") | |
| else: | |
| t_extract = time.time() | |
| source = Path(upload_file.name).name | |
| raw_text, extract_meta = extract_document_text(upload_file) | |
| paragraphs = [p for p in split_paragraphs(raw_text) if not should_skip_paragraph(p)] | |
| log_cb(f"输入来源:文件 {source},抽取+切分后段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s") | |
| if not paragraphs: | |
| log_cb("终止:未提取到有效段落。") | |
| yield emit(summary="未提取到可分析正文。", status="状态: 无有效段落") | |
| return | |
| t0 = time.time() | |
| risks, details = [], [] | |
| total = len(paragraphs) | |
| log_cb("开始推理。") | |
| progress_cb(0) | |
| yield emit(status="状态: 开始分析... 0%") | |
| detector_scores = [] | |
| t_model = time.time() | |
| if model_name == "mba_local_pack": | |
| detector_scores = [float(min(max(detector_score_mba(p) * 0.3, 0.0), 1.0)) for p in paragraphs] | |
| log_cb(f"MBA 推理完成,段落={total},耗时={time.time()-t_model:.2f}s") | |
| progress_cb(95) | |
| yield emit() | |
| else: | |
| tokenizer, model = get_or_load_model(model_name) | |
| total_chunks = max(1, (len(paragraphs) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE) | |
| detector_scores = [] | |
| for ci, cstart in enumerate(range(0, len(paragraphs), PARAGRAPH_CHUNK_SIZE), 1): | |
| cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(paragraphs)) | |
| chunk = paragraphs[cstart:cend] | |
| log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})") | |
| yield emit() | |
| enc = tokenizer( | |
| chunk, | |
| truncation=True, | |
| max_length=WINDOW_MAX_LENGTH, | |
| stride=WINDOW_STRIDE, | |
| return_overflowing_tokens=True, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| sample_map = enc.pop("overflow_to_sample_mapping").tolist() | |
| window_count = len(sample_map) | |
| ai_probs = np.zeros(window_count, dtype=np.float32) | |
| batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE) | |
| with torch.inference_mode(): | |
| for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1): | |
| e = min(s + WINDOW_BATCH_SIZE, window_count) | |
| batch = {k: v[s:e] for k, v in enc.items()} | |
| logits = model(**batch).logits | |
| probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy() | |
| ai_probs[s:e] = probs | |
| global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks | |
| pct = round(global_batch_progress * 95) | |
| progress_cb(pct) | |
| log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}") | |
| yield emit() | |
| buckets: List[List[float]] = [[] for _ in range(len(chunk))] | |
| for i, sid in enumerate(sample_map): | |
| buckets[sid].append(float(ai_probs[i])) | |
| for vals in buckets: | |
| arr = np.array(vals, dtype=np.float32) | |
| detector_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr))) | |
| log_cb(f"模型前向完成,段落={total},耗时={time.time()-t_model:.2f}s") | |
| progress_cb(95) | |
| yield emit() | |
| last_pct = 95 | |
| for i, p in enumerate(paragraphs, 1): | |
| score = analyze_paragraph_with_detector(p, detector_scores[i - 1]) | |
| risks.append(score["risk"]) | |
| level = "🟢" | |
| if score["risk"] > RISK_THRESHOLD: | |
| level = "🔴" | |
| elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15): | |
| level = "🟡" | |
| risk_level = "低风险" | |
| if score["risk"] > RISK_THRESHOLD: | |
| risk_level = "高风险" | |
| elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15): | |
| risk_level = "中风险" | |
| details.append({ | |
| "risk_level": risk_level, | |
| "content": f""" | |
| {level} 段落 {i} AI风险: {score['risk']:.2%} | |
| Detector: {score['detector']:.2%} | |
| 重复度: {score['repetition']:.2%} | |
| 句式稳定性: {1 - score['variance']:.2%} | |
| {p} | |
| """, | |
| }) | |
| pct = 95 + round(i * 5 / total) | |
| if pct >= last_pct + 5 or i == total: | |
| last_pct = pct | |
| log_cb(f"进度 {pct}%({i}/{total})") | |
| progress_cb(pct) | |
| yield emit() | |
| f = build_doc_features(risks) | |
| kn_like = predict_kn_like_rate(f) | |
| elapsed = time.time() - t0 | |
| speed = len(paragraphs) / max(elapsed, 1e-6) | |
| trunc_info = [] | |
| if extract_meta.get("total_pages") is not None: | |
| trunc_info.append(f"页面截断: 否({extract_meta.get('used_pages')}/{extract_meta.get('total_pages')} 页)") | |
| trunc_info.append(f"段落截断: 否(分析 {len(paragraphs)}/{len(paragraphs)} 段)") | |
| mode_line = "当前模式: 原始风险率(未加载校准模型)" if not CALIBRATION_MODEL else "当前模式: 知网对齐预测率(已加载校准模型)" | |
| summary = f""" | |
| # 当前模型: {model_label} | |
| # 综合AI风险率: {f['overall']:.2%} | |
| # 预测知网AIGC率: {kn_like:.2%} | |
| 高风险段落占比: {f['high_ratio']:.2%} | |
| 中风险段落占比: {f['mid_ratio']:.2%} | |
| 有效段落数: {len(paragraphs)} | |
| 平均速度: {speed:.2f} 段/秒 | |
| {mode_line} | |
| {' | '.join(trunc_info)} | |
| (说明:该结果为“风险分析与校准预测”,并非官方系统结果) | |
| """ | |
| save_history_item({"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": source, "model": model_label, "overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs)}) | |
| history_md = format_history_markdown() | |
| details_text = build_filtered_details(details, "全部") | |
| log_cb(f"聚合完成:overall={f['overall']:.2%}, kn_like={kn_like:.2%}, 总耗时={elapsed:.2f}s, 速度={speed:.2f}段/秒") | |
| state = { | |
| "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "source": source, | |
| "model": model_label, | |
| "summary": summary, | |
| "details": details_text, | |
| "metrics": {"overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs), "high_ratio": f["high_ratio"], "mid_ratio": f["mid_ratio"], "elapsed_sec": elapsed, "speed_para_per_sec": speed}, | |
| "paragraphs": details, | |
| } | |
| json_path, md_path = write_exports(state) | |
| log_cb(f"导出完成:{Path(json_path).name}, {Path(md_path).name}") | |
| progress_cb(100) | |
| yield emit(summary=summary, details=details_text, history_md=history_md, status="状态: 分析完成 100%", state=state, json_path=json_path, md_path=md_path) | |
| def apply_risk_filter(state: Dict, risk_filter: str): | |
| if not state: | |
| return "请先完成一次分析。" | |
| return build_filtered_details(state.get("paragraphs", []), risk_filter) | |
| def export_json(state: Dict): | |
| if not state: | |
| return None | |
| json_path, _ = write_exports(state) | |
| return json_path | |
| def export_md(state: Dict): | |
| if not state: | |
| return None | |
| _, md_path = write_exports(state) | |
| return md_path | |
| GEEK_CSS = """ | |
| :root { | |
| --bg: #f3f4ea; | |
| --ink: #102015; | |
| --panel: #fefef6; | |
| --accent: #0f6b3f; | |
| --accent2: #b57722; | |
| } | |
| @media (prefers-color-scheme: dark) { | |
| :root { | |
| --bg: #09110c; | |
| --ink: #d7e7d8; | |
| --panel: #0f1913; | |
| --accent: #4bd38a; | |
| --accent2: #efb24a; | |
| } | |
| } | |
| .gradio-container { | |
| background: radial-gradient(circle at 20% 20%, color-mix(in srgb, var(--bg) 85%, white 15%) 0%, var(--bg) 45%, color-mix(in srgb, var(--bg) 75%, black 25%) 100%); | |
| color: var(--ink); | |
| font-family: "IBM Plex Mono", "JetBrains Mono", monospace; | |
| } | |
| h1, h2, h3 { | |
| letter-spacing: 0.4px; | |
| } | |
| .panel { | |
| border: 2px solid var(--ink); | |
| border-radius: 12px; | |
| background: var(--panel); | |
| box-shadow: 6px 6px 0 color-mix(in srgb, var(--ink) 18%, transparent); | |
| } | |
| .status-pill { | |
| border: 2px dashed var(--accent2); | |
| border-radius: 10px; | |
| padding: 8px 10px; | |
| background: color-mix(in srgb, var(--panel) 85%, var(--accent2) 15%); | |
| } | |
| button.primary { | |
| background: linear-gradient(90deg, color-mix(in srgb, var(--accent) 70%, black 30%) 0%, var(--accent) 100%) !important; | |
| border: 2px solid color-mix(in srgb, var(--accent) 40%, black 60%) !important; | |
| } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Base(), css=GEEK_CSS, title="论文AIGC风险检测系统") as demo: | |
| gr.Markdown(""" | |
| # 论文AIGC风险检测系统 | |
| 支持 `PDF / Word(.docx) / 文本(.txt, .md)`,默认全文检测,支持直接粘贴文本。 | |
| """) | |
| analysis_state = gr.State({}) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 输入面板", elem_classes=["panel"]) | |
| input_mode = gr.Radio(["自动(有文本优先)", "文件上传", "文本输入"], value="自动(有文本优先)", label="输入模式") | |
| file_input = gr.File(file_types=[".pdf", ".docx", ".txt", ".md"], label="文件输入") | |
| pasted_text = gr.Textbox(lines=10, label="文本输入(可选)", placeholder="粘贴原文可覆盖文件输入") | |
| model = gr.Dropdown(list(MODEL_CHOICES.keys()), value=DEFAULT_MODEL_LABEL, label="检测引擎") | |
| run_btn = gr.Button("Run Analysis", variant="primary") | |
| risk_filter = gr.Radio(["全部", "高风险", "中风险", "低风险"], value="全部", label="风险筛选(分析后)") | |
| export_json_btn = gr.Button("导出 JSON") | |
| export_md_btn = gr.Button("导出 Markdown") | |
| with gr.Column(scale=2): | |
| status_out = gr.Markdown(value="状态: 等待分析", elem_classes=["status-pill"]) | |
| summary_out = gr.Markdown(label="总览") | |
| log_out = gr.Textbox(label="实时日志终端", lines=12, interactive=False) | |
| details_out = gr.Markdown(label="段落详情") | |
| history_out = gr.Markdown(label="历史记录", value=format_history_markdown()) | |
| json_file_out = gr.File(label="JSON导出文件") | |
| md_file_out = gr.File(label="Markdown导出文件") | |
| run_btn.click( | |
| fn=analyze_document, | |
| inputs=[input_mode, file_input, pasted_text, model], | |
| outputs=[summary_out, details_out, history_out, status_out, log_out, analysis_state, json_file_out, md_file_out], | |
| show_progress="hidden", | |
| ) | |
| risk_filter.change(fn=apply_risk_filter, inputs=[analysis_state, risk_filter], outputs=[details_out]) | |
| export_json_btn.click(fn=export_json, inputs=[analysis_state], outputs=[json_file_out]) | |
| export_md_btn.click(fn=export_md, inputs=[analysis_state], outputs=[md_file_out]) | |
| demo.launch() | |