| | |
| | import pytest |
| | import logging |
| | import hashlib |
| | from unittest.mock import patch, MagicMock, ANY |
| | import requests |
| |
|
| | from ankigen_core.utils import ( |
| | get_logger, |
| | ResponseCache, |
| | fetch_webpage_text, |
| | setup_logging, |
| | ) |
| |
|
| |
|
| | |
| |
|
| |
|
| | def test_get_logger_returns_logger_instance(): |
| | """Test that get_logger returns a logging.Logger instance.""" |
| | logger = get_logger() |
| | assert isinstance(logger, logging.Logger) |
| |
|
| |
|
| | def test_get_logger_is_singleton(): |
| | """Test that get_logger returns the same instance when called multiple times.""" |
| | logger1 = get_logger() |
| | logger2 = get_logger() |
| | assert logger1 is logger2 |
| |
|
| |
|
| | def test_setup_logging_configures_handlers(capsys): |
| | """Test that setup_logging (called via get_logger) configures handlers |
| | and basic logging works. This is a more integrated test. |
| | """ |
| | |
| | |
| | |
| | from ankigen_core import utils |
| |
|
| | original_logger_instance = utils._logger_instance |
| | utils._logger_instance = None |
| |
|
| | logger = get_logger() |
| |
|
| | |
| | |
| | |
| | assert ( |
| | len(logger.handlers) >= 1 |
| | ) |
| |
|
| | |
| | test_message = "Test INFO message for logging" |
| | logger.info(test_message) |
| | captured = capsys.readouterr() |
| | assert test_message in captured.out |
| |
|
| | |
| | utils._logger_instance = original_logger_instance |
| |
|
| |
|
| | |
| |
|
| |
|
| | def test_response_cache_set_and_get(): |
| | """Test basic set and get functionality of ResponseCache.""" |
| | cache = ResponseCache(maxsize=2) |
| | prompt1 = "What is Python?" |
| | model1 = "gpt-test" |
| | response1 = {"answer": "A programming language"} |
| |
|
| | prompt2 = "What is Java?" |
| | model2 = "gpt-test" |
| | response2 = {"answer": "Another programming language"} |
| |
|
| | cache.set(prompt1, model1, response1) |
| | cache.set(prompt2, model2, response2) |
| |
|
| | retrieved_response1 = cache.get(prompt1, model1) |
| | assert retrieved_response1 == response1 |
| |
|
| | retrieved_response2 = cache.get(prompt2, model2) |
| | assert retrieved_response2 == response2 |
| |
|
| |
|
| | def test_response_cache_get_non_existent(): |
| | """Test get returns None for a key not in the cache.""" |
| | cache = ResponseCache() |
| | retrieved_response = cache.get("NonExistentPrompt", "test-model") |
| | assert retrieved_response is None |
| |
|
| |
|
| | def test_response_cache_key_creation_indirectly(): |
| | """Test that different prompts or models result in different cache entries.""" |
| | cache = ResponseCache(maxsize=5) |
| | prompt1 = "Key test prompt 1" |
| | model_a = "model-a" |
| | model_b = "model-b" |
| | response_a = "Response for model A" |
| | response_b = "Response for model B" |
| |
|
| | cache.set(prompt1, model_a, response_a) |
| | cache.set(prompt1, model_b, response_b) |
| |
|
| | assert cache.get(prompt1, model_a) == response_a |
| | assert cache.get(prompt1, model_b) == response_b |
| | |
| | assert cache.get(prompt1, model_a) != response_b |
| |
|
| |
|
| | def test_response_cache_lru_eviction_simple(): |
| | """Test basic LRU eviction if maxsize is hit. |
| | Focus on the fact that old items might be evicted. |
| | """ |
| | cache = ResponseCache(maxsize=1) |
| | prompt1 = "Prompt One" |
| | model1 = "m1" |
| | response1 = "Resp One" |
| |
|
| | prompt2 = "Prompt Two" |
| | model2 = "m2" |
| | response2 = "Resp Two" |
| |
|
| | cache.set(prompt1, model1, response1) |
| | assert cache.get(prompt1, model1) == response1 |
| |
|
| | |
| | |
| | |
| | cache.set(prompt2, model2, response2) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | assert cache.get(prompt2, model2) == response2 |
| |
|
| | |
| | cache_lru = ResponseCache(maxsize=1) |
| | cache_lru.set("p1", "m", "r1") |
| | cache_lru.set("p2", "m", "r2") |
| |
|
| | _ = cache_lru.get("p2", "m") |
| | retrieved_p1_after_p2_get = cache_lru.get( |
| | "p1", "m" |
| | ) |
| |
|
| | |
| | |
| | |
| | assert retrieved_p1_after_p2_get == "r1" |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_success(mock_requests_get): |
| | """Test successful webpage fetching and text extraction.""" |
| | |
| | mock_response = MagicMock() |
| | mock_response.text = """ |
| | <html> |
| | <head><title>Test Page</title></head> |
| | <body> |
| | <header>Ignore this</header> |
| | <script>console.log("ignore scripts");</script> |
| | <main> |
| | <h1>Main Title</h1> |
| | <p>This is the first paragraph.</p> |
| | <p>Second paragraph with extra spaces.</p> |
| | <div>Div content</div> |
| | </main> |
| | <footer>Ignore footer too</footer> |
| | </body> |
| | </html> |
| | """ |
| | mock_response.raise_for_status = MagicMock() |
| | mock_requests_get.return_value = mock_response |
| |
|
| | |
| | url = "http://example.com/test" |
| | extracted_text = fetch_webpage_text(url) |
| |
|
| | |
| | mock_requests_get.assert_called_once_with( |
| | url, |
| | headers=pytest.approx( |
| | { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| | } |
| | ), |
| | timeout=15, |
| | ) |
| | mock_response.raise_for_status.assert_called_once() |
| |
|
| | |
| | expected_lines = [ |
| | "Main Title", |
| | "This is the first paragraph.", |
| | "Second paragraph with extra spaces.", |
| | "Div content", |
| | ] |
| | actual_lines = extracted_text.split("\n") |
| |
|
| | assert len(actual_lines) == len( |
| | expected_lines |
| | ), f"Expected {len(expected_lines)} lines, got {len(actual_lines)}" |
| |
|
| | for i, expected_line in enumerate(expected_lines): |
| | assert ( |
| | actual_lines[i] == expected_line |
| | ), f"Line {i + 1} mismatch: Expected '{expected_line}', Got '{actual_lines[i]}'" |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_network_error(mock_requests_get): |
| | """Test handling of network errors during webpage fetching.""" |
| | |
| | mock_requests_get.side_effect = requests.exceptions.RequestException( |
| | "Test Network Error" |
| | ) |
| |
|
| | url = "http://example.com/network-error" |
| | |
| | with pytest.raises(ConnectionError, match="Test Network Error"): |
| | fetch_webpage_text(url) |
| |
|
| | mock_requests_get.assert_called_once_with( |
| | url, |
| | headers=pytest.approx( |
| | { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| | } |
| | ), |
| | timeout=15, |
| | ) |
| |
|
| |
|
| | |
| | @patch("ankigen_core.utils.BeautifulSoup") |
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_parsing_error(mock_requests_get, mock_beautiful_soup): |
| | """Test handling of HTML parsing errors (simulated by BeautifulSoup raising error).""" |
| | |
| | mock_response = MagicMock() |
| | mock_response.text = "<html><body>Invalid HTML?</body></html>" |
| | mock_response.raise_for_status = MagicMock() |
| | mock_requests_get.return_value = mock_response |
| |
|
| | |
| | mock_beautiful_soup.side_effect = Exception("Test Parsing Error") |
| |
|
| | url = "http://example.com/parsing-error" |
| | |
| | with pytest.raises(RuntimeError, match="Failed to parse HTML content"): |
| | fetch_webpage_text(url) |
| |
|
| | mock_requests_get.assert_called_once_with( |
| | url, |
| | headers=pytest.approx( |
| | { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| | } |
| | ), |
| | timeout=15, |
| | ) |
| | |
| | |
| | |
| | assert mock_beautiful_soup.call_count > 0 |
| |
|
| |
|
| | def test_fetch_webpage_text_empty_content(): |
| | """Test handling when the extracted text is empty.""" |
| | mock_response = MagicMock() |
| | mock_response.text = "<html><body><script>only script</script></body></html>" |
| | mock_response.raise_for_status = MagicMock() |
| |
|
| | with patch("ankigen_core.utils.requests.get", return_value=mock_response): |
| | url = "http://example.com/empty" |
| | extracted_text = fetch_webpage_text(url) |
| | assert extracted_text == "" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | def test_setup_logging_initialization(): |
| | """Test that setup_logging initializes and returns a logger.""" |
| | logger = setup_logging() |
| | assert isinstance(logger, logging.Logger) |
| | assert logger.name == "ankigen" |
| | assert len(logger.handlers) == 2 |
| | |
| | from ankigen_core import utils |
| |
|
| | utils._logger_instance = None |
| |
|
| |
|
| | def test_setup_logging_singleton(): |
| | """Test that setup_logging returns the same logger instance if called again.""" |
| | logger1 = setup_logging() |
| | logger2 = setup_logging() |
| | assert logger1 is logger2 |
| | from ankigen_core import utils |
| |
|
| | utils._logger_instance = None |
| |
|
| |
|
| | def test_get_logger_flow(): |
| | """Test get_logger calls setup_logging if no instance exists, else returns existing.""" |
| | from ankigen_core import utils |
| |
|
| | utils._logger_instance = None |
| |
|
| | |
| | logger1 = get_logger() |
| | assert utils._logger_instance is not None |
| | assert logger1 is utils._logger_instance |
| |
|
| | |
| | logger2 = get_logger() |
| | assert logger2 is logger1 |
| | utils._logger_instance = None |
| |
|
| |
|
| | |
| |
|
| |
|
| | @pytest.fixture |
| | def cache(): |
| | return ResponseCache(maxsize=2) |
| |
|
| |
|
| | def test_response_cache_get_miss(cache): |
| | retrieved = cache.get("non_existent_prompt", "model") |
| | assert retrieved is None |
| |
|
| |
|
| | def test_response_cache_lru_eviction(cache): |
| | |
| | cache.set("p1", "m1", "r1") |
| | cache.set("p2", "m2", "r2") |
| |
|
| | |
| | cache.get("p1", "m1") |
| |
|
| | |
| | |
| | |
| | cache.set("p3", "m3", "r3") |
| |
|
| | assert cache.get("p1", "m1") == "r1" |
| | assert cache.get("p3", "m3") == "r3" |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | cache.get("p1", "m1") |
| | cache.get("p2", "m2") |
| | cache.get( |
| | "p3", "m3" |
| | ) |
| |
|
| | |
| | |
| | cache_info = cache._lru_cached_get.cache_info() |
| | assert cache_info.hits >= 1 |
| | assert cache_info.misses >= 1 |
| | assert cache_info.currsize == 2 |
| |
|
| | |
| | |
| | |
| | |
| | assert cache.get("p2", "m2") == "r2" |
| | |
| | |
| |
|
| |
|
| | def test_response_cache_create_key(cache): |
| | prompt = "test prompt" |
| | model = "test_model" |
| | expected_key = hashlib.md5(f"{model}:{prompt}".encode("utf-8")).hexdigest() |
| | assert cache._create_key(prompt, model) == expected_key |
| |
|
| |
|
| | |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_success_main_tag(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = "<html><body><main> Main content here. </main></body></html>" |
| | mock_requests_get.return_value = mock_response |
| |
|
| | text = fetch_webpage_text("http://example.com") |
| | assert "Main content here." in text |
| | mock_requests_get.assert_called_once_with( |
| | "http://example.com", headers=ANY, timeout=15 |
| | ) |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_success_article_tag(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = ( |
| | "<html><body><article> Article content. </article></body></html>" |
| | ) |
| | mock_requests_get.return_value = mock_response |
| | text = fetch_webpage_text("http://example.com") |
| | assert "Article content." in text |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_success_body_fallback(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = ( |
| | "<html><body> Body content only. <script>junk</script> </body></html>" |
| | ) |
| | mock_requests_get.return_value = mock_response |
| | text = fetch_webpage_text("http://example.com") |
| | assert "Body content only." in text |
| | assert "junk" not in text |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_no_meaningful_text(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = "<html><body><main></main></body></html>" |
| | mock_requests_get.return_value = mock_response |
| | text = fetch_webpage_text("http://example.com") |
| | assert text == "" |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_http_error(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 404 |
| | |
| | mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( |
| | "Client Error: Not Found for url", response=mock_response |
| | ) |
| | mock_requests_get.return_value = mock_response |
| | with pytest.raises( |
| | ConnectionError, match="Could not fetch URL: Client Error: Not Found for url" |
| | ): |
| | fetch_webpage_text("http://example.com") |
| |
|
| |
|
| | @patch("ankigen_core.utils.BeautifulSoup") |
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_bs_init_error(mock_requests_get, mock_beautiful_soup): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = "<html></html>" |
| | mock_requests_get.return_value = mock_response |
| | mock_beautiful_soup.side_effect = Exception("BS failed") |
| |
|
| | with pytest.raises( |
| | RuntimeError, match="Failed to parse HTML content for http://example.com." |
| | ): |
| | fetch_webpage_text("http://example.com") |
| |
|
| |
|
| | @patch("ankigen_core.utils.requests.get") |
| | def test_fetch_webpage_text_lxml_fallback(mock_requests_get): |
| | mock_response = MagicMock() |
| | mock_response.status_code = 200 |
| | mock_response.text = "<html><body><main>LXML Test</main></body></html>" |
| | mock_requests_get.return_value = mock_response |
| |
|
| | with patch("ankigen_core.utils.BeautifulSoup") as mock_bs_constructor: |
| |
|
| | def bs_side_effect(text, parser_type): |
| | if parser_type == "lxml": |
| | raise ImportError("lxml not found") |
| | elif parser_type == "html.parser": |
| | from bs4 import BeautifulSoup as RealBeautifulSoup |
| |
|
| | return RealBeautifulSoup(text, "html.parser") |
| | raise ValueError(f"Unexpected parser: {parser_type}") |
| |
|
| | mock_bs_constructor.side_effect = bs_side_effect |
| |
|
| | logger_instance = get_logger() |
| | with patch.object(logger_instance, "warning") as mock_logger_warning: |
| | text = fetch_webpage_text("http://example.com/lxmltest") |
| | assert "LXML Test" in text |
| | mock_logger_warning.assert_any_call( |
| | "lxml not found, using html.parser instead." |
| | ) |
| |
|
| | actual_parsers_used = [ |
| | call[0][1] for call in mock_bs_constructor.call_args_list |
| | ] |
| | assert "lxml" in actual_parsers_used |
| | assert "html.parser" in actual_parsers_used |
| |
|