Skip to content

Commit fee8f16

Browse files
committed
reverted consumer function changes
1 parent 931dfd8 commit fee8f16

File tree

6 files changed

+84
-201
lines changed

6 files changed

+84
-201
lines changed

BlockBlobReader/src/consumer.js

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -227,40 +227,6 @@ function getBlockBlobService(context, task) {
227227
}
228228
})};
229229

230-
function getMessageSize(msg) {
231-
// increasing one to accommodate \n
232-
return 1 + Buffer.byteLength(JSON.stringify(msg), 'utf8');
233-
}
234-
235-
function getSplittedArray(messageArray,context){
236-
237-
let allChunks = [];
238-
let currentChunkSize = 0;
239-
let currentChunk = [];
240-
const maxChunkSize = 1*1024*1024; // 1MB
241-
for (const msg of messageArray) {
242-
let currentMsgSize = getMessageSize(msg);
243-
if (currentMsgSize > maxChunkSize) {
244-
context.log(`Warning: Ignoring msg of size: ${currentMsgSize} > maxChunkSize`)
245-
continue;
246-
}
247-
if (currentMsgSize + currentChunkSize > maxChunkSize) {
248-
allChunks.push(currentChunk);
249-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
250-
currentChunk = [msg];
251-
currentChunkSize = currentMsgSize;
252-
} else {
253-
currentChunk.push(msg);
254-
currentChunkSize += currentMsgSize;
255-
}
256-
}
257-
if (currentChunk.length > 0) {
258-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
259-
allChunks.push(currentChunk);
260-
}
261-
return allChunks;
262-
}
263-
264230
function messageHandler(serviceBusTask, context, sumoClient) {
265231
var file_ext = serviceBusTask.blobName.split(".").pop();
266232
if (file_ext == serviceBusTask.blobName) {
@@ -305,26 +271,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
305271
});
306272
} else {
307273
messageArray = msghandler[file_ext](context,msg);
308-
var splittedArrays = getSplittedArray(messageArray,context);
309-
context.log("No of chunks created: "+splittedArrays.length);
310-
for(splitArray of splittedArrays){
311-
splitArray.forEach(function(msg){
312-
sumoClient.addData(msg)
313-
})
314-
await sumoClient.flushAll();
315-
}
316-
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
317-
if (sumoClient.messagesFailed > 0) {
318-
context.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
319-
} else {
320-
context.log('Exiting now.');
321-
context.done();
322-
}
323-
}else{
324-
context.log("Messages Attempted: " + sumoClient.messagesAttempted);
325-
context.log("Messages Received: " + sumoClient.messagesReceived);
326-
context.done();
327-
}
274+
messageArray.forEach(function (msg) {
275+
sumoClient.addData(msg);
276+
});
277+
sumoClient.flushAll();
328278
}
329279
});
330280
}).catch(function (err) {
@@ -358,10 +308,21 @@ function servicebushandler(context, serviceBusTask) {
358308
};
359309
setSourceCategory(serviceBusTask, options);
360310
function failureHandler(msgArray, ctx) {
361-
//ctx.log("Failed to send to Sumo");
311+
ctx.log("Failed to send to Sumo");
312+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
313+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
314+
}
362315
}
363316
function successHandler(ctx) {
364-
//ctx.log('Successfully sent to Sumo');
317+
ctx.log('Successfully sent to Sumo', serviceBusTask);
318+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
319+
if (sumoClient.messagesFailed > 0) {
320+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
321+
} else {
322+
ctx.log('Exiting now.');
323+
ctx.done();
324+
}
325+
}
365326
}
366327

367328
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);

BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -227,40 +227,6 @@ function getBlockBlobService(context, task) {
227227
}
228228
})};
229229

