SheGuard / api /extract_report.py
3v324v23's picture
Fix: over-extraction bug (label-aware lookahead)
6e847b8
# api/extract_report.py β€” BATTLE-HARDENED VERSION
"""
WHY THIS FILE EXISTS:
Handles every realistic image a health worker might upload:
- Screenshot (dark or light theme, phone or computer)
- Photo of paper (possibly skewed, shadowed, crumpled)
- Scanned document (high resolution, clean)
- Low-light photo (clinic with poor lighting)
- Rotated image (phone held sideways)
- Colored background (yellow/blue/green clinic forms)
- Low contrast (faded ink, old paper)
- Glare/reflection (phone flash on glossy paper)
- Handwritten numbers on ruled paper
- Partial image (only part of the form captured)
STRATEGY β€” ensemble preprocessing + ensemble OCR:
We run MULTIPLE preprocessing pipelines Γ— MULTIPLE Tesseract configs,
score each combination by how many medically plausible values were found,
and return the best result. This is far more robust than any single pipeline.
"""
import base64
import io
import re
import pytesseract
from PIL import Image
import cv2
import numpy as np
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Tuple
router = APIRouter()
import platform
# ── Tesseract path (Cross-platform) ───────────────────────────────────────────
if platform.system() == "Windows":
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
# Note: On Linux/Docker, 'tesseract' is expected to be in the system PATH.
# ── Schemas ───────────────────────────────────────────────────────────────────
class ExtractRequest(BaseModel):
image_base64: str
image_type: str = "image/jpeg"
class ExtractedVisit(BaseModel):
age: Optional[float] = None
systolic_bp: Optional[float] = None
diastolic_bp: Optional[float] = None
blood_sugar: Optional[float] = None
body_temp: Optional[float] = None
heart_rate: Optional[float] = None
visit_date: Optional[str] = None
class ExtractResponse(BaseModel):
visits: List[ExtractedVisit]
patient_id: Optional[str] = None
notes: Optional[str] = None
confidence: float = 1.0
raw_text: Optional[str] = None
# ── Medical ranges ────────────────────────────────────────────────────────────
# WHY: Values outside these ranges are OCR errors, not real readings.
# We DISCARD (not clamp) out-of-range values β€” a clamped wrong value
# is still wrong and could mislead the risk model.
RANGES = {
"age": (10, 60),
"systolic_bp": (70, 200),
"diastolic_bp": (40, 130),
"blood_sugar": (3.0, 20.0),
"body_temp": (35.0, 42.0),
"heart_rate": (40, 160),
}
# Field label aliases β€” every way a clinic might write each field name
ROW_ALIASES = {
"age": ["age", "years", "yr", "patient age", "age (yrs)", "age:", "a.g.e", "age/yrs"],
"systolic_bp": ["systolic", "sbp", "sys", "systolic bp", "bp sys", "bp(sys)",
"upper bp", "s.b.p", "s bp", "syst", "bp systolic", "bp - sys"],
"diastolic_bp": ["diastolic", "dbp", "dia", "diastolic bp", "bp dia", "bp(dia)",
"lower bp", "d.b.p", "d bp", "diast", "bp diastolic", "bp - dia"],
"blood_sugar": ["blood sugar", "bs", "glucose", "bg", "blood glucose", "bld sugar",
"sugar", "b.s", "rbs", "fbs", "ppbs", "glu", "glc", "sugr"],
"body_temp": ["temp", "temperature", "body temp", "tmp", "fever", "t(c)",
"body temperature", "b.temp", "t (c)", "t(c)", "bt", "temp:"],
"heart_rate": ["heart rate", "hr", "pulse", "bpm", "heartrate", "h.r",
"heart", "p/r", "pr", "pulse rate", "hr:", "beats/min"],
}
# Lines containing these words are SKIPPED during parsing
# WHY: "Field V1 V2 V3" contains "1","2","3" which would be misread as values
SKIP_KEYWORDS = [
"field", "visit", "v1", "v2", "v3", "v 1", "v 2", "v 3",
"column", "header", "parameter", "reading", "measurement",
"no.", "sl.", "s.no", "item"
]
# ═══════════════════════════════════════════════════════════════════════════════
# IMAGE PREPROCESSING PIPELINES
# Each targets a different image problem. We run several and pick the best.
# ═══════════════════════════════════════════════════════════════════════════════
def to_gray(pil_image: Image.Image) -> np.ndarray:
"""Convert PIL image to OpenCV grayscale array."""
return cv2.cvtColor(np.array(pil_image.convert("RGB")), cv2.COLOR_RGB2GRAY)
def smart_upscale(gray: np.ndarray, target_width: int = 1400) -> np.ndarray:
"""
WHY: Tesseract needs text to be at least 20-30px tall to read reliably.
Small phone screenshots or thumbnails have tiny text that OCR misses.
We upscale to target_width using bicubic interpolation (preserves sharp text edges).
"""
h, w = gray.shape
if w < target_width:
scale = target_width / w
gray = cv2.resize(gray, None, fx=scale, fy=scale,
interpolation=cv2.INTER_CUBIC)
return gray
def detect_and_invert_if_dark(gray: np.ndarray) -> np.ndarray:
"""
WHY: Tesseract is trained on black text on white background.
Dark-theme screenshots (WhatsApp dark mode, VS Code, etc.) have
white text on dark background β€” Tesseract reads nothing without inversion.
DETECTION: Mean pixel brightness < 127 = most pixels are dark = dark background.
We bitwise-NOT the image: every dark pixel becomes bright and vice versa.
"""
if np.mean(gray) < 127:
return cv2.bitwise_not(gray)
return gray
def remove_shadow(gray: np.ndarray) -> np.ndarray:
"""
WHY: Phone photos of paper often have a shadow from the hand or phone body.
This creates uneven brightness β€” one corner is darker than the rest.
HOW (background subtraction):
1. Dilate heavily β†’ text pixels expand and "fill in", leaving only background
2. Median blur β†’ smooth the background estimate
3. Subtract from original β†’ removes the background brightness variation
4. Normalise β†’ stretch to full 0-255 range
"""
dilated = cv2.dilate(gray, np.ones((7, 7), np.uint8))
bg = cv2.medianBlur(dilated, 21)
diff = 255 - cv2.absdiff(gray, bg)
norm = cv2.normalize(diff, None, 0, 255, cv2.NORM_MINMAX)
return norm.astype(np.uint8)
def enhance_contrast_clahe(gray: np.ndarray) -> np.ndarray:
"""
WHY CLAHE (Contrast Limited Adaptive Histogram Equalisation):
Standard histogram equalisation boosts contrast globally but creates artifacts.
CLAHE works on small image tiles separately β€” it enhances faded ink without
blowing out the background, and handles images with varying contrast.
clipLimit=2.0 β€” prevents over-amplifying noise in flat regions
tileGridSize β€” small enough to be local, large enough for patterns
"""
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
return clahe.apply(gray)
def deskew(gray: np.ndarray) -> np.ndarray:
"""
WHY: Paper placed on a table and photographed is rarely perfectly flat.
Even a 3-5 degree rotation significantly confuses Tesseract.
HOW: Find all text pixel coordinates, fit a minimum-area bounding rectangle,
read its angle, and rotate the image to correct the skew.
We skip correction if the detected angle > 15 degrees (likely wrong detection).
"""
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
coords = np.column_stack(np.where(binary > 0))
if len(coords) < 100:
return gray
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = 90 + angle
if abs(angle) > 15:
return gray # skip unreliable angle
(h, w) = gray.shape
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
return cv2.warpAffine(gray, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
def remove_ruled_lines(gray: np.ndarray) -> np.ndarray:
"""
WHY: Handwritten reports often use ruled (lined) paper.
Horizontal lines can break through characters, confusing Tesseract's
text segmentation. We detect and remove long horizontal line structures.
HOW: A very wide, 1-pixel-tall kernel detects structures that are long
horizontally = ruled lines. We subtract those from the image.
"""
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
lines = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel, iterations=2)
return cv2.subtract(gray, lines)
# ── 9 Preprocessing pipelines ─────────────────────────────────────────────────
def pipeline_standard(gray: np.ndarray) -> np.ndarray:
"""Pipeline A: Clean printed document, white paper, good lighting."""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
gray = cv2.fastNlMeansDenoising(gray, h=10)
return cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 10)
def pipeline_shadow(gray: np.ndarray) -> np.ndarray:
"""Pipeline B: Paper photo with shadow or uneven lighting."""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
gray = remove_shadow(gray)
gray = cv2.fastNlMeansDenoising(gray, h=8)
return cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 41, 12)
def pipeline_low_contrast(gray: np.ndarray) -> np.ndarray:
"""Pipeline C: Faded ink, old paper, poor scan quality."""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
gray = enhance_contrast_clahe(gray)
gray = cv2.fastNlMeansDenoising(gray, h=12)
# Unsharp mask: enhances edges by subtracting blurred version
blurred = cv2.GaussianBlur(gray, (0, 0), 3)
gray = cv2.addWeighted(gray, 1.5, blurred, -0.5, 0)
return cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 21, 8)
def pipeline_deskewed(gray: np.ndarray) -> np.ndarray:
"""Pipeline D: Rotated or skewed image (phone held sideways)."""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
gray = deskew(gray)
gray = cv2.fastNlMeansDenoising(gray, h=10)
return cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 10)
def pipeline_ruled_paper(gray: np.ndarray) -> np.ndarray:
"""Pipeline E: Handwritten on ruled/lined paper."""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
gray = remove_shadow(gray)
gray = remove_ruled_lines(gray)
gray = enhance_contrast_clahe(gray)
gray = cv2.fastNlMeansDenoising(gray, h=15)
return cv2.adaptiveThreshold(gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 25, 8)
def pipeline_high_noise(gray: np.ndarray) -> np.ndarray:
"""Pipeline F: Low-light photo, dim room, heavily compressed JPEG."""
gray = smart_upscale(gray, target_width=1800)
gray = detect_and_invert_if_dark(gray)
gray = cv2.fastNlMeansDenoising(gray, h=20,
templateWindowSize=7, searchWindowSize=21)
gray = enhance_contrast_clahe(gray)
# Morphological close: reconnects character strokes broken by noise
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
_, out = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return out
def pipeline_screenshot(gray: np.ndarray) -> np.ndarray:
"""
Pipeline G: Screenshot (phone, computer, WhatsApp, EHR system).
"""
gray = smart_upscale(gray)
gray = detect_and_invert_if_dark(gray)
_, out = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return out
def pipeline_inverted(gray: np.ndarray) -> np.ndarray:
"""Pipeline H: Explicit inversion for white-on-dark text."""
return cv2.bitwise_not(gray)
def pipeline_color_form(pil_image: Image.Image) -> np.ndarray:
"""
Pipeline I: Colored background (yellow paper, green header, blue stripes).
"""
img_bgr = cv2.cvtColor(np.array(pil_image.convert("RGB")), cv2.COLOR_RGB2BGR)
lab = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB)
l_chan = lab[:, :, 0]
l_chan = smart_upscale(l_chan)
l_chan = detect_and_invert_if_dark(l_chan)
l_chan = enhance_contrast_clahe(l_chan)
l_chan = cv2.fastNlMeansDenoising(l_chan, h=10)
return cv2.adaptiveThreshold(l_chan, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 10)
PIPELINE_FNS = {
"standard": pipeline_standard,
"shadow": pipeline_shadow,
"low_contrast": pipeline_low_contrast,
"deskewed": pipeline_deskewed,
"ruled_paper": pipeline_ruled_paper,
"high_noise": pipeline_high_noise,
"screenshot": pipeline_screenshot,
"inverted": pipeline_inverted,
}
# Pipeline order per detected image type
PIPELINE_ORDERS = {
"screenshot": ["screenshot", "inverted", "standard"],
"dark_bg": ["inverted", "screenshot", "standard"],
"low_contrast": ["low_contrast", "shadow", "standard"],
"colored_form": ["color_form", "low_contrast", "standard"],
"camera_photo": ["shadow", "deskewed", "ruled_paper"],
"standard": ["standard", "shadow", "low_contrast"],
}
# ═══════════════════════════════════════════════════════════════════════════════
# IMAGE TYPE DETECTOR
# ═══════════════════════════════════════════════════════════════════════════════
def detect_image_type(gray: np.ndarray, pil_image: Image.Image) -> str:
"""Heuristically classify the image to select optimal pipeline order."""
mean_b = float(np.mean(gray))
std_b = float(np.std(gray))
h, w = gray.shape
if std_b > 75 and (mean_b < 70 or mean_b > 185):
return "screenshot"
if mean_b < 85:
return "dark_bg"
if std_b < 35:
return "low_contrast"
img_hsv = cv2.cvtColor(np.array(pil_image.convert("RGB")), cv2.COLOR_RGB2HSV)
mean_sat = float(np.mean(img_hsv[:, :, 1]))
if mean_sat > 40:
return "colored_form"
if w > 2000 or h > 2000:
return "camera_photo"
return "standard"
# ═══════════════════════════════════════════════════════════════════════════════
# OCR ENGINE
# ═══════════════════════════════════════════════════════════════════════════════
OCR_CONFIGS = [
"--psm 6 --oem 3",
"--psm 4 --oem 3",
"--psm 11 --oem 3",
"--psm 3 --oem 3",
]
def run_ocr_best(processed_img: np.ndarray) -> str:
"""Run Tesseract and return output with most digits."""
pil = Image.fromarray(processed_img)
best_text = ""
best_digits = 0
for config in OCR_CONFIGS:
try:
text = pytesseract.image_to_string(pil, config=config)
digits = sum(c.isdigit() for c in text)
if digits > best_digits:
best_digits = digits
best_text = text
except Exception:
continue
return best_text
# ═══════════════════════════════════════════════════════════════════════════════
# PARSERS
# ═══════════════════════════════════════════════════════════════════════════════
def find_numbers(text: str) -> List[float]:
"""Extract all integers and decimals from a string."""
return [float(m) for m in re.findall(r'\d+\.?\d*', text)]
def match_label(line: str) -> Optional[str]:
"""Check if a line contains a known field name alias."""
ll = line.lower()
for field, aliases in ROW_ALIASES.items():
for alias in aliases:
if alias in ll:
return field
return None
def is_skip_line(line: str) -> bool:
"""Returns True if this line is a header or label-only row to skip."""
ll = line.lower()
return any(kw in ll for kw in SKIP_KEYWORDS)
def parse_table_format(raw_text: str) -> List[dict]:
"""Parser A -- Row-per-field table layout."""
lines = [l.strip() for l in raw_text.split('\n') if l.strip()]
field_vals = {}
max_visits = 0
for idx, line in enumerate(lines):
if is_skip_line(line):
continue
field = match_label(line)
if field is None:
continue
# Robust lookahead: look for numbers in current line + next 2 lines
# BUT: STOP if we see ANOTHER known label in the next lines
search_chunk = line
for offset in [1, 2]:
if idx + offset < len(lines):
next_line = lines[idx + offset]
# If the next line has IT'S OWN label, stop search here
if match_label(next_line) is not None:
break
search_chunk += " " + next_line
numbers = find_numbers(search_chunk)
lo, hi = RANGES[field]
valid = [n for n in numbers if lo <= n <= hi]
if valid:
if field in field_vals:
field_vals[field].extend(valid)
else:
field_vals[field] = valid
max_visits = max(max_visits, len(field_vals[field]))
if not field_vals:
return []
results = []
for i in range(max_visits):
visit = {}
for field in RANGES.keys():
vals = field_vals.get(field, [])
visit[field] = vals[i] if i < len(vals) else None
results.append(visit)
return results
def parse_column_format(raw_text: str) -> List[dict]:
"""Parser B -- Column-per-visit layout."""
lines = [l.strip() for l in raw_text.split('\n') if l.strip()]
header_idx = -1
header_fields = []
for i, line in enumerate(lines):
# Relaxed split for mobile OCR
parts = re.split(r'\s{1,}|\t', line)
known = [match_label(p) for p in parts]
if sum(f is not None for f in known) >= 2:
header_idx = i
header_fields = known
break
if header_idx == -1:
return []
visits = []
for line in lines[header_idx + 1:header_idx + 10]:
if is_skip_line(line):
continue
parts = re.split(r'\s{1,}|\t', line)
if not parts: continue
visit = {}
for col_idx, field in enumerate(header_fields):
if field is None or col_idx >= len(parts):
continue
nums = find_numbers(parts[col_idx])
if nums:
lo, hi = RANGES[field]
val = nums[0]
if lo <= val <= hi:
visit[field] = val
if visit:
visits.append(visit)
return visits
def parse_key_value_format(raw_text: str) -> List[dict]:
"""Parser C -- Key-value format."""
lines = [l.strip() for l in raw_text.split('\n') if l.strip()]
visit = {}
for line in lines:
if is_skip_line(line):
continue
field = match_label(line)
if field is None:
continue
bp_match = re.search(r'(\d{2,3})\s*/\s*(\d{2,3})', line)
if bp_match and field in ("systolic_bp", "diastolic_bp"):
s, d = float(bp_match.group(1)), float(bp_match.group(2))
if 70 <= s <= 200: visit["systolic_bp"] = s
if 40 <= d <= 130: visit["diastolic_bp"] = d
continue
nums = find_numbers(line)
lo, hi = RANGES[field]
valid = [n for n in nums if lo <= n <= hi]
if valid:
visit[field] = valid[0]
return [visit] if visit else []
def score_visits(visits: List[dict]) -> int:
"""Count total non-None extracted values across all visits."""
return sum(1 for v in visits for val in v.values() if val is not None)
def best_parse(raw_text: str) -> Tuple[List[dict], str]:
"""Try all parsers and return the one that extracted the most data."""
candidates = [
(parse_table_format(raw_text), "row-per-field"),
(parse_column_format(raw_text), "column-per-visit"),
(parse_key_value_format(raw_text),"key-value"),
]
return max(candidates, key=lambda x: score_visits(x[0]))
def extract_patient_id(raw_text: str) -> Optional[str]:
"""Find common patient ID patterns in OCR text."""
patterns = [
r'PT[-\s]?\d{4}[-\s]?\d+',
r'Patient\s*(ID|No\.?|Number)[:\s]+([\w-]+)',
r'Reg(?:istration)?\s*(No\.?|#|:)\s*([\w-]+)',
r'ANC\s*(?:No\.?|#|:)?\s*([\w-]+)',
r'Card\s*(?:No\.?|#)[:\s]*([\w-]+)',
r'P/(\d+)',
r'MRN[:\s]+([\w-]+)',
]
for pattern in patterns:
m = re.search(pattern, raw_text, re.IGNORECASE)
if m:
groups = [g for g in m.groups() if g]
return groups[-1] if groups else m.group(0)
return None
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN ENDPOINT
# ═══════════════════════════════════════════════════════════════════════════════
@router.post("/extract-report", response_model=ExtractResponse)
async def extract_report(request: ExtractRequest):
"""POST /extract-report β€” Offline OCR extraction."""
try:
image_bytes = base64.b64decode(request.image_base64)
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Could not decode image: {e}"
)
gray = to_gray(pil_image)
img_type = detect_image_type(gray, pil_image)
pipelines = PIPELINE_ORDERS.get(img_type, PIPELINE_ORDERS["standard"])
best_visits = []
best_score = 0
best_text = ""
best_pipeline = "none"
best_parser = "none"
all_texts = []
for pipeline_name in pipelines:
try:
if pipeline_name == "color_form":
processed = pipeline_color_form(pil_image)
else:
fn = PIPELINE_FNS.get(pipeline_name, pipeline_standard)
processed = fn(to_gray(pil_image))
except Exception:
continue
try:
raw_text = run_ocr_best(processed)
except Exception:
continue
all_texts.append(raw_text[:200])
visits, parser_name = best_parse(raw_text)
score = score_visits(visits)
if score > best_score:
best_score = score
best_visits = visits
best_text = raw_text
best_pipeline = pipeline_name
best_parser = parser_name
# 4. Handle complete failure
if not best_visits or best_score == 0:
fallback_text = all_texts[0][:200] if all_texts else "No text extracted"
return ExtractResponse(
visits = [],
patient_id = None,
notes = (
f"Image type detected: {img_type}. "
"No structured data could be extracted. "
"Check image quality or unusual layout."
),
confidence = 0.0,
raw_text = fallback_text,
)
# 5. Build ExtractedVisit objects
extracted = [
ExtractedVisit(
age = v.get("age"),
systolic_bp = v.get("systolic_bp"),
diastolic_bp = v.get("diastolic_bp"),
blood_sugar = v.get("blood_sugar"),
body_temp = v.get("body_temp"),
heart_rate = v.get("heart_rate"),
)
for v in best_visits
]
total_possible = len(extracted) * 6
total_filled = sum(
1 for v in extracted
for val in [v.age, v.systolic_bp, v.diastolic_bp,
v.blood_sugar, v.body_temp, v.heart_rate]
if val is not None
)
confidence = round(total_filled / max(total_possible, 1), 2)
notes = (
f"Type: {img_type} | "
f"Pipe: {best_pipeline} | "
f"Parser: {best_parser} | "
f"{total_filled}/{total_possible} fields."
)
return ExtractResponse(
visits = extracted,
patient_id = extract_patient_id(best_text),
notes = notes,
confidence = confidence,
raw_text = best_text[:500],
)