bitcheck-document / app /services /qr_code_analyzer.py
AI Assistant
Update Bitcheck Document Service and test.html
37a1755
from pathlib import Path
from urllib.parse import urlparse
from app.schemas.document_verification import (
LiveQRVerification,
QRAnalysis,
QRAnalysisItem,
QRBoundingBox,
QRTextConsistency,
)
from app.services.live_qr_verifier import LiveQRVerifier
from app.services.source_url_analyzer import SourceURLAnalyzer
class QRCodeAnalyzer:
def __init__(self) -> None:
self.url_analyzer = SourceURLAnalyzer()
self.live_verifier = LiveQRVerifier()
def analyze(
self,
page_images: list[str],
run_qr: bool,
run_live_qr_check: bool,
extracted_fields: dict[str, str] | None = None,
) -> QRAnalysis:
warnings: list[str] = []
flags: list[str] = []
if not run_qr:
if run_live_qr_check:
warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.")
return self._empty(warnings=warnings)
if run_live_qr_check:
warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.")
cv2 = self._load_cv2()
if cv2 is None:
warnings.append("OpenCV QRCodeDetector is unavailable; optional pyzbar decoding will be attempted if installed.")
items: list[QRAnalysisItem] = []
barcodes_found = False
seen: set[tuple[str, int]] = set()
for page_index, image_path in enumerate(page_images, start=1):
if cv2 is not None:
image = cv2.imread(str(Path(image_path)))
if image is None:
warnings.append(f"Could not read page image for QR analysis: {image_path}")
continue
for data, bbox in self._detect_qr_cv2(cv2, image):
key = (data, page_index)
if not data or key in seen:
continue
seen.add(key)
items.append(self._build_item(data, page_index, bbox, False, extracted_fields or {}))
pyzbar_items, pyzbar_barcodes = self._detect_with_pyzbar(image_path, page_index, seen, False, extracted_fields or {})
items.extend(pyzbar_items)
barcodes_found = barcodes_found or pyzbar_barcodes
consistency = self._qr_text_consistency(items, extracted_fields or {})
risk = max([item.url_analysis.risk_score for item in items if item.url_analysis] + [consistency.risk_score, 0.0])
live_risks = [item.live_verification.risk_score for item in items if item.live_verification and item.live_verification.risk_score is not None]
if live_risks:
risk = max(risk, max(live_risks))
for item in items:
if item.url_analysis:
flags.extend(item.url_analysis.flags)
if item.live_verification:
flags.extend(item.live_verification.flags)
flags.extend(consistency.mismatch_flags)
return QRAnalysis(
checked=True,
qr_found=bool(items),
barcodes_found=barcodes_found,
items=items,
qr_text_consistency=consistency,
risk_score=round(min(risk, 1.0), 2),
flags=list(dict.fromkeys(flags)),
warnings=warnings,
)
def _build_item(
self,
data: str,
page: int,
bbox: QRBoundingBox,
run_live_qr_check: bool,
extracted_fields: dict[str, str],
) -> QRAnalysisItem:
data_type = self._classify_data(data)
url_analysis = self.url_analyzer.analyze(data) if data_type == "url" else None
live_verification: LiveQRVerification | None = None
if url_analysis:
eligible = url_analysis.scheme in {"http", "https"} and not url_analysis.is_private_or_internal
live_verification = self.live_verifier.not_requested(eligible=eligible)
return QRAnalysisItem(
type="QR_CODE",
data=data,
data_type=data_type,
page=page,
bbox=bbox,
url_analysis=url_analysis,
live_verification=live_verification,
)
def _detect_qr_cv2(self, cv2, image) -> list[tuple[str, QRBoundingBox]]:
detector = cv2.QRCodeDetector()
results: list[tuple[str, QRBoundingBox]] = []
try:
ok, decoded_info, points, _ = detector.detectAndDecodeMulti(image)
if ok and points is not None:
for data, point_set in zip(decoded_info, points):
results.append((data, self._bbox_from_points(point_set)))
return results
except Exception:
pass
try:
data, points, _ = detector.detectAndDecode(image)
if data and points is not None:
results.append((data, self._bbox_from_points(points[0])))
except Exception:
pass
return results
def _detect_with_pyzbar(
self,
image_path: str,
page: int,
seen: set[tuple[str, int]],
run_live_qr_check: bool,
extracted_fields: dict[str, str],
) -> tuple[list[QRAnalysisItem], bool]:
try:
from PIL import Image
from pyzbar.pyzbar import decode
except Exception:
return [], False
items: list[QRAnalysisItem] = []
barcodes_found = False
try:
decoded = decode(Image.open(image_path))
except Exception:
return [], False
for item in decoded:
data = item.data.decode("utf-8", errors="ignore")
key = (data, page)
if key in seen:
continue
seen.add(key)
rect = item.rect
bbox = QRBoundingBox(x=rect.left, y=rect.top, width=rect.width, height=rect.height)
if item.type == "QRCODE":
items.append(self._build_item(data, page, bbox, run_live_qr_check, extracted_fields))
else:
barcodes_found = True
return items, barcodes_found
def _qr_text_consistency(self, items: list[QRAnalysisItem], fields: dict[str, str]) -> QRTextConsistency:
matched: list[str] = []
mismatches: list[str] = []
searchable = " ".join(item.data for item in items).lower()
for key, value in fields.items():
if value and str(value).lower() in searchable:
matched.append(key)
return QRTextConsistency(
checked=True,
matched_document_fields=matched,
mismatch_flags=mismatches,
risk_score=0.0 if not mismatches else 0.4,
)
def _bbox_from_points(self, points) -> QRBoundingBox:
xs = [int(point[0]) for point in points]
ys = [int(point[1]) for point in points]
return QRBoundingBox(x=min(xs), y=min(ys), width=max(xs) - min(xs), height=max(ys) - min(ys))
def _classify_data(self, data: str) -> str:
parsed = urlparse(data)
if parsed.scheme in {"http", "https"} and parsed.netloc:
return "url"
if data.lower().startswith("mailto:") or "@" in data:
return "email"
if data.lower().startswith("tel:") or data.replace("+", "").replace("-", "").replace(" ", "").isdigit():
return "phone"
if data.strip():
return "plain_text"
return "unknown"
def _empty(self, warnings: list[str] | None = None) -> QRAnalysis:
return QRAnalysis(
checked=True,
qr_found=False,
barcodes_found=False,
items=[],
qr_text_consistency=QRTextConsistency(
checked=True,
matched_document_fields=[],
mismatch_flags=[],
risk_score=0.0,
),
risk_score=0.0,
flags=[],
warnings=warnings or [],
)
def _load_cv2(self):
try:
import cv2
return cv2
except Exception:
return None