File size: 8,278 Bytes
f0743f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import { expect } from '@playwright/test';

describe('LeaderElection with Redis', () => {
  let LeaderElection: typeof import('../LeaderElection').LeaderElection;
  let instances: InstanceType<typeof import('../LeaderElection').LeaderElection>[] = [];
  let keyvRedisClient: Awaited<typeof import('~/cache/redisClients')>['keyvRedisClient'];
  let ioredisClient: Awaited<typeof import('~/cache/redisClients')>['ioredisClient'];

  beforeAll(async () => {
    // Set up environment variables for Redis
    process.env.USE_REDIS = 'true';
    process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
    process.env.REDIS_KEY_PREFIX = 'LeaderElection-IntegrationTest';

    // Import modules after setting env vars
    const leaderElectionModule = await import('../LeaderElection');
    const redisClients = await import('~/cache/redisClients');

    LeaderElection = leaderElectionModule.LeaderElection;
    keyvRedisClient = redisClients.keyvRedisClient;
    ioredisClient = redisClients.ioredisClient;

    // Ensure Redis is connected
    if (!keyvRedisClient) {
      throw new Error('Redis client is not initialized');
    }

    // Wait for connection and topology discovery to complete
    await redisClients.keyvRedisClientReady;

    // Increase max listeners to handle many instances in tests
    process.setMaxListeners(200);
  });

  afterEach(async () => {
    await Promise.all(instances.map((instance) => instance.resign()));
    instances = [];

    // Clean up: clear the leader key directly from Redis
    if (keyvRedisClient) {
      await keyvRedisClient.del(LeaderElection.LEADER_KEY);
    }
  });

  afterAll(async () => {
    // Close both Redis clients to prevent hanging
    if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect();
    if (ioredisClient?.status === 'ready') await ioredisClient.quit();
  });

  describe('Test Case 1: Simulate shutdown of the leader', () => {
    it('should elect a new leader after the current leader resigns', async () => {
      // Create 100 instances
      instances = Array.from({ length: 100 }, () => new LeaderElection());

      // Call isLeader on all instances and get leadership status
      const resultsWithInstances = await Promise.all(
        instances.map(async (instance) => ({
          instance,
          isLeader: await instance.isLeader(),
        })),
      );

      // Find leader and followers
      const leaders = resultsWithInstances.filter((r) => r.isLeader);
      const followers = resultsWithInstances.filter((r) => !r.isLeader);
      const leader = leaders[0].instance;
      const nextLeader = followers[0].instance;

      // Verify only one is leader
      expect(leaders.length).toBe(1);

      // Verify getLeaderUUID matches the leader's UUID
      expect(await LeaderElection.getLeaderUUID()).toBe(leader.UUID);

      // Leader resigns
      await leader.resign();

      // Verify getLeaderUUID returns null after resignation
      expect(await LeaderElection.getLeaderUUID()).toBeNull();

      // Next instance to call isLeader should become the new leader
      expect(await nextLeader.isLeader()).toBe(true);
    }, 30000); // 30 second timeout for 100 instances
  });

  describe('Test Case 2: Simulate crash of the leader', () => {
    it('should allow re-election after leader crashes (lease expires)', async () => {
      // Mock config with short lease duration
      const clusterConfigModule = await import('../config');
      const originalConfig = { ...clusterConfigModule.clusterConfig };

      // Override config values for this test
      Object.assign(clusterConfigModule.clusterConfig, {
        LEADER_LEASE_DURATION: 2,
        LEADER_RENEW_INTERVAL: 4,
      });

      try {
        // Create 1 instance with mocked config
        const instance = new LeaderElection();
        instances.push(instance);

        // Become leader
        expect(await instance.isLeader()).toBe(true);

        // Verify leader UUID is set
        expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID);

        // Simulate crash by clearing refresh timer
        instance.clearRefreshTimer();

        // The instance no longer considers itself leader even though it still holds the key
        expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID);
        expect(await instance.isLeader()).toBe(false);

        // Wait for lease to expire (3 seconds > 2 second lease)
        await new Promise((resolve) => setTimeout(resolve, 3000));

        // Verify leader UUID is null after lease expiration
        expect(await LeaderElection.getLeaderUUID()).toBeNull();
      } finally {
        // Restore original config values
        Object.assign(clusterConfigModule.clusterConfig, originalConfig);
      }
    }, 15000); // 15 second timeout
  });

  describe('Test Case 3: Stress testing', () => {
    it('should ensure only one instance becomes leader even when multiple instances call electSelf() at once', async () => {
      // Create 10 instances
      instances = Array.from({ length: 10 }, () => new LeaderElection());

      // Call electSelf on all instances in parallel
      const results = await Promise.all(instances.map((instance) => instance['electSelf']()));

      // Verify only one returned true
      const successCount = results.filter((success) => success).length;
      expect(successCount).toBe(1);

      // Find the winning instance
      const winnerInstance = instances.find((_, index) => results[index]);

      // Verify getLeaderUUID matches the winner's UUID
      expect(await LeaderElection.getLeaderUUID()).toBe(winnerInstance?.UUID);
    }, 15000); // 15 second timeout
  });
});

describe('LeaderElection without Redis', () => {
  let LeaderElection: typeof import('../LeaderElection').LeaderElection;
  let instances: InstanceType<typeof import('../LeaderElection').LeaderElection>[] = [];

  beforeAll(async () => {
    // Set up environment variables for non-Redis mode
    process.env.USE_REDIS = 'false';

    // Reset all modules to force re-evaluation with new env vars
    jest.resetModules();

    // Import modules after setting env vars and resetting modules
    const leaderElectionModule = await import('../LeaderElection');
    LeaderElection = leaderElectionModule.LeaderElection;
  });

  afterEach(async () => {
    await Promise.all(instances.map((instance) => instance.resign()));
    instances = [];
  });

  afterAll(() => {
    // Restore environment variables
    process.env.USE_REDIS = 'true';

    // Reset all modules to ensure next test runs get fresh imports
    jest.resetModules();
  });

  it('should allow all instances to be leaders when USE_REDIS is false', async () => {
    // Create 10 instances
    instances = Array.from({ length: 10 }, () => new LeaderElection());

    // Call isLeader on all instances
    const results = await Promise.all(instances.map((instance) => instance.isLeader()));

    // Verify all instances report themselves as leaders
    expect(results.every((isLeader) => isLeader)).toBe(true);
    expect(results.filter((isLeader) => isLeader).length).toBe(10);
  });

  it('should return null for getLeaderUUID when USE_REDIS is false', async () => {
    // Create a few instances
    instances = Array.from({ length: 3 }, () => new LeaderElection());

    // Call isLeader on all instances to make them "leaders"
    await Promise.all(instances.map((instance) => instance.isLeader()));

    // Verify getLeaderUUID returns null in non-Redis mode
    expect(await LeaderElection.getLeaderUUID()).toBeNull();
  });

  it('should allow resign() to be called without throwing errors', async () => {
    // Create multiple instances
    instances = Array.from({ length: 5 }, () => new LeaderElection());

    // Make them all leaders
    await Promise.all(instances.map((instance) => instance.isLeader()));

    // Call resign on all instances - should not throw
    await expect(
      Promise.all(instances.map((instance) => instance.resign())),
    ).resolves.not.toThrow();

    // Verify they're still leaders after resigning (since there's no shared state)
    const results = await Promise.all(instances.map((instance) => instance.isLeader()));
    expect(results.every((isLeader) => isLeader)).toBe(true);
  });
});