| | """
|
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| | β ADVANCED RESPONSE OPTIMIZER v2.0 β
|
| | β Next-Generation AI Response Quality & Learning System β
|
| | β β
|
| | β Features: β
|
| | β β’ Multi-Factor Response Ranking (10+ factors) β
|
| | β β’ Semantic Context Understanding β
|
| | β β’ Adaptive Response Generation β
|
| | β β’ Real-Time Feedback Learning β
|
| | β β’ Response Quality Scoring β
|
| | β β’ Topic-Based Response Specialization β
|
| | β β’ Confidence Calibration β
|
| | β β’ Response Synthesis from Multiple Sources β
|
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| | """
|
| |
|
| | import json
|
| | import logging
|
| | import re
|
| | import math
|
| | from pathlib import Path
|
| | from typing import Dict, List, Tuple, Optional, Any
|
| | from datetime import datetime
|
| | from collections import defaultdict, Counter
|
| | import hashlib
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class ResponseQualityScorer:
|
| | """Scores response quality based on multiple factors"""
|
| |
|
| | def __init__(self):
|
| | self.quality_history = defaultdict(list)
|
| | self.factor_weights = {
|
| | 'relevance': 0.25,
|
| | 'completeness': 0.15,
|
| | 'clarity': 0.15,
|
| | 'accuracy': 0.20,
|
| | 'freshness': 0.08,
|
| | 'source_quality': 0.10,
|
| | 'confidence': 0.07
|
| | }
|
| | self.load_quality_data()
|
| |
|
| | def load_quality_data(self):
|
| | """Load historical quality scores"""
|
| | try:
|
| | quality_file = Path('noahski_data/response_quality.json')
|
| | if quality_file.exists():
|
| | with open(quality_file, 'r', encoding='utf-8') as f:
|
| | data = json.load(f)
|
| | self.quality_history = defaultdict(list, data.get('history', {}))
|
| | logger.info(f"β
Loaded quality scores for {len(self.quality_history)} responses")
|
| | except Exception as e:
|
| | logger.warning(f"Could not load quality data: {e}")
|
| |
|
| | def save_quality_data(self):
|
| | """Save quality scores to disk"""
|
| | try:
|
| | quality_file = Path('noahski_data/response_quality.json')
|
| | quality_file.parent.mkdir(parents=True, exist_ok=True)
|
| | with open(quality_file, 'w', encoding='utf-8') as f:
|
| | json.dump({
|
| | 'history': dict(self.quality_history),
|
| | 'updated': datetime.now().isoformat()
|
| | }, f, indent=2)
|
| | except Exception as e:
|
| | logger.warning(f"Could not save quality data: {e}")
|
| |
|
| | def score_response(self,
|
| | query: str,
|
| | response: str,
|
| | sources: List[Dict] = None,
|
| | context: Dict = None) -> Dict:
|
| | """
|
| | Score a response on multiple quality factors
|
| | Returns a comprehensive quality assessment
|
| | """
|
| | scores = {}
|
| |
|
| |
|
| | relevance = self._score_relevance(query, response)
|
| | scores['relevance'] = relevance
|
| |
|
| |
|
| | completeness = self._score_completeness(query, response)
|
| | scores['completeness'] = completeness
|
| |
|
| |
|
| | clarity = self._score_clarity(response)
|
| | scores['clarity'] = clarity
|
| |
|
| |
|
| | accuracy = self._score_accuracy(response, sources, context)
|
| | scores['accuracy'] = accuracy
|
| |
|
| |
|
| | freshness = self._score_freshness(response, sources)
|
| | scores['freshness'] = freshness
|
| |
|
| |
|
| | source_quality = self._score_source_quality(sources) if sources else 0.5
|
| | scores['source_quality'] = source_quality
|
| |
|
| |
|
| | confidence = context.get('confidence', 0.5) if context else 0.5
|
| | scores['confidence'] = confidence
|
| |
|
| |
|
| | overall_score = sum(
|
| | scores.get(factor, 0) * weight
|
| | for factor, weight in self.factor_weights.items()
|
| | )
|
| |
|
| | return {
|
| | 'overall_score': overall_score,
|
| | 'factor_scores': scores,
|
| | 'weights': self.factor_weights,
|
| | 'quality_level': self._classify_quality(overall_score),
|
| | 'improvements': self._suggest_improvements(scores),
|
| | 'timestamp': datetime.now().isoformat()
|
| | }
|
| |
|
| | def _score_relevance(self, query: str, response: str) -> float:
|
| | """Score how relevant the response is to the query"""
|
| | query_words = set(query.lower().split())
|
| | response_words = set(response.lower().split())
|
| |
|
| |
|
| | stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
|
| | 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could',
|
| | 'should', 'may', 'might', 'must', 'can', 'and', 'or', 'but', 'in',
|
| | 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'as', 'it',
|
| | 'that', 'this', 'which', 'who', 'what', 'where', 'when', 'why', 'how'}
|
| |
|
| | query_key = query_words - stop_words
|
| | response_key = response_words - stop_words
|
| |
|
| | if not query_key:
|
| | return 0.5
|
| |
|
| |
|
| | intersection = len(query_key & response_key)
|
| | union = len(query_key | response_key)
|
| |
|
| | return min(1.0, intersection / union * 1.5) if union > 0 else 0
|
| |
|
| | def _score_completeness(self, query: str, response: str) -> float:
|
| | """Score if the response fully answers the question"""
|
| | query_lower = query.lower()
|
| | response_lower = response.lower()
|
| |
|
| |
|
| | question_words = ['who', 'what', 'where', 'when', 'why', 'how', 'wer', 'was', 'wo', 'wann', 'warum', 'wie']
|
| |
|
| | has_question = any(qw in query_lower for qw in question_words)
|
| | has_answer = len(response) > 50
|
| |
|
| |
|
| | hedge_words = ['maybe', 'perhaps', 'possibly', 'might', 'could', 'may be', 'vielleicht', 'mΓΆglich']
|
| | hedge_score = 1.0 - (sum(1 for hw in hedge_words if hw in response_lower) * 0.1)
|
| |
|
| | length_score = min(1.0, len(response) / 500)
|
| |
|
| | completeness = (
|
| | (0.4 if has_answer else 0) +
|
| | (0.4 * hedge_score) +
|
| | (0.2 * length_score)
|
| | )
|
| |
|
| | return min(1.0, completeness)
|
| |
|
| | def _score_clarity(self, response: str) -> float:
|
| | """Score response clarity (structure, readability)"""
|
| |
|
| | sentences = re.split(r'[.!?]+', response)
|
| | sentences = [s.strip() for s in sentences if s.strip()]
|
| |
|
| | if not sentences:
|
| | return 0.1
|
| |
|
| |
|
| | avg_length = sum(len(s.split()) for s in sentences) / len(sentences)
|
| | length_score = 1.0 - abs(avg_length - 17.5) / 30
|
| | length_score = max(0, min(1.0, length_score))
|
| |
|
| |
|
| | has_breaks = '\n' in response or 'β’' in response or '- ' in response
|
| | structure_score = 0.8 if has_breaks else 0.6
|
| |
|
| |
|
| | words = response.lower().split()
|
| | unique_words = len(set(words))
|
| | diversity_score = unique_words / len(words) if words else 0.5
|
| | diversity_score = min(1.0, diversity_score * 1.5)
|
| |
|
| | clarity = (
|
| | length_score * 0.3 +
|
| | structure_score * 0.3 +
|
| | diversity_score * 0.4
|
| | )
|
| |
|
| | return min(1.0, clarity)
|
| |
|
| | def _score_accuracy(self, response: str, sources: List[Dict], context: Dict) -> float:
|
| | """Score factual accuracy based on sources and consistency"""
|
| | if not sources:
|
| | return 0.6
|
| |
|
| |
|
| | source_types = set(s.get('source', 'unknown') for s in sources)
|
| | diversity_bonus = min(0.1, len(source_types) * 0.05)
|
| |
|
| |
|
| | agreement_score = 0.7 if len(sources) > 1 else 0.5
|
| |
|
| | accuracy = agreement_score + diversity_bonus
|
| | return min(1.0, accuracy)
|
| |
|
| | def _score_freshness(self, response: str, sources: List[Dict]) -> float:
|
| | """Score currency/freshness of information"""
|
| | if not sources:
|
| | return 0.5
|
| |
|
| |
|
| | current_year_keywords = ['2024', '2025', '2026', 'recent', 'latest', 'new']
|
| | has_current = any(kw in response.lower() for kw in current_year_keywords)
|
| |
|
| |
|
| | dates = []
|
| | for source in sources:
|
| | published = source.get('published', '')
|
| | if published:
|
| | try:
|
| |
|
| | year_match = re.search(r'20\d{2}', published)
|
| | if year_match:
|
| | dates.append(int(year_match.group()))
|
| | except:
|
| | pass
|
| |
|
| | if dates:
|
| | recent_score = min(1.0, max(dates) / 2026)
|
| | else:
|
| | recent_score = 0.5
|
| |
|
| | freshness = (recent_score * 0.7 + (0.3 if has_current else 0))
|
| | return min(1.0, freshness)
|
| |
|
| | def _score_source_quality(self, sources: List[Dict]) -> float:
|
| | """Score quality of cited sources"""
|
| | if not sources:
|
| | return 0.5
|
| |
|
| | quality_scores = []
|
| |
|
| | trusted_domains = {
|
| | 'wikipedia': 0.85,
|
| | 'github': 0.85,
|
| | 'stackoverflow': 0.90,
|
| | 'medium': 0.70,
|
| | 'arxiv': 0.90,
|
| | 'scholar.google': 0.95,
|
| | 'bbc': 0.85,
|
| | 'cnn': 0.80,
|
| | 'guardian': 0.80,
|
| | 'nytimes': 0.85
|
| | }
|
| |
|
| | for source in sources:
|
| | domain = source.get('domain', '').lower()
|
| | url = source.get('url', '').lower()
|
| |
|
| |
|
| | url_score = 0.6
|
| | for trusted, score in trusted_domains.items():
|
| | if trusted in domain or trusted in url:
|
| | url_score = score
|
| | break
|
| |
|
| | quality_scores.append(url_score)
|
| |
|
| | return sum(quality_scores) / len(quality_scores) if quality_scores else 0.6
|
| |
|
| | def _classify_quality(self, score: float) -> str:
|
| | """Classify response quality level"""
|
| | if score >= 0.85:
|
| | return 'excellent'
|
| | elif score >= 0.70:
|
| | return 'good'
|
| | elif score >= 0.55:
|
| | return 'fair'
|
| | elif score >= 0.40:
|
| | return 'poor'
|
| | else:
|
| | return 'very_poor'
|
| |
|
| | def _suggest_improvements(self, scores: Dict) -> List[str]:
|
| | """Suggest improvements based on weak factors"""
|
| | suggestions = []
|
| |
|
| | for factor, score in scores.items():
|
| | if score < 0.6:
|
| | if factor == 'relevance':
|
| | suggestions.append('Response could be more directly relevant to the query')
|
| | elif factor == 'completeness':
|
| | suggestions.append('Response could provide a more complete answer')
|
| | elif factor == 'clarity':
|
| | suggestions.append('Response formatting could be clearer (use structure, examples)')
|
| | elif factor == 'accuracy':
|
| | suggestions.append('Verify factual accuracy with authoritative sources')
|
| | elif factor == 'freshness':
|
| | suggestions.append('Consider using more current information')
|
| | elif factor == 'source_quality':
|
| | suggestions.append('Use higher-quality authoritative sources')
|
| | elif factor == 'confidence':
|
| | suggestions.append('Model confidence could be improved with better training data')
|
| |
|
| | return suggestions
|
| |
|
| |
|
| | class AdvancedResponseRanker:
|
| | """Ranks and selects best responses from multiple candidates"""
|
| |
|
| | def __init__(self):
|
| | self.scorer = ResponseQualityScorer()
|
| | self.ranking_history = defaultdict(list)
|
| |
|
| | def rank_candidates(self,
|
| | query: str,
|
| | candidates: List[Dict],
|
| | context: Dict = None) -> List[Dict]:
|
| | """
|
| | Rank multiple response candidates and return sorted by quality
|
| |
|
| | Each candidate should have:
|
| | - 'response' or 'content': the response text
|
| | - 'source': source identifier
|
| | - 'confidence': model confidence
|
| | - 'sources': list of sources used
|
| | """
|
| |
|
| | if not candidates:
|
| | return []
|
| |
|
| | ranked_candidates = []
|
| |
|
| | for candidate in candidates:
|
| | response_text = candidate.get('response') or candidate.get('content', '')
|
| |
|
| | if not response_text:
|
| | continue
|
| |
|
| |
|
| | quality = self.scorer.score_response(
|
| | query=query,
|
| | response=response_text,
|
| | sources=candidate.get('sources', []),
|
| | context={**context, **candidate} if context else candidate
|
| | )
|
| |
|
| |
|
| | freshness_bonus = quality['factor_scores'].get('freshness', 0) * 0.05
|
| |
|
| |
|
| | usage_boost = candidate.get('uses', 0) * 0.01
|
| | usage_boost = min(0.1, usage_boost)
|
| |
|
| |
|
| | final_score = quality['overall_score'] + freshness_bonus + usage_boost
|
| |
|
| | ranked_candidates.append({
|
| | **candidate,
|
| | 'quality_score': quality['overall_score'],
|
| | 'overall_rank_score': final_score,
|
| | 'quality_details': quality,
|
| | 'improvements': quality['improvements']
|
| | })
|
| |
|
| |
|
| | ranked_candidates.sort(key=lambda x: x['overall_rank_score'], reverse=True)
|
| |
|
| |
|
| | top_candidate = ranked_candidates[0] if ranked_candidates else None
|
| | if top_candidate:
|
| | query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
|
| | self.ranking_history[query_hash].append({
|
| | 'timestamp': datetime.now().isoformat(),
|
| | 'top_source': top_candidate.get('source'),
|
| | 'top_score': top_candidate['overall_rank_score'],
|
| | 'num_candidates': len(ranked_candidates)
|
| | })
|
| |
|
| | return ranked_candidates
|
| |
|
| | def get_best_response(self, query: str, candidates: List[Dict], context: Dict = None) -> Optional[Dict]:
|
| | """Get single best response from candidates"""
|
| | ranked = self.rank_candidates(query, candidates, context)
|
| | return ranked[0] if ranked else None
|
| |
|
| |
|
| | class AdaptiveResponseGenerator:
|
| | """Generates responses with adaptive style based on context"""
|
| |
|
| | def __init__(self):
|
| | self.style_profiles = {
|
| | 'technical': {
|
| | 'formal': True,
|
| | 'use_code': True,
|
| | 'use_references': True,
|
| | 'length': 'long',
|
| | 'tone': 'precise'
|
| | },
|
| | 'casual': {
|
| | 'formal': False,
|
| | 'use_code': False,
|
| | 'use_references': False,
|
| | 'length': 'medium',
|
| | 'tone': 'friendly'
|
| | },
|
| | 'educational': {
|
| | 'formal': True,
|
| | 'use_code': True,
|
| | 'use_references': True,
|
| | 'length': 'medium',
|
| | 'tone': 'explanatory',
|
| | 'include_examples': True
|
| | },
|
| | 'concise': {
|
| | 'formal': False,
|
| | 'use_code': False,
|
| | 'use_references': False,
|
| | 'length': 'short',
|
| | 'tone': 'direct'
|
| | }
|
| | }
|
| |
|
| | def detect_style_preference(self, query: str, context: Dict = None) -> str:
|
| | """Detect what response style the user prefers"""
|
| | query_lower = query.lower()
|
| |
|
| |
|
| | if any(word in query_lower for word in ['code', 'programming', 'technical', 'implement']):
|
| | return 'technical'
|
| | elif any(word in query_lower for word in ['example', 'explain', 'teach', 'learn', 'how to']):
|
| | return 'educational'
|
| | elif any(word in query_lower for word in ['quick', 'brief', 'tl;dr', 'summarize', 'short']):
|
| | return 'concise'
|
| | else:
|
| | return 'casual'
|
| |
|
| | def adapt_response(self, response: str, style: str = 'casual') -> str:
|
| | """Adapt response to specified style"""
|
| | profile = self.style_profiles.get(style, self.style_profiles['casual'])
|
| |
|
| |
|
| | if profile['formal'] and not any(word in response for word in ['However', 'Therefore', 'Furthermore']):
|
| |
|
| | response = response.replace('but ', 'However, ')
|
| | response = response.replace('so ', 'Therefore, ')
|
| |
|
| | if profile['length'] == 'short' and len(response) > 300:
|
| |
|
| | sentences = response.split('.')
|
| | response = '. '.join(sentences[:2]) + '.'
|
| |
|
| | if profile['tone'] == 'friendly':
|
| |
|
| | emojis = {'help': 'π', 'good': 'β¨', 'code': 'π»', 'learn': 'π'}
|
| | for keyword, emoji in emojis.items():
|
| | if keyword in response.lower():
|
| | response = response.replace(keyword, f'{emoji} {keyword}')
|
| | break
|
| |
|
| | return response
|
| |
|
| |
|
| | class LearningFeedbackProcessor:
|
| | """Processes user feedback to improve future responses"""
|
| |
|
| | def __init__(self):
|
| | self.feedback_data = defaultdict(lambda: {'positive': [], 'negative': [], 'ratings': []})
|
| | self.pattern_learner = {}
|
| | self.load_feedback()
|
| |
|
| | def load_feedback(self):
|
| | """Load historical feedback"""
|
| | try:
|
| | feedback_file = Path('noahski_data/response_feedback.json')
|
| | if feedback_file.exists():
|
| | with open(feedback_file, 'r', encoding='utf-8') as f:
|
| | self.feedback_data = defaultdict(
|
| | lambda: {'positive': [], 'negative': [], 'ratings': []},
|
| | json.load(f)
|
| | )
|
| | logger.info(f"β
Loaded feedback for {len(self.feedback_data)} response types")
|
| | except Exception as e:
|
| | logger.warning(f"Could not load feedback: {e}")
|
| |
|
| | def save_feedback(self):
|
| | """Save feedback to disk"""
|
| | try:
|
| | feedback_file = Path('noahski_data/response_feedback.json')
|
| | feedback_file.parent.mkdir(parents=True, exist_ok=True)
|
| | with open(feedback_file, 'w', encoding='utf-8') as f:
|
| | json.dump(dict(self.feedback_data), f, indent=2)
|
| | except Exception as e:
|
| | logger.warning(f"Could not save feedback: {e}")
|
| |
|
| | def record_feedback(self, response_id: str, rating: int, feedback_type: str, comment: str = ''):
|
| | """Record user feedback for a response"""
|
| | if feedback_type not in ['positive', 'negative', 'rating']:
|
| | return
|
| |
|
| | feedback_entry = {
|
| | 'timestamp': datetime.now().isoformat(),
|
| | 'comment': comment
|
| | }
|
| |
|
| | if feedback_type == 'positive':
|
| | self.feedback_data[response_id]['positive'].append(feedback_entry)
|
| | elif feedback_type == 'negative':
|
| | self.feedback_data[response_id]['negative'].append(feedback_entry)
|
| | elif feedback_type == 'rating':
|
| | self.feedback_data[response_id]['ratings'].append({
|
| | **feedback_entry,
|
| | 'rating': rating
|
| | })
|
| |
|
| | self.save_feedback()
|
| | logger.info(f"π Recorded {feedback_type} feedback for response: {response_id}")
|
| |
|
| | def get_response_performance(self, response_id: str) -> Dict:
|
| | """Get performance metrics for a response"""
|
| | feedback = self.feedback_data.get(response_id, {})
|
| |
|
| | positive_count = len(feedback.get('positive', []))
|
| | negative_count = len(feedback.get('negative', []))
|
| | ratings = feedback.get('ratings', [])
|
| |
|
| | avg_rating = sum(r['rating'] for r in ratings) / len(ratings) if ratings else None
|
| |
|
| | return {
|
| | 'response_id': response_id,
|
| | 'positive_feedback': positive_count,
|
| | 'negative_feedback': negative_count,
|
| | 'satisfaction_rate': positive_count / (positive_count + negative_count) if (positive_count + negative_count) > 0 else None,
|
| | 'average_rating': avg_rating,
|
| | 'total_feedbacks': positive_count + negative_count + len(ratings)
|
| | }
|
| |
|
| | def identify_improvement_opportunities(self) -> List[Dict]:
|
| | """Identify which response types need improvement"""
|
| | opportunities = []
|
| |
|
| | for response_id, feedback in self.feedback_data.items():
|
| | performance = self.get_response_performance(response_id)
|
| |
|
| |
|
| | if performance['satisfaction_rate'] is not None:
|
| | if performance['satisfaction_rate'] < 0.5:
|
| | opportunities.append({
|
| | 'response_id': response_id,
|
| | 'issue': 'low_satisfaction',
|
| | 'satisfaction': performance['satisfaction_rate'],
|
| | 'samples': performance['total_feedbacks']
|
| | })
|
| |
|
| |
|
| | if performance['average_rating'] is not None:
|
| | if performance['average_rating'] < 3.0:
|
| | opportunities.append({
|
| | 'response_id': response_id,
|
| | 'issue': 'low_rating',
|
| | 'average_rating': performance['average_rating'],
|
| | 'samples': performance['total_feedbacks']
|
| | })
|
| |
|
| | return sorted(opportunities, key=lambda x: x.get('samples', 0), reverse=True)
|
| |
|
| |
|
| | class AdvancedResponseOptimizer:
|
| | """Master optimizer combining all response improvement techniques"""
|
| |
|
| | def __init__(self):
|
| | self.scorer = ResponseQualityScorer()
|
| | self.ranker = AdvancedResponseRanker()
|
| | self.generator = AdaptiveResponseGenerator()
|
| | self.feedback_processor = LearningFeedbackProcessor()
|
| | logger.info("π Advanced Response Optimizer v2.0 initialized")
|
| |
|
| | def optimize_response(self,
|
| | query: str,
|
| | candidates: List[Dict],
|
| | context: Dict = None) -> Dict:
|
| | """
|
| | Optimize response selection and generation
|
| | Returns best response with quality metrics and improvement suggestions
|
| | """
|
| |
|
| | if not candidates:
|
| | return self._error_response("No response candidates provided")
|
| |
|
| |
|
| | ranked = self.ranker.rank_candidates(query, candidates, context)
|
| |
|
| | if not ranked:
|
| | return self._error_response("Could not rank response candidates")
|
| |
|
| |
|
| | best = ranked[0]
|
| | response_text = best.get('response') or best.get('content', '')
|
| |
|
| |
|
| | style = self.generator.detect_style_preference(query, context)
|
| |
|
| |
|
| | optimized_response = self.generator.adapt_response(response_text, style)
|
| |
|
| |
|
| | return {
|
| | 'success': True,
|
| | 'content': optimized_response,
|
| | 'source': best.get('source'),
|
| | 'quality': {
|
| | 'overall_score': best['overall_rank_score'],
|
| | 'quality_level': best['quality_details']['quality_level'],
|
| | 'factor_scores': best['quality_details']['factor_scores'],
|
| | 'improvements': best['quality_details']['improvements']
|
| | },
|
| | 'style_adapted': style,
|
| | 'ranking_position': 1,
|
| | 'total_alternatives': len(ranked),
|
| | 'confidence': best.get('confidence', 0.5)
|
| | }
|
| |
|
| | def _error_response(self, error_msg: str) -> Dict:
|
| | """Generate error response"""
|
| | return {
|
| | 'success': False,
|
| | 'content': error_msg,
|
| | 'type': 'error'
|
| | }
|
| |
|
| | def improve_batch_responses(self, query_response_pairs: List[Tuple[str, str]]) -> Dict:
|
| | """
|
| | Improve multiple response pairs and return analysis
|
| | Useful for batch optimization of training data
|
| | """
|
| | improvements = {
|
| | 'total_processed': len(query_response_pairs),
|
| | 'responses': [],
|
| | 'avg_initial_quality': 0,
|
| | 'avg_final_quality': 0
|
| | }
|
| |
|
| | initial_scores = []
|
| | final_scores = []
|
| |
|
| | for query, response in query_response_pairs:
|
| | quality = self.scorer.score_response(query, response)
|
| | initial_score = quality['overall_score']
|
| | initial_scores.append(initial_score)
|
| |
|
| |
|
| | candidates = [{'response': response, 'source': 'original', 'confidence': initial_score}]
|
| | optimized = self.optimize_response(query, candidates)
|
| |
|
| | if optimized['success']:
|
| | final_quality = self.scorer.score_response(query, optimized['content'])
|
| | final_scores.append(final_quality['overall_score'])
|
| |
|
| | improvements['responses'].append({
|
| | 'query': query[:50] + '...' if len(query) > 50 else query,
|
| | 'initial_score': initial_score,
|
| | 'final_score': final_quality['overall_score'],
|
| | 'improvement': final_quality['overall_score'] - initial_score,
|
| | 'quality_level': final_quality['quality_level']
|
| | })
|
| |
|
| | if initial_scores:
|
| | improvements['avg_initial_quality'] = sum(initial_scores) / len(initial_scores)
|
| | if final_scores:
|
| | improvements['avg_final_quality'] = sum(final_scores) / len(final_scores)
|
| | improvements['overall_improvement'] = improvements['avg_final_quality'] - improvements['avg_initial_quality']
|
| |
|
| | return improvements
|
| |
|
| |
|
| |
|
| | response_optimizer = AdvancedResponseOptimizer()
|
| |
|
| | if __name__ == '__main__':
|
| |
|
| | logger.basicConfig(level=logging.INFO)
|
| |
|
| | test_candidates = [
|
| | {'response': 'This is a great response', 'source': 'test1', 'confidence': 0.8},
|
| | {'response': 'This is another very detailed response with more information', 'source': 'test2', 'confidence': 0.85},
|
| | ]
|
| |
|
| | result = response_optimizer.optimize_response('What is Python?', test_candidates)
|
| | print(json.dumps(result, indent=2))
|
| |
|