230-
function getMessageSize(msg) {
231-
// increasing one to accommodate \n
232-
return 1 + Buffer.byteLength(JSON.stringify(msg), 'utf8');
233-
}
234-
235-
function getSplittedArray(messageArray,context){
236-
237-
let allChunks = [];
238-
let currentChunkSize = 0;
239-
let currentChunk = [];
240-
const maxChunkSize = 1*1024*1024; // 1MB
241-
for (const msg of messageArray) {
242-
let currentMsgSize = getMessageSize(msg);
243-
if (currentMsgSize > maxChunkSize) {
244-
context.log(`Warning: Ignoring msg of size: ${currentMsgSize} > maxChunkSize`)
245-
continue;
246-
}
247-
if (currentMsgSize + currentChunkSize > maxChunkSize) {
248-
allChunks.push(currentChunk);
249-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
250-
currentChunk = [msg];
251-
currentChunkSize = currentMsgSize;
252-
} else {
253-
currentChunk.push(msg);
254-
currentChunkSize += currentMsgSize;
255-
}
256-
}
257-
if (currentChunk.length > 0) {
258-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
259-
allChunks.push(currentChunk);
260-
}
261-
return allChunks;
262-
}
263-
264230
function messageHandler(serviceBusTask, context, sumoClient) {
265231
var file_ext = serviceBusTask.blobName.split(".").pop();
266232
if (file_ext == serviceBusTask.blobName) {
@@ -305,26 +271,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
305271
});
306272
} else {
307273
messageArray = msghandler[file_ext](context,msg);
308-
var splittedArrays = getSplittedArray(messageArray,context);
309-
context.log("No of chunks created: "+splittedArrays.length);
310-
for(splitArray of splittedArrays){
311-
splitArray.forEach(function(msg){
312-
sumoClient.addData(msg)
313-
})
314-
await sumoClient.flushAll();
315-
}
316-
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
317-
if (sumoClient.messagesFailed > 0) {
318-
context.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
319-
} else {
320-
context.log('Exiting now.');
321-
context.done();
322-
}
323-
}else{
324-
context.log("Messages Attempted: " + sumoClient.messagesAttempted);
325-
context.log("Messages Received: " + sumoClient.messagesReceived);
326-
context.done();
327-
}
274+
messageArray.forEach(function (msg) {
275+
sumoClient.addData(msg);
276+
});
277+
sumoClient.flushAll();
328278
}
329279
});
330280
}).catch(function (err) {
@@ -358,10 +308,21 @@ function servicebushandler(context, serviceBusTask) {
358308
};
359309
setSourceCategory(serviceBusTask, options);
360310
function failureHandler(msgArray, ctx) {
361-
//ctx.log("Failed to send to Sumo");
311+
ctx.log("Failed to send to Sumo");
312+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
313+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
314+
}
362315
}
363316
function successHandler(ctx) {
364-
//ctx.log('Successfully sent to Sumo');
317+
ctx.log('Successfully sent to Sumo', serviceBusTask);
318+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
319+
if (sumoClient.messagesFailed > 0) {
320+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
321+
} else {
322+
ctx.log('Exiting now.');
323+
ctx.done();
324+
}
325+
}
365326
}
366327

367328
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);

BlockBlobReader/target/consumer_build/BlobTaskConsumer/sumoclient.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ var url = require('node:url');
88

99
var bucket = require('./messagebucket');
1010
var sumoutils = require('./sumoutils.js');
11-
var { promisify } = require('node:util');
1211

1312

