Skip to content

Commit 0e5dfc9

Browse files
committed
feat: emit events request:connecting, request:connected, request:body-writing, request:body-written, request:body-reading, request:body-read
1 parent 7a79ae4 commit 0e5dfc9

File tree

2 files changed

+72
-36
lines changed

2 files changed

+72
-36
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ pipeline(
3737
Get insights into the part downloads and write to file directly without stream if it is smaller than 1 TiB:
3838

3939
```js
40-
const {createWriteStream} = require('node:fs');
4140
const {download} = require('s3-getobject-accelerator');
4241

4342
const d = download({bucket: 'bucket', key: 'key', version: 'optional version'}, {partSizeInMegabytes: 8, concurrency: 4});

index.js

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ const {LRUCache} = require('lru-cache');
1111

1212
const EVENT_NAME_REQUEST_NAME_RESOLVING = 'request:name-resolving';
1313
const EVENT_NAME_REQUEST_NAME_RESOLVED = 'request:name-resolved';
14+
const EVENT_NAME_REQUEST_CONNECTING = 'request:connecting';
15+
const EVENT_NAME_REQUEST_CONNECTED = 'request:connected';
16+
const EVENT_NAME_REQUEST_BODY_WRITING = 'request:body-writing';
17+
const EVENT_NAME_REQUEST_BODY_WRITTEN = 'request:body-written';
18+
const EVENT_NAME_REQUEST_BODY_READING = 'request:body-reading';
19+
const EVENT_NAME_REQUEST_BODY_READ = 'request:body-read';
1420
const EVENT_NAME_REQUEST_RETRYING = 'request:retrying';
1521
const EVENT_NAME_OBJECT_DOWNLOADING = 'object:downloading';
1622
const EVENT_NAME_PART_DOWNLOADING = 'part:downloading';
@@ -21,6 +27,12 @@ const EVENT_NAME_PART_DONE = 'part:done';
2127
const EVENT_NAMES = [
2228
EVENT_NAME_REQUEST_NAME_RESOLVING,
2329
EVENT_NAME_REQUEST_NAME_RESOLVED,
30+
EVENT_NAME_REQUEST_CONNECTING,
31+
EVENT_NAME_REQUEST_CONNECTED,
32+
EVENT_NAME_REQUEST_BODY_WRITING,
33+
EVENT_NAME_REQUEST_BODY_WRITTEN,
34+
EVENT_NAME_REQUEST_BODY_READING,
35+
EVENT_NAME_REQUEST_BODY_READ,
2436
EVENT_NAME_REQUEST_RETRYING,
2537
EVENT_NAME_OBJECT_DOWNLOADING,
2638
EVENT_NAME_PART_DOWNLOADING,
@@ -83,13 +95,17 @@ const MAX_RETRY_DELAY_IN_SECONDS = 20;
8395
let imdsTokenCache = undefined;
8496
let imdsRegionCache = undefined;
8597
let imdsCredentialsCache = undefined;
98+
let lastRequestNo = 1;
99+
let lastDownloadNo = 1;
86100
const dnsCache = new LRUCache({max: 1000, ttl: DNS_RECORD_MAX_TTL_IN_SECONDS*1000});
87101
const resolverCache = new LRUCache({max: 100});
88102

89103
exports.clearCache = () => {
90104
imdsTokenCache = undefined;
91105
imdsRegionCache = undefined;
92106
imdsCredentialsCache = undefined;
107+
lastRequestNo = 1;
108+
lastDownloadNo = 1;
93109
dnsCache.clear();
94110
resolverCache.clear();
95111
};
@@ -125,6 +141,9 @@ function fetchResolver(timeoutInMilliseconds) {
125141

126142
function request(nodemodule, requestOptions, body, timeoutOptions, contextOptions, cb) {
127143
const resolver = fetchResolver(timeoutOptions.resolveTimeoutInMilliseconds);
144+
const requestNo = lastRequestNo++;
145+
const traceId = (contextOptions.traceId) ? `${contextOptions.traceId}:request=${requestNo}` : `request=${requestNo}`;
146+
128147
requestOptions.lookup = (hostname, options, cb) => {
129148
if (typeof options === 'function') {
130149
cb = options;
@@ -151,7 +170,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
151170
let record;
152171
while ((record = cache.shift()) !== undefined) {
153172
if (record.expiredAt > now) {
154-
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVED, {cached: true});
173+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVED, {traceId, hostname, family, address: record.address, cached: true});
155174
if (options.all === true) {
156175
return cb(null, [{address: record.address, family}]);
157176
} else {
@@ -162,7 +181,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
162181
dnsCache.delete(dnsCacheKey);
163182
}
164183

165-
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVING);
184+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVING, {traceId, hostname, family});
166185
resolver[fn](hostname, {ttl: true}, (err, records) => {
167186
if (err) {
168187
cb(err);
@@ -176,7 +195,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
176195
if (records.length > 1) {
177196
dnsCache.set(dnsCacheKey, records.slice(1).map(record => ({address: record.address, expiredAt: now+(Math.min(Math.max(record.ttl, DNS_RECORD_MAX_TTL_IN_SECONDS), DNS_RECORD_MIN_TTL_IN_SECONDS)*1000)})));
178197
}
179-
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVED, {cached: false});
198+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVED, {traceId, hostname, family, address: record.address, cached: false});
180199
if (options.all === true) {
181200
cb(null, [{address: record.address, family}]);
182201
} else {
@@ -205,7 +224,9 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
205224
clearTimeout(readTimeoutId);
206225
clearTimeout(writeTimeoutId);
207226
};
227+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_CONNECTING, {traceId, hostname: requestOptions.hostname, method: requestOptions.method, path: requestOptions.path});
208228
const req = nodemodule.request(requestOptions, (res) => {
229+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_CONNECTED, {traceId});
209230
if (timeoutOptions.readTimeoutInMilliseconds > 0) {
210231
readTimeoutId = setTimeout(() => {
211232
clearTimeouts();
@@ -225,6 +246,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
225246
}, timeoutOptions.dataTimeoutInMilliseconds);
226247
}
227248
};
249+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_BODY_READING, {traceId});
228250
resetDataTimeout();
229251
res.on('data', chunk => {
230252
resetDataTimeout();
@@ -237,6 +259,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
237259
}
238260
});
239261
res.on('end', () => {
262+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_BODY_READ, {traceId});
240263
clearTimeouts();
241264
if (cbcalled === false) {
242265
cbcalled = true;
@@ -271,7 +294,9 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
271294
req.destroy(new WriteTimeoutError());
272295
}, timeoutOptions.writeTimeoutInMilliseconds);
273296
}
297+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_BODY_WRITING, {traceId});
274298
req.write(body, () => {
299+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_BODY_WRITTEN, {traceId});
275300
clearTimeout(writeTimeoutId);
276301
});
277302
}
@@ -280,33 +305,33 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
280305
exports.request = request;
281306

282307
function retryrequest(nodemodule, requestOptions, body, retryOptions, timeoutOptions, contextOptions, cb) {
283-
let attempt = 1;
284-
const retry = (err) => {
285-
attempt++;
308+
const getTraceId = (attempt) => (contextOptions.traceId) ? `${contextOptions.traceId}:attempt=${attempt}` : `attempt=${attempt}`;
309+
const retry = (erredAttempt, err) => {
310+
const attempt = erredAttempt+1;
286311
if (attempt <= retryOptions.maxAttempts) {
287312
const maxRetryDelayInSeconds = ('maxRetryDelayInSeconds' in retryOptions) ? retryOptions.maxRetryDelayInSeconds : MAX_RETRY_DELAY_IN_SECONDS;
288313
const delayInMilliseconds = Math.min(Math.random() * Math.pow(2, attempt-1), maxRetryDelayInSeconds) * 1000;
289-
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_RETRYING, {attempt, delayInMilliseconds, err});
314+
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_RETRYING, {traceId: getTraceId(attempt), attempt, delayInMilliseconds, err});
290315
setTimeout(() => {
291316
if (requestOptions.signal) {
292317
if (requestOptions.signal.aborted === true) {
293318
cb(requestOptions.signal.reason);
294319
} else {
295-
req();
320+
req(attempt);
296321
}
297322
} else {
298-
req();
323+
req(attempt);
299324
}
300325
}, delayInMilliseconds);
301326
} else {
302327
cb(err);
303328
}
304329
};
305-
const req = () => {
306-
request(nodemodule, requestOptions, body, timeoutOptions, contextOptions, (err, res, body) => {
330+
const req = (attempt) => {
331+
request(nodemodule, requestOptions, body, timeoutOptions, {...contextOptions, traceId: getTraceId(attempt)}, (err, res, body) => {
307332
if (err) {
308333
if (RETRIABLE_NETWORK_ERROR_CODES.includes(err.code) || RETRIABLE_ERROR_NAMES.includes(err.name)) {
309-
retry(err);
334+
retry(attempt, err);
310335
} else {
311336
cb(err);
312337
}
@@ -316,7 +341,7 @@ function retryrequest(nodemodule, requestOptions, body, retryOptions, timeoutOpt
316341
const err = new Error(`status code: ${res.statusCode}\n${body.toString('utf8')}`);
317342
err.statusCode = res.statusCode;
318343
err.body = body;
319-
retry(err);
344+
retry(attempt, err);
320345
} else {
321346
let message = `status code: ${res.statusCode}`;
322347
if (res.headers['content-type']) {
@@ -325,15 +350,15 @@ function retryrequest(nodemodule, requestOptions, body, retryOptions, timeoutOpt
325350
const err = new Error(message);
326351
err.statusCode = res.statusCode;
327352
err.body = body;
328-
retry(err);
353+
retry(attempt, err);
329354
}
330355
} else {
331356
cb(null, res, body);
332357
}
333358
}
334359
});
335360
};
336-
req();
361+
req(1);
337362
}
338363
exports.retryrequest = retryrequest;
339364

@@ -537,7 +562,7 @@ function escapeKey(string) { // source https://github.com/aws/aws-sdk-js/blob/64
537562
});
538563
}
539564

540-
function getObject({Bucket, Key, VersionId, PartNumber, Range}, {requestTimeoutInMilliseconds, resolveTimeoutInMilliseconds, connectionTimeoutInMilliseconds, readTimeoutInMilliseconds, dataTimeoutInMilliseconds, writeTimeoutInMilliseconds, v2AwsSdkCredentials, endpointHostname, agent, emitter}, cb) {
565+
function getObject({Bucket, Key, VersionId, PartNumber, Range}, {v2AwsSdkCredentials, endpointHostname, agent}, retryOptions, timeoutOptions, contextOptions, cb) {
541566
const ac = new AbortController();
542567
const qs = {};
543568
const headers = {};
@@ -567,7 +592,7 @@ function getObject({Bucket, Key, VersionId, PartNumber, Range}, {requestTimeoutI
567592
signal: ac.signal,
568593
agent
569594
}, credentials);
570-
retryrequest(https, options, undefined, {maxAttempts: 5}, {requestTimeoutInMilliseconds, resolveTimeoutInMilliseconds, connectionTimeoutInMilliseconds, readTimeoutInMilliseconds, dataTimeoutInMilliseconds, writeTimeoutInMilliseconds}, {emitter}, (err, res, body) => {
595+
retryrequest(https, options, undefined, retryOptions, timeoutOptions, contextOptions, (err, res, body) => {
571596
if (err) {
572597
cb(err);
573598
} else {
@@ -672,6 +697,10 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
672697
}
673698
}
674699

700+
const retryOptions = {maxAttempts: 5};
701+
const timeoutOptions = {requestTimeoutInMilliseconds, resolveTimeoutInMilliseconds, connectionTimeoutInMilliseconds, readTimeoutInMilliseconds, dataTimeoutInMilliseconds, writeTimeoutInMilliseconds};
702+
const downloadNo = lastDownloadNo++;
703+
675704
const emitter = new EventEmitter();
676705
const partSizeInBytes = mapPartSizeInBytes(partSizeInMegabytes);
677706
let stream = null;
@@ -710,9 +739,17 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
710739
}
711740
}
712741

742+
function getTraceId(partNo) {
743+
if (partNo !== undefined) {
744+
return `download=${downloadNo}:part=${partNo}`;
745+
} else {
746+
return `download=${downloadNo}`;
747+
}
748+
}
749+
713750
function writePart(partNo, chunk, cb) {
714751
if (lastWrittenPartNo === (partNo-1)) {
715-
emitter.emit(EVENT_NAME_PART_WRITING, {partNo});
752+
emitter.emit(EVENT_NAME_PART_WRITING, {traceId: getTraceId(partNo), partNo});
716753
write(chunk, () => {
717754
lastWrittenPartNo = partNo;
718755
if (lastWrittenPartNo === partsToDownload) {
@@ -740,7 +777,7 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
740777
const endByte = Math.min(startByte+partSizeInBytes-1, bytesToDownload-1); // inclusive
741778
params.Range = `bytes=${startByte}-${endByte}`;
742779
}
743-
const req = getObject(params, {requestTimeoutInMilliseconds, resolveTimeoutInMilliseconds, connectionTimeoutInMilliseconds, readTimeoutInMilliseconds, dataTimeoutInMilliseconds, writeTimeoutInMilliseconds, v2AwsSdkCredentials, endpointHostname, agent, emitter}, (err, data) => {
780+
const req = getObject(params, {v2AwsSdkCredentials, endpointHostname, agent}, retryOptions, timeoutOptions, {emitter, traceId: getTraceId(partNo)}, (err, data) => {
744781
delete partsDownloading[partNo];
745782
if (err) {
746783
cb(err);
@@ -754,14 +791,14 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
754791
function downloadNextPart() {
755792
if (nextPartNo <= partsToDownload) {
756793
const partNo = nextPartNo++;
757-
emitter.emit(EVENT_NAME_PART_DOWNLOADING, {partNo});
794+
emitter.emit(EVENT_NAME_PART_DOWNLOADING, {traceId: getTraceId(partNo), partNo});
758795
downloadPart(partNo, (err, data) => {
759796
if (err) {
760797
abortDownloads(err);
761798
} else {
762-
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {partNo});
799+
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {traceId: getTraceId(partNo), partNo});
763800
writePart(partNo, data.Body, () => {
764-
emitter.emit(EVENT_NAME_PART_DONE, {partNo});
801+
emitter.emit(EVENT_NAME_PART_DONE, {traceId: getTraceId(partNo), partNo});
765802
process.nextTick(downloadNextPart);
766803
});
767804
}
@@ -791,7 +828,7 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
791828
const endByte = partSizeInBytes-1; // inclusive
792829
params.Range = `bytes=0-${endByte}`;
793830
}
794-
partsDownloading[1] = getObject(params, {requestTimeoutInMilliseconds, resolveTimeoutInMilliseconds, connectionTimeoutInMilliseconds, readTimeoutInMilliseconds, dataTimeoutInMilliseconds, writeTimeoutInMilliseconds, v2AwsSdkCredentials, endpointHostname, agent, emitter}, (err, data) => {
831+
partsDownloading[1] = getObject(params, {v2AwsSdkCredentials, endpointHostname, agent}, retryOptions, timeoutOptions, {emitter, traceId: `download=${downloadNo}:part=1`}, (err, data) => {
795832
delete partsDownloading[1];
796833
if (err) {
797834
if (err.code === 'InvalidRange') {
@@ -826,41 +863,41 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
826863
}
827864

828865
function start() {
829-
emitter.emit(EVENT_NAME_PART_DOWNLOADING, {partNo: 1});
866+
emitter.emit(EVENT_NAME_PART_DOWNLOADING, {traceId: getTraceId(1), partNo: 1});
830867
meta((err, metadata, body) => {
831868
if (err) {
832869
stream.destroy(err);
833870
} else {
834-
emitter.emit(EVENT_NAME_OBJECT_DOWNLOADING, metadata);
871+
emitter.emit(EVENT_NAME_OBJECT_DOWNLOADING, {traceId: getTraceId(), ...metadata});
835872
if (partSizeInBytes === null) {
836-
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {partNo: 1});
873+
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {traceId: getTraceId(1), partNo: 1});
837874
if ('parts' in metadata && metadata.parts > 1) {
838-
emitter.emit(EVENT_NAME_PART_WRITING, {partNo: 1});
875+
emitter.emit(EVENT_NAME_PART_WRITING, {traceId: getTraceId(1), partNo: 1});
839876
write(body, () => {
840-
emitter.emit(EVENT_NAME_PART_DONE, {partNo: 1});
877+
emitter.emit(EVENT_NAME_PART_DONE, {traceId: getTraceId(1), partNo: 1});
841878
lastWrittenPartNo = 1;
842879
nextPartNo = 2;
843880
partsToDownload = metadata.parts;
844881
startDownloadingParts();
845882
});
846883
} else {
847-
emitter.emit(EVENT_NAME_PART_WRITING, {partNo: 1});
884+
emitter.emit(EVENT_NAME_PART_WRITING, {traceId: getTraceId(1), partNo: 1});
848885
stream.end(body, () => {
849-
emitter.emit(EVENT_NAME_PART_DONE, {partNo: 1});
886+
emitter.emit(EVENT_NAME_PART_DONE, {traceId: getTraceId(1), partNo: 1});
850887
});
851888
}
852889
} else {
853-
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {partNo: 1});
890+
emitter.emit(EVENT_NAME_PART_DOWNLOADED, {traceId: getTraceId(1), partNo: 1});
854891
bytesToDownload = metadata.lengthInBytes;
855892
if (bytesToDownload <= partSizeInBytes) {
856-
emitter.emit(EVENT_NAME_PART_WRITING, {partNo: 1});
893+
emitter.emit(EVENT_NAME_PART_WRITING, {traceId: getTraceId(1), partNo: 1});
857894
stream.end(body, () => {
858-
emitter.emit(EVENT_NAME_PART_DONE, {partNo: 1});
895+
emitter.emit(EVENT_NAME_PART_DONE, {traceId: getTraceId(1), partNo: 1});
859896
});
860897
} else {
861-
emitter.emit(EVENT_NAME_PART_WRITING, {partNo: 1});
898+
emitter.emit(EVENT_NAME_PART_WRITING, {traceId: getTraceId(1), partNo: 1});
862899
write(body, () => {
863-
emitter.emit(EVENT_NAME_PART_DONE, {partNo: 1});
900+
emitter.emit(EVENT_NAME_PART_DONE, {traceId: getTraceId(1), partNo: 1});
864901
lastWrittenPartNo = 1;
865902
nextPartNo = 2;
866903
partsToDownload = Math.ceil(bytesToDownload/partSizeInBytes);

0 commit comments

Comments
 (0)