Spaces:
Sleeping
Sleeping
File size: 3,132 Bytes
1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e 1a93711 f051f2e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
import os, mimetypes
from typing import List, Tuple
import pdfplumber
from docx import Document as DocxDocument
from PIL import Image
import pytesseract
from settings import ALLOWED_EXT, ALLOWED_MIME, MAX_UPLOAD_MB, ENABLE_AV_SCAN, CLAMD_UNIX_SOCKET, CLAMD_NETWORK
from privacy import redact_text
# --- Optional AV scan (clamd) ---
def _clamd_scan(path: str) -> bool:
if not ENABLE_AV_SCAN:
return True
try:
import clamd
cd = None
if CLAMD_UNIX_SOCKET:
cd = clamd.ClamdUnixSocket(CLAMD_UNIX_SOCKET)
elif CLAMD_NETWORK:
host, port = CLAMD_NETWORK
cd = clamd.ClamdNetworkSocket(host, port)
if not cd:
return True
res = cd.scan(path)
# Expected: {'/path/file': ('OK', 'OK')} or ('FOUND','Eicar-Test-Signature')
verdict = next(iter(res.values()))[0] if isinstance(res, dict) else "OK"
return verdict == "OK"
except Exception:
# If AV unavailable, fail open by default (configurable)
return True
def _check_allowed(path: str) -> tuple[bool, str]:
ext = os.path.splitext(path.lower())[1]
if ext not in ALLOWED_EXT:
return False, f"Extension {ext} not allowed."
mime, _ = mimetypes.guess_type(path)
if mime not in ALLOWED_MIME:
return False, f"MIME {mime} not allowed."
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > MAX_UPLOAD_MB:
return False, f"File too large ({size_mb:.1f}MB > {MAX_UPLOAD_MB}MB)."
if not _clamd_scan(path):
return False, "Antivirus scan failed."
return True, "ok"
def _read_text_file(path: str) -> str:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def _read_docx(path: str) -> str:
doc = DocxDocument(path)
return "\n".join([p.text for p in doc.paragraphs])
def _read_pdf(path: str) -> str:
out = []
with pdfplumber.open(path) as pdf:
for p in pdf.pages:
out.append(p.extract_text() or "")
return "\n".join(out)
def _read_image_ocr(path: str) -> str:
img = Image.open(path)
return pytesseract.image_to_string(img)
def extract_text_from_files(filepaths: List[str]) -> List[Tuple[str, str]]:
"""
Returns a list of (safe_name, redacted_text) for approved files.
"""
results: List[Tuple[str, str]] = []
for fp in filepaths or []:
ok, reason = _check_allowed(fp)
if not ok:
# skip silently or raise/log upstream
continue
ext = os.path.splitext(fp.lower())[1]
try:
if ext in {".txt", ".md", ".csv"}:
txt = _read_text_file(fp)
elif ext == ".docx":
txt = _read_docx(fp)
elif ext == ".pdf":
txt = _read_pdf(fp)
elif ext in {".png", ".jpg", ".jpeg", ".webp"}:
txt = _read_image_ocr(fp)
else:
txt = ""
if txt and txt.strip():
results.append((os.path.basename(fp), redact_text(txt)))
except Exception:
continue
return results
|