1413
var metadataMap = {"category":"X-Sumo-Category","sourceName":"X-Sumo-Name","sourceHost":"X-Sumo-Host"};
@@ -174,24 +173,27 @@ SumoClient.prototype.flushBucketToSumo = function(metaKey) {
174173

175174
if (curOptions.compress_data) {
176175
curOptions.headers['Content-Encoding'] = 'gzip';
177-
const gzipPromise = promisify(zlib.gzip);
178-
return gzipPromise(msgArray.join('\n')).then(function(compressed_data){
179-
//self.context.log("gzip successful");
176+
177+
return zlib.gzip(msgArray.join('\n'),function(e,compressed_data){
178+
if (!e) {
179+
self.context.log("gzip successful");
180180
return sumoutils.p_retryMax(httpSend,self.MaxAttempts,self.RetryInterval,[msgArray,compressed_data])
181181
.then(()=> {
182182
self.context.log("Succesfully sent " + self.messagesSent + " messages to Sumo");
183-
self.success_callback(self.context);
184-
}).catch((err) => {
183+
self.success_callback(self.context);}
184+
)
185+
.catch((err) => {
185186
self.messagesFailed += msgArray.length;
186187
self.messagesAttempted += msgArray.length;
187188
self.context.log("Failed to send after retries: " + self.MaxAttempts + " " + JSON.stringify(err) + ' messagesAttempted: ' + self.messagesAttempted + ' messagesReceived: ' + self.messagesReceived);
188189
self.failure_callback(msgArray,self.context);
189190
});
190-
}).catch((e)=> {
191+
} else {
191192
self.messagesFailed += msgArray.length;
192193
self.messagesAttempted += msgArray.length;
193194
self.context.log("Failed to gzip: " + JSON.stringify(e) + ' messagesAttempted: ' + self.messagesAttempted + ' messagesReceived: ' + self.messagesReceived);
194195
self.failure_callback(msgArray,self.context);
196+
}
195197
});
196198
} else {
197199
//self.context.log('Send raw data to Sumo');
@@ -210,13 +212,11 @@ SumoClient.prototype.flushBucketToSumo = function(metaKey) {
210212
/**
211213
* Flush all internal buckets to Sumo
212214
*/
213-
SumoClient.prototype.flushAll = async function() {
215+
SumoClient.prototype.flushAll = function() {
214216
var self = this;
215-
var allPromises = [];
216217
this.dataMap.forEach( function(buffer,key,dataMap) {
217-
allPromises.push(self.flushBucketToSumo(key));
218+
self.flushBucketToSumo(key);
218219
});
219-
await Promise.all(allPromises);
220220
};
221221

222222
SumoClient.prototype.addData = function(data) {

BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -227,40 +227,6 @@ function getBlockBlobService(context, task) {
227227
}
228228
})};
229229

230-
function getMessageSize(msg) {
231-
// increasing one to accommodate \n
232-
return 1 + Buffer.byteLength(JSON.stringify(msg), 'utf8');
233-
}
234-
235-
function getSplittedArray(messageArray,context){
236-
237-
let allChunks = [];
238-
let currentChunkSize = 0;
239-
let currentChunk = [];
240-
const maxChunkSize = 1*1024*1024; // 1MB
241-
for (const msg of messageArray) {
242-
let currentMsgSize = getMessageSize(msg);
243-
if (currentMsgSize > maxChunkSize) {
244-
context.log(`Warning: Ignoring msg of size: ${currentMsgSize} > maxChunkSize`)
245-
continue;
246-
}
247-
if (currentMsgSize + currentChunkSize > maxChunkSize) {
248-
allChunks.push(currentChunk);
249-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
250-
currentChunk = [msg];
251-
currentChunkSize = currentMsgSize;
252-
} else {
253-
currentChunk.push(msg);
254-
currentChunkSize += currentMsgSize;
255-
}
256-
}
257-
if (currentChunk.length > 0) {
258-
context.log(`Chunk created of size: ${currentChunkSize} length: ${currentChunk.length}`)
259-
allChunks.push(currentChunk);
260-
}
261-
return allChunks;
262-
}
263-
264230
function messageHandler(serviceBusTask, context, sumoClient) {
265231
var file_ext = serviceBusTask.blobName.split(".").pop();
266232
if (file_ext == serviceBusTask.blobName) {
@@ -305,26 +271,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
305271
});
306272
} else {
307273
messageArray = msghandler[file_ext](context,msg);
308-
var splittedArrays = getSplittedArray(messageArray,context);
309-
context.log("No of chunks created: "+splittedArrays.length);
310-
for(splitArray of splittedArrays){
311-
splitArray.forEach(function(msg){
312-
sumoClient.addData(msg)
313-
})
314-
await sumoClient.flushAll();
315-
}
316-
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
317-
if (sumoClient.messagesFailed > 0) {
318-
context.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
319-
} else {
320-
context.log('Exiting now.');
321-
context.done();
322-
}
323-
}else{
324-
context.log("Messages Attempted: " + sumoClient.messagesAttempted);
325-
context.log("Messages Received: " + sumoClient.messagesReceived);
326-
context.done();
327-
}
274+
messageArray.forEach(function (msg) {
275+
sumoClient.addData(msg);
276+
});
277+
sumoClient.flushAll();
328278
}
329279
});
330280
}).catch(function (err) {
@@ -358,10 +308,21 @@ function servicebushandler(context, serviceBusTask) {
358308
};
359309
setSourceCategory(serviceBusTask, options);
360310
function failureHandler(msgArray, ctx) {
361-
//ctx.log("Failed to send to Sumo");
311+
ctx.log("Failed to send to Sumo");
312+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
313+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
314+
}
362315
}
363316
function successHandler(ctx) {
364-
//ctx.log('Successfully sent to Sumo');
317+
ctx.log('Successfully sent to Sumo', serviceBusTask);
318+
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
319+
if (sumoClient.messagesFailed > 0) {
320+
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
321+
} else {
322+
ctx.log('Exiting now.');
323+
ctx.done();
324+
}
325+
}
365326
}
366327

367328
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);

0 commit comments

Comments
 (0)