Spaces:
Sleeping
Sleeping
| # coding=utf-8 | |
| # Copyright 2024 HuggingFace Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import ast | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import traceback | |
| from pathlib import Path | |
| from typing import List | |
| import pytest | |
| from dotenv import load_dotenv | |
| from .utils.markers import require_run_all | |
| class SubprocessCallException(Exception): | |
| pass | |
| def run_command(command: List[str], return_stdout=False, env=None): | |
| """ | |
| Runs command with subprocess.check_output and returns stdout if requested. | |
| Properly captures and handles errors during command execution. | |
| """ | |
| for i, c in enumerate(command): | |
| if isinstance(c, Path): | |
| command[i] = str(c) | |
| if env is None: | |
| env = os.environ.copy() | |
| try: | |
| output = subprocess.check_output(command, stderr=subprocess.STDOUT, env=env) | |
| if return_stdout: | |
| if hasattr(output, "decode"): | |
| output = output.decode("utf-8") | |
| return output | |
| except subprocess.CalledProcessError as e: | |
| raise SubprocessCallException( | |
| f"Command `{' '.join(command)}` failed with the following error:\n\n{e.output.decode()}" | |
| ) from e | |
| class DocCodeExtractor: | |
| """Handles extraction and validation of Python code from markdown files.""" | |
| def extract_python_code(content: str) -> List[str]: | |
| """Extract Python code blocks from markdown content.""" | |
| pattern = r"```(?:python|py)\n(.*?)\n```" | |
| matches = re.finditer(pattern, content, re.DOTALL) | |
| return [match.group(1).strip() for match in matches] | |
| def create_test_script(code_blocks: List[str], tmp_dir: str) -> Path: | |
| """Create a temporary Python script from code blocks.""" | |
| combined_code = "\n\n".join(code_blocks) | |
| assert len(combined_code) > 0, "Code is empty!" | |
| tmp_file = Path(tmp_dir) / "test_script.py" | |
| with open(tmp_file, "w", encoding="utf-8") as f: | |
| f.write(combined_code) | |
| return tmp_file | |
| class TestDocs: | |
| """Test case for documentation code testing.""" | |
| def setup_class(cls): | |
| cls._tmpdir = tempfile.mkdtemp() | |
| cls.launch_args = ["python3"] | |
| cls.docs_dir = Path(__file__).parent.parent / "docs" / "source" / "en" | |
| cls.extractor = DocCodeExtractor() | |
| if not cls.docs_dir.exists(): | |
| raise ValueError(f"Docs directory not found at {cls.docs_dir}") | |
| load_dotenv() | |
| cls.md_files = list(cls.docs_dir.rglob("*.mdx")) | |
| if not cls.md_files: | |
| raise ValueError(f"No markdown files found in {cls.docs_dir}") | |
| def teardown_class(cls): | |
| shutil.rmtree(cls._tmpdir) | |
| def test_single_doc(self, doc_path: Path): | |
| """Test a single documentation file.""" | |
| with open(doc_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| code_blocks = self.extractor.extract_python_code(content) | |
| excluded_snippets = [ | |
| "ToolCollection", | |
| "image_generation_tool", # We don't want to run this expensive operation | |
| "from_langchain", # Langchain is not a dependency | |
| "while llm_should_continue(memory):", # This is pseudo code | |
| "ollama_chat/llama3.2", # Exclude ollama building in guided tour | |
| "model = TransformersModel(model_id=model_id)", # Exclude testing with transformers model | |
| "SmolagentsInstrumentor", # Exclude telemetry since it needs additional installs | |
| ] | |
| code_blocks = [ | |
| block | |
| for block in code_blocks | |
| if not any( | |
| [snippet in block for snippet in excluded_snippets] | |
| ) # Exclude these tools that take longer to run and add dependencies | |
| ] | |
| if len(code_blocks) == 0: | |
| pytest.skip(f"No Python code blocks found in {doc_path.name}") | |
| # Validate syntax of each block individually by parsing it | |
| for i, block in enumerate(code_blocks, 1): | |
| ast.parse(block) | |
| # Create and execute test script | |
| print("\n\nCollected code block:==========\n".join(code_blocks)) | |
| try: | |
| code_blocks = [ | |
| ( | |
| block.replace("<YOUR_HUGGINGFACEHUB_API_TOKEN>", os.getenv("HF_TOKEN")) | |
| .replace("YOUR_ANTHROPIC_API_KEY", os.getenv("ANTHROPIC_API_KEY")) | |
| .replace("{your_username}", "m-ric") | |
| ) | |
| for block in code_blocks | |
| ] | |
| test_script = self.extractor.create_test_script(code_blocks, self._tmpdir) | |
| run_command(self.launch_args + [str(test_script)]) | |
| except SubprocessCallException as e: | |
| pytest.fail(f"\nError while testing {doc_path.name}:\n{str(e)}") | |
| except Exception: | |
| pytest.fail(f"\nUnexpected error while testing {doc_path.name}:\n{traceback.format_exc()}") | |
| def _setup(self): | |
| """Fixture to ensure temporary directory exists for each test.""" | |
| os.makedirs(self._tmpdir, exist_ok=True) | |
| yield | |
| # Clean up test files after each test | |
| for file in Path(self._tmpdir).glob("*"): | |
| file.unlink() | |
| def pytest_generate_tests(metafunc): | |
| """Generate test cases for each markdown file.""" | |
| if "doc_path" in metafunc.fixturenames: | |
| test_class = metafunc.cls | |
| # Initialize the class if needed | |
| if not hasattr(test_class, "md_files"): | |
| test_class.setup_class() | |
| # Parameterize with the markdown files | |
| metafunc.parametrize("doc_path", test_class.md_files, ids=[f.stem for f in test_class.md_files]) | |