File size: 8,088 Bytes
6a8a839
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
from pathlib import Path
from urllib.parse import urlparse

from app.schemas.document_verification import (
    LiveQRVerification,
    QRAnalysis,
    QRAnalysisItem,
    QRBoundingBox,
    QRTextConsistency,
)
from app.services.live_qr_verifier import LiveQRVerifier
from app.services.source_url_analyzer import SourceURLAnalyzer


class QRCodeAnalyzer:
    def __init__(self) -> None:
        self.url_analyzer = SourceURLAnalyzer()
        self.live_verifier = LiveQRVerifier()

    def analyze(
        self,
        page_images: list[str],
        run_qr: bool,
        run_live_qr_check: bool,
        extracted_fields: dict[str, str] | None = None,
    ) -> QRAnalysis:
        warnings: list[str] = []
        flags: list[str] = []
        if not run_qr:
            if run_live_qr_check:
                warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.")
            return self._empty(warnings=warnings)
        if run_live_qr_check:
            warnings.append("QR live browsing is disabled; BitCheck only performs structural QR URL analysis.")

        cv2 = self._load_cv2()
        if cv2 is None:
            warnings.append("OpenCV QRCodeDetector is unavailable; optional pyzbar decoding will be attempted if installed.")

        items: list[QRAnalysisItem] = []
        barcodes_found = False
        seen: set[tuple[str, int]] = set()
        for page_index, image_path in enumerate(page_images, start=1):
            if cv2 is not None:
                image = cv2.imread(str(Path(image_path)))
                if image is None:
                    warnings.append(f"Could not read page image for QR analysis: {image_path}")
                    continue
                for data, bbox in self._detect_qr_cv2(cv2, image):
                    key = (data, page_index)
                    if not data or key in seen:
                        continue
                    seen.add(key)
                    items.append(self._build_item(data, page_index, bbox, False, extracted_fields or {}))
            pyzbar_items, pyzbar_barcodes = self._detect_with_pyzbar(image_path, page_index, seen, False, extracted_fields or {})
            items.extend(pyzbar_items)
            barcodes_found = barcodes_found or pyzbar_barcodes

        consistency = self._qr_text_consistency(items, extracted_fields or {})
        risk = max([item.url_analysis.risk_score for item in items if item.url_analysis] + [consistency.risk_score, 0.0])
        live_risks = [item.live_verification.risk_score for item in items if item.live_verification and item.live_verification.risk_score is not None]
        if live_risks:
            risk = max(risk, max(live_risks))
        for item in items:
            if item.url_analysis:
                flags.extend(item.url_analysis.flags)
            if item.live_verification:
                flags.extend(item.live_verification.flags)
        flags.extend(consistency.mismatch_flags)

        return QRAnalysis(
            checked=True,
            qr_found=bool(items),
            barcodes_found=barcodes_found,
            items=items,
            qr_text_consistency=consistency,
            risk_score=round(min(risk, 1.0), 2),
            flags=list(dict.fromkeys(flags)),
            warnings=warnings,
        )

    def _build_item(
        self,
        data: str,
        page: int,
        bbox: QRBoundingBox,
        run_live_qr_check: bool,
        extracted_fields: dict[str, str],
    ) -> QRAnalysisItem:
        data_type = self._classify_data(data)
        url_analysis = self.url_analyzer.analyze(data) if data_type == "url" else None
        live_verification: LiveQRVerification | None = None
        if url_analysis:
            eligible = url_analysis.scheme in {"http", "https"} and not url_analysis.is_private_or_internal
            live_verification = self.live_verifier.not_requested(eligible=eligible)
        return QRAnalysisItem(
            type="QR_CODE",
            data=data,
            data_type=data_type,
            page=page,
            bbox=bbox,
            url_analysis=url_analysis,
            live_verification=live_verification,
        )

    def _detect_qr_cv2(self, cv2, image) -> list[tuple[str, QRBoundingBox]]:
        detector = cv2.QRCodeDetector()
        results: list[tuple[str, QRBoundingBox]] = []
        try:
            ok, decoded_info, points, _ = detector.detectAndDecodeMulti(image)
            if ok and points is not None:
                for data, point_set in zip(decoded_info, points):
                    results.append((data, self._bbox_from_points(point_set)))
                return results
        except Exception:
            pass
        try:
            data, points, _ = detector.detectAndDecode(image)
            if data and points is not None:
                results.append((data, self._bbox_from_points(points[0])))
        except Exception:
            pass
        return results

    def _detect_with_pyzbar(
        self,
        image_path: str,
        page: int,
        seen: set[tuple[str, int]],
        run_live_qr_check: bool,
        extracted_fields: dict[str, str],
    ) -> tuple[list[QRAnalysisItem], bool]:
        try:
            from PIL import Image
            from pyzbar.pyzbar import decode
        except Exception:
            return [], False
        items: list[QRAnalysisItem] = []
        barcodes_found = False
        try:
            decoded = decode(Image.open(image_path))
        except Exception:
            return [], False
        for item in decoded:
            data = item.data.decode("utf-8", errors="ignore")
            key = (data, page)
            if key in seen:
                continue
            seen.add(key)
            rect = item.rect
            bbox = QRBoundingBox(x=rect.left, y=rect.top, width=rect.width, height=rect.height)
            if item.type == "QRCODE":
                items.append(self._build_item(data, page, bbox, run_live_qr_check, extracted_fields))
            else:
                barcodes_found = True
        return items, barcodes_found

    def _qr_text_consistency(self, items: list[QRAnalysisItem], fields: dict[str, str]) -> QRTextConsistency:
        matched: list[str] = []
        mismatches: list[str] = []
        searchable = " ".join(item.data for item in items).lower()
        for key, value in fields.items():
            if value and str(value).lower() in searchable:
                matched.append(key)
        return QRTextConsistency(
            checked=True,
            matched_document_fields=matched,
            mismatch_flags=mismatches,
            risk_score=0.0 if not mismatches else 0.4,
        )

    def _bbox_from_points(self, points) -> QRBoundingBox:
        xs = [int(point[0]) for point in points]
        ys = [int(point[1]) for point in points]
        return QRBoundingBox(x=min(xs), y=min(ys), width=max(xs) - min(xs), height=max(ys) - min(ys))

    def _classify_data(self, data: str) -> str:
        parsed = urlparse(data)
        if parsed.scheme in {"http", "https"} and parsed.netloc:
            return "url"
        if data.lower().startswith("mailto:") or "@" in data:
            return "email"
        if data.lower().startswith("tel:") or data.replace("+", "").replace("-", "").replace(" ", "").isdigit():
            return "phone"
        if data.strip():
            return "plain_text"
        return "unknown"

    def _empty(self, warnings: list[str] | None = None) -> QRAnalysis:
        return QRAnalysis(
            checked=True,
            qr_found=False,
            barcodes_found=False,
            items=[],
            qr_text_consistency=QRTextConsistency(
                checked=True,
                matched_document_fields=[],
                mismatch_flags=[],
                risk_score=0.0,
            ),
            risk_score=0.0,
            flags=[],
            warnings=warnings or [],
        )

    def _load_cv2(self):
        try:
            import cv2

            return cv2
        except Exception:
            return None