Medica_DecisionSupportAI / upload_ingest.py
Rajan Sharma
Create upload_ingest.py
1a93711 verified
raw
history blame
3.13 kB
import os, mimetypes
from typing import List, Tuple
import pdfplumber
from docx import Document as DocxDocument
from PIL import Image
import pytesseract
from settings import ALLOWED_EXT, ALLOWED_MIME, MAX_UPLOAD_MB, ENABLE_AV_SCAN, CLAMD_UNIX_SOCKET, CLAMD_NETWORK
from privacy import redact_text
# --- Optional AV scan (clamd) ---
def _clamd_scan(path: str) -> bool:
if not ENABLE_AV_SCAN:
return True
try:
import clamd
cd = None
if CLAMD_UNIX_SOCKET:
cd = clamd.ClamdUnixSocket(CLAMD_UNIX_SOCKET)
elif CLAMD_NETWORK:
host, port = CLAMD_NETWORK
cd = clamd.ClamdNetworkSocket(host, port)
if not cd:
return True
res = cd.scan(path)
# Expected: {'/path/file': ('OK', 'OK')} or ('FOUND','Eicar-Test-Signature')
verdict = next(iter(res.values()))[0] if isinstance(res, dict) else "OK"
return verdict == "OK"
except Exception:
# If AV unavailable, fail open by default (configurable)
return True
def _check_allowed(path: str) -> tuple[bool, str]:
ext = os.path.splitext(path.lower())[1]
if ext not in ALLOWED_EXT:
return False, f"Extension {ext} not allowed."
mime, _ = mimetypes.guess_type(path)
if mime not in ALLOWED_MIME:
return False, f"MIME {mime} not allowed."
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > MAX_UPLOAD_MB:
return False, f"File too large ({size_mb:.1f}MB > {MAX_UPLOAD_MB}MB)."
if not _clamd_scan(path):
return False, "Antivirus scan failed."
return True, "ok"
def _read_text_file(path: str) -> str:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def _read_docx(path: str) -> str:
doc = DocxDocument(path)
return "\n".join([p.text for p in doc.paragraphs])
def _read_pdf(path: str) -> str:
out = []
with pdfplumber.open(path) as pdf:
for p in pdf.pages:
out.append(p.extract_text() or "")
return "\n".join(out)
def _read_image_ocr(path: str) -> str:
img = Image.open(path)
return pytesseract.image_to_string(img)
def extract_text_from_files(filepaths: List[str]) -> List[Tuple[str, str]]:
"""
Returns a list of (safe_name, redacted_text) for approved files.
"""
results: List[Tuple[str, str]] = []
for fp in filepaths or []:
ok, reason = _check_allowed(fp)
if not ok:
# skip silently or raise/log upstream
continue
ext = os.path.splitext(fp.lower())[1]
try:
if ext in {".txt", ".md", ".csv"}:
txt = _read_text_file(fp)
elif ext == ".docx":
txt = _read_docx(fp)
elif ext == ".pdf":
txt = _read_pdf(fp)
elif ext in {".png", ".jpg", ".jpeg", ".webp"}:
txt = _read_image_ocr(fp)
else:
txt = ""
if txt and txt.strip():
results.append((os.path.basename(fp), redact_text(txt)))
except Exception:
continue
return results