File size: 3,840 Bytes
2d67aa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import unittest

import torch
import torch.multiprocessing as mp
from accelerate.utils import set_seed

from specforge.distributed import init_distributed
from specforge.modeling.target.eagle3_target_model import (
    CustomEagle3TargetModel,
    HFEagle3TargetModel,
    SGLangEagle3TargetModel,
)
from tests.utils import get_available_port


@torch.no_grad()
def test_target_model_backend(rank, world_size, port, tp_size):
    os.environ["RANK"] = str(rank)
    os.environ["LOCAL_RANK"] = str(rank)
    os.environ["WORLD_SIZE"] = str(world_size)
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = str(port)

    init_distributed(tp_size=tp_size)
    set_seed(42)

    input_ids = torch.randint(0, 1000, (2, 256)).cuda()
    attention_mask = torch.ones_like(input_ids)
    loss_mask = torch.ones_like(input_ids)

    hf_target_model = HFEagle3TargetModel.from_pretrained(
        "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda"
    )
    hf_target_model.set_aux_hidden_states_layers()
    hf_out = hf_target_model.generate_eagle3_data(
        input_ids=input_ids,
        attention_mask=attention_mask,
        loss_mask=loss_mask,
    )
    del hf_target_model

    custom_target_model = CustomEagle3TargetModel.from_pretrained(
        "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda"
    )
    custom_target_model.set_aux_hidden_states_layers()
    custom_out = custom_target_model.generate_eagle3_data(
        input_ids=input_ids,
        attention_mask=attention_mask,
        loss_mask=loss_mask,
    )
    del custom_target_model

    # compare weights
    assert torch.allclose(
        hf_out.target, custom_out.target, atol=1e-5, rtol=1e-5
    ), f"Logits are not close: \nhf: {hf_out[0] - custom_out[0]}"
    assert torch.allclose(
        hf_out.loss_mask, custom_out.loss_mask, atol=1e-5, rtol=1e-5
    ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}"
    assert torch.allclose(
        hf_out.input_ids, custom_out.input_ids, atol=1e-5, rtol=1e-5
    ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}"
    assert torch.allclose(
        hf_out.hidden_states, custom_out.hidden_states, atol=1e-5, rtol=1e-5
    ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}"

    sgl_target_model = SGLangEagle3TargetModel.from_pretrained(
        "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda"
    )
    sgl_target_model.set_aux_hidden_states_layers()
    sgl_out = sgl_target_model.generate_eagle3_data(
        input_ids=input_ids, attention_mask=attention_mask, loss_mask=loss_mask
    )
    del sgl_target_model

    assert torch.equal(hf_out.loss_mask, sgl_out.loss_mask)
    assert torch.equal(hf_out.input_ids, sgl_out.input_ids)
    assert torch.allclose(
        hf_out.hidden_states, sgl_out.hidden_states, atol=1e-1, rtol=1e-2
    ), f"Hidden states are not close, diff: \n{(hf_out.hidden_states - sgl_out.hidden_states).abs().max()}"
    assert torch.allclose(
        hf_out.target, sgl_out.target.half(), atol=1e-1, rtol=1e-2
    ), f"Target are not close, diff: \n{(hf_out.target - sgl_out.target).abs().max()}"


class TestTargetModelBackend(unittest.TestCase):

    def test_target_model_backend_dp(self):
        world_size = 2
        port = get_available_port()
        mp.spawn(
            test_target_model_backend, nprocs=world_size, args=(world_size, port, 1)
        )

    def test_target_model_backend_tp(self):
        world_size = 2
        port = get_available_port()
        mp.spawn(
            test_target_model_backend, nprocs=world_size, args=(world_size, port, 2)
        )


if __name__ == "__main__":
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestTargetModelBackend))
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)