File size: 8,509 Bytes
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
 
1bbe15b
 
 
5b6e847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
 
 
 
 
1bbe15b
5b6e847
1bbe15b
5b6e847
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
1bbe15b
5b6e847
 
 
1bbe15b
5b6e847
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
1bbe15b
5b6e847
 
 
 
 
 
1bbe15b
5b6e847
 
1bbe15b
5b6e847
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Firewall Decision Engine for VDHF

Controls output delivery based on verification score and threshold.
"""

from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

from config.settings import FIREWALL_THRESHOLD
from core.verifier import VerificationResult, ClaimVerifier
from core.claim_extractor import Claim


class FirewallDecision(Enum):
    """Possible firewall decisions."""
    PASS = "PASS"  # Response is safe to deliver
    REGENERATE = "REGENERATE"  # Trigger prompt refinement


@dataclass
class FirewallResult:
    """Result of firewall evaluation."""
    decision: FirewallDecision
    support_ratio: float
    threshold: float
    total_claims: int
    supported_claims: int
    unsupported_claims: int
    verification_results: List[VerificationResult]

    @property
    def is_safe(self) -> bool:
        return self.decision == FirewallDecision.PASS

    def __str__(self) -> str:
        status = "✓ PASSED" if self.is_safe else "✗ BLOCKED"
        return (
            f"Firewall Decision: {status}\n"
            f"  Support Ratio: {self.support_ratio:.2%} (threshold: {self.threshold:.2%})\n"
            f"  Claims: {self.supported_claims}/{self.total_claims} supported"
        )


class FirewallScoringModule:
    """
    Firewall Scoring Module

    Computes global verification score for the response.

    Metric:
    SupportRatio = (Number of SUPPORTED claims) / (Total number of claims)
    """

    def __init__(self, threshold: float = FIREWALL_THRESHOLD):
        self.threshold = threshold

    def compute_support_ratio(
        self,
        verification_results: List[VerificationResult]
    ) -> float:
        """
        Compute the support ratio for verification results.

        Args:
            verification_results: List of VerificationResult objects

        Returns:
            Support ratio (0.0 to 1.0)
        """
        if not verification_results:
            return 1.0  # No claims = safe

        supported = sum(1 for r in verification_results if r.is_supported)
        total = len(verification_results)

        return supported / total

    def get_detailed_scores(
        self,
        verification_results: List[VerificationResult]
    ) -> Dict[str, Any]:
        """
        Get detailed scoring breakdown.

        Args:
            verification_results: List of VerificationResult objects

        Returns:
            Dictionary with detailed scores
        """
        total = len(verification_results)
        supported = sum(1 for r in verification_results if r.is_supported)

        similarity_scores = [r.similarity_score for r in verification_results]
        entailment_scores = [r.entailment_score for r in verification_results]

        return {
            'support_ratio': supported / total if total > 0 else 1.0,
            'total_claims': total,
            'supported_claims': supported,
            'unsupported_claims': total - supported,
            'average_similarity': sum(similarity_scores) / len(similarity_scores) if similarity_scores else 0.0,
            'average_entailment': sum(entailment_scores) / len(entailment_scores) if entailment_scores else 0.0,
            'min_similarity': min(similarity_scores) if similarity_scores else 0.0,
            'max_similarity': max(similarity_scores) if similarity_scores else 0.0,
        }


class FirewallDecisionEngine:
    """
    Firewall Decision Engine

    Purpose:
    Control output delivery based on verification score.

    Decision Logic:
    If SupportRatio >= tau:
        Allow the response to pass to the user
    Else:
        Trigger prompt refinement and regeneration
    """

    def __init__(self, threshold: float = FIREWALL_THRESHOLD):
        self.threshold = threshold
        self.scoring_module = FirewallScoringModule(threshold)

    def evaluate(
        self,
        verification_results: List[VerificationResult]
    ) -> FirewallResult:
        """
        Evaluate whether response should pass the firewall.

        Args:
            verification_results: List of VerificationResult objects

        Returns:
            FirewallResult object with decision
        """
        # Compute support ratio
        support_ratio = self.scoring_module.compute_support_ratio(verification_results)

        # Make decision
        if support_ratio >= self.threshold:
            decision = FirewallDecision.PASS
        else:
            decision = FirewallDecision.REGENERATE

        # Count claims
        total = len(verification_results)
        supported = sum(1 for r in verification_results if r.is_supported)

        return FirewallResult(
            decision=decision,
            support_ratio=support_ratio,
            threshold=self.threshold,
            total_claims=total,
            supported_claims=supported,
            unsupported_claims=total - supported,
            verification_results=verification_results
        )

    def get_unsupported_claims(
        self,
        firewall_result: FirewallResult
    ) -> List[Claim]:
        """
        Get list of unsupported claims for prompt refinement.

        Args:
            firewall_result: FirewallResult object

        Returns:
            List of unsupported Claim objects
        """
        unsupported = []
        for result in firewall_result.verification_results:
            if not result.is_supported:
                unsupported.append(result.claim)
        return unsupported

    def get_supported_claims(
        self,
        firewall_result: FirewallResult
    ) -> List[Claim]:
        """
        Get list of supported claims.

        Args:
            firewall_result: FirewallResult object

        Returns:
            List of supported Claim objects
        """
        supported = []
        for result in firewall_result.verification_results:
            if result.is_supported:
                supported.append(result.claim)
        return supported

    def get_verified_evidence(
        self,
        firewall_result: FirewallResult
    ) -> List[str]:
        """
        Get evidence that supports verified claims.

        Args:
            firewall_result: FirewallResult object

        Returns:
            List of evidence strings
        """
        evidence = set()
        for result in firewall_result.verification_results:
            if result.is_supported and result.best_evidence:
                evidence.add(result.best_evidence)
        return list(evidence)


class HallucinationFirewall:
    """
    Complete Hallucination Firewall

    Combines verification and decision-making into a single interface.
    """

    def __init__(
        self,
        similarity_threshold: float = None,
        firewall_threshold: float = FIREWALL_THRESHOLD
    ):
        from config.settings import SIMILARITY_THRESHOLD
        self.similarity_threshold = similarity_threshold or SIMILARITY_THRESHOLD
        self.firewall_threshold = firewall_threshold

        self.verifier = ClaimVerifier(self.similarity_threshold)
        self.decision_engine = FirewallDecisionEngine(firewall_threshold)

    def check_response(
        self,
        claims: List[Claim],
        evidence_list: List
    ) -> FirewallResult:
        """
        Check a response through the firewall.

        Args:
            claims: Extracted claims from response
            evidence_list: Retrieved evidence

        Returns:
            FirewallResult with decision
        """
        # Verify all claims
        verification_results = self.verifier.verify_all_claims(claims, evidence_list)

        # Evaluate through firewall
        result = self.decision_engine.evaluate(verification_results)

        return result

    def get_refinement_info(
        self,
        firewall_result: FirewallResult
    ) -> Dict[str, Any]:
        """
        Get information needed for prompt refinement.

        Args:
            firewall_result: FirewallResult from check_response

        Returns:
            Dictionary with refinement information
        """
        return {
            'unsupported_claims': self.decision_engine.get_unsupported_claims(firewall_result),
            'supported_claims': self.decision_engine.get_supported_claims(firewall_result),
            'verified_evidence': self.decision_engine.get_verified_evidence(firewall_result),
            'support_ratio': firewall_result.support_ratio,
            'needs_regeneration': not firewall_result.is_safe
        }