From e8c85ebeecf4e233efc0b6d28849c33891992783 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 18:19:04 +0100 Subject: [PATCH 1/5] Disable ergo throttle defaults --- .github/workflows/e2e-test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 472ab9b22..24776cfc7 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -178,6 +178,10 @@ jobs: - 6379:6379 ircd: image: ghcr.io/ergochat/ergo:stable + env: + ERGO__SERVER__IP_LIMITS_COUNT: "false" + ERGO__SERVER__IP_LIMITS_THROTTLE: "false" + ERGO__SERVER__IP_LIMITS_MAX_CONNECTIONS_PER_WINDOW: "32000" ports: - 6667:6667 steps: From e9a47d2eb32d6bb99eb99159fed18dd57b12c9fe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 18:19:22 +0100 Subject: [PATCH 2/5] Add a test for checking many joins to a channel --- spec/e2e/scaling.spec.ts | 72 ++++++++++++++++++++++++++++++++++++++++ spec/util/e2e-test.ts | 7 ++-- spec/util/homerunner.ts | 14 +++++--- 3 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 spec/e2e/scaling.spec.ts diff --git a/spec/e2e/scaling.spec.ts b/spec/e2e/scaling.spec.ts new file mode 100644 index 000000000..731b43ad9 --- /dev/null +++ b/spec/e2e/scaling.spec.ts @@ -0,0 +1,72 @@ +import { TestIrcServer } from "matrix-org-irc"; +import { IrcBridgeE2ETest } from "../util/e2e-test"; +import { describe, it, expect } from "@jest/globals"; +import { delay } from "../../src/promiseutil"; + +function createUserSet(count: number) { + const localparts: string[] = []; + for (let index = 0; index < count; index++) { + localparts.push(TestIrcServer.generateUniqueNick(`alice-c${index}`)); + } + return localparts; +} + +describe('Bridge scaling test', () => { + let testEnv: IrcBridgeE2ETest; + beforeEach(async () => { + testEnv = await IrcBridgeE2ETest.createTestEnv({ + matrixLocalparts: [TestIrcServer.generateUniqueNick("alice")], + matrixSynclessLocalparts: createUserSet(80), + ircNicks: ['bob'], + traceToFile: true, + }); + await testEnv.setUp(); + }); + afterEach(() => { + return testEnv?.tearDown(); + }); + it('should be able to connect many users to a single channel', async () => { + const channel = `#${TestIrcServer.generateUniqueNick("test")}`; + const { homeserver } = testEnv; + const alice = homeserver.users[0].client; + const { bob } = testEnv.ircTest.clients; + + // Create the channel + await bob.join(channel); + + const adminRoomId = await testEnv.createAdminRoomHelper(alice); + const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel); + + // And finally wait for bob to appear. + const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`; + await alice.waitForRoomEvent( + {eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId} + ); + + // Send some messages + const aliceMsg = bob.waitForEvent('message', 10000); + const bobMsg = alice.waitForRoomEvent( + {eventType: 'm.room.message', sender: bobUserId, roomId: cRoomId} + ); + alice.sendText(cRoomId, "Hello bob!"); + await aliceMsg; + bob.say(channel, "Hi alice!"); + await bobMsg; + + const ircJoins: {channel: string, nick: string}[] = []; + bob.on('join', (joinChannel, nick) => ircJoins.push({channel: joinChannel, nick})); + + // Have all the Matrix users join + const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart)) + for (const mxUser of usersToJoin) { + await mxUser.client.joinRoom(cRoomId); + } + while (ircJoins.length < usersToJoin.length) { + await delay(2000); + } + // Now check that all the users joined. + for (const mxUser of usersToJoin) { + expect(ircJoins).toContainEqual({ channel, nick: `M-${mxUser.localpart}`}); + } + }, 100_000); +}); diff --git a/spec/util/e2e-test.ts b/spec/util/e2e-test.ts index a94fcbff5..e0463d6f9 100644 --- a/spec/util/e2e-test.ts +++ b/spec/util/e2e-test.ts @@ -24,6 +24,7 @@ const IRCBRIDGE_TEST_REDIS_URL = process.env.IRCBRIDGE_TEST_REDIS_URL; interface Opts { matrixLocalparts?: string[]; + matrixSynclessLocalparts?: string[]; ircNicks?: string[]; timeout?: number; config?: Partial, @@ -181,11 +182,11 @@ export class IrcBridgeE2ETest { } const workerID = parseInt(process.env.JEST_WORKER_ID ?? '0'); - const { matrixLocalparts, config } = opts; + const { matrixLocalparts, matrixSynclessLocalparts, config } = opts; const ircTest = new TestIrcServer(); const [postgresDb, homeserver] = await Promise.all([ this.createDatabase(), - createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID), + createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID, matrixSynclessLocalparts), ircTest.setUp(opts.ircNicks), ]); const redisUri = IRCBRIDGE_TEST_REDIS_URL && `${IRCBRIDGE_TEST_REDIS_URL}/${workerID}`; @@ -256,7 +257,7 @@ export class IrcBridgeE2ETest { }, membershipLists: { enabled: true, - floodDelayMs: 100, + floodDelayMs: 0, global: { ircToMatrix: { incremental: true, diff --git a/spec/util/homerunner.ts b/spec/util/homerunner.ts index 5837a8e6c..8c0617c9d 100644 --- a/spec/util/homerunner.ts +++ b/spec/util/homerunner.ts @@ -22,7 +22,7 @@ export interface ComplementHomeServer { hsToken: string; senderLocalpart: string; }; - users: {userId: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[] + users: {userId: string, localpart: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[] } // Ensure we don't clash with other tests. @@ -50,7 +50,9 @@ async function waitForHomerunner() { } } -export async function createHS(localparts: string[] = [], workerId: number): Promise { +export async function createHS( + localparts: string[], workerId: number, localpartsNoSync: string[] = [] +): Promise { const appPort = 49152 + workerId; await waitForHomerunner(); // Ensure we never use the same port twice. @@ -80,20 +82,24 @@ export async function createHS(localparts: string[] = [], workerId: number): Pro ...asRegistration, URL: `http://host.docker.internal:${AppserviceConfig.port}`, }], - Users: localparts.map(localpart => ({Localpart: localpart, DisplayName: localpart})), + Users: [ + ...localparts, + ...localpartsNoSync + ].map(localpart => ({Localpart: localpart, DisplayName: localpart})), }], } }); const [homeserverName, homeserver] = Object.entries(blueprintResponse.homeservers)[0]; const users = Object.entries(homeserver.AccessTokens).map(([userId, accessToken]) => ({ userId: userId, + localpart: userId.slice(1).split(':', 2)[0], accessToken, deviceId: homeserver.DeviceIDs[userId], client: new E2ETestMatrixClient(homeserver.BaseURL, accessToken), })); // Start syncing proactively. - await Promise.all(users.map(u => u.client.start())); + await Promise.all(users.filter(u => localparts.includes(u.localpart)).map(u => u.client.start())); return { users, id: blueprint, From a8e2b51c60ed274bb151e9d16f3f55c6deff4e0b Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 18:19:58 +0100 Subject: [PATCH 3/5] Run the queue immediately if it is not processing anything --- src/util/Queue.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/util/Queue.ts b/src/util/Queue.ts index ea7424713..6176d67d3 100644 --- a/src/util/Queue.ts +++ b/src/util/Queue.ts @@ -108,7 +108,9 @@ export class Queue { item: thing, defer: defer }); - if (!this.intervalMs) { + + // If we are not currently processing anything, consume immediately. + if (!this.intervalMs || this.processing === null) { // always process stuff asyncly, never syncly. process.nextTick(() => { this.consume(); From 2ff8cd0aa9b1871ddbb74eb07de233abe35a7f46 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 18:23:54 +0100 Subject: [PATCH 4/5] Add some comments --- spec/e2e/scaling.spec.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spec/e2e/scaling.spec.ts b/spec/e2e/scaling.spec.ts index 731b43ad9..68eafba1a 100644 --- a/spec/e2e/scaling.spec.ts +++ b/spec/e2e/scaling.spec.ts @@ -53,6 +53,7 @@ describe('Bridge scaling test', () => { bob.say(channel, "Hi alice!"); await bobMsg; + // Track all the joins that we see. const ircJoins: {channel: string, nick: string}[] = []; bob.on('join', (joinChannel, nick) => ircJoins.push({channel: joinChannel, nick})); @@ -61,9 +62,12 @@ describe('Bridge scaling test', () => { for (const mxUser of usersToJoin) { await mxUser.client.joinRoom(cRoomId); } + + // We now need to wait for all the expected joins on the IRC side. while (ircJoins.length < usersToJoin.length) { await delay(2000); } + // Now check that all the users joined. for (const mxUser of usersToJoin) { expect(ircJoins).toContainEqual({ channel, nick: `M-${mxUser.localpart}`}); From b23eb87fffd6947663613244b9ab147950bbbdbb Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 18:42:45 +0100 Subject: [PATCH 5/5] More sync testing --- spec/e2e/scaling.spec.ts | 68 ++++++++++++++++++++++++++++++---------- spec/util/e2e-test.ts | 5 +++ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/spec/e2e/scaling.spec.ts b/spec/e2e/scaling.spec.ts index 68eafba1a..391b43bfa 100644 --- a/spec/e2e/scaling.spec.ts +++ b/spec/e2e/scaling.spec.ts @@ -1,4 +1,4 @@ -import { TestIrcServer } from "matrix-org-irc"; +import { ChanData, TestIrcServer } from "matrix-org-irc"; import { IrcBridgeE2ETest } from "../util/e2e-test"; import { describe, it, expect } from "@jest/globals"; import { delay } from "../../src/promiseutil"; @@ -43,19 +43,45 @@ describe('Bridge scaling test', () => { {eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId} ); - // Send some messages - const aliceMsg = bob.waitForEvent('message', 10000); - const bobMsg = alice.waitForRoomEvent( - {eventType: 'm.room.message', sender: bobUserId, roomId: cRoomId} - ); - alice.sendText(cRoomId, "Hello bob!"); - await aliceMsg; - bob.say(channel, "Hi alice!"); - await bobMsg; + // Have all the Matrix users join + const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart)) + for (const mxUser of usersToJoin) { + await mxUser.client.joinRoom(cRoomId); + } + + // We now need to wait for all the expected joins on the IRC side. + const chanData = bob.chanData(channel, false); + if (!chanData) { + throw Error('Expected to have channel data for channel'); + } - // Track all the joins that we see. - const ircJoins: {channel: string, nick: string}[] = []; - bob.on('join', (joinChannel, nick) => ircJoins.push({channel: joinChannel, nick})); + do { + await delay(500); + } while (chanData?.users.size < homeserver.users.length) + + // Now check that all the users joined. + for (const mxUser of usersToJoin) { + expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`) + } + }, 100_000); + + it('should be able to sync many users on startup', async () => { + const channel = `#${TestIrcServer.generateUniqueNick("test")}`; + const { homeserver } = testEnv; + const alice = homeserver.users[0].client; + const { bob } = testEnv.ircTest.clients; + + // Create the channel + await bob.join(channel); + + const adminRoomId = await testEnv.createAdminRoomHelper(alice); + const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel); + + // And finally wait for bob to appear. + const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`; + await alice.waitForRoomEvent( + {eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId} + ); // Have all the Matrix users join const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart)) @@ -63,14 +89,24 @@ describe('Bridge scaling test', () => { await mxUser.client.joinRoom(cRoomId); } + // Now kill the bridge + await testEnv.recreateBridge(); + await testEnv.setUp(); + + // We now need to wait for all the expected joins on the IRC side. - while (ircJoins.length < usersToJoin.length) { - await delay(2000); + const chanData = bob.chanData(channel, false); + if (!chanData) { + throw Error('Expected to have channel data for channel'); } + do { + await delay(500); + } while (chanData?.users.size < homeserver.users.length) + // Now check that all the users joined. for (const mxUser of usersToJoin) { - expect(ircJoins).toContainEqual({ channel, nick: `M-${mxUser.localpart}`}); + expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`) } }, 100_000); }); diff --git a/spec/util/e2e-test.ts b/spec/util/e2e-test.ts index e0463d6f9..84d3e4d30 100644 --- a/spec/util/e2e-test.ts +++ b/spec/util/e2e-test.ts @@ -247,6 +247,11 @@ export class IrcBridgeE2ETest { displayName: "$NICK", joinAttempts: 3, }, + ircClients: { + ...IrcServer.DEFAULT_CONFIG.ircClients, + // Set a sensibly high max. + maxClients: 100_000, + }, dynamicChannels: { enabled: true, createAlias: true,