Spaces:
Sleeping
Sleeping
| import time | |
| from typing import Dict, Any, List, Optional | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sentence_transformers import SentenceTransformer | |
| def calculate_confidence(answer: str) -> float: | |
| """ | |
| Calculate confidence score for generated answer. | |
| """ | |
| confidence = 1.0 | |
| # Reduce confidence for uncertain language | |
| uncertainty_markers = [ | |
| "might", "may", "could", "possibly", "perhaps", | |
| "I think", "probably", "likely", "seems", "appears" | |
| ] | |
| for marker in uncertainty_markers: | |
| if marker in answer.lower(): | |
| confidence *= 0.9 | |
| # Reduce confidence for very short or very long answers | |
| words = answer.split() | |
| if len(words) < 5: | |
| confidence *= 0.8 | |
| elif len(words) > 100: | |
| confidence *= 0.9 | |
| # Reduce confidence if answer contains non-financial terms | |
| financial_terms = [ | |
| "revenue", "profit", "loss", "income", "expense", | |
| "asset", "liability", "equity", "cash", "stock", | |
| "share", "dividend", "market", "financial", "fiscal" | |
| ] | |
| if not any(term in answer.lower() for term in financial_terms): | |
| confidence *= 0.7 | |
| return max(0.1, min(confidence, 1.0)) | |
| def calculate_semantic_similarity(text1: str, text2: str) -> float: | |
| """ | |
| Calculate semantic similarity between two texts using sentence embeddings. | |
| """ | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| embeddings = model.encode([text1, text2]) | |
| similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0] | |
| return float(similarity) | |
| def evaluate_response(query: str, answer: str, chunks: Optional[List[str]] = None) -> Dict[str, Any]: | |
| """ | |
| Evaluate the quality of the generated response. | |
| """ | |
| confidence = calculate_confidence(answer) | |
| metrics = { | |
| "confidence": confidence, | |
| "answer_length": len(answer.split()), | |
| "query_length": len(query.split()), | |
| } | |
| if chunks: | |
| metrics["num_chunks"] = len(chunks) | |
| # Calculate chunk relevance score | |
| if len(chunks) > 0: | |
| # Split query into terms, excluding common words | |
| query_terms = [term.lower() for term in query.split() | |
| if term.lower() not in {'what', 'was', 'is', 'are', 'in', 'the', 'a', 'an', 'and', 'or'}] | |
| # Calculate relevance for each chunk | |
| chunk_scores = [] | |
| for chunk in chunks: | |
| chunk_lower = chunk.lower() | |
| matches = sum(1 for term in query_terms if term in chunk_lower) | |
| chunk_scores.append(matches / len(query_terms) if query_terms else 0) | |
| # Take average of chunk scores | |
| metrics["chunk_relevance"] = sum(chunk_scores) / len(chunks) | |
| else: | |
| metrics["chunk_relevance"] = 0.0 | |
| return metrics | |
| def evaluate_models(questions: List[str], answers: List[str], rag_fn, ft_fn) -> List[Dict]: | |
| """ | |
| Evaluate and compare RAG and fine-tuned models. | |
| """ | |
| results = [] | |
| for q, a in zip(questions, answers): | |
| start = time.time() | |
| rag_answer = rag_fn(q) | |
| rag_time = time.time() - start | |
| start = time.time() | |
| ft_answer = ft_fn(q) | |
| ft_time = time.time() - start | |
| results.append({ | |
| "question": q, | |
| "ground_truth": a, | |
| "rag_answer": rag_answer, | |
| "rag_time": round(rag_time, 2), | |
| "ft_answer": ft_answer, | |
| "ft_time": round(ft_time, 2) | |
| }) | |
| return results | |