Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| """Quality evaluation for responses.""" | |
| from dataclasses import dataclass | |
| from typing import Any | |
| class QualityScore: | |
| """Quality evaluation scores.""" | |
| relevance: float # 0-1: How relevant is the answer | |
| completeness: float # 0-1: How complete is the answer | |
| accuracy: float # 0-1: Estimated accuracy | |
| clarity: float # 0-1: How clear is the answer | |
| sourcing: float # 0-1: Quality of source citations | |
| overall: float # 0-1: Overall quality score | |
| feedback: list[str] # Specific feedback items | |
| class QualityEvaluator: | |
| """Evaluates the quality of generated responses.""" | |
| def __init__(self, min_quality_threshold: float = 0.6): | |
| """Initialize the evaluator. | |
| Args: | |
| min_quality_threshold: Minimum acceptable quality score | |
| """ | |
| self.min_quality_threshold = min_quality_threshold | |
| def evaluate( | |
| self, | |
| query: str, | |
| answer: str, | |
| sources: list[dict[str, str]] | None = None, | |
| reasoning_steps: list[str] | None = None, | |
| ) -> QualityScore: | |
| """Evaluate the quality of a response. | |
| Args: | |
| query: Original user query | |
| answer: Generated answer | |
| sources: List of source citations | |
| reasoning_steps: Reasoning steps taken | |
| Returns: | |
| QualityScore with detailed evaluation | |
| """ | |
| feedback = [] | |
| # Evaluate relevance | |
| relevance = self._evaluate_relevance(query, answer) | |
| if relevance < 0.5: | |
| feedback.append("Answer may not be relevant to the question") | |
| # Evaluate completeness | |
| completeness = self._evaluate_completeness(query, answer) | |
| if completeness < 0.5: | |
| feedback.append("Answer appears incomplete") | |
| # Evaluate accuracy (based on source count and reasoning) | |
| accuracy = self._evaluate_accuracy(sources, reasoning_steps) | |
| if accuracy < 0.5: | |
| feedback.append("Accuracy could not be verified with sources") | |
| # Evaluate clarity | |
| clarity = self._evaluate_clarity(answer) | |
| if clarity < 0.5: | |
| feedback.append("Answer could be clearer") | |
| # Evaluate sourcing | |
| sourcing = self._evaluate_sourcing(answer, sources) | |
| if sourcing < 0.5: | |
| feedback.append("More sources would improve credibility") | |
| # Calculate overall score (weighted average) | |
| overall = ( | |
| relevance * 0.25 | |
| + completeness * 0.2 | |
| + accuracy * 0.25 | |
| + clarity * 0.15 | |
| + sourcing * 0.15 | |
| ) | |
| if overall >= self.min_quality_threshold: | |
| feedback.insert(0, "Response meets quality standards") | |
| else: | |
| feedback.insert(0, "Response may need refinement") | |
| return QualityScore( | |
| relevance=relevance, | |
| completeness=completeness, | |
| accuracy=accuracy, | |
| clarity=clarity, | |
| sourcing=sourcing, | |
| overall=overall, | |
| feedback=feedback, | |
| ) | |
| def is_acceptable(self, score: QualityScore) -> bool: | |
| """Check if quality score is acceptable. | |
| Args: | |
| score: Quality score to check | |
| Returns: | |
| True if acceptable | |
| """ | |
| return score.overall >= self.min_quality_threshold | |
| def _evaluate_relevance(self, query: str, answer: str) -> float: | |
| """Evaluate answer relevance to query. | |
| Args: | |
| query: User query | |
| answer: Generated answer | |
| Returns: | |
| Relevance score (0-1) | |
| """ | |
| if not answer: | |
| return 0.0 | |
| # Simple keyword matching | |
| query_words = set(query.lower().split()) | |
| answer_words = set(answer.lower().split()) | |
| # Remove common words | |
| stopwords = {"the", "a", "an", "is", "are", "was", "were", "what", "how", "when", "where", "why", "who"} | |
| query_words -= stopwords | |
| answer_words -= stopwords | |
| if not query_words: | |
| return 0.5 | |
| overlap = len(query_words & answer_words) | |
| return min(1.0, overlap / len(query_words) + 0.3) # Base score + overlap | |
| def _evaluate_completeness(self, query: str, answer: str) -> float: | |
| """Evaluate answer completeness. | |
| Args: | |
| query: User query | |
| answer: Generated answer | |
| Returns: | |
| Completeness score (0-1) | |
| """ | |
| if not answer: | |
| return 0.0 | |
| # Check answer length relative to query complexity | |
| query_words = len(query.split()) | |
| answer_words = len(answer.split()) | |
| # Longer queries typically need longer answers | |
| expected_min = max(20, query_words * 3) | |
| if answer_words < expected_min: | |
| return answer_words / expected_min | |
| # Check for explanation patterns | |
| explanation_markers = ["because", "since", "therefore", "this means", "in other words"] | |
| has_explanation = any(marker in answer.lower() for marker in explanation_markers) | |
| score = 0.7 | |
| if has_explanation: | |
| score += 0.2 | |
| if answer_words > expected_min * 2: | |
| score += 0.1 | |
| return min(1.0, score) | |
| def _evaluate_accuracy( | |
| self, | |
| sources: list[dict[str, str]] | None, | |
| reasoning_steps: list[str] | None, | |
| ) -> float: | |
| """Evaluate estimated accuracy. | |
| Args: | |
| sources: List of sources | |
| reasoning_steps: Reasoning steps | |
| Returns: | |
| Accuracy score (0-1) | |
| """ | |
| score = 0.3 # Base score | |
| # More sources = higher potential accuracy | |
| if sources: | |
| score += min(0.3, len(sources) * 0.1) | |
| # Reasoning steps suggest careful analysis | |
| if reasoning_steps: | |
| score += min(0.3, len(reasoning_steps) * 0.1) | |
| # Cap at 0.9 since we can't truly verify accuracy | |
| return min(0.9, score) | |
| def _evaluate_clarity(self, answer: str) -> float: | |
| """Evaluate answer clarity. | |
| Args: | |
| answer: Generated answer | |
| Returns: | |
| Clarity score (0-1) | |
| """ | |
| if not answer: | |
| return 0.0 | |
| score = 0.5 | |
| # Check sentence structure (average length) | |
| sentences = answer.split(".") | |
| if sentences: | |
| avg_sentence_length = len(answer.split()) / len(sentences) | |
| # Ideal: 15-25 words per sentence | |
| if 10 <= avg_sentence_length <= 30: | |
| score += 0.2 | |
| # Check for structure (paragraphs, lists) | |
| if "\n" in answer: | |
| score += 0.1 | |
| if any(marker in answer for marker in ["-", "•", "1.", "2."]): | |
| score += 0.1 | |
| # Check for hedge words (too many = less clear) | |
| hedge_words = ["might", "perhaps", "maybe", "possibly", "could"] | |
| hedge_count = sum(1 for word in hedge_words if word in answer.lower()) | |
| if hedge_count > 3: | |
| score -= 0.1 | |
| return min(1.0, max(0.0, score)) | |
| def _evaluate_sourcing( | |
| self, | |
| answer: str, | |
| sources: list[dict[str, str]] | None, | |
| ) -> float: | |
| """Evaluate source quality. | |
| Args: | |
| answer: Generated answer | |
| sources: List of sources | |
| Returns: | |
| Sourcing score (0-1) | |
| """ | |
| if not sources: | |
| return 0.2 | |
| score = 0.3 | |
| # More sources = better | |
| source_count = len(sources) | |
| score += min(0.3, source_count * 0.1) | |
| # Check for diverse domains | |
| urls = [s.get("url", "") for s in sources] | |
| domains = set() | |
| for url in urls: | |
| if url: | |
| try: | |
| from urllib.parse import urlparse | |
| domain = urlparse(url).netloc | |
| domains.add(domain) | |
| except Exception: | |
| pass | |
| # Domain diversity | |
| if len(domains) > 1: | |
| score += 0.2 | |
| # Check for reliable domains | |
| reliable_indicators = [".gov", ".edu", "wikipedia.org"] | |
| for url in urls: | |
| if any(ind in url.lower() for ind in reliable_indicators): | |
| score += 0.1 | |
| break | |
| return min(1.0, score) | |