Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from app.schemas.document_verification import ( | |
| LiveQRVerification, | |
| QRAnalysis, | |
| QRAnalysisItem, | |
| QRBoundingBox, | |
| QRTextConsistency, | |
| ) | |
| from app.services.live_qr_verifier import LiveQRVerifier | |
| from app.services.source_url_analyzer import SourceURLAnalyzer | |
| class QRCodeAnalyzer: | |
| def __init__(self) -> None: | |
| self.url_analyzer = SourceURLAnalyzer() | |
| self.live_verifier = LiveQRVerifier() | |
| def analyze( | |
| self, | |
| page_images: list[str], | |
| run_qr: bool, | |
| run_live_qr_check: bool, | |
| extracted_fields: dict[str, str] | None = None, | |
| ) -> QRAnalysis: | |
| warnings: list[str] = [] | |
| flags: list[str] = [] | |
| if not run_qr: | |
| if run_live_qr_check: | |
| warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.") | |
| return self._empty(warnings=warnings) | |
| if run_live_qr_check: | |
| warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.") | |
| cv2 = self._load_cv2() | |
| if cv2 is None: | |
| warnings.append("OpenCV QRCodeDetector is unavailable; optional pyzbar decoding will be attempted if installed.") | |
| items: list[QRAnalysisItem] = [] | |
| barcodes_found = False | |
| seen: set[tuple[str, int]] = set() | |
| for page_index, image_path in enumerate(page_images, start=1): | |
| if cv2 is not None: | |
| image = cv2.imread(str(Path(image_path))) | |
| if image is None: | |
| warnings.append(f"Could not read page image for QR analysis: {image_path}") | |
| continue | |
| for data, bbox in self._detect_qr_cv2(cv2, image): | |
| key = (data, page_index) | |
| if not data or key in seen: | |
| continue | |
| seen.add(key) | |
| items.append(self._build_item(data, page_index, bbox, False, extracted_fields or {})) | |
| pyzbar_items, pyzbar_barcodes = self._detect_with_pyzbar(image_path, page_index, seen, False, extracted_fields or {}) | |
| items.extend(pyzbar_items) | |
| barcodes_found = barcodes_found or pyzbar_barcodes | |
| consistency = self._qr_text_consistency(items, extracted_fields or {}) | |
| risk = max([item.url_analysis.risk_score for item in items if item.url_analysis] + [consistency.risk_score, 0.0]) | |
| live_risks = [item.live_verification.risk_score for item in items if item.live_verification and item.live_verification.risk_score is not None] | |
| if live_risks: | |
| risk = max(risk, max(live_risks)) | |
| for item in items: | |
| if item.url_analysis: | |
| flags.extend(item.url_analysis.flags) | |
| if item.live_verification: | |
| flags.extend(item.live_verification.flags) | |
| flags.extend(consistency.mismatch_flags) | |
| return QRAnalysis( | |
| checked=True, | |
| qr_found=bool(items), | |
| barcodes_found=barcodes_found, | |
| items=items, | |
| qr_text_consistency=consistency, | |
| risk_score=round(min(risk, 1.0), 2), | |
| flags=list(dict.fromkeys(flags)), | |
| warnings=warnings, | |
| ) | |
| def _build_item( | |
| self, | |
| data: str, | |
| page: int, | |
| bbox: QRBoundingBox, | |
| run_live_qr_check: bool, | |
| extracted_fields: dict[str, str], | |
| ) -> QRAnalysisItem: | |
| data_type = self._classify_data(data) | |
| url_analysis = self.url_analyzer.analyze(data) if data_type == "url" else None | |
| live_verification: LiveQRVerification | None = None | |
| if url_analysis: | |
| eligible = url_analysis.scheme in {"http", "https"} and not url_analysis.is_private_or_internal | |
| live_verification = self.live_verifier.not_requested(eligible=eligible) | |
| return QRAnalysisItem( | |
| type="QR_CODE", | |
| data=data, | |
| data_type=data_type, | |
| page=page, | |
| bbox=bbox, | |
| url_analysis=url_analysis, | |
| live_verification=live_verification, | |
| ) | |
| def _detect_qr_cv2(self, cv2, image) -> list[tuple[str, QRBoundingBox]]: | |
| detector = cv2.QRCodeDetector() | |
| results: list[tuple[str, QRBoundingBox]] = [] | |
| try: | |
| ok, decoded_info, points, _ = detector.detectAndDecodeMulti(image) | |
| if ok and points is not None: | |
| for data, point_set in zip(decoded_info, points): | |
| results.append((data, self._bbox_from_points(point_set))) | |
| return results | |
| except Exception: | |
| pass | |
| try: | |
| data, points, _ = detector.detectAndDecode(image) | |
| if data and points is not None: | |
| results.append((data, self._bbox_from_points(points[0]))) | |
| except Exception: | |
| pass | |
| return results | |
| def _detect_with_pyzbar( | |
| self, | |
| image_path: str, | |
| page: int, | |
| seen: set[tuple[str, int]], | |
| run_live_qr_check: bool, | |
| extracted_fields: dict[str, str], | |
| ) -> tuple[list[QRAnalysisItem], bool]: | |
| try: | |
| from PIL import Image | |
| from pyzbar.pyzbar import decode | |
| except Exception: | |
| return [], False | |
| items: list[QRAnalysisItem] = [] | |
| barcodes_found = False | |
| try: | |
| decoded = decode(Image.open(image_path)) | |
| except Exception: | |
| return [], False | |
| for item in decoded: | |
| data = item.data.decode("utf-8", errors="ignore") | |
| key = (data, page) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| rect = item.rect | |
| bbox = QRBoundingBox(x=rect.left, y=rect.top, width=rect.width, height=rect.height) | |
| if item.type == "QRCODE": | |
| items.append(self._build_item(data, page, bbox, run_live_qr_check, extracted_fields)) | |
| else: | |
| barcodes_found = True | |
| return items, barcodes_found | |
| def _qr_text_consistency(self, items: list[QRAnalysisItem], fields: dict[str, str]) -> QRTextConsistency: | |
| matched: list[str] = [] | |
| mismatches: list[str] = [] | |
| searchable = " ".join(item.data for item in items).lower() | |
| for key, value in fields.items(): | |
| if value and str(value).lower() in searchable: | |
| matched.append(key) | |
| return QRTextConsistency( | |
| checked=True, | |
| matched_document_fields=matched, | |
| mismatch_flags=mismatches, | |
| risk_score=0.0 if not mismatches else 0.4, | |
| ) | |
| def _bbox_from_points(self, points) -> QRBoundingBox: | |
| xs = [int(point[0]) for point in points] | |
| ys = [int(point[1]) for point in points] | |
| return QRBoundingBox(x=min(xs), y=min(ys), width=max(xs) - min(xs), height=max(ys) - min(ys)) | |
| def _classify_data(self, data: str) -> str: | |
| parsed = urlparse(data) | |
| if parsed.scheme in {"http", "https"} and parsed.netloc: | |
| return "url" | |
| if data.lower().startswith("mailto:") or "@" in data: | |
| return "email" | |
| if data.lower().startswith("tel:") or data.replace("+", "").replace("-", "").replace(" ", "").isdigit(): | |
| return "phone" | |
| if data.strip(): | |
| return "plain_text" | |
| return "unknown" | |
| def _empty(self, warnings: list[str] | None = None) -> QRAnalysis: | |
| return QRAnalysis( | |
| checked=True, | |
| qr_found=False, | |
| barcodes_found=False, | |
| items=[], | |
| qr_text_consistency=QRTextConsistency( | |
| checked=True, | |
| matched_document_fields=[], | |
| mismatch_flags=[], | |
| risk_score=0.0, | |
| ), | |
| risk_score=0.0, | |
| flags=[], | |
| warnings=warnings or [], | |
| ) | |
| def _load_cv2(self): | |
| try: | |
| import cv2 | |
| return cv2 | |
| except Exception: | |
| return None | |