Spaces:
Sleeping
Sleeping
| import os, mimetypes | |
| from typing import List, Tuple | |
| import pdfplumber | |
| from docx import Document as DocxDocument | |
| from PIL import Image | |
| import pytesseract | |
| from settings import ALLOWED_EXT, ALLOWED_MIME, MAX_UPLOAD_MB, ENABLE_AV_SCAN, CLAMD_UNIX_SOCKET, CLAMD_NETWORK | |
| from privacy import redact_text | |
| # --- Optional AV scan (clamd) --- | |
| def _clamd_scan(path: str) -> bool: | |
| if not ENABLE_AV_SCAN: | |
| return True | |
| try: | |
| import clamd | |
| cd = None | |
| if CLAMD_UNIX_SOCKET: | |
| cd = clamd.ClamdUnixSocket(CLAMD_UNIX_SOCKET) | |
| elif CLAMD_NETWORK: | |
| host, port = CLAMD_NETWORK | |
| cd = clamd.ClamdNetworkSocket(host, port) | |
| if not cd: | |
| return True | |
| res = cd.scan(path) | |
| # Expected: {'/path/file': ('OK', 'OK')} or ('FOUND','Eicar-Test-Signature') | |
| verdict = next(iter(res.values()))[0] if isinstance(res, dict) else "OK" | |
| return verdict == "OK" | |
| except Exception: | |
| # If AV unavailable, fail open by default (configurable) | |
| return True | |
| def _check_allowed(path: str) -> tuple[bool, str]: | |
| ext = os.path.splitext(path.lower())[1] | |
| if ext not in ALLOWED_EXT: | |
| return False, f"Extension {ext} not allowed." | |
| mime, _ = mimetypes.guess_type(path) | |
| if mime not in ALLOWED_MIME: | |
| return False, f"MIME {mime} not allowed." | |
| size_mb = os.path.getsize(path) / (1024 * 1024) | |
| if size_mb > MAX_UPLOAD_MB: | |
| return False, f"File too large ({size_mb:.1f}MB > {MAX_UPLOAD_MB}MB)." | |
| if not _clamd_scan(path): | |
| return False, "Antivirus scan failed." | |
| return True, "ok" | |
| def _read_text_file(path: str) -> str: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| def _read_docx(path: str) -> str: | |
| doc = DocxDocument(path) | |
| return "\n".join([p.text for p in doc.paragraphs]) | |
| def _read_pdf(path: str) -> str: | |
| out = [] | |
| with pdfplumber.open(path) as pdf: | |
| for p in pdf.pages: | |
| out.append(p.extract_text() or "") | |
| return "\n".join(out) | |
| def _read_image_ocr(path: str) -> str: | |
| img = Image.open(path) | |
| return pytesseract.image_to_string(img) | |
| def extract_text_from_files(filepaths: List[str]) -> List[Tuple[str, str]]: | |
| """ | |
| Returns a list of (safe_name, redacted_text) for approved files. | |
| """ | |
| results: List[Tuple[str, str]] = [] | |
| for fp in filepaths or []: | |
| ok, reason = _check_allowed(fp) | |
| if not ok: | |
| # skip silently or raise/log upstream | |
| continue | |
| ext = os.path.splitext(fp.lower())[1] | |
| try: | |
| if ext in {".txt", ".md", ".csv"}: | |
| txt = _read_text_file(fp) | |
| elif ext == ".docx": | |
| txt = _read_docx(fp) | |
| elif ext == ".pdf": | |
| txt = _read_pdf(fp) | |
| elif ext in {".png", ".jpg", ".jpeg", ".webp"}: | |
| txt = _read_image_ocr(fp) | |
| else: | |
| txt = "" | |
| if txt and txt.strip(): | |
| results.append((os.path.basename(fp), redact_text(txt))) | |
| except Exception: | |
| continue | |
| return results | |