""" OpenWolf HF Spaces — FastAPI 入口 ML 依赖在 startup 时自动安装,保持 Docker 构建轻量 """ import os import sys import json import asyncio import time import threading import uuid import requests import re import hashlib import random from pathlib import Path from fastapi import FastAPI, Request, HTTPException, BackgroundTasks import concurrent.futures from fastapi.responses import JSONResponse sys.path.insert(0, "/app") # ── 设默认环境变量 ── os.environ.setdefault("ISSUE_NUMBER", "0") os.environ.setdefault("COMMENT_BODY", "") os.environ.setdefault("COMMENT_USER", "spaces") os.environ.setdefault("GITHUB_REPO", "hughyonng/OpenWolf") os.environ.setdefault("GITHUB_TOKEN", os.environ.get("GITHUB_PAT", "")) os.environ.setdefault("OPENWOLF_PAT", os.environ.get("GITHUB_PAT", "")) os.environ.setdefault("TELEGRAM_BOT_TOKEN", "") os.environ.setdefault("TELEGRAM_CHAT_ID", "") app = FastAPI(title="OpenWolf Agent with Cloud Acceleration") @app.exception_handler(Exception) async def _catch_all(request: Request, exc: Exception): print(f"[FATAL] {request.method} {request.url.path}: {exc}") return JSONResponse({"ok": False, "error": str(exc)}, status_code=500) _ready = False _model_loading = False _model_loaded = False _background_tasks = set() _extract_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="extract") _translate_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="translate") # GGUF 模型及全局状态 _llama_model = None _llama_lock = threading.Lock() _infer_lock = threading.Lock() _translate_tasks = {} _cancelled_tasks: set = set() # 存 chat_id,被取消时加入 _analyze_tasks = {} _analyze_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="analyze") _task_tasks = {} _task_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="text_task") _ocr_pdf_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="ocr_pdf") _ocr_tasks = {} _OCR_TASK_DIR = Path("/app/.ocr_tasks") _OCR_TASK_DIR.mkdir(parents=True, exist_ok=True) def _ocr_task_path(task_id: str) -> Path: return _OCR_TASK_DIR / f"{task_id}.json" def _ocr_task_write(task_id: str, data: dict): """同时写内存和文件,保证重启后可恢复。""" _ocr_tasks[task_id] = data try: _ocr_task_path(task_id).write_text( json.dumps(data, ensure_ascii=False), encoding="utf-8" ) except Exception as e: print(f"[ocr_task] 文件写入失败 {task_id}: {e}") def _ocr_task_read(task_id: str) -> dict | None: """先查内存,没有则从文件恢复(Spaces 重启场景)。""" if task_id in _ocr_tasks: return _ocr_tasks[task_id] p = _ocr_task_path(task_id) if p.exists(): try: data = json.loads(p.read_text(encoding="utf-8")) _ocr_tasks[task_id] = data # 恢复到内存 return data except Exception as e: print(f"[ocr_task] 文件读取失败 {task_id}: {e}") return None def _ocr_task_delete(task_id: str): pass def _ocr_task_cleanup_old(): """清理超过1小时的旧任务文件,在 startup 时运行一次。""" for d in [_OCR_TASK_DIR, _TRANSLATE_TASK_DIR, _ANALYZE_TASK_DIR, _TASK_TASK_DIR]: try: cutoff = time.time() - 3600 for p in d.glob("*.json"): if p.stat().st_mtime < cutoff: p.unlink(missing_ok=True) except Exception as e: print(f"[task_cleanup] 清理 {d.name} 失败: {e}") # ── 翻译任务持久化 ── _TRANSLATE_TASK_DIR = Path("/app/.translate_tasks") _TRANSLATE_TASK_DIR.mkdir(parents=True, exist_ok=True) def _translate_task_path(task_id: str) -> Path: return _TRANSLATE_TASK_DIR / f"{task_id}.json" def _translate_task_write(task_id: str, data: dict): _translate_tasks[task_id] = data try: _translate_task_path(task_id).write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8") except Exception as e: print(f"[trans_task] 文件写入失败 {task_id}: {e}") def _translate_task_read(task_id: str) -> dict | None: if task_id in _translate_tasks: return _translate_tasks[task_id] p = _translate_task_path(task_id) if p.exists(): try: data = json.loads(p.read_text(encoding="utf-8")) _translate_tasks[task_id] = data return data except Exception as e: print(f"[trans_task] 文件读取失败 {task_id}: {e}") return None # ── 分析任务持久化 ── _ANALYZE_TASK_DIR = Path("/app/.analyze_tasks") _ANALYZE_TASK_DIR.mkdir(parents=True, exist_ok=True) def _analyze_task_path(task_id: str) -> Path: return _ANALYZE_TASK_DIR / f"{task_id}.json" def _analyze_task_write(task_id: str, data: dict): _analyze_tasks[task_id] = data try: _analyze_task_path(task_id).write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8") except Exception as e: print(f"[analyze_task] 文件写入失败 {task_id}: {e}") def _analyze_task_read(task_id: str) -> dict | None: if task_id in _analyze_tasks: return _analyze_tasks[task_id] p = _analyze_task_path(task_id) if p.exists(): try: data = json.loads(p.read_text(encoding="utf-8")) _analyze_tasks[task_id] = data return data except Exception as e: print(f"[analyze_task] 文件读取失败 {task_id}: {e}") return None # ── 通用任务持久化 ── _TASK_TASK_DIR = Path("/app/.task_tasks") _TASK_TASK_DIR.mkdir(parents=True, exist_ok=True) def _task_task_path(task_id: str) -> Path: return _TASK_TASK_DIR / f"{task_id}.json" def _task_task_write(task_id: str, data: dict): _task_tasks[task_id] = data try: _task_task_path(task_id).write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8") except Exception as e: print(f"[generic_task] 文件写入失败 {task_id}: {e}") def _task_task_read(task_id: str) -> dict | None: if task_id in _task_tasks: return _task_tasks[task_id] p = _task_task_path(task_id) if p.exists(): try: data = json.loads(p.read_text(encoding="utf-8")) _task_tasks[task_id] = data return data except Exception as e: print(f"[generic_task] 文件读取失败 {task_id}: {e}") return None # 🌟 优化:扫描版 PDF 的 Lazy-OCR 提取单次页数,增加到 25 页,完美对应 5000 汉字生成量 PAGES_PER_CHUNK = 25 # ══════════════════════════════════════════════════════════════════ # ModelScope 每日额度持久化管理器 # ══════════════════════════════════════════════════════════════════ class ModelScopeQuotaManager: def __init__(self): self.lock = threading.Lock() self.file_path = Path("/app/.translate_cache/modelscope_quota.json") self.file_path.parent.mkdir(parents=True, exist_ok=True) self._load() def _load(self): if self.file_path.exists(): try: self.data = json.loads(self.file_path.read_text(encoding="utf-8")) except Exception: self.data = {} else: self.data = {} def _save(self): try: self.file_path.write_text(json.dumps(self.data, ensure_ascii=False), encoding="utf-8") except Exception as e: print(f"[quota] 保存配额记录失败: {e}") def increment(self, model_name: str) -> bool: with self.lock: self._load() today = time.strftime("%Y-%m-%d", time.localtime()) if self.data.get("date") != today: self.data = {"date": today, "total": 0, "usage": {}, "fails": {}} # ★ 连续失败 5 次 → 当日跳过 fail_count = self.data.get("fails", {}).get(model_name, 0) if fail_count >= 5: return False current_usage = self.data["usage"].get(model_name, 0) if self.data["total"] >= 1000: return False if current_usage >= 200: return False self.data["usage"][model_name] = current_usage + 1 self.data["total"] += 1 self._save() return True def record_fail(self, model_name: str): with self.lock: self._load() fails = self.data.setdefault("fails", {}) c = fails.get(model_name, 0) + 1 fails[model_name] = c if c == 5: print(f"[quota] 模型 {model_name} 连续失败 5 次,今日跳过") self._save() def record_success(self, model_name: str): with self.lock: self._load() fails = self.data.setdefault("fails", {}) if fails.get(model_name, 0) > 0: print(f"[quota] 模型 {model_name} 恢复响应,重置失败计数") fails[model_name] = 0 self._save() _quota_manager = ModelScopeQuotaManager() @app.on_event("startup") async def startup(): global _ready _ready = True print("[startup] OpenWolf Spaces ready") # 清理上次重启前遗留的旧任务文件 threading.Thread(target=_ocr_task_cleanup_old, daemon=True).start() def _ensure_models(): _models_dir = Path("/app/models") _bge_dir = _models_dir / "bge-m3" try: if not (_bge_dir / "config.json").exists(): print("[models] Downloading bge-m3 (2.2GB)...") t0 = time.time() from sentence_transformers import SentenceTransformer _ = SentenceTransformer("BAAI/bge-m3", device="cpu") print(f"[models] bge-m3 done in {time.time()-t0:.1f}s") except Exception as e: print(f"[models] bge-m3 download failed: {e}") try: _gguf_files = [ ("HY-MT1.5-1.8B-Q4_K_M.gguf", 1.13), ("HY-MT1.5-1.8B-Q8_0.gguf", 1.91), ] _gguf_to_download = None for _name, _gb in _gguf_files: _p = _models_dir / "translate" / _name if _p.exists(): _gguf_to_download = None break if _gguf_to_download is None: _gguf_to_download = (_name, _gb) if _gguf_to_download: _name, _gb = _gguf_to_download print(f"[models] Downloading {_name} ({_gb}GB)...") t0 = time.time() from huggingface_hub import hf_hub_download hf_hub_download( repo_id="tencent/HY-MT1.5-1.8B-GGUF", filename=_name, local_dir=str(_models_dir / "translate"), ) print(f"[models] GGUF done in {time.time()-t0:.1f}s") except Exception as e: print(f"[models] GGUF download failed: {e}") print("[models] All models ready") try: global _model_loaded, _model_loading print("[warmup] Loading bge-m3...") t0 = time.time() from sentence_transformers import SentenceTransformer _ = SentenceTransformer("BAAI/bge-m3", device="cpu") _model_loaded = True _model_loading = False print(f"[warmup] bge-m3 loaded in {time.time()-t0:.1f}s") except Exception as e: print(f"[warmup] bge-m3 FAILED: {e}") _model_loading = False threading.Thread(target=_ensure_models, daemon=True).start() @app.get("/health") async def health(): env_keys = [ "MODELSCOPE_API_KEY", "OPENROUTER_API_KEY", "GOOGLE_API_KEY", "CHATANYWHERE_API_KEY", "GROQ_API_KEY", "GITHUB_PAT", "GITHUB_REPO", "TELEGRAM_BOT_TOKEN", "TELEGRAM_CHAT_ID", "OPENWOLF_PAT", "SILICONFLOW_API_KEY", "ZHIPU_API_KEY", "NVIDIA_API_KEY" ] env_status = {k: "✅" if os.environ.get(k) else "❌" for k in env_keys} return {"status": "ok", "ready": _ready, "env": env_status} @app.post("/extract-r2") async def extract_r2(request: Request): """ 从 R2 公开 URL 下载文件并提取文本内容,用于 SPA 文档对话上传。 """ try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") url = body.get("url", "") file_name = body.get("file_name", "document.pdf") if not url: return {"ok": False, "error": "url required"} ext = file_name.rsplit(".", 1)[-1].lower() if "." in file_name else "pdf" import requests as _req import uuid as _uuid from pathlib import Path try: r = _req.get(url, timeout=120, stream=True) if r.status_code != 200: return {"ok": False, "error": f"下载失败 HTTP {r.status_code}"} local_path = Path("/app") / f"inputs/{_uuid.uuid4().hex}.{ext}" local_path.parent.mkdir(parents=True, exist_ok=True) with open(local_path, "wb") as f: for chunk in r.iter_content(chunk_size=65536): f.write(chunk) text = "" if ext == "pdf": import pdfplumber with pdfplumber.open(local_path) as p: for page in p.pages: t = page.extract_text() if t: text += t + "\n" elif ext in ("txt", "md", "csv", "json"): text = local_path.read_text(encoding="utf-8", errors="ignore") elif ext == "docx": import docx d = docx.Document(local_path) text = "\n".join(p.text for p in d.paragraphs) elif ext == "epub": import zipfile, xml.etree.ElementTree as ET with zipfile.ZipFile(local_path, "r") as z: for name in z.namelist(): if name.endswith((".xhtml", ".html", ".htm")): try: root = ET.fromstring(z.read(name)) for elem in root.iter(): if elem.text: text += elem.text.strip() + " " if elem.tail: text += elem.tail.strip() + " " text += "\n\n" except Exception: pass try: local_path.unlink() except Exception: pass if not text.strip(): return {"ok": False, "error": "无法提取文本内容"} return {"ok": True, "text": text[:50000], "file_name": file_name} except Exception as e: return {"ok": False, "error": f"提取失败: {e}"} # ══════════════════════════════════════════════════════════════════ # 异步 OCR 提取端点(PDF 转换面板用,不含 LLM,纯物理提取) # ══════════════════════════════════════════════════════════════════ @app.post("/ocr-pdf") async def ocr_pdf(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") url = body.get("url") file_name = body.get("file_name", "document.pdf") callback_url = body.get("callback_url") ocr_model = body.get("model", "") if not url: return {"ok": False, "error": "R2 URL is required"} task_id = str(uuid.uuid4()) _ocr_task_write(task_id, {"status": "processing", "progress": 0, "result": None, "file_name": file_name, "callback_url": callback_url or ""}) def _async_ocr_worker(t_id, pdf_url, fn, cb_url, ocr_model=""): extraction_method = "" # 记录实际使用的提取方式 def push(pct, status="processing", text=None): current = _ocr_task_read(t_id) or {} current["progress"] = pct current["method"] = extraction_method # ★ 每次 push 都带上当前方式 if status != "processing": current["status"] = status if text is not None: current["result"] = text _ocr_task_write(t_id, current) if not cb_url: return payload = {"ok": True, "task_id": t_id, "status": status, "progress": pct, "method": extraction_method} # ★ 回调 payload 带 method if text is not None: payload["text"] = text payload["file_name"] = fn for attempt in range(3): try: r = requests.post(cb_url, json=payload, timeout=15) if r.status_code < 500: return except Exception as e: print(f"[ocr] 推送失败 attempt={attempt+1}: {e}") time.sleep(2 ** attempt) try: resp = requests.get(pdf_url, timeout=120, stream=True) if resp.status_code != 200: _ocr_task_write(t_id, {"status": "error", "progress": 0, "error": f"Download failed: HTTP {resp.status_code}"}) if cb_url: requests.post(cb_url, json={"ok": False, "task_id": t_id, "error": f"HTTP {resp.status_code}"}, timeout=30) return local_path = Path("/app") / f"inputs/{t_id}_{fn}" local_path.parent.mkdir(parents=True, exist_ok=True) with open(local_path, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) import fitz doc = fitz.open(local_path) total_pages = len(doc) doc.close() full_text = "" _ocr_task_write(t_id, {"status": "processing", "progress": 1, "file_name": fn, "callback_url": cb_url or "", "method": ""}) push(1) # ★ 先试 pdfplumber 电子版提取 if ocr_model != "llamaparse": try: import pdfplumber as _pp with _pp.open(local_path) as _p: _text_parts = [] for _page in _p.pages: _t = _page.extract_text() if _t: _text_parts.append(_t) if _text_parts: _joined = "\n\n".join(_text_parts) if len(_joined.strip()) > 200: full_text = _joined extraction_method = "pdfplumber" # ★ print(f"[ocr] pdfplumber 提取成功,跳过 OCR ({len(full_text)} 字符)") except Exception as _e: print(f"[ocr] pdfplumber 提取失败: {_e}") if not full_text: if ocr_model == "llamaparse": extraction_method = "llamaparse" # ★ push(5) full_text = _ocr_pdf_via_llamaparse(local_path) if not full_text: raise Exception("LlamaParse 未返回任何文本,请检查 API Key 或文件格式") push(95) else: extraction_method = f"ocr:{ocr_model or 'paddle'}" # ★ push_interval = max(1, total_pages // 10) for i in range(total_pages): page_text = _ocr_page_via_siliconflow(local_path, i, ocr_model) if page_text: full_text += page_text + "\n\n" pct = max(1, int(((i + 1) / total_pages) * 100)) _ocr_task_write(t_id, {"status": "processing", "progress": pct, "file_name": fn, "callback_url": cb_url or "", "method": extraction_method}) if i % push_interval == 0 or i == total_pages - 1: push(pct) try: local_path.unlink() except Exception: pass _ok, _reason = _check_ocr_quality(full_text) if not _ok: _ocr_task_write(t_id, {"status": "error", "progress": 0, "error": f"OCR质量检测失败:{_reason}", "file_name": fn, "callback_url": cb_url or ""}) if cb_url: requests.post(cb_url, json={"ok": False, "task_id": t_id, "error": f"OCR质量检测失败:{_reason}"}, timeout=30) raise Exception(f"OCR质量检测失败:{_reason}") _ocr_task_write(t_id, {"status": "done", "progress": 100, "file_name": fn, "callback_url": cb_url or "", "method": extraction_method}) push(100, status="done", text=full_text) except Exception as e: _ocr_task_write(t_id, {"status": "error", "progress": 0, "error": str(e), "file_name": fn, "callback_url": cb_url or ""}) if cb_url: requests.post(cb_url, json={"ok": False, "task_id": t_id, "error": str(e)}, timeout=30) _ocr_pdf_pool.submit(_async_ocr_worker, task_id, url, file_name, callback_url, ocr_model) return {"ok": True, "task_id": task_id} @app.get("/ocr-pdf/check/{task_id}") async def ocr_pdf_check(task_id: str): task = _ocr_task_read(task_id) # ← 先内存后文件,重启后仍可查 if not task: return {"ok": False, "error": "Task not found"} return { "ok": True, "status": task.get("status", "processing"), "progress": task.get("progress", 0), "error": task.get("error"), "method": task.get("method", ""), } # ══════════════════════════════════════════════════════════════════ # 辅助洗涤函数:彻底清洗并过滤大模型返回的思维链(...) # ══════════════════════════════════════════════════════════════════ def clean_think_tags(text: str) -> str: """ 清洗大模型输出文本中残留的 ... 标签及其包裹的所有非译文内容 """ if not text: return "" # 移除完整的带开闭合标签思维链 cleaned = re.sub(r".*?", "", text, flags=re.DOTALL) # 防御性过滤:如果由于截断模型只输出了 但未闭合,直接过滤掉 后的所有文字 cleaned = re.sub(r".*$", "", cleaned, flags=re.DOTALL) # 清理行首尾杂质 return cleaned.strip() # ══════════════════════════════════════════════════════════════════ # 工具函数群:动态密钥加载与语系检测 # ══════════════════════════════════════════════════════════════════ def get_multi_api_keys(prefix: str) -> list: keys = [] for i in range(1, 10): val = os.environ.get(f"{prefix}_{i}") or os.environ.get(f"{prefix}{i}") if val: keys.append(val.strip()) single = os.environ.get(prefix) if single and single.strip() not in keys: keys.append(single.strip()) return keys def detect_japanese_korean(text: str) -> str: if re.search(r"[\u3040-\u309f\u30a0-\u30ff]", text): return "ja" if re.search(r"[\uac00-\ud7af]", text): return "ko" return "en" # 将 target_size 调整为 8000 字符(对应 ~1500 英文单词,译出 ~2500 汉字) def semantic_split(text: str, target_size: int = 8000) -> list: """ 按段落逻辑切分,不再执行强熔断。 """ paragraphs = text.split("\n") chunks = [] current_chunk = [] current_size = 0 for para in paragraphs: para_clean = para.strip() if not para_clean: continue # 过滤页码、页眉页脚噪音 if len(para_clean) < 80 and any(kw in para_clean.lower() for kw in ["page", "vol.", "no.", "issn", "doi:", "http://", "https://"]): continue para_size = len(para_clean) if current_size + para_size > target_size and current_chunk: chunks.append("\n\n".join(current_chunk)) current_chunk = [para_clean] current_size = para_size else: current_chunk.append(para_clean) current_size += para_size + 2 if current_chunk: chunks.append("\n\n".join(current_chunk)) return chunks if chunks else [text] # ══════════════════════════════════════════════════════════════════ # OCR 输出质量检测 # ══════════════════════════════════════════════════════════════════ def _check_ocr_quality(text: str) -> tuple: """ 检测 OCR 输出是否有效,返回 (is_ok, reason) """ if not text or len(text.strip()) < 50: return False, "输出为空" brace_ratio = text.count('}') / len(text) if brace_ratio > 0.3: return False, f"疑似CID字体乱码(}}占比{brace_ratio:.0%})" lines = text.split('\n') hash_lines = sum(1 for l in lines if l.strip().startswith('#####')) if len(lines) > 10 and hash_lines / len(lines) > 0.3: return False, f"OCR识别失败({hash_lines}/{len(lines)}行为无效内容)" cjk_count = sum(1 for c in text if '一' <= c <= '鿿' or '぀' <= c <= 'ヿ') if len(text) > 200 and cjk_count / len(text) < 0.01: return False, "未检测到有效中日文字符" return True, "ok" # ══════════════════════════════════════════════════════════════════ # 在线 OCR 模块(硅基流动视觉大模型 Lazy 加载版) # ══════════════════════════════════════════════════════════════════ def _ocr_page_via_siliconflow(pdf_path: Path, page_index: int, ocr_model: str = "") -> str: import base64 sf_key = os.environ.get("SILICONFLOW_API_KEY") if not sf_key: print("[ocr] 未配置 SILICONFLOW_API_KEY") return "" try: import fitz doc = fitz.open(pdf_path) if page_index >= len(doc): return "" page = doc[page_index] pix = page.get_pixmap(dpi=300) img_bytes = pix.tobytes("png") base64_img = base64.b64encode(img_bytes).decode("utf-8") if not ocr_model: ocr_model = os.environ.get("OCRAI_OCR_MODEL", "PaddlePaddle/PaddleOCR-VL-1.5") url = "https://api.siliconflow.cn/v1/chat/completions" headers = {"Authorization": f"Bearer {sf_key.strip()}", "Content-Type": "application/json"} body = { "model": ocr_model, "messages": [ { "role": "user", "content": [ {"type": "text", "text": "完整提取图片中的所有中文文字,保留原文格式和段落。Extract all Chinese text from this image, preserve original paragraphs."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}} ] } ], "temperature": 0.1 } r = requests.post(url, headers=headers, json=body, timeout=40) if r.status_code == 200: extracted_text = r.json()["choices"][0]["message"]["content"].strip() print(f"[ocr] 页面 {page_index+1} OCR 提取成功 ({len(extracted_text)} 字符)") return extracted_text else: print(f"[ocr] OCR 异常 HTTP {r.status_code}") except Exception as e: print(f"[ocr] 页面 {page_index+1} 在线 OCR 失败: {e}") return "" # ══════════════════════════════════════════════════════════════════ # LlamaParse AI OCR 整本解析 # ══════════════════════════════════════════════════════════════════ def _ocr_pdf_via_llamaparse(pdf_path: Path, api_key_override: str = "") -> str: """ 用 LlamaCloud REST API 对整本 PDF 做 agentic OCR,返回 markdown 拼接的全文。 不依赖 llama-cloud SDK,直接用 requests 调 HTTP API。 需要环境变量 LLAMA_CLOUD_API_KEY。 """ api_key = api_key_override or os.environ.get("LLAMA_CLOUD_API_KEY") if not api_key: print("[ocr/llamaparse] 未配置 LLAMA_CLOUD_API_KEY") return "" headers = {"Authorization": f"Bearer {api_key}"} base = "https://api.cloud.llamaindex.ai/api/v2" pdf_name = pdf_path.name try: # 1. 上传文件 print(f"[ocr/llamaparse] 上传文件: {pdf_name}") with open(pdf_path, "rb") as f: upload_resp = requests.post( f"{base}/files", headers=headers, files={"file": (pdf_name, f, "application/pdf")}, data={"purpose": "parse"}, timeout=120, ) if upload_resp.status_code != 200: print(f"[ocr/llamaparse] 上传失败 HTTP {upload_resp.status_code}: {upload_resp.text[:200]}") return "" file_id = upload_resp.json()["id"] print(f"[ocr/llamaparse] 文件 ID: {file_id}") # 2. 提交解析任务 parse_resp = requests.post( f"{base}/parsing/parse", headers={**headers, "Content-Type": "application/json"}, json={ "file_id": file_id, "tier": "agentic", "version": "latest", "expand": ["markdown"], }, timeout=30, ) if parse_resp.status_code != 200: print(f"[ocr/llamaparse] 解析提交失败 HTTP {parse_resp.status_code}: {parse_resp.text[:200]}") return "" job_id = parse_resp.json()["id"] print(f"[ocr/llamaparse] Job ID: {job_id},轮询结果...") # 3. 轮询直到完成 for attempt in range(90): time.sleep(10) status_resp = requests.get(f"{base}/parsing/{job_id}/status", headers=headers, timeout=30) if status_resp.status_code != 200: continue status_data = status_resp.json() st = status_data.get("status", "") print(f"[ocr/llamaparse] 轮询 {attempt+1}/90: {st}") if st in ("SUCCESS", "completed"): # 取结果 result_resp = requests.get(f"{base}/parsing/{job_id}/result", headers=headers, timeout=30) if result_resp.ok: result_data = result_resp.json() pages = result_data.get("markdown", {}).get("pages", []) full_md = "\n\n".join(p.get("markdown", p.get("md", "")) for p in pages if p) if full_md: print(f"[ocr/llamaparse] 解析完成,{len(full_md)} 字符") return full_md # fallback: 取 text 格式 text_resp = requests.get(f"{base}/parsing/{job_id}/result?format=text", headers=headers, timeout=30) if text_resp.ok: t = text_resp.text if t and len(t.strip()) > 50: print(f"[ocr/llamaparse] 解析完成(text),{len(t)} 字符") return t print("[ocr/llamaparse] 结果为空") return "" elif st in ("ERROR", "CANCELLED", "failed"): print(f"[ocr/llamaparse] 解析失败: {status_data}") return "" print("[ocr/llamaparse] 轮询超时(15分钟)") return "" except Exception as e: print(f"[ocr/llamaparse] 异常: {e}") import traceback traceback.print_exc() return "" # ══════════════════════════════════════════════════════════════════ # 极速引擎:云端 API 5层混合翻译链(带日韩优化与多账号负载) # ══════════════════════════════════════════════════════════════════ def _translate_via_cloud_router(text: str, prev_source: str = "", prev_trans: str = "") -> str: lang = detect_japanese_korean(text) is_jk = lang in ("ja", "ko") context_prompt = "" if prev_source and prev_trans: context_prompt = ( f"### 上文翻译参考:\n" f"【原文】:{prev_source[-200:]}\n" f"【译文】:{prev_trans[-200:]}\n\n" ) system_prompt = ( "你是一位精通多国语言的资深学术翻译专家。请将下面的英文学术文本翻译成中文。\n" "## 翻译规则:\n" "1. 保持专业学术语言风格,用词准确,翻译自然流畅。不要直译,要符合现代中文的阅读习惯。\n" "2. 专有名词首次出现时请保留英文原文,格式如:“卷积神经网络 (Convolutional Neural Network, CNN)”。\n" "3. 人名、地名首次出现时使用中英对照。\n" "4. 严格保持原文段落和标点符号结构的完整,保留代码、公式、数字、年份。不要合并或拆分原文段落。\n" "5. 仅输出翻译结果,禁止输出任何多余的解释、导语或提示性字样。" ) user_content = f"{context_prompt}## 待翻译文本:\n{text}" # ────── 第一层:ModelScope 官方接口(多密钥轮询) ────── modelscope_keys = get_multi_api_keys("MODELSCOPE_API_KEY") if modelscope_keys: random.shuffle(modelscope_keys) ms_models = [ "Qwen/Qwen3-Coder-480B-A35B-Instruct", "MiniMax/MiniMax-M1-80k", "deepseek-ai/DeepSeek-V3.2", "MiniMax/MiniMax-M2.5", "deepseek-ai/DeepSeek-R1-0528", "Qwen/Qwen3-235B-A22B-Thinking-2507", "ZhipuAI/GLM-5", "Qwen/Qwen3.5-122B-A10B" ] if is_jk: ms_models = [m for m in ms_models if "qwen" in m.lower()] + [m for m in ms_models if "qwen" not in m.lower()] for model in ms_models: if not _quota_manager.increment(model): continue for key in modelscope_keys: try: url = "https://api-inference.modelscope.cn/v1/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2, "max_tokens": 500 } r = requests.post(url, headers=headers, json=body, timeout=35) if r.status_code == 200: try: _choices = r.json().get("choices", []) if _choices and _choices[0].get("message", {}).get("content"): _quota_manager.record_success(model) print(f"[translate] 第一层 ModelScope 翻译成功: {model}") _text = _choices[0]["message"]["content"].strip() # 过滤 MiniMax 思维链标签 _text = re.sub(r'.*?', '', _text, flags=re.DOTALL).strip() return _text else: print(f"[translate] ModelScope {model} HTTP 200 但内容为空") except Exception as pe: print(f"[translate] ModelScope {model} 响应解析异常: {pe}") else: print(f"[translate] ModelScope {model} HTTP {r.status_code}") except Exception as e: print(f"[translate] ModelScope {model} 发生异常: {e}") _quota_manager.record_fail(model) # ────── 第二层:主力层(Cerebras & Groq 多密钥均衡) ────── cerebras_keys = get_multi_api_keys("CEREBRAS_API_KEY") groq_keys = get_multi_api_keys("GROQ_API_KEY") random.shuffle(cerebras_keys) random.shuffle(groq_keys) # 2.1 Cerebras API for key in cerebras_keys: for model in ["gpt-oss-120b", "zai-glm-4.7"]: try: url = "https://api.cerebras.ai/v1/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第二层 Cerebras 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] Cerebras 异常: {e}") # 2.2 Groq API for key in groq_keys: for model in ["openai/gpt-oss-120b", "llama-3.3-70b-versatile"]: try: url = "https://api.groq.com/openai/v1/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第二层 Groq 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] Groq 异常: {e}") # 2.3 硅基流动 Hunyuan-MT-7B(翻译专用模型) sf_key = os.environ.get("SILICONFLOW_API_KEY") if sf_key: try: url = "https://api.siliconflow.cn/v1/chat/completions" headers = {"Authorization": f"Bearer {sf_key.strip()}", "Content-Type": "application/json"} body = { "model": "tencent/Hunyuan-MT-7B", "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print("[translate] 第二层 硅基流动 Hunyuan-MT-7B 翻译成功") return _c.strip() except Exception as e: print(f"[translate] 硅基流动 7B 异常: {e}") # ────── 第三层:免费补充层(NVIDIA / OpenRouter) ────── # 3.1 NVIDIA API 通道 nv_key = os.environ.get("NVIDIA_API_KEY") if nv_key: nv_models = [ "qwen/qwen3.5-397b-a17b", "qwen/qwen3-coder-480b-a35b-instruct", "qwen/qwen3.5-122b-a10b", "z-ai/glm-5.1", "nvidia/nemotron-3-super-120b-a12b" ] if is_jk: nv_models = [m for m in nv_models if "qwen" in m.lower()] + [m for m in nv_models if "qwen" not in m.lower()] for model in nv_models: try: url = "https://integrate.api.nvidia.com/v1/chat/completions" headers = {"Authorization": f"Bearer {nv_key.strip()}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第三层 NVIDIA 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] NVIDIA 异常: {e}") # 3.2 OpenRouter or_key = os.environ.get("OPENWOLF_OR_KEY") or os.environ.get("OPENROUTER_API_KEY") if or_key: or_models = [ "qwen/qwen3-coder:free", "meta-llama/llama-3.3-70b-instruct:free", "z-ai/glm-4.5-air:free", "nvidia/nemotron-3-super-120b-a12b:free", "qwen/qwen3-next-80b-a3b-instruct:free" ] if is_jk: or_models = [m for m in or_models if "qwen" in m.lower()] + [m for m in or_models if "qwen" not in m.lower()] for model in or_models: try: url = "https://openrouter.ai/api/v1/chat/completions" headers = {"Authorization": f"Bearer {or_key.strip()}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第三层 OpenRouter 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] OpenRouter 异常: {e}") # ────── 第四层:主力辅助层(Mistral & Opencode 多账号) ────── mistral_keys = get_multi_api_keys("MISTRAL_API_KEY") opencode_keys = get_multi_api_keys("OPENCODE_API_KEY") random.shuffle(mistral_keys) random.shuffle(opencode_keys) # 4.1 Mistral for key in mistral_keys: for model in ["mistral-large-latest", "mistral-medium-latest"]: try: url = "https://api.mistral.ai/v1/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第四层 Mistral 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] Mistral 异常: {e}") # 4.2 Opencode for key in opencode_keys: for model in ["big-pickle", "nemotron-3-super-free", "deepseek-v4-flash-free"]: try: url = "https://opencode.ai/zen/v1/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第四层 opencode 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] opencode 异常: {e}") # ────── 第五层:轻量兜底层(完全免费,顺序串联) ────── # 5.1 智谱免费 Flash zp_key = os.environ.get("ZHIPU_API_KEY") if zp_key: for model in ["glm-4.7-flash", "glm-4.6-flash", "GLM-Z1-Flash", "GLM-4-Flash"]: try: url = "https://open.bigmodel.cn/api/paas/v4/chat/completions" headers = {"Authorization": f"Bearer {zp_key.strip()}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第五层 智谱 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] 智谱 异常: {e}") # 5.2 硅基流动小模型完全免费通道 if sf_key: sf_free_models = [ "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", "Qwen/Qwen3.5-4B", "Qwen/Qwen3-8B", "THUDM/GLM-Z1-9B-0414", "THUDM/GLM-4-9B-0414" ] if is_jk: sf_free_models = [m for m in sf_free_models if "qwen" in m.lower()] + [m for m in sf_free_models if "qwen" not in m.lower()] for model in sf_free_models: try: url = "https://api.siliconflow.cn/v1/chat/completions" headers = {"Authorization": f"Bearer {sf_key.strip()}", "Content-Type": "application/json"} body = { "model": model, "messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}], "temperature": 0.2 } r = requests.post(url, headers=headers, json=body, timeout=60) if r.status_code == 200: _c = r.json()["choices"][0]["message"]["content"] if _c and _c.strip(): print(f"[translate] 第五层 硅基流动 翻译成功: {model}") return _c.strip() except Exception as e: print(f"[translate] 硅基流动免费通道异常: {e}") return "" # ══════════════════════════════════════════════════════════════════ # 本地 GGUF 离线安全兜底推理引擎(轻量直观版 Prompt,保障 1.8B 稳定翻译) # ══════════════════════════════════════════════════════════════════ def _get_llama(): global _llama_model if _llama_model is not None: return _llama_model with _llama_lock: if _llama_model is not None: return _llama_model model_path = "/app/models/translate/HY-MT1.5-1.8B-Q4_K_M.gguf" # 安全防御:如果模型文件在写入中或不存在,循环等待 for _i in range(60): if os.path.isfile(model_path) and os.path.getsize(model_path) > 100 * 1024 * 1024: break print(f"[llama] 正在等待本地兜底模型准备就绪... {_i}s") time.sleep(1) if not os.path.isfile(model_path): model_path = "/app/models/translate/HY-MT1.5-1.8B-Q8_0.gguf" for _i in range(10): if os.path.isfile(model_path) and os.path.getsize(model_path) > 100 * 1024 * 1024: break time.sleep(1) if not os.path.isfile(model_path): raise FileNotFoundError("GGUF model file not found in translate directory") from llama_cpp import Llama result = [None] error = [None] done = threading.Event() # 动态计算最适合您 CPU 算力的物理线程数 num_cores = os.cpu_count() or 4 optimal_threads = max(1, min(4, num_cores)) def _load(): try: print(f"[llama] Loading HY-MT1.5 with {optimal_threads} threads...") t0 = time.time() result[0] = Llama(model_path=model_path, n_ctx=8192, n_threads=optimal_threads, n_gpu_layers=0, verbose=False, use_mmap=True, use_mlock=False) print(f"[llama] Loaded in {time.time()-t0:.1f}s") except Exception as e: error[0] = e finally: done.set() t = threading.Thread(target=_load, daemon=True) t.start() if not done.wait(timeout=90): raise TimeoutError("GGUF model loading timed out (90s)") if error[0]: raise error[0] _llama_model = result[0] return _llama_model def _translate_chunk_local(text: str) -> str: """ 轻量直译版 Prompt,对 1.8B 小模型深度优化,100%老实翻译 """ llm = _get_llama() _max = max(64, min(1024, int(len(text) * 1.5))) _prompt = ( "将以下学术英文文本完整翻译成中文。要求:\n" "1. 保持专业学术语言风格。\n" "2. 专有名词首次出现时保留英文原文。\n" "3. 人名和地名不翻译,直接保留原文。\n" "4. 严格保持原有段落和标点结构不变。\n" "5. 保留公式、数字和年份。\n" "6. 只输出翻译好的中文译文,禁止输出任何拼音、多余的解释或说明文字。\n\n" f"{text}" ) with _infer_lock: out = llm.create_chat_completion( messages=[{"role": "user", "content": _prompt}], max_tokens=_max, temperature=0.1, ) return out["choices"][0]["message"]["content"].strip() @app.get("/debug/model") async def debug_model(): q4_path = "/app/models/translate/HY-MT1.5-1.8B-Q4_K_M.gguf" q8_path = "/app/models/translate/HY-MT1.5-1.8B-Q8_0.gguf" q4_exists = os.path.isfile(q4_path) q8_exists = os.path.isfile(q8_path) result = { "q4_exists": q4_exists, "q8_exists": q8_exists, "q4_size_gb": round(os.path.getsize(q4_path) / 1024**3, 2) if q4_exists else 0, "q8_size_gb": round(os.path.getsize(q8_path) / 1024**3, 2) if q8_exists else 0, "llama_loaded": _llama_model is not None, } try: import llama_cpp result["llama_cpp_version"] = llama_cpp.__version__ except ImportError: result["llama_cpp_version"] = None return JSONResponse(result) # ══════════════════════════════════════════════════════════════════ # 接口:防弹版异步翻译任务启动(带懒加载 OCR 提取与 过滤) # ══════════════════════════════════════════════════════════════════ @app.post("/api/job/cancel") async def api_job_cancel(request: Request): """取消某个 chat_id 的所有翻译任务""" try: body = await request.json() chat_id = str(body.get("chat_id") or "") if chat_id: _cancelled_tasks.add(chat_id) print(f"[cancel] 已标记取消: chat_id={chat_id}") return {"ok": True} return {"ok": False, "error": "chat_id required"} except Exception as e: return {"ok": False, "error": str(e)} @app.post("/api/job/start") async def api_job_start(request: Request): try: body = await request.json() except Exception as e: return {"ok": False, "error": f"JSON 解析失败: {e}"} try: task_id = str(uuid.uuid4()) _translate_task_write(task_id, {"status": "processing", "result": None}) def _do_work(t_id, payload): try: _file_id = payload.get("fileId") or payload.get("file_id") _file_path = payload.get("file_path") or payload.get("filePath") _dl_url = payload.get("download_url") or payload.get("downloadUrl") _r2_url = payload.get("r2_download_url") or payload.get("r2DownloadUrl") _orig_fn = payload.get("fileName") or payload.get("filename") or "document.pdf" _orig_ext = _orig_fn.rsplit(".", 1)[-1].lower() if "." in _orig_fn else "pdf" _is_local_only = payload.get("is_local_only", False) _ci = payload.get("chunk_index", 0) if _ci == -1 or _ci is None: _ci = 0 _chat_id_str = str(payload.get("chat_id") or "default") _cancelled_tasks.discard(_chat_id_str) # ★ 新任务启动时清除旧的取消标志 _context_dir = Path("/app/.context_cache") / _chat_id_str _context_dir.mkdir(parents=True, exist_ok=True) _chunks_cache_file = _context_dir / "chunks_list.json" _meta_cache_file = _context_dir / "pdf_metadata.json" _chunks = [] _is_scanned = False _total_pages = 0 if _meta_cache_file.is_file(): try: _meta = json.loads(_meta_cache_file.read_text(encoding="utf-8")) _is_scanned = _meta.get("is_scanned", False) _total_pages = _meta.get("total_pages", 0) except Exception: pass # ★ 每段开始前检查取消标志 if _chat_id_str in _cancelled_tasks: print(f"[translate] 检测到取消标志,中止翻译: chat_id={_chat_id_str}") _translate_task_write(t_id, {"status": "error", "result": "任务已取消"}) _cancelled_tasks.discard(_chat_id_str) return # 电子文本分段缓存直调 if _ci > 0 and not _is_scanned and _chunks_cache_file.is_file(): try: _chunks = json.loads(_chunks_cache_file.read_text(encoding="utf-8")) print(f"[translate] 命中切片缓存。当前分段: {_ci + 1}/{len(_chunks)}") except Exception as _ce: print(f"[translate] 载入分段缓存失败: {_ce}") # 初始解析与环境拉取(缓存丢失时 resume 也重新下载) if not _chunks: print(f"[download] 开始定位/下载文件: file_id={_file_id}, file_path={_file_path}") _downloaded_local_path = None # 第一层:从 R2 下载 if _r2_url: try: import requests as _rt import uuid as _uuid _r = _rt.get(_r2_url, timeout=120, stream=True) if _r.status_code == 200: _local = Path("/app") / f"inputs/{_uuid.uuid4().hex}_{_orig_fn}" _local.parent.mkdir(parents=True, exist_ok=True) with open(_local, "wb") as _f: for _chunk in _r.iter_content(chunk_size=65536): _f.write(_chunk) _downloaded_local_path = _local else: print(f"[download] R2 HTTP {_r.status_code}: {_r2_url}") except Exception as _e: print(f"[download] R2 异常: {_e}") # 第二层:从 download_url 下载 if _dl_url and not (_downloaded_local_path and _downloaded_local_path.is_file()): try: import requests as _rt import uuid as _uuid _h = {} _gh_pat = os.environ.get("OPENWOLF_PAT") or os.environ.get("GITHUB_PAT") or os.environ.get("GITHUB_TOKEN") or "" if "api.github.com" in _dl_url and _gh_pat: _h["Authorization"] = f"Bearer {_gh_pat}" _h["Accept"] = "application/vnd.github.raw" _r = _rt.get(_dl_url, headers=_h, timeout=120) if _r.status_code == 200: _local = Path("/app") / f"inputs/{_uuid.uuid4().hex}.{_orig_ext}" _local.parent.mkdir(parents=True, exist_ok=True) _local.write_bytes(_r.content) _downloaded_local_path = _local except Exception as _e: print(f"[download] download_url 异常: {_e}") # 第三层:根据 file_path 获取 GitHub 文件 if _file_path and not (_downloaded_local_path and _downloaded_local_path.is_file()): _repo_path = _file_path _local_check = Path("/app") / _repo_path if _local_check.is_file(): _downloaded_local_path = _local_check elif Path(_repo_path).is_file(): _downloaded_local_path = Path(_repo_path) else: _gh_repo = os.environ.get("GITHUB_REPO", "hughyonng/OpenWolf") _gh_pat = os.environ.get("OPENWOLF_PAT") or os.environ.get("GITHUB_PAT") or os.environ.get("GITHUB_TOKEN") or "" if _gh_pat: try: import requests as _rt import uuid as _uuid _u = f"https://api.github.com/repos/{_gh_repo}/contents/{_repo_path}" _h = {"Authorization": f"Bearer {_gh_pat}", "Accept": "application/vnd.github.raw"} _r = _rt.get(_u, headers=_h, timeout=120) if _r.status_code == 200: _local = Path("/app") / f"inputs/{_uuid.uuid4().hex}.{_orig_ext}" _local.parent.mkdir(parents=True, exist_ok=True) _local.write_bytes(_r.content) _downloaded_local_path = _local except Exception as _e: print(f"[download] GitHub API 异常: {_e}") # 第四层:从 Telegram 兜底下载 if _file_id and not (_downloaded_local_path and _downloaded_local_path.is_file()): try: _token = os.environ.get("TELEGRAM_BOT_TOKEN", "") if _token: import requests as _rt import uuid as _uuid _mr = _rt.get(f"https://api.telegram.org/bot{_token}/getFile?file_id={_file_id}", timeout=30) _fd = _mr.json().get("result", {}) if _mr.ok else {} _fp = _fd.get("file_path", "") if _mr.ok and _fp: _dl = _rt.get(f"https://api.telegram.org/file/bot{_token}/{_fp}", timeout=300, stream=True) if _dl.ok: _local = Path("/app") / f"inputs/{_uuid.uuid4().hex}_{_fp.split('/')[-1]}" _local.parent.mkdir(parents=True, exist_ok=True) with open(_local, "wb") as _f: for _chunk in _dl.iter_content(chunk_size=65536): _f.write(_chunk) _downloaded_local_path = _local except Exception as _e: print(f"[download] Telegram 异常: {_e}") if not _downloaded_local_path or not _downloaded_local_path.is_file(): raise ValueError("无法在所有防护层中下载或定位待翻译文档") _fixed_path = _context_dir / f"source_document.{_orig_ext}" import shutil shutil.copy2(_downloaded_local_path, _fixed_path) _downloaded_local_path = _fixed_path # 判断 PDF 属性 if _orig_ext == "pdf": try: import fitz doc = fitz.open(_downloaded_local_path) _total_pages = len(doc) sample_text = "" for p_idx in range(min(3, _total_pages)): sample_text += doc[p_idx].get_text() or "" if len(sample_text.strip()) < 100: if _is_local_only: _is_scanned = False print("[translate] 安全本地模式激活:强制关闭在线 OCR,采用电子文本降级读取") else: _is_scanned = True print("[translate] 检测到 PDF 为扫描版图片件,自动开启 Lazy-OCR 通道") else: _is_scanned = False print("[translate] 检测到 PDF 为电子文本版,直接提取") except Exception as e: _is_scanned = False print(f"[translate] 预判定 PDF 属性异常,降级到电子版读取: {e}") else: _is_scanned = False try: _meta_cache_file.write_text(json.dumps({ "is_scanned": _is_scanned, "total_pages": _total_pages, "file_ext": _orig_ext, "file_name": _orig_fn, }, ensure_ascii=False), encoding="utf-8") except Exception as e: print(f"[meta_cache] 写入缓存异常: {e}") # 电子版读取与分段(优化大分段支持 5000 汉字) if not _is_scanned: import pdfplumber as _pp _full_text = "" if _orig_ext == "pdf": with _pp.open(_downloaded_local_path) as _p: _full_text = "\n".join(page.extract_text() or "" for page in _p.pages) elif _orig_ext in ("txt", "md", "csv", "json"): with open(_downloaded_local_path, "r", encoding="utf-8", errors="ignore") as _f: _full_text = _f.read() elif _orig_ext in ("docx",): import docx as _dx _d = _dx.Document(_downloaded_local_path) _full_text = "\n".join(p.text for p in _d.paragraphs) elif _orig_ext == "epub": import zipfile, xml.etree.ElementTree as _ET with zipfile.ZipFile(_downloaded_local_path, "r") as _z: for _name in _z.namelist(): if _name.endswith((".xhtml", ".html", ".htm")): try: _root = _ET.fromstring(_z.read(_name)) for _elem in _root.iter(): if _elem.text: _full_text += _elem.text.strip() + " " if _elem.tail: _full_text += _elem.tail.strip() + " " _full_text += "\n\n" except Exception: pass if not _full_text.strip(): raise ValueError("文本提取为空,请检查文件是否加密") # 合并行内折行(用空格替代 newline,解决标题粘连) import re as _re _full_text = _re.sub(r'(?= _total_chunks: result_payload = { "translated_text": "🎉 本书已通过在线 OCR 全部翻译完毕!", "has_more": False, "chunk_index": _ci, "total_chunks": _total_chunks, "file_path": _file_path } else: start_page = _ci * PAGES_PER_CHUNK end_page = min(start_page + PAGES_PER_CHUNK, _total_pages) _chunk_raw_text = "" _fixed_path = _context_dir / f"source_document.{_orig_ext}" print(f"[ocr] 正在提取 scanned PDF 第 {start_page+1} 至 {end_page} 页...") for p_idx in range(start_page, end_page): page_text = _ocr_page_via_siliconflow(_fixed_path, p_idx) if page_text: _chunk_raw_text += page_text + "\n\n" if not _chunk_raw_text.strip(): raise ValueError(f"在线 OCR 未能在第 {start_page+1}~{end_page} 页识别到任何有效字符") _prev_source = "" _prev_trans = "" _prev_src_file = _context_dir / f"src_{_ci - 1}.txt" _prev_trs_file = _context_dir / f"trans_{_ci - 1}.txt" if _ci > 0 and _prev_src_file.is_file() and _prev_trs_file.is_file(): _prev_source = _prev_src_file.read_text(encoding="utf-8", errors="ignore") _prev_trans = _prev_trs_file.read_text(encoding="utf-8", errors="ignore") # 如果是 local_only 模式,强行禁用在线大模型 if _is_local_only: _tr = _translate_chunk_local(_chunk_raw_text) else: _tr = _translate_via_cloud_router(_chunk_raw_text, _prev_source, _prev_trans) if not _tr: print("[translate] 在线路由空转,降级本地 GGUF 直译...") _tr = _translate_chunk_local(_chunk_raw_text) # 🌟 优化:在保存和传递前,利用正则清洗大模型特有的思维链(...) _tr = clean_think_tags(_tr) _curr_src_file = _context_dir / f"src_{_ci}.txt" _curr_trs_file = _context_dir / f"trans_{_ci}.txt" _curr_src_file.write_text(_chunk_raw_text, encoding="utf-8") _curr_trs_file.write_text(_tr, encoding="utf-8") for _f in _context_dir.glob("*.txt"): try: _f_name = _f.name if _f_name.startswith("src_") or _f_name.startswith("trans_"): _f_idx = int(_f_name.split("_")[1].split(".")[0]) if _f_idx < _ci - 1: _f.unlink() except: pass _hm = (_ci + 1) < _total_chunks result_payload = { "translated_text": _tr, "has_more": _hm, "chunk_index": _ci, "total_chunks": _total_chunks, "file_path": _file_path } # 情况 2:电子版 else: if not _chunks and _chunks_cache_file.is_file(): try: _chunks = json.loads(_chunks_cache_file.read_text(encoding="utf-8")) except Exception: pass _total_chunks = len(_chunks) if _chunks else 1 if _ci >= _total_chunks or not _chunks: result_payload = { "translated_text": "🎉 本书已翻译完毕!", "has_more": False, "chunk_index": _ci, "total_chunks": _total_chunks, "file_path": _file_path } else: _chunk_to_trans = _chunks[_ci] # ★ 每次翻译前检查取消标志 if _chat_id_str in _cancelled_tasks: raise ValueError("任务已被用户取消") _prev_source = "" _prev_trans = "" _prev_src_file = _context_dir / f"src_{_ci - 1}.txt" _prev_trs_file = _context_dir / f"trans_{_ci - 1}.txt" if _ci > 0 and _prev_src_file.is_file() and _prev_trs_file.is_file(): _prev_source = _prev_src_file.read_text(encoding="utf-8", errors="ignore") _prev_trans = _prev_trs_file.read_text(encoding="utf-8", errors="ignore") # 安全模式检测:如果 local_only 激活,强行跳过在线接口 if _is_local_only: print("[translate] 安全本地模式激活:强行绕过所有在线 AI 接口,仅执行本地 GGUF 翻译") _tr = _translate_chunk_local(_chunk_to_trans) else: _tr = _translate_via_cloud_router(_chunk_to_trans, _prev_source, _prev_trans) if not _tr: print("[translate] 在线接口空转,降级本地 GGUF 兜底翻译...") _tr = _translate_chunk_local(_chunk_to_trans) # 🌟 优化:在保存和传递前,利用正则清洗大模型特有的思维链(...) _tr = clean_think_tags(_tr) _curr_src_file = _context_dir / f"src_{_ci}.txt" _curr_trs_file = _context_dir / f"trans_{_ci}.txt" _curr_src_file.write_text(_chunk_to_trans, encoding="utf-8") _curr_trs_file.write_text(_tr, encoding="utf-8") for _f in _context_dir.glob("*.txt"): try: _f_name = _f.name if _f_name.startswith("src_") or _f_name.startswith("trans_"): _f_idx = int(_f_name.split("_")[1].split(".")[0]) if _f_idx < _ci - 1: _f.unlink() except: pass _hm = (_ci + 1) < _total_chunks result_payload = { "translated_text": _tr, "has_more": _hm, "chunk_index": _ci, "total_chunks": _total_chunks, "file_path": _file_path } _translate_task_write(t_id, {"status": "done", "result": json.dumps(result_payload, ensure_ascii=False)}) # ★ 回调推送(和 OCR 一样) _cb_url = payload.get("callback_url") or "" if _cb_url: try: import requests as _rq _rq.post(_cb_url, json={ "ok": True, "task_id": t_id, "status": "done", "result": json.dumps(result_payload, ensure_ascii=False), "bus_id": payload.get("bus_id") or "", "chat_id": str(payload.get("chat_id") or ""), "chunk_index": result_payload.get("chunk_index", 0), "has_more": result_payload.get("has_more", False), "total_chunks": result_payload.get("total_chunks", 1), "file_path": result_payload.get("file_path") or _file_path or "", "file_name": _orig_fn, }, timeout=15) print(f"[api_job_start] 回调推送成功: {_cb_url}") except Exception as _ce: print(f"[api_job_start] 回调推送失败: {_ce}") except Exception as e: import traceback print(f"[api_job_start] 翻译子线程异常: {e}") traceback.print_exc() _translate_task_write(t_id, {"status": "error", "result": f"模型推理报错: {e}"}) _cb_url = payload.get("callback_url") or "" if _cb_url: try: import requests as _rq _rq.post(_cb_url, json={"ok": False, "task_id": t_id, "error": str(e), "bus_id": payload.get("bus_id") or "", "chat_id": str(payload.get("chat_id") or "")}, timeout=15) except Exception: pass _translate_pool.submit(_do_work, task_id, body) return {"ok": True, "task_id": task_id} except Exception as e: return {"ok": False, "error": f"路由层报错: {e}"} @app.get("/api/job/check/{task_id}") async def api_job_check(task_id: str): """防弹版:替换原有的 /translate/check""" try: task = _translate_task_read(task_id) if not task: return {"ok": False, "status": "error", "result": "任务ID不存在"} if task["status"] in ("done", "error"): result_copy = task.copy() return {"ok": True, "status": result_copy["status"], "result": result_copy["result"]} return {"ok": True, "status": "processing"} except Exception as e: return {"ok": False, "status": "error", "result": f"检查报错: {e}"} # ── 以下原有文档分析及其他路由逻辑保持完整 ── @app.post("/analyze-doc") async def analyze_doc(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") url = body.get("url", "") question = body.get("question", "请分析这份文档的内容") max_chars = int(body.get("max_chars", 50000)) if not url: return {"ok": False, "error": "url required"} import requests as _req import uuid as _uuid from pathlib import Path resp = _req.get(url, timeout=300, stream=True) if resp.status_code != 200: return {"ok": False, "error": f"下载失败 HTTP {resp.status_code}"} local_path = Path("/app") / f"inputs/{_uuid.uuid4().hex}.pdf" local_path.parent.mkdir(parents=True, exist_ok=True) with open(local_path, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) import pdfplumber text = "" with pdfplumber.open(local_path) as p: for page in p.pages: t = page.extract_text() if t: text += t + "\n" try: local_path.unlink() except Exception: pass if not text.strip(): return {"ok": False, "error": "无法提取文本内容"} doc_text = text[:max_chars] return {"ok": True, "result": "分析完成"} @app.post("/analyze-doc/start") async def analyze_doc_start(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") url = body.get("url", "") question = body.get("question", "请分析这份文档的内容") max_chars = int(body.get("max_chars", 50000)) if not url: return {"ok": False, "error": "url required"} task_id = str(uuid.uuid4()) _analyze_task_write(task_id, {"status": "processing", "result": None}) _analyze_pool.submit(_do_analyze_async, task_id, url, question, max_chars) return {"ok": True, "task_id": task_id} @app.post("/analyze-text/start") async def analyze_text_start(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") text = body.get("text", "") question = body.get("question", "") if not text or not question: return {"ok": False, "error": "text and question required"} task_id = str(uuid.uuid4()) _analyze_task_write(task_id, {"status": "processing", "result": None}) _analyze_pool.submit(_do_analyze_text_async, task_id, text, question) return {"ok": True, "task_id": task_id} @app.get("/analyze-text/check/{task_id}") async def analyze_text_check(task_id: str): return await analyze_doc_check(task_id) @app.get("/analyze-doc/check/{task_id}") async def analyze_doc_check(task_id: str): task = _analyze_task_read(task_id) if not task: return {"ok": False, "status": "error", "result": "任务ID不存在"} if task["status"] in ("done", "error"): result_copy = task.copy() resp = {"ok": True, "status": result_copy["status"], "result": result_copy["result"]} if result_copy.get("doc_text"): resp["doc_text"] = result_copy["doc_text"] return resp return {"ok": True, "status": "processing"} def _do_analyze_async(task_id: str, url: str, question: str, max_chars: int): import requests as _req import uuid as _uuid from pathlib import Path try: resp = _req.get(url, timeout=300, stream=True) if resp.status_code != 200: _analyze_task_write(task_id, {"status": "error", "result": f"下载失败 HTTP {resp.status_code}"}) return local_path = Path("/app") / f"inputs/{_uuid.uuid4().hex}.pdf" local_path.parent.mkdir(parents=True, exist_ok=True) with open(local_path, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) import pdfplumber text = "" with pdfplumber.open(local_path) as p: for page in p.pages: t = page.extract_text() if t: text += t + "\n" try: local_path.unlink() except Exception: pass if not text.strip(): _analyze_task_write(task_id, {"status": "error", "result": "无法提取文本内容"}) return doc_text = text[:max_chars] _do_analyze_text_async(task_id, doc_text, question) except Exception as e: _analyze_task_write(task_id, {"status": "error", "result": f"分析失败: {e}"}) def _do_analyze_text_async(task_id: str, doc_text: str, question: str): import requests as _req import concurrent.futures as _cf try: prompt = f"以下是文档内容:\n\n{doc_text}\n\n---\n\n用户问题:{question}" _analyze_task_write(task_id, {"status": "done", "result": "分析完成", "doc_text": doc_text}) except Exception as e: _analyze_task_write(task_id, {"status": "error", "result": f"分析失败: {e}"}) @app.post("/task/start") async def task_start(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") task_text = body.get("task", "") chat_id = body.get("chat_id", "") task_type = body.get("task_type") or None history = body.get("history", []) if not task_text: return {"ok": False, "error": "task required"} task_id = str(uuid.uuid4()) _task_task_write(task_id, {"status": "processing", "result": None}) _task_pool.submit(_do_task_async, task_id, task_text, str(chat_id), task_type, history) return {"ok": True, "task_id": task_id} @app.get("/task/check/{task_id}") async def task_check(task_id: str): task = _task_task_read(task_id) if not task: return {"ok": False, "status": "error", "result": "任务ID不存在"} if task["status"] in ("done", "error"): result_copy = task.copy() return {"ok": True, "status": result_copy["status"], "result": result_copy["result"]} return {"ok": True, "status": "processing"} def _do_task_async(task_id: str, task_text: str, chat_id: str, task_type: str = None, history: list = None): if history is None: history = [] try: from scripts.ai_agent import run_agent_task result = run_agent_task(task_text, history, None, chat_id, "consumer", task_type=task_type) _task_task_write(task_id, {"status": "done", "result": str(result)}) except Exception as e: _task_task_write(task_id, {"status": "error", "result": f"处理失败: {e}"}) @app.get("/skill-search") async def skill_search(request: Request): q = request.query_params.get("q", "").strip().lower() if not q: return JSONResponse([]) idx = _get_skill_index() results = [] for s in idx.get("skills", []): if q in s.get("name", "").lower() or q in s.get("description", "").lower(): results.append({"id": s["id"], "name": s["name"], "description": s.get("description", "")[:200]}) return JSONResponse(results) @app.get("/skill-view") async def skill_view(request: Request): name = request.query_params.get("name", "").strip().lower() if not name: return JSONResponse({"error": "name required"}, status_code=400) idx = _get_skill_index() for s in idx.get("skills", []): sid = s.get("id", "").lower() if name in sid or name in s.get("name", "").lower(): readme_url = f"https://raw.githubusercontent.com/hughyonng/OpenWolf/refs/heads/main/skills/library/{sid}/README.md" try: r = requests.get(readme_url, timeout=10) if r.ok: return JSONResponse({"name": s["name"], "content": r.text[:2000]}) except Exception: pass return JSONResponse({"error": "not found"}, status_code=404)