Skip to content

Commit 85933cc

Browse files
authored
Merge pull request #6 from AKST/removes-end-listeners
Removes end event listens
2 parents 4b6f114 + 76208aa commit 85933cc

File tree

1 file changed

+62
-11
lines changed

1 file changed

+62
-11
lines changed

lib/stream-to-async-iterator.js

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ export const states = {
1111
errored: Symbol('errored'),
1212
};
1313

14+
/*
15+
* A contract for a promise that requires a clean up
16+
* function be called after the promise finishes.
17+
*/
18+
type PromiseWithCleanUp<T> = {
19+
promise: Promise<T>,
20+
cleanup: () => void,
21+
}
22+
1423
/**
1524
* @typedef {Object} StreamAsyncToIterator~Options
1625
* @property {number} [size] - the size of each read from the stream for each iteration
@@ -106,9 +115,22 @@ export default class StreamAsyncToIterator {
106115
*/
107116
async next(): Promise<Iteration> {
108117
if (this._state === states.notReadable) {
118+
const read = this._untilReadable();
119+
const end = this._untilEnd();
120+
109121
//need to wait until the stream is readable or ended
110-
await Promise.race([this._untilReadable(), this._untilEnd()]);
111-
return this.next();
122+
try {
123+
await Promise.race([read.promise, end.promise]);
124+
return this.next();
125+
}
126+
catch (e) {
127+
throw e
128+
}
129+
finally {
130+
//need to clean up any hanging event listeners
131+
read.cleanup()
132+
end.cleanup()
133+
}
112134
} else if (this._state === states.ended) {
113135
return {done: true, value: null};
114136
} else if (this._state === states.errored) {
@@ -133,34 +155,63 @@ export default class StreamAsyncToIterator {
133155
* @private
134156
* @returns {Promise}
135157
*/
136-
_untilReadable(): Promise<void> {
137-
return new Promise((resolve, reject) => {
138-
const handleReadable = () => {
158+
_untilReadable(): PromiseWithCleanUp<void> {
159+
//let is used here instead of const because the exact reference is
160+
//required to remove it, this is why it is not a curried function that
161+
//accepts resolve & reject as parameters.
162+
let eventListener = null;
163+
164+
const promise = new Promise((resolve, reject) => {
165+
eventListener = () => {
139166
this._state = states.readable;
140167
this._rejections.delete(reject);
168+
169+
// we set this to null to info the clean up not to do anything
170+
eventListener = null;
141171
resolve();
142172
};
143173

144-
this._stream.once('readable', handleReadable);
174+
//on is used here instead of once, because
175+
//the listener is remove afterwards anyways.
176+
this._stream.once('readable', eventListener);
145177
this._rejections.add(reject);
146178
});
179+
180+
const cleanup = () => {
181+
if (eventListener == null) return;
182+
this._stream.removeListener('readable', eventListener);
183+
};
184+
185+
return { cleanup, promise }
147186
}
148187

149188
/**
150189
* Waits until the stream is ended. Rejects if the stream errored out.
151190
* @private
152191
* @returns {Promise}
153192
*/
154-
_untilEnd(): Promise<void> {
155-
return new Promise((resolve, reject) => {
156-
const handleEnd = () => {
193+
_untilEnd(): PromiseWithCleanUp<void> {
194+
let eventListener = null;
195+
196+
const promise = new Promise((resolve, reject) => {
197+
eventListener = () => {
157198
this._state = states.ended;
158199
this._rejections.delete(reject);
200+
201+
eventListener = null
159202
resolve();
160203
};
161-
this._stream.once('end', handleEnd);
204+
205+
this._stream.once('end', eventListener);
162206
this._rejections.add(reject);
163-
})
207+
});
208+
209+
const cleanup = () => {
210+
if (eventListener == null) return;
211+
this._stream.removeListener('end', eventListener);
212+
};
213+
214+
return { cleanup, promise }
164215
}
165216
}
166217

0 commit comments

Comments
 (0)