Spaces:
Sleeping
Sleeping
| """ | |
| OpenWolf PDF Parser — Lightweight HF Space | |
| Only does PDF text extraction + chunk splitting. | |
| No LLM, no bge-m3, no heavy ML. | |
| """ | |
| import os, sys, json, urllib.parse, re | |
| from pathlib import Path | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| app = FastAPI(title="OpenWolf PDF Parser") | |
| async def health(): | |
| return {"status": "ok"} | |
| async def parse_pdf(request: Request): | |
| """ | |
| 解析 PDF,返回全文文本 | |
| Body: { file_path: "inputs/xxx.pdf", repo: "owner/repo", pat: "ghp_xxx" } | |
| """ | |
| body = await request.json() | |
| file_path = body.get("file_path", "") | |
| repo = body.get("repo", os.environ.get("GITHUB_REPO", "")) | |
| pat = body.get("pat", os.environ.get("GITHUB_PAT", "")) | |
| if not file_path: | |
| raise HTTPException(400, "file_path required") | |
| local_path = Path("/app") / file_path | |
| if not local_path.exists(): | |
| # Download from GitHub | |
| import requests as req | |
| encoded = '/'.join(urllib.parse.quote(s, safe='') for s in file_path.split('/')) | |
| url = f"https://api.github.com/repos/{repo}/contents/{encoded}" | |
| headers = {"Authorization": f"token {pat}", "Accept": "application/vnd.github.raw"} if pat else {"Accept": "application/vnd.github.raw"} | |
| resp = req.get(url, headers=headers, timeout=120) | |
| if resp.status_code != 200: | |
| return JSONResponse({"ok": False, "error": f"download failed HTTP {resp.status_code}"}) | |
| local_path.parent.mkdir(parents=True, exist_ok=True) | |
| local_path.write_bytes(resp.content) | |
| # Extract text | |
| import pdfplumber | |
| text = "" | |
| with pdfplumber.open(local_path) as pdf: | |
| for page in pdf.pages: | |
| t = page.extract_text() | |
| if t: | |
| text += t + "\n" | |
| return {"ok": True, "text": text, "chars": len(text)} | |
| async def get_chunks(request: Request): | |
| """ | |
| 从文本或 PDF 生成稳定 chunk 列表 | |
| Body: { file_path: "...", chunk_size: 4500 } | |
| 或: { text: "...", chunk_size: 4500 } | |
| """ | |
| body = await request.json() | |
| chunk_size = int(body.get("chunk_size", 4500)) | |
| text = body.get("text", "") | |
| if not text: | |
| file_path = body.get("file_path", "") | |
| if file_path: | |
| # Read local file | |
| local_path = Path("/app") / file_path | |
| if local_path.exists(): | |
| ext = file_path.rsplit(".", 1)[-1].lower() | |
| if ext == "pdf": | |
| import pdfplumber | |
| with pdfplumber.open(local_path) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| else: | |
| text = local_path.read_text(encoding="utf-8", errors="ignore") | |
| if not text: | |
| return JSONResponse({"ok": False, "error": "no text provided"}) | |
| chunks = [] | |
| i = 0 | |
| n = len(text) | |
| while i < n: | |
| end = min(i + chunk_size, n) | |
| cut = text.rfind("\n\n", i, end) | |
| if cut == -1 or cut <= i: | |
| cut = end | |
| chunk = text[i:cut].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| i = cut if cut > i else end | |
| return {"ok": True, "chunks": chunks, "total": len(chunks), "total_chars": n} | |