File size: 4,821 Bytes
88d2f2a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Cross-reference clustering tests (mocked LLM)."""

from __future__ import annotations

from datetime import datetime, timedelta, timezone

import pytest

from polyglot_alpha.ingestion.cross_reference import (
    content_hash,
    cluster_with_llm,
    cross_reference,
    filter_recent,
    heuristic_cluster,
)
from polyglot_alpha.ingestion.models import RawEvent


def _ev(source: str, title: str, summary: str = "", lang: str = "zh", url: str | None = None) -> RawEvent:
    return RawEvent(
        source=source,
        title=title,
        summary=summary,
        url=url or f"https://example.com/{source.lower()}/{abs(hash(title)) % 1000}",
        published_at=datetime.now(tz=timezone.utc),
        language=lang,
    )


def test_content_hash_is_stable_and_url_order_independent() -> None:
    h1 = content_hash("PBOC cuts RRR", ["https://a.com/1", "https://b.com/2"])
    h2 = content_hash("PBOC cuts RRR", ["https://b.com/2", "https://a.com/1"])
    assert h1 == h2
    assert len(h1) == 64


def test_filter_recent_drops_old_events() -> None:
    fresh = _ev("Caixin", "新鲜事件 - PBOC cuts RRR fresh")
    stale = RawEvent(
        source="Reuters",
        title="Old news",
        summary="",
        url="https://reuters.com/old",
        published_at=datetime.now(tz=timezone.utc) - timedelta(hours=5),
        language="en",
    )
    kept = filter_recent([fresh, stale], window=timedelta(hours=1))
    assert kept == [fresh]


def test_heuristic_cluster_requires_two_sources() -> None:
    same_source_a = _ev("Caixin", "PBOC announces RRR cut to support economy growth")
    same_source_b = _ev("Caixin", "PBOC announces RRR cut to support economy growth")
    different_source = _ev(
        "Xinhua", "PBOC announces RRR cut to support economy growth"
    )

    # Two events from same source -> no confirmation.
    assert heuristic_cluster([same_source_a, same_source_b]) == []

    # Two distinct sources sharing tokens -> one confirmed event.
    confirmed = heuristic_cluster([same_source_a, different_source])
    assert len(confirmed) == 1
    ev = confirmed[0]
    assert ev.sources_count == 2
    assert "Caixin" not in ev.all_sources  # all_sources holds URLs not source names
    assert len(ev.all_sources) == 2
    assert len(ev.content_hash) == 64


@pytest.mark.asyncio
async def test_cluster_with_llm_uses_mock_caller() -> None:
    events = [
        _ev("Caixin", "PBOC cuts RRR by 0.5 pct", "央行降准 0.5 个百分点"),
        _ev("Xinhua", "China central bank lowers reserve requirement"),
        _ev("Reuters", "Earnings beat at TSMC", "Unrelated", lang="en"),
    ]

    async def fake_llm(prompt: str) -> dict:
        assert "PBOC" in prompt
        return {
            "clusters": [
                {
                    "cluster_id": "rrr-cut",
                    "item_ids": [0, 1],
                    "primary_title": "PBOC cuts RRR by 0.5pct",
                    "summary": "Two outlets confirm the cut.",
                },
                # Single-source cluster should be filtered out by guardrail.
                {
                    "cluster_id": "tsmc",
                    "item_ids": [2],
                    "primary_title": "TSMC earnings",
                    "summary": "",
                },
            ]
        }

    confirmed = await cluster_with_llm(events, llm=fake_llm)
    assert len(confirmed) == 1
    only = confirmed[0]
    assert only.sources_count == 2
    assert only.primary_title == "PBOC cuts RRR by 0.5pct"
    assert "zh" in only.languages or "en" in only.languages


@pytest.mark.asyncio
async def test_cluster_with_llm_falls_back_on_error() -> None:
    events = [
        _ev("Caixin", "Shared keywords reserve requirement reduction"),
        _ev("Xinhua", "Shared keywords reserve requirement reduction"),
    ]

    async def boom(prompt: str) -> dict:
        raise RuntimeError("LLM unavailable")

    confirmed = await cluster_with_llm(events, llm=boom)
    assert len(confirmed) == 1
    assert confirmed[0].sources_count == 2


@pytest.mark.asyncio
async def test_cross_reference_end_to_end() -> None:
    events = [
        _ev("Caixin", "PBOC announces RRR cut 0.5 percentage points"),
        _ev("Xinhua", "PBOC cuts RRR by 0.5 percentage points"),
    ]

    async def fake_llm(prompt: str) -> dict:
        return {
            "clusters": [
                {
                    "cluster_id": "c0",
                    "item_ids": [0, 1],
                    "primary_title": "PBOC cuts RRR 0.5pct",
                    "summary": "Confirmed.",
                }
            ]
        }

    confirmed = await cross_reference(events, llm=fake_llm)
    assert len(confirmed) == 1
    ev = confirmed[0]
    assert ev.content_hash == content_hash("PBOC cuts RRR 0.5pct", [e.url for e in events])