import os, mimetypes from typing import List, Tuple import pdfplumber from docx import Document as DocxDocument from PIL import Image import pytesseract from settings import ALLOWED_EXT, ALLOWED_MIME, MAX_UPLOAD_MB, ENABLE_AV_SCAN, CLAMD_UNIX_SOCKET, CLAMD_NETWORK from privacy import redact_text # --- Optional AV scan (clamd) --- def _clamd_scan(path: str) -> bool: if not ENABLE_AV_SCAN: return True try: import clamd cd = None if CLAMD_UNIX_SOCKET: cd = clamd.ClamdUnixSocket(CLAMD_UNIX_SOCKET) elif CLAMD_NETWORK: host, port = CLAMD_NETWORK cd = clamd.ClamdNetworkSocket(host, port) if not cd: return True res = cd.scan(path) # Expected: {'/path/file': ('OK', 'OK')} or ('FOUND','Eicar-Test-Signature') verdict = next(iter(res.values()))[0] if isinstance(res, dict) else "OK" return verdict == "OK" except Exception: # If AV unavailable, fail open by default (configurable) return True def _check_allowed(path: str) -> tuple[bool, str]: ext = os.path.splitext(path.lower())[1] if ext not in ALLOWED_EXT: return False, f"Extension {ext} not allowed." mime, _ = mimetypes.guess_type(path) if mime not in ALLOWED_MIME: return False, f"MIME {mime} not allowed." size_mb = os.path.getsize(path) / (1024 * 1024) if size_mb > MAX_UPLOAD_MB: return False, f"File too large ({size_mb:.1f}MB > {MAX_UPLOAD_MB}MB)." if not _clamd_scan(path): return False, "Antivirus scan failed." return True, "ok" def _read_text_file(path: str) -> str: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def _read_docx(path: str) -> str: doc = DocxDocument(path) return "\n".join([p.text for p in doc.paragraphs]) def _read_pdf(path: str) -> str: out = [] with pdfplumber.open(path) as pdf: for p in pdf.pages: out.append(p.extract_text() or "") return "\n".join(out) def _read_image_ocr(path: str) -> str: img = Image.open(path) return pytesseract.image_to_string(img) def extract_text_from_files(filepaths: List[str]) -> List[Tuple[str, str]]: """ Returns a list of (safe_name, redacted_text) for approved files. """ results: List[Tuple[str, str]] = [] for fp in filepaths or []: ok, reason = _check_allowed(fp) if not ok: # skip silently or raise/log upstream continue ext = os.path.splitext(fp.lower())[1] try: if ext in {".txt", ".md", ".csv"}: txt = _read_text_file(fp) elif ext == ".docx": txt = _read_docx(fp) elif ext == ".pdf": txt = _read_pdf(fp) elif ext in {".png", ".jpg", ".jpeg", ".webp"}: txt = _read_image_ocr(fp) else: txt = "" if txt and txt.strip(): results.append((os.path.basename(fp), redact_text(txt))) except Exception: continue return results