import json import re import time from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple import fitz import gradio as gr import numpy as np import torch from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer try: import joblib except Exception: joblib = None try: import docx except Exception: docx = None MODEL_CHOICES = { "paperpass-v3(默认,论文场景优先)": "yibo365/paperpass-v3", "AIGC_detector_zhv3(新版中文检测)": "yuchuantian/aigc_detector_zhv3", "AIGC_detector_zhv2(兜底)": "yuchuantian/AIGC_detector_zhv2", "mba-aigc-detector(实验版,需本地模型包)": "mba_local_pack", } DEFAULT_MODEL_LABEL = "paperpass-v3(默认,论文场景优先)" RISK_THRESHOLD = 0.75 MIN_PARAGRAPH_CHARS = 80 TARGET_PARAGRAPH_CHARS = 420 MAX_PARAGRAPH_CHARS = 900 WINDOW_MAX_LENGTH = 512 WINDOW_STRIDE = 192 WINDOW_BATCH_SIZE = 64 PARAGRAPH_CHUNK_SIZE = 32 MAX_HISTORY_ITEMS = 30 CALIBRATION_PATH = Path("calibration/model.json") MBA_MODELS_DIR = Path("models/mba") HISTORY_PATH = Path("history/analysis_records.json") EXPORT_DIR = Path("exports") CURRENT_MODEL_NAME = None CURRENT_TOKENIZER = None CURRENT_MODEL = None MBA_STATE = { "ready": False, "extractor_tokenizer": None, "extractor_model": None, "tree_models": {}, } try: torch.set_num_threads(max(1, (torch.get_num_threads() or 4))) except Exception: pass def load_calibration_model() -> Dict: if not CALIBRATION_PATH.exists(): return {} try: data = json.loads(CALIBRATION_PATH.read_text(encoding="utf-8")) except Exception: return {} required = {"feature_order", "coef", "intercept"} if data.get("model_type") != "linear" or not required.issubset(data.keys()): return {} return data CALIBRATION_MODEL = load_calibration_model() def ensure_history_file(): HISTORY_PATH.parent.mkdir(parents=True, exist_ok=True) if not HISTORY_PATH.exists(): HISTORY_PATH.write_text("[]", encoding="utf-8") def load_history() -> List[Dict]: ensure_history_file() try: data = json.loads(HISTORY_PATH.read_text(encoding="utf-8")) if isinstance(data, list): return data except Exception: pass return [] def save_history_item(item: Dict): items = load_history() items.insert(0, item) HISTORY_PATH.write_text(json.dumps(items[:MAX_HISTORY_ITEMS], ensure_ascii=False, indent=2), encoding="utf-8") def format_history_markdown() -> str: items = load_history() if not items: return "暂无历史记录。" lines = ["# 历史分析记录"] for i, x in enumerate(items, 1): lines.append( f"{i}. `{x.get('time')}` | 文件: {x.get('source')} | 模型: {x.get('model')} | " f"综合风险: {x.get('overall', 0):.2%} | 预测知网率: {x.get('kn_like', 0):.2%} | 段落数: {x.get('paragraphs', 0)}" ) return "\n".join(lines) def get_or_load_model(model_name: str): global CURRENT_MODEL_NAME, CURRENT_TOKENIZER, CURRENT_MODEL if CURRENT_MODEL_NAME == model_name and CURRENT_TOKENIZER is not None and CURRENT_MODEL is not None: return CURRENT_TOKENIZER, CURRENT_MODEL tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) model = AutoModelForSequenceClassification.from_pretrained(model_name) model.eval() try: model = torch.compile(model) except Exception: pass CURRENT_MODEL_NAME = model_name CURRENT_TOKENIZER = tokenizer CURRENT_MODEL = model return tokenizer, model def normalize_text(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def is_probable_page_number(line: str) -> bool: s = line.strip() patterns = [r"^第\s*\d+\s*页$", r"^\d+\s*/\s*\d+$", r"^[-—]*\s*\d{1,4}\s*[-—]*$", r"^page\s*\d+$"] return any(re.match(p, s, flags=re.IGNORECASE) for p in patterns) def clean_common_noise(line: str) -> str: return re.sub(r"[ \t]+", " ", normalize_text(line)) def extract_pdf_text(file_path: str) -> Tuple[str, Dict]: doc = fitz.open(file_path) all_pages = len(doc) page_lines: List[List[str]] = [] for idx in range(all_pages): page = doc[idx] rect = page.rect top_cut, bottom_cut = rect.height * 0.06, rect.height * 0.06 blocks = page.get_text("blocks") lines = [] for b in sorted(blocks, key=lambda x: (round(x[1], 1), round(x[0], 1))): _, y0, _, y1, text, *_ = b if y1 <= top_cut or y0 >= rect.height - bottom_cut: continue for raw in text.splitlines(): line = clean_common_noise(raw) if line and not is_probable_page_number(line): lines.append(line) page_lines.append(lines) freq = {} for lines in page_lines: for c in set(lines[:2] + lines[-2:]): if len(c) >= 4: freq[c] = freq.get(c, 0) + 1 repeat_lines = {k for k, v in freq.items() if v >= max(3, int(0.4 * all_pages))} merged = [] for lines in page_lines: merged.append("\n".join([ln for ln in lines if ln not in repeat_lines and not is_probable_page_number(ln)])) return "\n\n".join(merged), {"total_pages": all_pages, "used_pages": all_pages, "page_truncated": False} def extract_docx_text(file_path: str) -> Tuple[str, Dict]: if docx is None: raise RuntimeError("当前环境缺少 python-docx。") d = docx.Document(file_path) paras = [clean_common_noise(p.text) for p in d.paragraphs if clean_common_noise(p.text)] return "\n\n".join(paras), {"total_pages": None, "used_pages": None, "page_truncated": False} def extract_txt_text(file_path: str) -> Tuple[str, Dict]: return Path(file_path).read_text(encoding="utf-8", errors="ignore"), {"total_pages": None, "used_pages": None, "page_truncated": False} def extract_document_text(upload_file) -> Tuple[str, Dict]: path = upload_file.name suffix = Path(path).suffix.lower() if suffix == ".pdf": return extract_pdf_text(path) if suffix == ".docx": return extract_docx_text(path) if suffix in {".txt", ".md"}: return extract_txt_text(path) raise RuntimeError("仅支持 pdf / docx / txt / md 文件。") def split_sentences(text: str) -> List[str]: t = re.sub(r"\n+", " ", text) parts = re.split(r"(?<=[。!?!?;;])\s*", t) return [normalize_text(x) for x in parts if normalize_text(x)] def rebuild_paragraphs_from_sentences(sentences: List[str]) -> List[str]: paragraphs: List[str] = [] cur: List[str] = [] cur_len = 0 for s in sentences: s_len = len(s) if s_len >= MAX_PARAGRAPH_CHARS: if cur: p = normalize_text(" ".join(cur)) if len(p) >= MIN_PARAGRAPH_CHARS: paragraphs.append(p) cur, cur_len = [], 0 paragraphs.append(s[:MAX_PARAGRAPH_CHARS]) continue should_flush = False if cur_len >= TARGET_PARAGRAPH_CHARS: should_flush = True if cur_len + s_len > MAX_PARAGRAPH_CHARS: should_flush = True if should_flush and cur: p = normalize_text(" ".join(cur)) if len(p) >= MIN_PARAGRAPH_CHARS: paragraphs.append(p) cur, cur_len = [], 0 cur.append(s) cur_len += s_len if cur: p = normalize_text(" ".join(cur)) if len(p) >= MIN_PARAGRAPH_CHARS: paragraphs.append(p) return paragraphs def split_paragraphs(text: str) -> List[str]: sents = split_sentences(text) return rebuild_paragraphs_from_sentences(sents) def should_skip_paragraph(text: str) -> bool: t = normalize_text(text) if not t: return True if re.search(r"(参考文献|致谢|附录|作者简介)", t[:40], flags=re.IGNORECASE): return True cn_chars = len(re.findall(r"[\u4e00-\u9fff]", t)) digit_punc = len(re.findall(r"[\d\W_]", t)) return cn_chars < 20 or digit_punc > len(t) * 0.75 def calc_repetition(text: str) -> float: t = normalize_text(text) grams = [t[i : i + 2] for i in range(max(0, len(t) - 1))] return 0.0 if not grams else max(0.0, 1.0 - len(set(grams)) / len(grams)) def calc_sentence_variance(text: str) -> float: sents = [s.strip() for s in re.split(r"[。!?!?]", text) if s.strip()] return 0.0 if len(sents) < 2 else float(min(np.var([len(s) for s in sents]) / 900.0, 1.0)) def detector_scores_transformer_stream( texts: List[str], model_name: str, progress_cb, log_cb, ) -> List[float]: if not texts: return [] tokenizer, model = get_or_load_model(model_name) all_scores: List[float] = [] total_chunks = max(1, (len(texts) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE) for ci, cstart in enumerate(range(0, len(texts), PARAGRAPH_CHUNK_SIZE), 1): cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(texts)) chunk = texts[cstart:cend] log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})") enc = tokenizer( chunk, truncation=True, max_length=WINDOW_MAX_LENGTH, stride=WINDOW_STRIDE, return_overflowing_tokens=True, padding=True, return_tensors="pt", ) sample_map = enc.pop("overflow_to_sample_mapping").tolist() window_count = len(sample_map) ai_probs = np.zeros(window_count, dtype=np.float32) batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE) with torch.inference_mode(): for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1): e = min(s + WINDOW_BATCH_SIZE, window_count) batch = {k: v[s:e] for k, v in enc.items()} logits = model(**batch).logits probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy() ai_probs[s:e] = probs # 0~95% for model stage. global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks progress_cb(round(global_batch_progress * 95)) log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}") buckets: List[List[float]] = [[] for _ in range(len(chunk))] for i, sid in enumerate(sample_map): buckets[sid].append(float(ai_probs[i])) for vals in buckets: arr = np.array(vals, dtype=np.float32) all_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr))) return all_scores def _extract_stat_features(text: str) -> np.ndarray: char_count = max(1, len(text)) sentences = [s.strip() for s in re.split(r"[。!?\.\n]", text) if s.strip()] lens = [len(s) for s in sentences] if sentences else [0] avg_sentence_length = float(np.mean(lens)) sentence_length_std = float(np.std(lens)) comma_ratio = (text.count(",") + text.count(",")) / char_count period_ratio = (text.count("。") + text.count(".")) / char_count pronouns = ["我", "你", "他", "她", "它", "我们", "你们", "他们"] conjunctions = ["和", "与", "或", "但是", "然而", "因此", "因为", "所以"] pronoun_ratio = sum(text.count(p) for p in pronouns) / char_count conjunction_ratio = sum(text.count(c) for c in conjunctions) / char_count unique_word_ratio = len(set(text)) / char_count words = text.split() avg_word_length = float(np.mean([len(w) for w in words])) if words else 0.0 digit_ratio = sum(c.isdigit() for c in text) / char_count chinese_char_ratio = len(re.findall(r"[\u4e00-\u9fff]", text)) / char_count paragraph_length = float(len(text)) burstiness = sentence_length_std / avg_sentence_length if avg_sentence_length > 0 else 0.0 formality_score = sum(text.count(w) for w in ["研究", "分析", "策略", "管理", "企业", "市场", "发展"]) / char_count return np.array([avg_sentence_length, sentence_length_std, comma_ratio, period_ratio, pronoun_ratio, conjunction_ratio, unique_word_ratio, avg_word_length, digit_ratio, chinese_char_ratio, paragraph_length, burstiness, formality_score]) def is_lfs_pointer(path: Path) -> bool: try: return path.read_text(encoding="utf-8", errors="ignore").startswith("version https://git-lfs.github.com/spec/v1") except Exception: return False def init_mba_pack() -> Tuple[bool, str]: if MBA_STATE["ready"]: return True, "" if joblib is None: return False, "当前环境缺少 joblib。" needed = ["select5_tree_d2_model.pkl", "select10_tree_d2_model.pkl", "select15_tree_d3_model.pkl", "select20_tree_d2_model.pkl", "bert_tree_d1_model.pkl"] if any(not (MBA_MODELS_DIR / f).exists() for f in needed): return False, "缺少 mba 模型文件,请将模型文件放入 models/mba/。" if any(is_lfs_pointer(MBA_MODELS_DIR / f) for f in needed): return False, "检测到 mba 模型文件是 Git LFS 指针,不是真实权重。" try: tok = AutoTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext") mdl = AutoModel.from_pretrained("hfl/chinese-roberta-wwm-ext") mdl.eval() trees = {name: joblib.load(MBA_MODELS_DIR / name) for name in needed} MBA_STATE.update({"ready": True, "extractor_tokenizer": tok, "extractor_model": mdl, "tree_models": trees}) return True, "" except Exception as e: return False, f"加载 mba 模型包失败: {e}" def detector_score_mba(text: str) -> float: ok, msg = init_mba_pack() if not ok: raise RuntimeError(msg) tok = MBA_STATE["extractor_tokenizer"] mdl = MBA_STATE["extractor_model"] inputs = tok(text[:512], return_tensors="pt", max_length=512, truncation=True, padding=True) with torch.inference_mode(): bert_feat = mdl(**inputs).last_hidden_state[:, 0, :].cpu().numpy()[0] stat_feat = _extract_stat_features(text) combined = np.concatenate([stat_feat, bert_feat]).reshape(1, -1) bert2d = bert_feat.reshape(1, -1) probs = [] for name, tree in MBA_STATE["tree_models"].items(): probs.append(float(tree.predict_proba(bert2d if "bert_tree" in name else combined)[0, 1])) return float(max(probs)) def analyze_paragraph_with_detector(text: str, detector: float) -> Dict[str, float]: repetition = calc_repetition(text) variance = calc_sentence_variance(text) risk = float(min(max(detector * 0.78 + repetition * 0.12 + (1 - variance) * 0.10, 0.0), 1.0)) return {"detector": detector, "repetition": repetition, "variance": variance, "risk": risk} def clip01(v: float) -> float: return float(min(max(v, 0.0), 1.0)) def build_doc_features(risks: List[float]) -> Dict[str, float]: arr = np.array(risks, dtype=float) return {"overall": clip01(float(np.mean(arr))), "p90": clip01(float(np.percentile(arr, 90))), "high_ratio": clip01(float(np.mean(arr > 0.75))), "mid_ratio": clip01(float(np.mean((arr > 0.55) & (arr <= 0.75)))), "std": clip01(float(np.std(arr)))} def predict_kn_like_rate(features: Dict[str, float]) -> float: if not CALIBRATION_MODEL: return features["overall"] x = np.array([features.get(n, 0.0) for n in CALIBRATION_MODEL["feature_order"]], dtype=float) return clip01(float(np.dot(x, np.array(CALIBRATION_MODEL["coef"], dtype=float)) + float(CALIBRATION_MODEL["intercept"]))) def build_filtered_details(blocks: List[Dict], level_filter: str) -> str: selected = blocks if level_filter == "全部" else [b for b in blocks if b["risk_level"] == level_filter] if not selected: return f"当前筛选 `{level_filter}` 下暂无段落。" return "\n\n---\n\n".join([b["content"] for b in selected]) def write_exports(state: Dict) -> Tuple[str, str]: EXPORT_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_path = EXPORT_DIR / f"analysis_{stamp}.json" md_path = EXPORT_DIR / f"analysis_{stamp}.md" json_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8") md_path.write_text(state.get("summary", "") + "\n\n" + state.get("details", ""), encoding="utf-8") return str(json_path), str(md_path) def append_log(logs: List[str], message: str) -> str: stamp = datetime.now().strftime("%H:%M:%S") logs.append(f"[{stamp}] {message}") return "\n".join(logs) def analyze_document(input_mode, upload_file, pasted_text, model_label): logs: List[str] = [] last_status = "状态: 等待分析" def emit(summary="", details="", history_md=None, status=None, state=None, json_path=None, md_path=None): nonlocal last_status if history_md is None: history_md = format_history_markdown() if status is None: status = last_status else: last_status = status return summary, details, history_md, status, "\n".join(logs), (state or {}), json_path, md_path def progress_cb(pct: int): nonlocal last_status pct = max(0, min(100, pct)) last_status = f"状态: 推理中 {pct}%" def log_cb(message: str): append_log(logs, message) if input_mode == "文本输入" and not normalize_text(pasted_text or ""): log_cb("等待输入:文本模式下未检测到文本内容。") yield emit(summary="请先粘贴文本内容。", status="状态: 等待输入") return if input_mode == "文件上传" and upload_file is None: log_cb("等待输入:文件模式下未检测到文件。") yield emit(summary="请先上传文件。", status="状态: 等待输入") return if upload_file is None and not normalize_text(pasted_text or ""): log_cb("等待输入:未检测到文件和文本。") yield emit(summary="请先上传文件,或粘贴文本。", status="状态: 等待输入") return model_name = MODEL_CHOICES.get(model_label, MODEL_CHOICES[DEFAULT_MODEL_LABEL]) log_cb(f"任务开始,模型={model_label}") if model_name == "mba_local_pack": ok, msg = init_mba_pack() if not ok: log_cb(f"模型不可用:{msg}") yield emit(summary=f"# 当前模型: {model_label}\n\n{msg}\n\n请切回其他模型。", status="状态: 模型不可用") return source = "pasted_text" use_text = input_mode == "文本输入" or (input_mode == "自动(有文本优先)" and normalize_text(pasted_text or "")) if use_text: t_extract = time.time() raw_text = pasted_text extract_meta = {"total_pages": None, "used_pages": None, "page_truncated": False} paragraphs = split_paragraphs(raw_text) log_cb(f"输入来源:文本框,切分段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s") else: t_extract = time.time() source = Path(upload_file.name).name raw_text, extract_meta = extract_document_text(upload_file) paragraphs = [p for p in split_paragraphs(raw_text) if not should_skip_paragraph(p)] log_cb(f"输入来源:文件 {source},抽取+切分后段落={len(paragraphs)},耗时={time.time()-t_extract:.2f}s") if not paragraphs: log_cb("终止:未提取到有效段落。") yield emit(summary="未提取到可分析正文。", status="状态: 无有效段落") return t0 = time.time() risks, details = [], [] total = len(paragraphs) log_cb("开始推理。") progress_cb(0) yield emit(status="状态: 开始分析... 0%") detector_scores = [] t_model = time.time() if model_name == "mba_local_pack": detector_scores = [float(min(max(detector_score_mba(p) * 0.3, 0.0), 1.0)) for p in paragraphs] log_cb(f"MBA 推理完成,段落={total},耗时={time.time()-t_model:.2f}s") progress_cb(95) yield emit() else: tokenizer, model = get_or_load_model(model_name) total_chunks = max(1, (len(paragraphs) + PARAGRAPH_CHUNK_SIZE - 1) // PARAGRAPH_CHUNK_SIZE) detector_scores = [] for ci, cstart in enumerate(range(0, len(paragraphs), PARAGRAPH_CHUNK_SIZE), 1): cend = min(cstart + PARAGRAPH_CHUNK_SIZE, len(paragraphs)) chunk = paragraphs[cstart:cend] log_cb(f"文本预处理 chunk {ci}/{total_chunks}(段落 {cstart+1}-{cend})") yield emit() enc = tokenizer( chunk, truncation=True, max_length=WINDOW_MAX_LENGTH, stride=WINDOW_STRIDE, return_overflowing_tokens=True, padding=True, return_tensors="pt", ) sample_map = enc.pop("overflow_to_sample_mapping").tolist() window_count = len(sample_map) ai_probs = np.zeros(window_count, dtype=np.float32) batch_total = max(1, (window_count + WINDOW_BATCH_SIZE - 1) // WINDOW_BATCH_SIZE) with torch.inference_mode(): for bi, s in enumerate(range(0, window_count, WINDOW_BATCH_SIZE), 1): e = min(s + WINDOW_BATCH_SIZE, window_count) batch = {k: v[s:e] for k, v in enc.items()} logits = model(**batch).logits probs = torch.softmax(logits, dim=-1)[:, 1].cpu().numpy() ai_probs[s:e] = probs global_batch_progress = ((ci - 1) + (bi / batch_total)) / total_chunks pct = round(global_batch_progress * 95) progress_cb(pct) log_cb(f"模型前向 chunk {ci}/{total_chunks} batch {bi}/{batch_total}") yield emit() buckets: List[List[float]] = [[] for _ in range(len(chunk))] for i, sid in enumerate(sample_map): buckets[sid].append(float(ai_probs[i])) for vals in buckets: arr = np.array(vals, dtype=np.float32) detector_scores.append(float(0.75 * np.mean(arr) + 0.25 * np.max(arr))) log_cb(f"模型前向完成,段落={total},耗时={time.time()-t_model:.2f}s") progress_cb(95) yield emit() last_pct = 95 for i, p in enumerate(paragraphs, 1): score = analyze_paragraph_with_detector(p, detector_scores[i - 1]) risks.append(score["risk"]) level = "🟢" if score["risk"] > RISK_THRESHOLD: level = "🔴" elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15): level = "🟡" risk_level = "低风险" if score["risk"] > RISK_THRESHOLD: risk_level = "高风险" elif score["risk"] > max(0.55, RISK_THRESHOLD - 0.15): risk_level = "中风险" details.append({ "risk_level": risk_level, "content": f""" {level} 段落 {i} AI风险: {score['risk']:.2%} Detector: {score['detector']:.2%} 重复度: {score['repetition']:.2%} 句式稳定性: {1 - score['variance']:.2%} {p} """, }) pct = 95 + round(i * 5 / total) if pct >= last_pct + 5 or i == total: last_pct = pct log_cb(f"进度 {pct}%({i}/{total})") progress_cb(pct) yield emit() f = build_doc_features(risks) kn_like = predict_kn_like_rate(f) elapsed = time.time() - t0 speed = len(paragraphs) / max(elapsed, 1e-6) trunc_info = [] if extract_meta.get("total_pages") is not None: trunc_info.append(f"页面截断: 否({extract_meta.get('used_pages')}/{extract_meta.get('total_pages')} 页)") trunc_info.append(f"段落截断: 否(分析 {len(paragraphs)}/{len(paragraphs)} 段)") mode_line = "当前模式: 原始风险率(未加载校准模型)" if not CALIBRATION_MODEL else "当前模式: 知网对齐预测率(已加载校准模型)" summary = f""" # 当前模型: {model_label} # 综合AI风险率: {f['overall']:.2%} # 预测知网AIGC率: {kn_like:.2%} 高风险段落占比: {f['high_ratio']:.2%} 中风险段落占比: {f['mid_ratio']:.2%} 有效段落数: {len(paragraphs)} 平均速度: {speed:.2f} 段/秒 {mode_line} {' | '.join(trunc_info)} (说明:该结果为“风险分析与校准预测”,并非官方系统结果) """ save_history_item({"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": source, "model": model_label, "overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs)}) history_md = format_history_markdown() details_text = build_filtered_details(details, "全部") log_cb(f"聚合完成:overall={f['overall']:.2%}, kn_like={kn_like:.2%}, 总耗时={elapsed:.2f}s, 速度={speed:.2f}段/秒") state = { "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": source, "model": model_label, "summary": summary, "details": details_text, "metrics": {"overall": f["overall"], "kn_like": kn_like, "paragraphs": len(paragraphs), "high_ratio": f["high_ratio"], "mid_ratio": f["mid_ratio"], "elapsed_sec": elapsed, "speed_para_per_sec": speed}, "paragraphs": details, } json_path, md_path = write_exports(state) log_cb(f"导出完成:{Path(json_path).name}, {Path(md_path).name}") progress_cb(100) yield emit(summary=summary, details=details_text, history_md=history_md, status="状态: 分析完成 100%", state=state, json_path=json_path, md_path=md_path) def apply_risk_filter(state: Dict, risk_filter: str): if not state: return "请先完成一次分析。" return build_filtered_details(state.get("paragraphs", []), risk_filter) def export_json(state: Dict): if not state: return None json_path, _ = write_exports(state) return json_path def export_md(state: Dict): if not state: return None _, md_path = write_exports(state) return md_path GEEK_CSS = """ :root { --bg: #f3f4ea; --ink: #102015; --panel: #fefef6; --accent: #0f6b3f; --accent2: #b57722; } @media (prefers-color-scheme: dark) { :root { --bg: #09110c; --ink: #d7e7d8; --panel: #0f1913; --accent: #4bd38a; --accent2: #efb24a; } } .gradio-container { background: radial-gradient(circle at 20% 20%, color-mix(in srgb, var(--bg) 85%, white 15%) 0%, var(--bg) 45%, color-mix(in srgb, var(--bg) 75%, black 25%) 100%); color: var(--ink); font-family: "IBM Plex Mono", "JetBrains Mono", monospace; } h1, h2, h3 { letter-spacing: 0.4px; } .panel { border: 2px solid var(--ink); border-radius: 12px; background: var(--panel); box-shadow: 6px 6px 0 color-mix(in srgb, var(--ink) 18%, transparent); } .status-pill { border: 2px dashed var(--accent2); border-radius: 10px; padding: 8px 10px; background: color-mix(in srgb, var(--panel) 85%, var(--accent2) 15%); } button.primary { background: linear-gradient(90deg, color-mix(in srgb, var(--accent) 70%, black 30%) 0%, var(--accent) 100%) !important; border: 2px solid color-mix(in srgb, var(--accent) 40%, black 60%) !important; } """ with gr.Blocks(theme=gr.themes.Base(), css=GEEK_CSS, title="论文AIGC风险检测系统") as demo: gr.Markdown(""" # 论文AIGC风险检测系统 支持 `PDF / Word(.docx) / 文本(.txt, .md)`,默认全文检测,支持直接粘贴文本。 """) analysis_state = gr.State({}) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 输入面板", elem_classes=["panel"]) input_mode = gr.Radio(["自动(有文本优先)", "文件上传", "文本输入"], value="自动(有文本优先)", label="输入模式") file_input = gr.File(file_types=[".pdf", ".docx", ".txt", ".md"], label="文件输入") pasted_text = gr.Textbox(lines=10, label="文本输入(可选)", placeholder="粘贴原文可覆盖文件输入") model = gr.Dropdown(list(MODEL_CHOICES.keys()), value=DEFAULT_MODEL_LABEL, label="检测引擎") run_btn = gr.Button("Run Analysis", variant="primary") risk_filter = gr.Radio(["全部", "高风险", "中风险", "低风险"], value="全部", label="风险筛选(分析后)") export_json_btn = gr.Button("导出 JSON") export_md_btn = gr.Button("导出 Markdown") with gr.Column(scale=2): status_out = gr.Markdown(value="状态: 等待分析", elem_classes=["status-pill"]) summary_out = gr.Markdown(label="总览") log_out = gr.Textbox(label="实时日志终端", lines=12, interactive=False) details_out = gr.Markdown(label="段落详情") history_out = gr.Markdown(label="历史记录", value=format_history_markdown()) json_file_out = gr.File(label="JSON导出文件") md_file_out = gr.File(label="Markdown导出文件") run_btn.click( fn=analyze_document, inputs=[input_mode, file_input, pasted_text, model], outputs=[summary_out, details_out, history_out, status_out, log_out, analysis_state, json_file_out, md_file_out], show_progress="hidden", ) risk_filter.change(fn=apply_risk_filter, inputs=[analysis_state, risk_filter], outputs=[details_out]) export_json_btn.click(fn=export_json, inputs=[analysis_state], outputs=[json_file_out]) export_md_btn.click(fn=export_md, inputs=[analysis_state], outputs=[md_file_out]) demo.launch()