Spaces:
Sleeping
Sleeping
| import json | |
| import sys | |
| import os | |
| import time | |
| from dotenv import load_dotenv | |
| from collections import defaultdict | |
| # Environment değişkenlerini yükle | |
| load_dotenv() | |
| from langchain_core.messages import HumanMessage | |
| from agents.agent import build_graph, analyze_question_type | |
| def load_test_questions(file_path="data/metadata.jsonl"): | |
| """Load test questions from metadata.jsonl file""" | |
| questions = [] | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| data = json.loads(line) | |
| questions.append({ | |
| "task_id": data["task_id"], | |
| "question": data["Question"], | |
| "expected_answer": data["Final answer"], | |
| "level": data["Level"], | |
| "tools": data.get("Annotator Metadata", {}).get("Tools", ""), | |
| "steps": data.get("Annotator Metadata", {}).get("Number of steps", "") | |
| }) | |
| return questions | |
| def categorize_questions(questions): | |
| """Categorize questions by type and level""" | |
| categories = defaultdict(list) | |
| levels = defaultdict(list) | |
| for q in questions: | |
| # Categorize by question type | |
| q_type = analyze_question_type(q["question"]) | |
| categories[q_type].append(q) | |
| # Categorize by difficulty level | |
| levels[f"Level_{q['level']}"].append(q) | |
| return categories, levels | |
| def test_single_question(graph, question_data, verbose=True): | |
| """Test a single question""" | |
| if verbose: | |
| print(f"\n{'='*80}") | |
| print(f"Testing Task ID: {question_data['task_id']}") | |
| print(f"Level: {question_data['level']}") | |
| print(f"Question: {question_data['question'][:150]}...") | |
| print(f"Expected Answer: {question_data['expected_answer']}") | |
| start_time = time.time() | |
| messages = [HumanMessage(content=question_data['question'])] | |
| result = graph.invoke({"messages": messages}) | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| answer = result["messages"][-1].content | |
| # Check if answer is correct (exact match or contains expected answer) | |
| is_correct = ( | |
| question_data['expected_answer'].lower() == answer.lower() or | |
| question_data['expected_answer'].lower() in answer.lower() | |
| ) | |
| if verbose: | |
| print(f"Agent Answer: {answer}") | |
| print(f"Execution Time: {execution_time:.2f} seconds") | |
| print(f"Match: {'✅' if is_correct else '❌'}") | |
| return { | |
| "task_id": question_data['task_id'], | |
| "answer": answer, | |
| "expected": question_data['expected_answer'], | |
| "is_correct": is_correct, | |
| "execution_time": execution_time | |
| } | |
| def test_by_category(graph, questions, category_name, category_questions): | |
| """Test all questions in a specific category""" | |
| print(f"\n{'='*80}") | |
| print(f"Testing Category: {category_name}") | |
| print(f"Number of questions: {len(category_questions)}") | |
| print('='*80) | |
| results = [] | |
| correct_count = 0 | |
| for i, q in enumerate(category_questions): | |
| print(f"\nQuestion {i+1}/{len(category_questions)}") | |
| result = test_single_question(graph, q, verbose=True) | |
| results.append(result) | |
| if result['is_correct']: | |
| correct_count += 1 | |
| # Summary for category | |
| accuracy = (correct_count / len(category_questions)) * 100 if category_questions else 0 | |
| avg_time = sum(r['execution_time'] for r in results) / len(results) if results else 0 | |
| print(f"\n{'-'*40}") | |
| print(f"Category: {category_name} - Summary") | |
| print(f"Accuracy: {accuracy:.1f}% ({correct_count}/{len(category_questions)})") | |
| print(f"Average execution time: {avg_time:.2f} seconds") | |
| print(f"{'-'*40}\n") | |
| return results | |
| def generate_report(all_results): | |
| """Generate a comprehensive test report""" | |
| print("\n" + "="*80) | |
| print("COMPREHENSIVE TEST REPORT") | |
| print("="*80) | |
| # Overall statistics | |
| total_questions = sum(len(results) for results in all_results.values()) | |
| total_correct = sum(sum(1 for r in results if r['is_correct']) for results in all_results.values()) | |
| overall_accuracy = (total_correct / total_questions) * 100 if total_questions > 0 else 0 | |
| print(f"\nOVERALL STATISTICS:") | |
| print(f"Total Questions: {total_questions}") | |
| print(f"Correct Answers: {total_correct}") | |
| print(f"Overall Accuracy: {overall_accuracy:.1f}%") | |
| # Category breakdown | |
| print("\n\nCATEGORY BREAKDOWN:") | |
| print("-"*50) | |
| for category, results in all_results.items(): | |
| if results: | |
| correct = sum(1 for r in results if r['is_correct']) | |
| accuracy = (correct / len(results)) * 100 | |
| avg_time = sum(r['execution_time'] for r in results) / len(results) | |
| print(f"\n{category}:") | |
| print(f" Questions: {len(results)}") | |
| print(f" Correct: {correct}") | |
| print(f" Accuracy: {accuracy:.1f}%") | |
| print(f" Avg Time: {avg_time:.2f}s") | |
| # Failed questions details | |
| print("\n\nFAILED QUESTIONS DETAILS:") | |
| print("-"*50) | |
| for category, results in all_results.items(): | |
| failed = [r for r in results if not r['is_correct']] | |
| if failed: | |
| print(f"\n{category}:") | |
| for r in failed: | |
| print(f" Task ID: {r['task_id']}") | |
| print(f" Expected: {r['expected']}") | |
| print(f" Got: {r['answer']}") | |
| print() | |
| # Save report to file | |
| with open('test_report.json', 'w') as f: | |
| json.dump(all_results, f, indent=2) | |
| print("\n\nDetailed report saved to: test_report.json") | |
| def main(): | |
| # Build the graph | |
| print("Building agent graph...") | |
| graph = build_graph() | |
| # Load test questions | |
| print("Loading test questions...") | |
| test_questions = load_test_questions() | |
| # Categorize questions | |
| categories, levels = categorize_questions(test_questions) | |
| # Test options | |
| print("\n\nTest Options:") | |
| print("1. Test all questions") | |
| print("2. Test by category") | |
| print("3. Test by difficulty level") | |
| print("4. Test specific question IDs") | |
| print("5. Test first N questions") | |
| choice = input("\nSelect option (1-5): ") | |
| all_results = {} | |
| if choice == "1": | |
| # Test all questions by category | |
| for category, questions in categories.items(): | |
| results = test_by_category(graph, test_questions, category, questions) | |
| all_results[category] = results | |
| elif choice == "2": | |
| # Test specific category | |
| print("\nAvailable categories:") | |
| for i, cat in enumerate(categories.keys()): | |
| print(f"{i+1}. {cat} ({len(categories[cat])} questions)") | |
| cat_choice = int(input("\nSelect category: ")) - 1 | |
| selected_category = list(categories.keys())[cat_choice] | |
| results = test_by_category(graph, test_questions, selected_category, categories[selected_category]) | |
| all_results[selected_category] = results | |
| elif choice == "3": | |
| # Test by difficulty level | |
| print("\nAvailable levels:") | |
| for i, level in enumerate(levels.keys()): | |
| print(f"{i+1}. {level} ({len(levels[level])} questions)") | |
| level_choice = int(input("\nSelect level: ")) - 1 | |
| selected_level = list(levels.keys())[level_choice] | |
| results = test_by_category(graph, test_questions, selected_level, levels[selected_level]) | |
| all_results[selected_level] = results | |
| elif choice == "4": | |
| # Test specific question IDs | |
| task_ids = input("\nEnter task IDs (comma-separated): ").split(',') | |
| selected_questions = [q for q in test_questions if q['task_id'].strip() in [id.strip() for id in task_ids]] | |
| results = test_by_category(graph, test_questions, "Selected Questions", selected_questions) | |
| all_results["Selected Questions"] = results | |
| elif choice == "5": | |
| # Test first N questions | |
| n = int(input("\nEnter number of questions to test: ")) | |
| selected_questions = test_questions[:n] | |
| results = test_by_category(graph, test_questions, f"First {n} Questions", selected_questions) | |
| all_results[f"First {n} Questions"] = results | |
| # Generate comprehensive report | |
| generate_report(all_results) | |
| if __name__ == "__main__": | |
| main() |