File size: 8,477 Bytes
75bea1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from __future__ import annotations
"""Quality evaluation for responses."""

from dataclasses import dataclass
from typing import Any


@dataclass
class QualityScore:
    """Quality evaluation scores."""

    relevance: float  # 0-1: How relevant is the answer
    completeness: float  # 0-1: How complete is the answer
    accuracy: float  # 0-1: Estimated accuracy
    clarity: float  # 0-1: How clear is the answer
    sourcing: float  # 0-1: Quality of source citations
    overall: float  # 0-1: Overall quality score
    feedback: list[str]  # Specific feedback items


class QualityEvaluator:
    """Evaluates the quality of generated responses."""

    def __init__(self, min_quality_threshold: float = 0.6):
        """Initialize the evaluator.
        
        Args:
            min_quality_threshold: Minimum acceptable quality score
        """
        self.min_quality_threshold = min_quality_threshold

    def evaluate(
        self,
        query: str,
        answer: str,
        sources: list[dict[str, str]] | None = None,
        reasoning_steps: list[str] | None = None,
    ) -> QualityScore:
        """Evaluate the quality of a response.
        
        Args:
            query: Original user query
            answer: Generated answer
            sources: List of source citations
            reasoning_steps: Reasoning steps taken
            
        Returns:
            QualityScore with detailed evaluation
        """
        feedback = []

        # Evaluate relevance
        relevance = self._evaluate_relevance(query, answer)
        if relevance < 0.5:
            feedback.append("Answer may not be relevant to the question")

        # Evaluate completeness
        completeness = self._evaluate_completeness(query, answer)
        if completeness < 0.5:
            feedback.append("Answer appears incomplete")

        # Evaluate accuracy (based on source count and reasoning)
        accuracy = self._evaluate_accuracy(sources, reasoning_steps)
        if accuracy < 0.5:
            feedback.append("Accuracy could not be verified with sources")

        # Evaluate clarity
        clarity = self._evaluate_clarity(answer)
        if clarity < 0.5:
            feedback.append("Answer could be clearer")

        # Evaluate sourcing
        sourcing = self._evaluate_sourcing(answer, sources)
        if sourcing < 0.5:
            feedback.append("More sources would improve credibility")

        # Calculate overall score (weighted average)
        overall = (
            relevance * 0.25
            + completeness * 0.2
            + accuracy * 0.25
            + clarity * 0.15
            + sourcing * 0.15
        )

        if overall >= self.min_quality_threshold:
            feedback.insert(0, "Response meets quality standards")
        else:
            feedback.insert(0, "Response may need refinement")

        return QualityScore(
            relevance=relevance,
            completeness=completeness,
            accuracy=accuracy,
            clarity=clarity,
            sourcing=sourcing,
            overall=overall,
            feedback=feedback,
        )

    def is_acceptable(self, score: QualityScore) -> bool:
        """Check if quality score is acceptable.
        
        Args:
            score: Quality score to check
            
        Returns:
            True if acceptable
        """
        return score.overall >= self.min_quality_threshold

    def _evaluate_relevance(self, query: str, answer: str) -> float:
        """Evaluate answer relevance to query.
        
        Args:
            query: User query
            answer: Generated answer
            
        Returns:
            Relevance score (0-1)
        """
        if not answer:
            return 0.0

        # Simple keyword matching
        query_words = set(query.lower().split())
        answer_words = set(answer.lower().split())

        # Remove common words
        stopwords = {"the", "a", "an", "is", "are", "was", "were", "what", "how", "when", "where", "why", "who"}
        query_words -= stopwords
        answer_words -= stopwords

        if not query_words:
            return 0.5

        overlap = len(query_words & answer_words)
        return min(1.0, overlap / len(query_words) + 0.3)  # Base score + overlap

    def _evaluate_completeness(self, query: str, answer: str) -> float:
        """Evaluate answer completeness.
        
        Args:
            query: User query
            answer: Generated answer
            
        Returns:
            Completeness score (0-1)
        """
        if not answer:
            return 0.0

        # Check answer length relative to query complexity
        query_words = len(query.split())
        answer_words = len(answer.split())

        # Longer queries typically need longer answers
        expected_min = max(20, query_words * 3)

        if answer_words < expected_min:
            return answer_words / expected_min

        # Check for explanation patterns
        explanation_markers = ["because", "since", "therefore", "this means", "in other words"]
        has_explanation = any(marker in answer.lower() for marker in explanation_markers)

        score = 0.7
        if has_explanation:
            score += 0.2
        if answer_words > expected_min * 2:
            score += 0.1

        return min(1.0, score)

    def _evaluate_accuracy(
        self,
        sources: list[dict[str, str]] | None,
        reasoning_steps: list[str] | None,
    ) -> float:
        """Evaluate estimated accuracy.
        
        Args:
            sources: List of sources
            reasoning_steps: Reasoning steps
            
        Returns:
            Accuracy score (0-1)
        """
        score = 0.3  # Base score

        # More sources = higher potential accuracy
        if sources:
            score += min(0.3, len(sources) * 0.1)

        # Reasoning steps suggest careful analysis
        if reasoning_steps:
            score += min(0.3, len(reasoning_steps) * 0.1)

        # Cap at 0.9 since we can't truly verify accuracy
        return min(0.9, score)

    def _evaluate_clarity(self, answer: str) -> float:
        """Evaluate answer clarity.
        
        Args:
            answer: Generated answer
            
        Returns:
            Clarity score (0-1)
        """
        if not answer:
            return 0.0

        score = 0.5

        # Check sentence structure (average length)
        sentences = answer.split(".")
        if sentences:
            avg_sentence_length = len(answer.split()) / len(sentences)
            # Ideal: 15-25 words per sentence
            if 10 <= avg_sentence_length <= 30:
                score += 0.2

        # Check for structure (paragraphs, lists)
        if "\n" in answer:
            score += 0.1
        if any(marker in answer for marker in ["-", "•", "1.", "2."]):
            score += 0.1

        # Check for hedge words (too many = less clear)
        hedge_words = ["might", "perhaps", "maybe", "possibly", "could"]
        hedge_count = sum(1 for word in hedge_words if word in answer.lower())
        if hedge_count > 3:
            score -= 0.1

        return min(1.0, max(0.0, score))

    def _evaluate_sourcing(
        self,
        answer: str,
        sources: list[dict[str, str]] | None,
    ) -> float:
        """Evaluate source quality.
        
        Args:
            answer: Generated answer
            sources: List of sources
            
        Returns:
            Sourcing score (0-1)
        """
        if not sources:
            return 0.2

        score = 0.3

        # More sources = better
        source_count = len(sources)
        score += min(0.3, source_count * 0.1)

        # Check for diverse domains
        urls = [s.get("url", "") for s in sources]
        domains = set()
        for url in urls:
            if url:
                try:
                    from urllib.parse import urlparse
                    domain = urlparse(url).netloc
                    domains.add(domain)
                except Exception:
                    pass

        # Domain diversity
        if len(domains) > 1:
            score += 0.2

        # Check for reliable domains
        reliable_indicators = [".gov", ".edu", "wikipedia.org"]
        for url in urls:
            if any(ind in url.lower() for ind in reliable_indicators):
                score += 0.1
                break

        return min(1.0, score)