| import asyncio |
| from bs4 import BeautifulSoup |
| from typing import Dict, Any |
| import os |
| import sys |
| import time |
| import csv |
| from tabulate import tabulate |
| from dataclasses import dataclass |
| from typing import List, Dict |
|
|
| parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| sys.path.append(parent_dir) |
| __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) |
|
|
| from crawl4ai.content_scraping_strategy import WebScrapingStrategy |
| from crawl4ai.content_scraping_strategy import WebScrapingStrategy as WebScrapingStrategyCurrent |
| |
|
|
| @dataclass |
| class TestResult: |
| name: str |
| success: bool |
| images: int |
| internal_links: int |
| external_links: int |
| markdown_length: int |
| execution_time: float |
|
|
| class StrategyTester: |
| def __init__(self): |
| self.new_scraper = WebScrapingStrategy() |
| self.current_scraper = WebScrapingStrategyCurrent() |
| with open(__location__ + '/sample_wikipedia.html', 'r', encoding='utf-8') as f: |
| self.WIKI_HTML = f.read() |
| self.results = {'new': [], 'current': []} |
| |
| def run_test(self, name: str, **kwargs) -> tuple[TestResult, TestResult]: |
| results = [] |
| for scraper in [self.new_scraper, self.current_scraper]: |
| start_time = time.time() |
| result = scraper._get_content_of_website_optimized( |
| url="https://en.wikipedia.org/wiki/Test", |
| html=self.WIKI_HTML, |
| **kwargs |
| ) |
| execution_time = time.time() - start_time |
| |
| test_result = TestResult( |
| name=name, |
| success=result['success'], |
| images=len(result['media']['images']), |
| internal_links=len(result['links']['internal']), |
| external_links=len(result['links']['external']), |
| markdown_length=len(result['markdown']), |
| execution_time=execution_time |
| ) |
| results.append(test_result) |
| |
| return results[0], results[1] |
|
|
| def run_all_tests(self): |
| test_cases = [ |
| ("Basic Extraction", {}), |
| ("Exclude Tags", {'excluded_tags': ['table', 'div.infobox', 'div.navbox']}), |
| ("Word Threshold", {'word_count_threshold': 50}), |
| ("CSS Selector", {'css_selector': 'div.mw-parser-output > p'}), |
| ("Link Exclusions", { |
| 'exclude_external_links': True, |
| 'exclude_social_media_links': True, |
| 'exclude_domains': ['facebook.com', 'twitter.com'] |
| }), |
| ("Media Handling", { |
| 'exclude_external_images': True, |
| 'image_description_min_word_threshold': 20 |
| }), |
| ("Text Only", { |
| 'only_text': True, |
| 'remove_forms': True |
| }), |
| ("HTML Cleaning", { |
| 'clean_html': True, |
| 'keep_data_attributes': True |
| }), |
| ("HTML2Text Options", { |
| 'html2text': { |
| 'skip_internal_links': True, |
| 'single_line_break': True, |
| 'mark_code': True, |
| 'preserve_tags': ['pre', 'code'] |
| } |
| }) |
| ] |
|
|
| all_results = [] |
| for name, kwargs in test_cases: |
| try: |
| new_result, current_result = self.run_test(name, **kwargs) |
| all_results.append((name, new_result, current_result)) |
| except Exception as e: |
| print(f"Error in {name}: {str(e)}") |
| |
| self.save_results_to_csv(all_results) |
| self.print_comparison_table(all_results) |
|
|
| def save_results_to_csv(self, all_results: List[tuple]): |
| csv_file = os.path.join(__location__, 'strategy_comparison_results.csv') |
| with open(csv_file, 'w', newline='') as f: |
| writer = csv.writer(f) |
| writer.writerow(['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links', |
| 'External Links', 'Markdown Length', 'Execution Time']) |
| |
| for name, new_result, current_result in all_results: |
| writer.writerow([name, 'New', new_result.success, new_result.images, |
| new_result.internal_links, new_result.external_links, |
| new_result.markdown_length, f"{new_result.execution_time:.3f}"]) |
| writer.writerow([name, 'Current', current_result.success, current_result.images, |
| current_result.internal_links, current_result.external_links, |
| current_result.markdown_length, f"{current_result.execution_time:.3f}"]) |
|
|
| def print_comparison_table(self, all_results: List[tuple]): |
| table_data = [] |
| headers = ['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links', |
| 'External Links', 'Markdown Length', 'Time (s)'] |
|
|
| for name, new_result, current_result in all_results: |
| |
| differences = [] |
| if new_result.images != current_result.images: differences.append('images') |
| if new_result.internal_links != current_result.internal_links: differences.append('internal_links') |
| if new_result.external_links != current_result.external_links: differences.append('external_links') |
| if new_result.markdown_length != current_result.markdown_length: differences.append('markdown') |
| |
| |
| new_row = [ |
| name, 'New', new_result.success, new_result.images, |
| new_result.internal_links, new_result.external_links, |
| new_result.markdown_length, f"{new_result.execution_time:.3f}" |
| ] |
| table_data.append(new_row) |
| |
| |
| current_row = [ |
| '', 'Current', current_result.success, current_result.images, |
| current_result.internal_links, current_result.external_links, |
| current_result.markdown_length, f"{current_result.execution_time:.3f}" |
| ] |
| table_data.append(current_row) |
| |
| |
| if differences: |
| table_data.append(['', '⚠️ Differences', ', '.join(differences), '', '', '', '', '']) |
| |
| |
| table_data.append([''] * len(headers)) |
|
|
| print("\nStrategy Comparison Results:") |
| print(tabulate(table_data, headers=headers, tablefmt='grid')) |
|
|
| if __name__ == "__main__": |
| tester = StrategyTester() |
| tester.run_all_tests() |