diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 472ab9b22..24776cfc7 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -178,6 +178,10 @@ jobs: - 6379:6379 ircd: image: ghcr.io/ergochat/ergo:stable + env: + ERGO__SERVER__IP_LIMITS_COUNT: "false" + ERGO__SERVER__IP_LIMITS_THROTTLE: "false" + ERGO__SERVER__IP_LIMITS_MAX_CONNECTIONS_PER_WINDOW: "32000" ports: - 6667:6667 steps: diff --git a/spec/e2e/scaling.spec.ts b/spec/e2e/scaling.spec.ts new file mode 100644 index 000000000..391b43bfa --- /dev/null +++ b/spec/e2e/scaling.spec.ts @@ -0,0 +1,112 @@ +import { ChanData, TestIrcServer } from "matrix-org-irc"; +import { IrcBridgeE2ETest } from "../util/e2e-test"; +import { describe, it, expect } from "@jest/globals"; +import { delay } from "../../src/promiseutil"; + +function createUserSet(count: number) { + const localparts: string[] = []; + for (let index = 0; index < count; index++) { + localparts.push(TestIrcServer.generateUniqueNick(`alice-c${index}`)); + } + return localparts; +} + +describe('Bridge scaling test', () => { + let testEnv: IrcBridgeE2ETest; + beforeEach(async () => { + testEnv = await IrcBridgeE2ETest.createTestEnv({ + matrixLocalparts: [TestIrcServer.generateUniqueNick("alice")], + matrixSynclessLocalparts: createUserSet(80), + ircNicks: ['bob'], + traceToFile: true, + }); + await testEnv.setUp(); + }); + afterEach(() => { + return testEnv?.tearDown(); + }); + it('should be able to connect many users to a single channel', async () => { + const channel = `#${TestIrcServer.generateUniqueNick("test")}`; + const { homeserver } = testEnv; + const alice = homeserver.users[0].client; + const { bob } = testEnv.ircTest.clients; + + // Create the channel + await bob.join(channel); + + const adminRoomId = await testEnv.createAdminRoomHelper(alice); + const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel); + + // And finally wait for bob to appear. + const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`; + await alice.waitForRoomEvent( + {eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId} + ); + + // Have all the Matrix users join + const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart)) + for (const mxUser of usersToJoin) { + await mxUser.client.joinRoom(cRoomId); + } + + // We now need to wait for all the expected joins on the IRC side. + const chanData = bob.chanData(channel, false); + if (!chanData) { + throw Error('Expected to have channel data for channel'); + } + + do { + await delay(500); + } while (chanData?.users.size < homeserver.users.length) + + // Now check that all the users joined. + for (const mxUser of usersToJoin) { + expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`) + } + }, 100_000); + + it('should be able to sync many users on startup', async () => { + const channel = `#${TestIrcServer.generateUniqueNick("test")}`; + const { homeserver } = testEnv; + const alice = homeserver.users[0].client; + const { bob } = testEnv.ircTest.clients; + + // Create the channel + await bob.join(channel); + + const adminRoomId = await testEnv.createAdminRoomHelper(alice); + const cRoomId = await testEnv.joinChannelHelper(alice, adminRoomId, channel); + + // And finally wait for bob to appear. + const bobUserId = `@irc_${bob.nick}:${homeserver.domain}`; + await alice.waitForRoomEvent( + {eventType: 'm.room.member', sender: bobUserId, stateKey: bobUserId, roomId: cRoomId} + ); + + // Have all the Matrix users join + const usersToJoin = homeserver.users.filter(u => testEnv.opts.matrixSynclessLocalparts?.includes(u.localpart)) + for (const mxUser of usersToJoin) { + await mxUser.client.joinRoom(cRoomId); + } + + // Now kill the bridge + await testEnv.recreateBridge(); + await testEnv.setUp(); + + + // We now need to wait for all the expected joins on the IRC side. + const chanData = bob.chanData(channel, false); + if (!chanData) { + throw Error('Expected to have channel data for channel'); + } + + do { + await delay(500); + } while (chanData?.users.size < homeserver.users.length) + + // Now check that all the users joined. + for (const mxUser of usersToJoin) { + expect(chanData.users.keys()).toContain(`M-${mxUser.localpart}`) + } + }, 100_000); +}); diff --git a/spec/util/e2e-test.ts b/spec/util/e2e-test.ts index a94fcbff5..84d3e4d30 100644 --- a/spec/util/e2e-test.ts +++ b/spec/util/e2e-test.ts @@ -24,6 +24,7 @@ const IRCBRIDGE_TEST_REDIS_URL = process.env.IRCBRIDGE_TEST_REDIS_URL; interface Opts { matrixLocalparts?: string[]; + matrixSynclessLocalparts?: string[]; ircNicks?: string[]; timeout?: number; config?: Partial, @@ -181,11 +182,11 @@ export class IrcBridgeE2ETest { } const workerID = parseInt(process.env.JEST_WORKER_ID ?? '0'); - const { matrixLocalparts, config } = opts; + const { matrixLocalparts, matrixSynclessLocalparts, config } = opts; const ircTest = new TestIrcServer(); const [postgresDb, homeserver] = await Promise.all([ this.createDatabase(), - createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID), + createHS(["ircbridge_bot", ...matrixLocalparts || []], workerID, matrixSynclessLocalparts), ircTest.setUp(opts.ircNicks), ]); const redisUri = IRCBRIDGE_TEST_REDIS_URL && `${IRCBRIDGE_TEST_REDIS_URL}/${workerID}`; @@ -246,6 +247,11 @@ export class IrcBridgeE2ETest { displayName: "$NICK", joinAttempts: 3, }, + ircClients: { + ...IrcServer.DEFAULT_CONFIG.ircClients, + // Set a sensibly high max. + maxClients: 100_000, + }, dynamicChannels: { enabled: true, createAlias: true, @@ -256,7 +262,7 @@ export class IrcBridgeE2ETest { }, membershipLists: { enabled: true, - floodDelayMs: 100, + floodDelayMs: 0, global: { ircToMatrix: { incremental: true, diff --git a/spec/util/homerunner.ts b/spec/util/homerunner.ts index 5837a8e6c..8c0617c9d 100644 --- a/spec/util/homerunner.ts +++ b/spec/util/homerunner.ts @@ -22,7 +22,7 @@ export interface ComplementHomeServer { hsToken: string; senderLocalpart: string; }; - users: {userId: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[] + users: {userId: string, localpart: string, accessToken: string, deviceId: string, client: E2ETestMatrixClient}[] } // Ensure we don't clash with other tests. @@ -50,7 +50,9 @@ async function waitForHomerunner() { } } -export async function createHS(localparts: string[] = [], workerId: number): Promise { +export async function createHS( + localparts: string[], workerId: number, localpartsNoSync: string[] = [] +): Promise { const appPort = 49152 + workerId; await waitForHomerunner(); // Ensure we never use the same port twice. @@ -80,20 +82,24 @@ export async function createHS(localparts: string[] = [], workerId: number): Pro ...asRegistration, URL: `http://host.docker.internal:${AppserviceConfig.port}`, }], - Users: localparts.map(localpart => ({Localpart: localpart, DisplayName: localpart})), + Users: [ + ...localparts, + ...localpartsNoSync + ].map(localpart => ({Localpart: localpart, DisplayName: localpart})), }], } }); const [homeserverName, homeserver] = Object.entries(blueprintResponse.homeservers)[0]; const users = Object.entries(homeserver.AccessTokens).map(([userId, accessToken]) => ({ userId: userId, + localpart: userId.slice(1).split(':', 2)[0], accessToken, deviceId: homeserver.DeviceIDs[userId], client: new E2ETestMatrixClient(homeserver.BaseURL, accessToken), })); // Start syncing proactively. - await Promise.all(users.map(u => u.client.start())); + await Promise.all(users.filter(u => localparts.includes(u.localpart)).map(u => u.client.start())); return { users, id: blueprint, diff --git a/src/util/Queue.ts b/src/util/Queue.ts index ea7424713..6176d67d3 100644 --- a/src/util/Queue.ts +++ b/src/util/Queue.ts @@ -108,7 +108,9 @@ export class Queue { item: thing, defer: defer }); - if (!this.intervalMs) { + + // If we are not currently processing anything, consume immediately. + if (!this.intervalMs || this.processing === null) { // always process stuff asyncly, never syncly. process.nextTick(() => { this.consume();