polyglot-alpha / tests /test_ipfs.py
licaomeng
deploy: main@8970ffb β†’ HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""Tests for :mod:`polyglot_alpha.ipfs`."""
from __future__ import annotations
import asyncio
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from polyglot_alpha import ipfs
_SAMPLE_CANDIDATE = {
"event_id": "evt-test-001",
"question_text": "Will it rain on demo day?",
"agent": "gemini",
"translation": "Test candidate body",
}
def test_canonical_json_is_deterministic() -> None:
a = ipfs._canonical_json_bytes({"b": 2, "a": 1})
b = ipfs._canonical_json_bytes({"a": 1, "b": 2})
assert a == b
def test_content_hash_changes_with_payload() -> None:
h1 = ipfs._content_hash({"x": 1})
h2 = ipfs._content_hash({"x": 2})
assert h1 != h2
assert len(h1) == 64
def test_pin_candidate_falls_back_to_local_file(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None:
"""No PINATA_JWT / no W3S_TOKEN / no daemon β†’ must fall back to local file."""
monkeypatch.delenv("PINATA_JWT", raising=False)
monkeypatch.delenv("W3S_TOKEN", raising=False)
monkeypatch.setattr(ipfs, "_LOCAL_PIN_DIR", tmp_path)
# Force the local-daemon probe to fail by pointing it at an unreachable port.
monkeypatch.setattr(ipfs, "_LOCAL_DAEMON_URL", "http://127.0.0.1:1/api/v0/add")
uri = asyncio.run(ipfs.pin_candidate(_SAMPLE_CANDIDATE))
assert uri.startswith("ipfs-local://")
sha = uri.removeprefix("ipfs-local://")
out_path = tmp_path / f"{sha}.json"
assert out_path.exists()
# Content-addressing property holds.
assert ipfs._content_hash(_SAMPLE_CANDIDATE) == sha
def test_pin_candidate_with_meta_marks_local_as_not_real_pin(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("PINATA_JWT", raising=False)
monkeypatch.delenv("W3S_TOKEN", raising=False)
monkeypatch.setattr(ipfs, "_LOCAL_PIN_DIR", tmp_path)
monkeypatch.setattr(ipfs, "_LOCAL_DAEMON_URL", "http://127.0.0.1:1/api/v0/add")
uri, is_real = asyncio.run(ipfs.pin_candidate_with_meta(_SAMPLE_CANDIDATE))
assert uri.startswith("ipfs-local://")
assert is_real is False
def test_pin_candidate_idempotent_for_same_payload(
tmp_path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("PINATA_JWT", raising=False)
monkeypatch.delenv("W3S_TOKEN", raising=False)
monkeypatch.setattr(ipfs, "_LOCAL_PIN_DIR", tmp_path)
monkeypatch.setattr(ipfs, "_LOCAL_DAEMON_URL", "http://127.0.0.1:1/api/v0/add")
uri1 = asyncio.run(ipfs.pin_candidate(_SAMPLE_CANDIDATE))
uri2 = asyncio.run(ipfs.pin_candidate(_SAMPLE_CANDIDATE))
assert uri1 == uri2 # SHA-based content addressing is stable