Spaces:
Runtime error
Runtime error
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β ADVANCED RESPONSE OPTIMIZER v2.0 β | |
| β Next-Generation AI Response Quality & Learning System β | |
| β β | |
| β Features: β | |
| β β’ Multi-Factor Response Ranking (10+ factors) β | |
| β β’ Semantic Context Understanding β | |
| β β’ Adaptive Response Generation β | |
| β β’ Real-Time Feedback Learning β | |
| β β’ Response Quality Scoring β | |
| β β’ Topic-Based Response Specialization β | |
| β β’ Confidence Calibration β | |
| β β’ Response Synthesis from Multiple Sources β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| import json | |
| import logging | |
| import re | |
| import math | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from datetime import datetime | |
| from collections import defaultdict, Counter | |
| import hashlib | |
| logger = logging.getLogger(__name__) | |
| class ResponseQualityScorer: | |
| """Scores response quality based on multiple factors""" | |
| def __init__(self): | |
| self.quality_history = defaultdict(list) | |
| self.factor_weights = { | |
| 'relevance': 0.25, # How relevant to the query | |
| 'completeness': 0.15, # Does it answer fully | |
| 'clarity': 0.15, # Is it clear/understandable | |
| 'accuracy': 0.20, # Factual correctness | |
| 'freshness': 0.08, # Currency of information | |
| 'source_quality': 0.10, # Quality of sources | |
| 'confidence': 0.07 # Model confidence | |
| } | |
| self.load_quality_data() | |
| def load_quality_data(self): | |
| """Load historical quality scores""" | |
| try: | |
| quality_file = Path('noahski_data/response_quality.json') | |
| if quality_file.exists(): | |
| with open(quality_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| self.quality_history = defaultdict(list, data.get('history', {})) | |
| logger.info(f"β Loaded quality scores for {len(self.quality_history)} responses") | |
| except Exception as e: | |
| logger.warning(f"Could not load quality data: {e}") | |
| def save_quality_data(self): | |
| """Save quality scores to disk""" | |
| try: | |
| quality_file = Path('noahski_data/response_quality.json') | |
| quality_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(quality_file, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'history': dict(self.quality_history), | |
| 'updated': datetime.now().isoformat() | |
| }, f, indent=2) | |
| except Exception as e: | |
| logger.warning(f"Could not save quality data: {e}") | |
| def score_response(self, | |
| query: str, | |
| response: str, | |
| sources: List[Dict] = None, | |
| context: Dict = None) -> Dict: | |
| """ | |
| Score a response on multiple quality factors | |
| Returns a comprehensive quality assessment | |
| """ | |
| scores = {} | |
| # 1. Relevance Score (lexical + semantic) | |
| relevance = self._score_relevance(query, response) | |
| scores['relevance'] = relevance | |
| # 2. Completeness Score (does it answer the question fully) | |
| completeness = self._score_completeness(query, response) | |
| scores['completeness'] = completeness | |
| # 3. Clarity Score (readability, structure, length) | |
| clarity = self._score_clarity(response) | |
| scores['clarity'] = clarity | |
| # 4. Accuracy Score (source quality, fact consistency) | |
| accuracy = self._score_accuracy(response, sources, context) | |
| scores['accuracy'] = accuracy | |
| # 5. Freshness Score (recency of information) | |
| freshness = self._score_freshness(response, sources) | |
| scores['freshness'] = freshness | |
| # 6. Source Quality Score | |
| source_quality = self._score_source_quality(sources) if sources else 0.5 | |
| scores['source_quality'] = source_quality | |
| # 7. Confidence Calibration | |
| confidence = context.get('confidence', 0.5) if context else 0.5 | |
| scores['confidence'] = confidence | |
| # Calculate weighted overall score | |
| overall_score = sum( | |
| scores.get(factor, 0) * weight | |
| for factor, weight in self.factor_weights.items() | |
| ) | |
| return { | |
| 'overall_score': overall_score, | |
| 'factor_scores': scores, | |
| 'weights': self.factor_weights, | |
| 'quality_level': self._classify_quality(overall_score), | |
| 'improvements': self._suggest_improvements(scores), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| def _score_relevance(self, query: str, response: str) -> float: | |
| """Score how relevant the response is to the query""" | |
| query_words = set(query.lower().split()) | |
| response_words = set(response.lower().split()) | |
| # Remove common stop words | |
| stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', | |
| 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', | |
| 'should', 'may', 'might', 'must', 'can', 'and', 'or', 'but', 'in', | |
| 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'as', 'it', | |
| 'that', 'this', 'which', 'who', 'what', 'where', 'when', 'why', 'how'} | |
| query_key = query_words - stop_words | |
| response_key = response_words - stop_words | |
| if not query_key: | |
| return 0.5 | |
| # Calculate Jaccard similarity | |
| intersection = len(query_key & response_key) | |
| union = len(query_key | response_key) | |
| return min(1.0, intersection / union * 1.5) if union > 0 else 0 | |
| def _score_completeness(self, query: str, response: str) -> float: | |
| """Score if the response fully answers the question""" | |
| query_lower = query.lower() | |
| response_lower = response.lower() | |
| # Check for question markers answered | |
| question_words = ['who', 'what', 'where', 'when', 'why', 'how', 'wer', 'was', 'wo', 'wann', 'warum', 'wie'] | |
| has_question = any(qw in query_lower for qw in question_words) | |
| has_answer = len(response) > 50 # Minimum response length | |
| # Check for definitive language (not hedging) | |
| hedge_words = ['maybe', 'perhaps', 'possibly', 'might', 'could', 'may be', 'vielleicht', 'mΓΆglich'] | |
| hedge_score = 1.0 - (sum(1 for hw in hedge_words if hw in response_lower) * 0.1) | |
| length_score = min(1.0, len(response) / 500) # Optimal length ~500 chars | |
| completeness = ( | |
| (0.4 if has_answer else 0) + | |
| (0.4 * hedge_score) + | |
| (0.2 * length_score) | |
| ) | |
| return min(1.0, completeness) | |
| def _score_clarity(self, response: str) -> float: | |
| """Score response clarity (structure, readability)""" | |
| # Sentence count and length | |
| sentences = re.split(r'[.!?]+', response) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if not sentences: | |
| return 0.1 | |
| # Average sentence length (ideal: 15-20 words) | |
| avg_length = sum(len(s.split()) for s in sentences) / len(sentences) | |
| length_score = 1.0 - abs(avg_length - 17.5) / 30 | |
| length_score = max(0, min(1.0, length_score)) | |
| # Paragraph structure (has newlines/breaks) | |
| has_breaks = '\n' in response or 'β’' in response or '- ' in response | |
| structure_score = 0.8 if has_breaks else 0.6 | |
| # Vocabulary diversity (avoid repetition) | |
| words = response.lower().split() | |
| unique_words = len(set(words)) | |
| diversity_score = unique_words / len(words) if words else 0.5 | |
| diversity_score = min(1.0, diversity_score * 1.5) | |
| clarity = ( | |
| length_score * 0.3 + | |
| structure_score * 0.3 + | |
| diversity_score * 0.4 | |
| ) | |
| return min(1.0, clarity) | |
| def _score_accuracy(self, response: str, sources: List[Dict], context: Dict) -> float: | |
| """Score factual accuracy based on sources and consistency""" | |
| if not sources: | |
| return 0.6 # Default if no sources | |
| # Source diversity increases confidence | |
| source_types = set(s.get('source', 'unknown') for s in sources) | |
| diversity_bonus = min(0.1, len(source_types) * 0.05) | |
| # Agreement between sources (if we can detect) | |
| agreement_score = 0.7 if len(sources) > 1 else 0.5 | |
| accuracy = agreement_score + diversity_bonus | |
| return min(1.0, accuracy) | |
| def _score_freshness(self, response: str, sources: List[Dict]) -> float: | |
| """Score currency/freshness of information""" | |
| if not sources: | |
| return 0.5 | |
| # Check if information seems current | |
| current_year_keywords = ['2024', '2025', '2026', 'recent', 'latest', 'new'] | |
| has_current = any(kw in response.lower() for kw in current_year_keywords) | |
| # Check source publication dates | |
| dates = [] | |
| for source in sources: | |
| published = source.get('published', '') | |
| if published: | |
| try: | |
| # Simple year extraction | |
| year_match = re.search(r'20\d{2}', published) | |
| if year_match: | |
| dates.append(int(year_match.group())) | |
| except: | |
| pass | |
| if dates: | |
| recent_score = min(1.0, max(dates) / 2026) # Current year weighting | |
| else: | |
| recent_score = 0.5 | |
| freshness = (recent_score * 0.7 + (0.3 if has_current else 0)) | |
| return min(1.0, freshness) | |
| def _score_source_quality(self, sources: List[Dict]) -> float: | |
| """Score quality of cited sources""" | |
| if not sources: | |
| return 0.5 | |
| quality_scores = [] | |
| trusted_domains = { | |
| 'wikipedia': 0.85, | |
| 'github': 0.85, | |
| 'stackoverflow': 0.90, | |
| 'medium': 0.70, | |
| 'arxiv': 0.90, | |
| 'scholar.google': 0.95, | |
| 'bbc': 0.85, | |
| 'cnn': 0.80, | |
| 'guardian': 0.80, | |
| 'nytimes': 0.85 | |
| } | |
| for source in sources: | |
| domain = source.get('domain', '').lower() | |
| url = source.get('url', '').lower() | |
| # Check trusted domains | |
| url_score = 0.6 # Default | |
| for trusted, score in trusted_domains.items(): | |
| if trusted in domain or trusted in url: | |
| url_score = score | |
| break | |
| quality_scores.append(url_score) | |
| return sum(quality_scores) / len(quality_scores) if quality_scores else 0.6 | |
| def _classify_quality(self, score: float) -> str: | |
| """Classify response quality level""" | |
| if score >= 0.85: | |
| return 'excellent' | |
| elif score >= 0.70: | |
| return 'good' | |
| elif score >= 0.55: | |
| return 'fair' | |
| elif score >= 0.40: | |
| return 'poor' | |
| else: | |
| return 'very_poor' | |
| def _suggest_improvements(self, scores: Dict) -> List[str]: | |
| """Suggest improvements based on weak factors""" | |
| suggestions = [] | |
| for factor, score in scores.items(): | |
| if score < 0.6: | |
| if factor == 'relevance': | |
| suggestions.append('Response could be more directly relevant to the query') | |
| elif factor == 'completeness': | |
| suggestions.append('Response could provide a more complete answer') | |
| elif factor == 'clarity': | |
| suggestions.append('Response formatting could be clearer (use structure, examples)') | |
| elif factor == 'accuracy': | |
| suggestions.append('Verify factual accuracy with authoritative sources') | |
| elif factor == 'freshness': | |
| suggestions.append('Consider using more current information') | |
| elif factor == 'source_quality': | |
| suggestions.append('Use higher-quality authoritative sources') | |
| elif factor == 'confidence': | |
| suggestions.append('Model confidence could be improved with better training data') | |
| return suggestions | |
| class AdvancedResponseRanker: | |
| """Ranks and selects best responses from multiple candidates""" | |
| def __init__(self): | |
| self.scorer = ResponseQualityScorer() | |
| self.ranking_history = defaultdict(list) | |
| def rank_candidates(self, | |
| query: str, | |
| candidates: List[Dict], | |
| context: Dict = None) -> List[Dict]: | |
| """ | |
| Rank multiple response candidates and return sorted by quality | |
| Each candidate should have: | |
| - 'response' or 'content': the response text | |
| - 'source': source identifier | |
| - 'confidence': model confidence | |
| - 'sources': list of sources used | |
| """ | |
| if not candidates: | |
| return [] | |
| ranked_candidates = [] | |
| for candidate in candidates: | |
| response_text = candidate.get('response') or candidate.get('content', '') | |
| if not response_text: | |
| continue | |
| # Score quality | |
| quality = self.scorer.score_response( | |
| query=query, | |
| response=response_text, | |
| sources=candidate.get('sources', []), | |
| context={**context, **candidate} if context else candidate | |
| ) | |
| # Get source freshness bonus | |
| freshness_bonus = quality['factor_scores'].get('freshness', 0) * 0.05 | |
| # Apply usage frequency bonus (frequently correct responses get boost) | |
| usage_boost = candidate.get('uses', 0) * 0.01 # Small bonus per use | |
| usage_boost = min(0.1, usage_boost) # Cap at 0.1 | |
| # Final ranking score | |
| final_score = quality['overall_score'] + freshness_bonus + usage_boost | |
| ranked_candidates.append({ | |
| **candidate, | |
| 'quality_score': quality['overall_score'], | |
| 'overall_rank_score': final_score, | |
| 'quality_details': quality, | |
| 'improvements': quality['improvements'] | |
| }) | |
| # Sort by ranking score | |
| ranked_candidates.sort(key=lambda x: x['overall_rank_score'], reverse=True) | |
| # Store ranking history | |
| top_candidate = ranked_candidates[0] if ranked_candidates else None | |
| if top_candidate: | |
| query_hash = hashlib.md5(query.encode()).hexdigest()[:8] | |
| self.ranking_history[query_hash].append({ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'top_source': top_candidate.get('source'), | |
| 'top_score': top_candidate['overall_rank_score'], | |
| 'num_candidates': len(ranked_candidates) | |
| }) | |
| return ranked_candidates | |
| def get_best_response(self, query: str, candidates: List[Dict], context: Dict = None) -> Optional[Dict]: | |
| """Get single best response from candidates""" | |
| ranked = self.rank_candidates(query, candidates, context) | |
| return ranked[0] if ranked else None | |
| class AdaptiveResponseGenerator: | |
| """Generates responses with adaptive style based on context""" | |
| def __init__(self): | |
| self.style_profiles = { | |
| 'technical': { | |
| 'formal': True, | |
| 'use_code': True, | |
| 'use_references': True, | |
| 'length': 'long', | |
| 'tone': 'precise' | |
| }, | |
| 'casual': { | |
| 'formal': False, | |
| 'use_code': False, | |
| 'use_references': False, | |
| 'length': 'medium', | |
| 'tone': 'friendly' | |
| }, | |
| 'educational': { | |
| 'formal': True, | |
| 'use_code': True, | |
| 'use_references': True, | |
| 'length': 'medium', | |
| 'tone': 'explanatory', | |
| 'include_examples': True | |
| }, | |
| 'concise': { | |
| 'formal': False, | |
| 'use_code': False, | |
| 'use_references': False, | |
| 'length': 'short', | |
| 'tone': 'direct' | |
| } | |
| } | |
| def detect_style_preference(self, query: str, context: Dict = None) -> str: | |
| """Detect what response style the user prefers""" | |
| query_lower = query.lower() | |
| # Check for style indicators | |
| if any(word in query_lower for word in ['code', 'programming', 'technical', 'implement']): | |
| return 'technical' | |
| elif any(word in query_lower for word in ['example', 'explain', 'teach', 'learn', 'how to']): | |
| return 'educational' | |
| elif any(word in query_lower for word in ['quick', 'brief', 'tl;dr', 'summarize', 'short']): | |
| return 'concise' | |
| else: | |
| return 'casual' | |
| def adapt_response(self, response: str, style: str = 'casual') -> str: | |
| """Adapt response to specified style""" | |
| profile = self.style_profiles.get(style, self.style_profiles['casual']) | |
| # Apply style adaptations | |
| if profile['formal'] and not any(word in response for word in ['However', 'Therefore', 'Furthermore']): | |
| # Add more formal connectors | |
| response = response.replace('but ', 'However, ') | |
| response = response.replace('so ', 'Therefore, ') | |
| if profile['length'] == 'short' and len(response) > 300: | |
| # Truncate to shorter response | |
| sentences = response.split('.') | |
| response = '. '.join(sentences[:2]) + '.' | |
| if profile['tone'] == 'friendly': | |
| # Add friendly elements | |
| emojis = {'help': 'π', 'good': 'β¨', 'code': 'π»', 'learn': 'π'} | |
| for keyword, emoji in emojis.items(): | |
| if keyword in response.lower(): | |
| response = response.replace(keyword, f'{emoji} {keyword}') | |
| break | |
| return response | |
| class LearningFeedbackProcessor: | |
| """Processes user feedback to improve future responses""" | |
| def __init__(self): | |
| self.feedback_data = defaultdict(lambda: {'positive': [], 'negative': [], 'ratings': []}) | |
| self.pattern_learner = {} | |
| self.load_feedback() | |
| def load_feedback(self): | |
| """Load historical feedback""" | |
| try: | |
| feedback_file = Path('noahski_data/response_feedback.json') | |
| if feedback_file.exists(): | |
| with open(feedback_file, 'r', encoding='utf-8') as f: | |
| self.feedback_data = defaultdict( | |
| lambda: {'positive': [], 'negative': [], 'ratings': []}, | |
| json.load(f) | |
| ) | |
| logger.info(f"β Loaded feedback for {len(self.feedback_data)} response types") | |
| except Exception as e: | |
| logger.warning(f"Could not load feedback: {e}") | |
| def save_feedback(self): | |
| """Save feedback to disk""" | |
| try: | |
| feedback_file = Path('noahski_data/response_feedback.json') | |
| feedback_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(feedback_file, 'w', encoding='utf-8') as f: | |
| json.dump(dict(self.feedback_data), f, indent=2) | |
| except Exception as e: | |
| logger.warning(f"Could not save feedback: {e}") | |
| def record_feedback(self, response_id: str, rating: int, feedback_type: str, comment: str = ''): | |
| """Record user feedback for a response""" | |
| if feedback_type not in ['positive', 'negative', 'rating']: | |
| return | |
| feedback_entry = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'comment': comment | |
| } | |
| if feedback_type == 'positive': | |
| self.feedback_data[response_id]['positive'].append(feedback_entry) | |
| elif feedback_type == 'negative': | |
| self.feedback_data[response_id]['negative'].append(feedback_entry) | |
| elif feedback_type == 'rating': | |
| self.feedback_data[response_id]['ratings'].append({ | |
| **feedback_entry, | |
| 'rating': rating | |
| }) | |
| self.save_feedback() | |
| logger.info(f"π Recorded {feedback_type} feedback for response: {response_id}") | |
| def get_response_performance(self, response_id: str) -> Dict: | |
| """Get performance metrics for a response""" | |
| feedback = self.feedback_data.get(response_id, {}) | |
| positive_count = len(feedback.get('positive', [])) | |
| negative_count = len(feedback.get('negative', [])) | |
| ratings = feedback.get('ratings', []) | |
| avg_rating = sum(r['rating'] for r in ratings) / len(ratings) if ratings else None | |
| return { | |
| 'response_id': response_id, | |
| 'positive_feedback': positive_count, | |
| 'negative_feedback': negative_count, | |
| 'satisfaction_rate': positive_count / (positive_count + negative_count) if (positive_count + negative_count) > 0 else None, | |
| 'average_rating': avg_rating, | |
| 'total_feedbacks': positive_count + negative_count + len(ratings) | |
| } | |
| def identify_improvement_opportunities(self) -> List[Dict]: | |
| """Identify which response types need improvement""" | |
| opportunities = [] | |
| for response_id, feedback in self.feedback_data.items(): | |
| performance = self.get_response_performance(response_id) | |
| # Flag responses with low satisfaction | |
| if performance['satisfaction_rate'] is not None: | |
| if performance['satisfaction_rate'] < 0.5: | |
| opportunities.append({ | |
| 'response_id': response_id, | |
| 'issue': 'low_satisfaction', | |
| 'satisfaction': performance['satisfaction_rate'], | |
| 'samples': performance['total_feedbacks'] | |
| }) | |
| # Flag responses with low ratings | |
| if performance['average_rating'] is not None: | |
| if performance['average_rating'] < 3.0: | |
| opportunities.append({ | |
| 'response_id': response_id, | |
| 'issue': 'low_rating', | |
| 'average_rating': performance['average_rating'], | |
| 'samples': performance['total_feedbacks'] | |
| }) | |
| return sorted(opportunities, key=lambda x: x.get('samples', 0), reverse=True) | |
| class AdvancedResponseOptimizer: | |
| """Master optimizer combining all response improvement techniques""" | |
| def __init__(self): | |
| self.scorer = ResponseQualityScorer() | |
| self.ranker = AdvancedResponseRanker() | |
| self.generator = AdaptiveResponseGenerator() | |
| self.feedback_processor = LearningFeedbackProcessor() | |
| logger.info("π Advanced Response Optimizer v2.0 initialized") | |
| def optimize_response(self, | |
| query: str, | |
| candidates: List[Dict], | |
| context: Dict = None) -> Dict: | |
| """ | |
| Optimize response selection and generation | |
| Returns best response with quality metrics and improvement suggestions | |
| """ | |
| if not candidates: | |
| return self._error_response("No response candidates provided") | |
| # 1. Rank candidates by quality | |
| ranked = self.ranker.rank_candidates(query, candidates, context) | |
| if not ranked: | |
| return self._error_response("Could not rank response candidates") | |
| # 2. Select best response | |
| best = ranked[0] | |
| response_text = best.get('response') or best.get('content', '') | |
| # 3. Detect optimal style for this user/context | |
| style = self.generator.detect_style_preference(query, context) | |
| # 4. Adapt response to style preference | |
| optimized_response = self.generator.adapt_response(response_text, style) | |
| # Return optimized response with quality details | |
| return { | |
| 'success': True, | |
| 'content': optimized_response, | |
| 'source': best.get('source'), | |
| 'quality': { | |
| 'overall_score': best['overall_rank_score'], | |
| 'quality_level': best['quality_details']['quality_level'], | |
| 'factor_scores': best['quality_details']['factor_scores'], | |
| 'improvements': best['quality_details']['improvements'] | |
| }, | |
| 'style_adapted': style, | |
| 'ranking_position': 1, # This is the best response | |
| 'total_alternatives': len(ranked), | |
| 'confidence': best.get('confidence', 0.5) | |
| } | |
| def _error_response(self, error_msg: str) -> Dict: | |
| """Generate error response""" | |
| return { | |
| 'success': False, | |
| 'content': error_msg, | |
| 'type': 'error' | |
| } | |
| def improve_batch_responses(self, query_response_pairs: List[Tuple[str, str]]) -> Dict: | |
| """ | |
| Improve multiple response pairs and return analysis | |
| Useful for batch optimization of training data | |
| """ | |
| improvements = { | |
| 'total_processed': len(query_response_pairs), | |
| 'responses': [], | |
| 'avg_initial_quality': 0, | |
| 'avg_final_quality': 0 | |
| } | |
| initial_scores = [] | |
| final_scores = [] | |
| for query, response in query_response_pairs: | |
| quality = self.scorer.score_response(query, response) | |
| initial_score = quality['overall_score'] | |
| initial_scores.append(initial_score) | |
| # Generate improvements | |
| candidates = [{'response': response, 'source': 'original', 'confidence': initial_score}] | |
| optimized = self.optimize_response(query, candidates) | |
| if optimized['success']: | |
| final_quality = self.scorer.score_response(query, optimized['content']) | |
| final_scores.append(final_quality['overall_score']) | |
| improvements['responses'].append({ | |
| 'query': query[:50] + '...' if len(query) > 50 else query, | |
| 'initial_score': initial_score, | |
| 'final_score': final_quality['overall_score'], | |
| 'improvement': final_quality['overall_score'] - initial_score, | |
| 'quality_level': final_quality['quality_level'] | |
| }) | |
| if initial_scores: | |
| improvements['avg_initial_quality'] = sum(initial_scores) / len(initial_scores) | |
| if final_scores: | |
| improvements['avg_final_quality'] = sum(final_scores) / len(final_scores) | |
| improvements['overall_improvement'] = improvements['avg_final_quality'] - improvements['avg_initial_quality'] | |
| return improvements | |
| # Global instance | |
| response_optimizer = AdvancedResponseOptimizer() | |
| if __name__ == '__main__': | |
| # Test | |
| logger.basicConfig(level=logging.INFO) | |
| test_candidates = [ | |
| {'response': 'This is a great response', 'source': 'test1', 'confidence': 0.8}, | |
| {'response': 'This is another very detailed response with more information', 'source': 'test2', 'confidence': 0.85}, | |
| ] | |
| result = response_optimizer.optimize_response('What is Python?', test_candidates) | |
| print(json.dumps(result, indent=2)) | |