Skip to content

Commit 8be9b96

Browse files
Boris Dorofeevnodkz
authored andcommitted
fix: single connection for queue-events
1 parent fc16802 commit 8be9b96

File tree

3 files changed

+15
-110
lines changed

3 files changed

+15
-110
lines changed

src/composeBull.ts

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ export function composeBull(
6565
const wrapMutation = composeFC(sc, opts)(wrapMutationFC, wrapQueueArgs);
6666
const wrapSubscription = composeFC(sc, opts)(wrapQueueSubsArgs);
6767

68-
const data = {
68+
const data: ComposeBullResult = {
6969
QueueTC: getQueueTC(sc, opts),
7070
JobTC: getJobTC(sc, opts),
7171
queryFields: {
@@ -96,22 +96,19 @@ export function composeBull(
9696
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
9797
queuePepUp: wrapMutation(createQueuePepUpFC),
9898
},
99-
} as ComposeBullResult;
100-
101-
//if (opts?.redisEvents) {
102-
data.subscriptionFields = {
103-
onJobActive: wrapSubscription(createOnJobActiveFC),
104-
onJobCompleted: wrapSubscription(createOnJobCompletedFC),
105-
onJobDelayed: wrapSubscription(createOnJobDelayedFC),
106-
onJobFailed: wrapSubscription(createOnJobFailedFC),
107-
onJobProgress: wrapSubscription(createOnJobProgressFC),
108-
onJobRemoved: wrapSubscription(createOnJobRemovedFC),
109-
onJobStalled: wrapSubscription(createOnJobStalledFC),
110-
onJobWaiting: wrapSubscription(createOnJobWaitingFC),
111-
onQueuePaused: wrapSubscription(createOnQueuePausedFC),
112-
onQueueResumed: wrapSubscription(createOnQueueResumedFC),
99+
subscriptionFields: {
100+
onJobActive: wrapSubscription(createOnJobActiveFC),
101+
onJobCompleted: wrapSubscription(createOnJobCompletedFC),
102+
onJobDelayed: wrapSubscription(createOnJobDelayedFC),
103+
onJobFailed: wrapSubscription(createOnJobFailedFC),
104+
onJobProgress: wrapSubscription(createOnJobProgressFC),
105+
onJobRemoved: wrapSubscription(createOnJobRemovedFC),
106+
onJobStalled: wrapSubscription(createOnJobStalledFC),
107+
onJobWaiting: wrapSubscription(createOnJobWaitingFC),
108+
onQueuePaused: wrapSubscription(createOnQueuePausedFC),
109+
onQueueResumed: wrapSubscription(createOnQueueResumedFC),
110+
},
113111
};
114-
//}
115112

116113
return data;
117114
}

src/helpers/queueEventsListen.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { QueueEvents } from 'bullmq';
22
import { Options } from '../definitions';
3+
import { getBullConnection } from './getBullConnection';
34

45
export function getAsyncIterator(
56
prefix: string,
@@ -22,7 +23,7 @@ function getQueueEventsSingleton(prefix: string, queueName: string, opts: Option
2223

2324
const queueEvents = new QueueEvents(queueName, {
2425
prefix,
25-
connection: opts?.redisEvents,
26+
connection: getBullConnection(opts),
2627
});
2728

2829
queueEventsMap.set(fullName, queueEvents);

src/testAsyncIt.ts

Lines changed: 0 additions & 93 deletions
This file was deleted.

0 commit comments

Comments
 (0)