File size: 5,516 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""

Test suite for Outline VPN implementation

"""

import os
import sys
import asyncio
import unittest
import tempfile
from unittest.mock import Mock, patch
from pathlib import Path

# Add project root to path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)

from core.outline_config import OutlineManager, UserConfig
from core.outline_server import OutlineServer
from core.shadowsocks_protocol import ShadowsocksProtocol
from core.ipsec_manager import IPsecManager
from core.nat_engine import NATEngine
from core.traffic_router import TrafficRouter

class TestOutlineVPN(unittest.TestCase):
    def setUp(self):
        self.temp_dir = tempfile.mkdtemp()
        self.config_path = os.path.join(self.temp_dir, 'test_config.json')
        self.manager = OutlineManager(self.config_path)
        
    def tearDown(self):
        import shutil
        shutil.rmtree(self.temp_dir)

    def test_user_management(self):
        """Test user creation and key management"""
        # Add user
        user = self.manager.add_user('testuser')
        self.assertIsNotNone(user.access_key)
        self.assertEqual(user.user_id, 'testuser')
        self.assertIsNone(user.data_limit)
        
        # Test user retrieval
        found_user = self.manager.get_user_by_key(user.access_key)
        self.assertEqual(found_user.user_id, 'testuser')
        
        # Test user removal
        self.assertTrue(self.manager.remove_user('testuser'))
        self.assertIsNone(self.manager.get_user_by_key(user.access_key))

    def test_bandwidth_tracking(self):
        """Test bandwidth usage tracking"""
        user = self.manager.add_user('bandwidthuser', data_limit=1000)
        self.manager.update_user_bandwidth('bandwidthuser', 500)
        self.assertEqual(user.bandwidth_usage, 500)
        self.assertTrue(user.is_active)
        
        # Test data limit enforcement
        self.manager.update_user_bandwidth('bandwidthuser', 600)
        self.assertFalse(user.is_active)

class TestShadowsocksProtocol(unittest.TestCase):
    def setUp(self):
        self.protocol = ShadowsocksProtocol("test_key")

    def test_encryption(self):
        """Test packet encryption and decryption"""
        test_data = b"Hello, World!"
        encrypted = self.protocol._encrypt_packet(test_data)
        decrypted = self.protocol._decrypt_packet(encrypted)
        self.assertEqual(test_data, decrypted)

class TestOutlineServer(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        config = {
            "server": {
                "host": "127.0.0.1",
                "port": 8388,
                "virtual_network": "10.0.0.0/24"
            }
        }
        self.server = OutlineServer(config)

    async def test_server_start_stop(self):
        """Test server startup and shutdown"""
        # Mock the actual server binding
        with patch('asyncio.start_server'):
            startup_task = asyncio.create_task(self.server.start())
            await asyncio.sleep(0.1)  # Give time for server to "start"
            self.assertTrue(self.server.is_running)
            
            await self.server.stop()
            self.assertFalse(self.server.is_running)
            await startup_task

    async def test_client_handling(self):
        """Test client connection handling"""
        # Add a test user
        self.server.outline_manager.add_user('testuser')
        
        # Mock reader/writer
        reader = Mock()
        writer = Mock()
        writer.get_extra_info.return_value = ('127.0.0.1', 12345)
        
        # Test connection handling
        await self.server._handle_client(reader, writer)
        writer.close.assert_called_once()

class TestNATEngine(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        self.nat = NATEngine()

    async def test_nat_session(self):
        """Test NAT session creation and tracking"""
        session = self.nat.create_session(
            virtual_ip="10.0.0.2",
            virtual_port=1234,
            real_ip="192.168.1.1",
            real_port=80
        )
        self.assertIsNotNone(session)
        self.assertEqual(session.virtual_ip, "10.0.0.2")
        
        # Test session lookup
        found = self.nat.lookup_session("192.168.1.1", 80)
        self.assertEqual(found.virtual_ip, "10.0.0.2")

class TestTrafficRouter(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        config = {
            "vpn_host": "127.0.0.1",
            "vpn_port": 8388,
            "virtual_network": "10.0.0.0/24"
        }
        self.router = TrafficRouter(config)

    async def test_traffic_routing(self):
        """Test traffic routing functionality"""
        # Mock connection
        reader = Mock()
        writer = Mock()
        writer.get_extra_info.return_value = ('10.0.0.2', 1234)
        
        # Test connection handling
        with patch('asyncio.open_connection'):
            await self.router._handle_client_connection(reader, writer)
            writer.close.assert_called_once()

def run_tests():
    """Run all tests"""
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromModule(sys.modules[__name__])
    runner = unittest.TextTestRunner(verbosity=2)
    return runner.run(suite)

if __name__ == '__main__':
    run_tests()