| from __future__ import annotations |
|
|
| import asyncio |
| import base64 |
| import json |
| from dataclasses import dataclass |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
| import pytest |
|
|
| from hf_bucket_mcp.server import ( |
| HuggingFaceBucketUpload, |
| LIST_FILES_DESCRIPTION, |
| READ_FILE_DESCRIPTION, |
| _bucket_file_handle, |
| _bucket_file_url, |
| _parse_bucket_file_ref, |
| _safe_bucket_path, |
| _size_display, |
| _uploaded_at, |
| mcp, |
| ) |
| from mcp.types import AudioContent, ImageContent, TextContent |
|
|
|
|
| class FakeApi: |
| def __init__(self) -> None: |
| self.tokens: list[str | bool | None] = [] |
|
|
| def whoami(self, token: str | bool | None = None) -> dict[str, str]: |
| self.tokens.append(token) |
| return {"name": "alice"} |
|
|
|
|
| @dataclass |
| class BucketItem: |
| path: str |
| size: int |
| type: str = "file" |
| last_modified: datetime | None = None |
|
|
|
|
| class InMemoryBucket: |
| def __init__(self) -> None: |
| self.created: list[str] = [] |
| self.files: dict[str, bytes] = {} |
|
|
| def create_bucket( |
| self, |
| bucket_id: str, |
| *, |
| token: str | bool | None = None, |
| **_: Any, |
| ) -> None: |
| assert token == "hf_test" |
| self.created.append(bucket_id) |
|
|
| def batch_bucket_files( |
| self, |
| bucket_id: str, |
| *, |
| add: list[tuple[bytes, str]], |
| token: str | bool | None = None, |
| ) -> None: |
| assert bucket_id == "alice/home" |
| assert token == "hf_test" |
| for data, path in add: |
| self.files[path] = data |
|
|
| def list_bucket_tree( |
| self, |
| bucket_id: str, |
| *, |
| recursive: bool, |
| token: str | bool | None = None, |
| ) -> list[BucketItem]: |
| assert bucket_id == "alice/home" |
| assert recursive is True |
| assert token == "hf_test" |
| return [ |
| BucketItem( |
| path=path, |
| size=len(data), |
| last_modified=datetime(2026, 5, 4, tzinfo=timezone.utc), |
| ) |
| for path, data in self.files.items() |
| ] |
|
|
| def download_bucket_files( |
| self, |
| bucket_id: str, |
| *, |
| files: list[tuple[str, str]], |
| raise_on_missing_files: bool = False, |
| token: str | bool | None = None, |
| ) -> None: |
| assert bucket_id == "alice/home" |
| assert token == "hf_test" |
| for remote_path, local_path in files: |
| if remote_path not in self.files: |
| if raise_on_missing_files: |
| raise FileNotFoundError(remote_path) |
| continue |
| Path(local_path).write_bytes(self.files[remote_path]) |
|
|
|
|
| def provider(bucket: InMemoryBucket) -> HuggingFaceBucketUpload: |
| return HuggingFaceBucketUpload( |
| api=FakeApi(), |
| create_bucket_fn=bucket.create_bucket, |
| batch_bucket_files_fn=bucket.batch_bucket_files, |
| list_bucket_tree_fn=bucket.list_bucket_tree, |
| download_bucket_files_fn=bucket.download_bucket_files, |
| token_getter=lambda _: "hf_test", |
| ) |
|
|
|
|
| def provider_with_api( |
| bucket: InMemoryBucket, |
| api: FakeApi, |
| ) -> HuggingFaceBucketUpload: |
| return HuggingFaceBucketUpload( |
| api=api, |
| create_bucket_fn=bucket.create_bucket, |
| batch_bucket_files_fn=bucket.batch_bucket_files, |
| list_bucket_tree_fn=bucket.list_bucket_tree, |
| download_bucket_files_fn=bucket.download_bucket_files, |
| token_getter=lambda _: "hf_test", |
| ) |
|
|
|
|
| @pytest.mark.parametrize( |
| ("name", "expected"), |
| [ |
| ("report.txt", "report.txt"), |
| ("nested/report.txt", "nested/report.txt"), |
| (r"windows\path.txt", "windows/path.txt"), |
| ("./nested/../report.txt", "nested/report.txt"), |
| ("/leading/slash.txt", "leading/slash.txt"), |
| ], |
| ) |
| def test_safe_bucket_path_normalizes_upload_names(name: str, expected: str) -> None: |
| assert _safe_bucket_path(name) == expected |
|
|
|
|
| @pytest.mark.parametrize("name", ["", ".", "..", "/", "./../"]) |
| def test_safe_bucket_path_rejects_empty_paths(name: str) -> None: |
| with pytest.raises(ValueError, match="valid name"): |
| _safe_bucket_path(name) |
|
|
|
|
| @pytest.mark.parametrize( |
| ("size", "expected"), |
| [ |
| (0, "0 B"), |
| (1023, "1023 B"), |
| (1024, "1.0 KB"), |
| (1024 * 1024, "1.0 MB"), |
| ], |
| ) |
| def test_size_display(size: int, expected: str) -> None: |
| assert _size_display(size) == expected |
|
|
|
|
| def test_uploaded_at_uses_first_available_timestamp() -> None: |
| timestamp = datetime(2026, 5, 4, 12, 30, tzinfo=timezone.utc) |
| item = BucketItem("file.txt", 1, last_modified=timestamp) |
|
|
| assert _uploaded_at(item) == timestamp.isoformat() |
|
|
|
|
| def test_bucket_file_url_encodes_paths_for_hub_resolve() -> None: |
| assert ( |
| _bucket_file_url("alice/home", "runs/run 1/image.png") |
| == "https://huggingface.co/buckets/alice/home/resolve/runs%2Frun%201%2Fimage.png" |
| ) |
|
|
|
|
| def test_bucket_file_handle_uses_hf_bucket_scheme() -> None: |
| assert ( |
| _bucket_file_handle("alice/home", "runs/run-1/image.png") |
| == "hf://buckets/alice/home/runs/run-1/image.png" |
| ) |
|
|
|
|
| @pytest.mark.parametrize( |
| ("name", "expected"), |
| [ |
| ("notes/readme.md", "notes/readme.md"), |
| ("nested%2Fliteral.txt", "nested%2Fliteral.txt"), |
| ( |
| "hf://buckets/alice/home/notes/readme.md", |
| "notes/readme.md", |
| ), |
| ( |
| "https://huggingface.co/buckets/alice/home/resolve/notes%2Freadme.md", |
| "notes/readme.md", |
| ), |
| ( |
| "https://huggingface.co/buckets/alice/home/resolve/notes/readme.md", |
| "notes/readme.md", |
| ), |
| ], |
| ) |
| def test_parse_bucket_file_ref_accepts_supported_read_formats( |
| name: str, |
| expected: str, |
| ) -> None: |
| assert _parse_bucket_file_ref(name, bucket_id="alice/home") == expected |
|
|
|
|
| @pytest.mark.parametrize( |
| "name", |
| [ |
| "https://example.com/file.txt", |
| "https://huggingface.co/datasets/alice/home/resolve/main/file.txt", |
| "hf://buckets/alice/home", |
| "hf://buckets/bob/home/file.txt", |
| "https://huggingface.co/buckets/bob/home/resolve/file.txt", |
| ], |
| ) |
| def test_parse_bucket_file_ref_rejects_unsupported_read_formats(name: str) -> None: |
| with pytest.raises(ValueError): |
| _parse_bucket_file_ref(name, bucket_id="alice/home") |
|
|
|
|
| def test_provider_stores_and_lists_files() -> None: |
| bucket = InMemoryBucket() |
| api = FakeApi() |
| upload = provider_with_api(bucket, api) |
|
|
| files = upload.on_store( |
| [{"name": "../notes/readme.md", "data": base64.b64encode(b"# Hi").decode()}], |
| ctx=None, |
| ) |
|
|
| assert bucket.created == ["alice/home"] |
| assert api.tokens == ["hf_test", "hf_test"] |
| assert bucket.files == {"notes/readme.md": b"# Hi"} |
| assert files == [ |
| { |
| "name": "notes/readme.md", |
| "type": "text/markdown", |
| "size": 4, |
| "size_display": "4 B", |
| "uploaded_at": "2026-05-04T00:00:00+00:00", |
| "url": "https://huggingface.co/buckets/alice/home/resolve/notes%2Freadme.md", |
| "hf_handle": "hf://buckets/alice/home/notes/readme.md", |
| } |
| ] |
|
|
|
|
| def test_provider_resolves_bucket_namespace_with_caller_token_once_per_bucket() -> None: |
| bucket = InMemoryBucket() |
| api = FakeApi() |
| upload = provider_with_api(bucket, api) |
|
|
| assert upload.bucket_id_for(ctx=None) == "alice/home" |
| assert upload.bucket_id_for(ctx=None) == "alice/home" |
|
|
| assert api.tokens == ["hf_test", "hf_test"] |
| assert bucket.created == ["alice/home"] |
|
|
|
|
| def test_provider_reads_text_and_binary_files() -> None: |
| bucket = InMemoryBucket() |
| upload = provider(bucket) |
| bucket.files["hello.txt"] = b"hello" |
| bucket.files["image.bin"] = b"\xff\x00" |
|
|
| text = upload.on_read("hello.txt", ctx=None) |
| assert text["content"] == "hello" |
| assert text["url"] == "https://huggingface.co/buckets/alice/home/resolve/hello.txt" |
| assert text["hf_handle"] == "hf://buckets/alice/home/hello.txt" |
|
|
| binary = upload.on_read("image.bin", ctx=None) |
| assert binary["content_base64"] == "/wA=" |
| assert "content" not in binary |
|
|
|
|
| def test_provider_read_missing_file_raises() -> None: |
| bucket = InMemoryBucket() |
| upload = provider(bucket) |
|
|
| with pytest.raises(FileNotFoundError): |
| upload.on_read("missing.txt", ctx=None) |
|
|
|
|
| def test_provider_reads_from_bucket_url() -> None: |
| bucket = InMemoryBucket() |
| upload = provider(bucket) |
| bucket.files["notes/readme.md"] = b"hello" |
|
|
| result = upload.on_read( |
| "https://huggingface.co/buckets/alice/home/resolve/notes%2Freadme.md", |
| ctx=None, |
| ) |
|
|
| assert result["name"] == "notes/readme.md" |
| assert result["content"] == "hello" |
|
|
|
|
| def test_provider_reads_images_as_mcp_content_blocks() -> None: |
| bucket = InMemoryBucket() |
| upload = provider(bucket) |
| bucket.files["image.png"] = b"\x89PNG\r\n\x1a\n" |
|
|
| result = upload.on_read("image.png", ctx=None) |
|
|
| assert len(result.content) == 2 |
| assert isinstance(result.content[0], TextContent) |
| assert isinstance(result.content[1], ImageContent) |
|
|
| metadata = json.loads(result.content[0].text) |
| assert metadata == { |
| "name": "image.png", |
| "size": 8, |
| "type": "image/png", |
| "uploaded_at": metadata["uploaded_at"], |
| "url": "https://huggingface.co/buckets/alice/home/resolve/image.png", |
| "hf_handle": "hf://buckets/alice/home/image.png", |
| } |
| assert result.structured_content == metadata |
| assert result.content[1].mimeType == "image/png" |
| assert result.content[1].data == "iVBORw0KGgo=" |
|
|
|
|
| def test_provider_reads_audio_as_mcp_content_blocks() -> None: |
| bucket = InMemoryBucket() |
| upload = provider(bucket) |
| bucket.files["audio.wav"] = b"RIFF" |
|
|
| result = upload.on_read("audio.wav", ctx=None) |
|
|
| assert len(result.content) == 2 |
| assert isinstance(result.content[0], TextContent) |
| assert isinstance(result.content[1], AudioContent) |
|
|
| metadata = json.loads(result.content[0].text) |
| assert metadata == { |
| "name": "audio.wav", |
| "size": 4, |
| "type": "audio/x-wav", |
| "uploaded_at": metadata["uploaded_at"], |
| "url": "https://huggingface.co/buckets/alice/home/resolve/audio.wav", |
| "hf_handle": "hf://buckets/alice/home/audio.wav", |
| } |
| assert result.structured_content == metadata |
| assert result.content[1].mimeType == "audio/x-wav" |
| assert result.content[1].data == "UklGRg==" |
|
|
|
|
| def test_file_tool_descriptions_explain_gradio_urls() -> None: |
| async def descriptions() -> dict[str, str]: |
| return {tool.name: tool.description for tool in await mcp.list_tools()} |
|
|
| tool_descriptions = asyncio.run(descriptions()) |
|
|
| assert tool_descriptions["list_files"] == LIST_FILES_DESCRIPTION |
| assert tool_descriptions["read_file"] == READ_FILE_DESCRIPTION |
|
|