Skip to content

Commit 49669c5

Browse files
committed
added support for vnet flow logs
1 parent 0665bdf commit 49669c5

File tree

3 files changed

+204
-24
lines changed

3 files changed

+204
-24
lines changed

BlockBlobReader/src/consumer.js

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async function setAppendBlobOffset(context, serviceBusTask, newOffset) {
218218

219219
async function nsgLogsHandler(context, msg, serviceBusTask) {
220220

221-
var jsonArray = [];
221+
let jsonArray = [];
222222
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
223223

224224
try {
@@ -236,7 +236,7 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
236236

237237
}
238238

239-
var eventsArr = [];
239+
let eventsArr = [];
240240
jsonArray.forEach(function (record) {
241241
let version = record.properties.Version;
242242
record.properties.flows.forEach(function (rule) {
@@ -282,6 +282,66 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
282282
return eventsArr;
283283
}
284284

285+
async function networkFlowLogsHandler(context, msg, serviceBusTask) {
286+
287+
let jsonArray = [];
288+
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
289+
290+
try {
291+
jsonArray = JSON.parse("[" + msg + "]");
292+
} catch(err) {
293+
let response = getParseableJsonArray(msg, context, serviceBusTask);
294+
jsonArray = response[0];
295+
let is_success = response[2];
296+
let newOffset = response[1] + serviceBusTask.startByte;
297+
if (is_success) {
298+
await setAppendBlobOffset(context, serviceBusTask, newOffset);
299+
} else {
300+
return jsonArray;
301+
}
302+
303+
}
304+
// Format: https://learn.microsoft.com/en-us/azure/network-watcher/vnet-flow-logs-overview#log-format
305+
let eventsArr = [];
306+
jsonArray.forEach(function (record) {
307+
record.flowRecords.flows.forEach(function (acl) {
308+
acl.flowGroups.forEach(function (rule) {
309+
rule.flowTuples.forEach(function (tuple) {
310+
let col = tuple.split(",");
311+
let event = {
312+
time: col[0], // Time stamp of when the flow occurred, in UNIX epoch format.
313+
flowLogGUID: record.flowLogGUID,
314+
category: record.category,
315+
flow_log_resource_id: record.flowLogResourceID,
316+
target_resource_id: record.targetResourceID,
317+
event_name: record.operationName,
318+
acl_id: acl.aclID,
319+
rule_name: rule.rule,
320+
mac: record.macAddress,
321+
src_ip: col[1],
322+
dest_IP: col[2],
323+
src_port: col[3],
324+
dest_port: col[4],
325+
protocol: col[5],
326+
flow_direction: col[6],
327+
flow_state: (col[7] === "" || col[7] === undefined) ? null : col[7],
328+
flow_encryption_status: (col[8] === "" || col[8] === undefined) ? null : col[8],
329+
num_packets_sent_src_to_dest: (col[9] === "" || col[9] === undefined) ? null : col[9],
330+
bytes_sent_src_to_dest: (col[10] === "" || col[10] === undefined) ? null : col[10],
331+
num_packets_sent_dest_to_src: (col[11] === "" || col[11] === undefined) ? null : col[11],
332+
bytes_sent_dest_to_src: (col[12] === "" || col[12] === undefined) ? null : col[12],
333+
version: record.flowLogVersion
334+
335+
}
336+
eventsArr.push(event);
337+
});
338+
});
339+
});
340+
});
341+
return eventsArr;
342+
}
343+
344+
285345
function jsonHandler(context,msg) {
286346
// it's assumed that json is well formed {},{}
287347
var jsonArray = [];
@@ -347,13 +407,13 @@ function messageHandler(serviceBusTask, context, sumoClient) {
347407
if (file_ext == serviceBusTask.blobName) {
348408
file_ext = "log";
349409
}
350-
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler};
410+
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler, "vnetflowlogs": networkFlowLogsHandler};
351411
if (!(file_ext in msghandler)) {
352412
context.log.error("Error in messageHandler: Unknown file extension - " + file_ext + " for blob: " + serviceBusTask.blobName);
353413
context.done();
354414
return;
355415
}
356-
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent")) {
416+
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" || serviceBusTask.containerName === "insights-logs-flowlogflowevent")) {
357417
// because in json first block and last block remain as it is and azure service adds new block in 2nd last pos
358418
if ((serviceBusTask.endByte < JSON_BLOB_HEAD_BYTES + JSON_BLOB_TAIL_BYTES) || (serviceBusTask.endByte == serviceBusTask.startByte)) {
359419
context.done(); //rejecting first commit when no data is there data will always be atleast HEAD_BYTES+DATA_BYTES+TAIL_BYTES
@@ -365,7 +425,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
365425
} else {
366426
serviceBusTask.startByte -= 1; //to remove comma before json object
367427
}
368-
file_ext = "nsg";
428+
file_ext = serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" ? "nsg" : "vnetflowlogs";
369429
}
370430
getBlockBlobService(context, serviceBusTask).then(function (blobService) {
371431
return getData(serviceBusTask, blobService, context).then(async function (msg) {
@@ -385,10 +445,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
385445
context.done(err);
386446
});
387447
} else {
388-
if (file_ext == "nsg") {
389-
messageArray = await nsgLogsHandler(context, msg, serviceBusTask);
448+
if (file_ext === "nsg" || file_ext === "vnetflowlogs") {
449+
messageArray = await msghandler[file_ext](context, msg, serviceBusTask);
390450
} else {
391-
messageArray = msghandler[file_ext](context,msg);
451+
messageArray = msghandler[file_ext](context, msg);
392452
}
393453
messageArray.forEach(function (msg) {
394454
sumoClient.addData(msg);

BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async function setAppendBlobOffset(context, serviceBusTask, newOffset) {
218218

219219
async function nsgLogsHandler(context, msg, serviceBusTask) {
220220

221-
var jsonArray = [];
221+
let jsonArray = [];
222222
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
223223

224224
try {
@@ -236,7 +236,7 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
236236

237237
}
238238

239-
var eventsArr = [];
239+
let eventsArr = [];
240240
jsonArray.forEach(function (record) {
241241
let version = record.properties.Version;
242242
record.properties.flows.forEach(function (rule) {
@@ -282,6 +282,66 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
282282
return eventsArr;
283283
}
284284

285+
async function networkFlowLogsHandler(context, msg, serviceBusTask) {
286+
287+
let jsonArray = [];
288+
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
289+
290+
try {
291+
jsonArray = JSON.parse("[" + msg + "]");
292+
} catch(err) {
293+
let response = getParseableJsonArray(msg, context, serviceBusTask);
294+
jsonArray = response[0];
295+
let is_success = response[2];
296+
let newOffset = response[1] + serviceBusTask.startByte;
297+
if (is_success) {
298+
await setAppendBlobOffset(context, serviceBusTask, newOffset);
299+
} else {
300+
return jsonArray;
301+
}
302+
303+
}
304+
// Format: https://learn.microsoft.com/en-us/azure/network-watcher/vnet-flow-logs-overview#log-format
305+
let eventsArr = [];
306+
jsonArray.forEach(function (record) {
307+
record.flowRecords.flows.forEach(function (acl) {
308+
acl.flowGroups.forEach(function (rule) {
309+
rule.flowTuples.forEach(function (tuple) {
310+
let col = tuple.split(",");
311+
let event = {
312+
time: col[0], // Time stamp of when the flow occurred, in UNIX epoch format.
313+
flowLogGUID: record.flowLogGUID,
314+
category: record.category,
315+
flow_log_resource_id: record.flowLogResourceID,
316+
target_resource_id: record.targetResourceID,
317+
event_name: record.operationName,
318+
acl_id: acl.aclID,
319+
rule_name: rule.rule,
320+
mac: record.macAddress,
321+
src_ip: col[1],
322+
dest_IP: col[2],
323+
src_port: col[3],
324+
dest_port: col[4],
325+
protocol: col[5],
326+
flow_direction: col[6],
327+
flow_state: (col[7] === "" || col[7] === undefined) ? null : col[7],
328+
flow_encryption_status: (col[8] === "" || col[8] === undefined) ? null : col[8],
329+
num_packets_sent_src_to_dest: (col[9] === "" || col[9] === undefined) ? null : col[9],
330+
bytes_sent_src_to_dest: (col[10] === "" || col[10] === undefined) ? null : col[10],
331+
num_packets_sent_dest_to_src: (col[11] === "" || col[11] === undefined) ? null : col[11],
332+
bytes_sent_dest_to_src: (col[12] === "" || col[12] === undefined) ? null : col[12],
333+
version: record.flowLogVersion
334+
335+
}
336+
eventsArr.push(event);
337+
});
338+
});
339+
});
340+
});
341+
return eventsArr;
342+
}
343+
344+
285345
function jsonHandler(context,msg) {
286346
// it's assumed that json is well formed {},{}
287347
var jsonArray = [];
@@ -347,13 +407,13 @@ function messageHandler(serviceBusTask, context, sumoClient) {
347407
if (file_ext == serviceBusTask.blobName) {
348408
file_ext = "log";
349409
}
350-
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler};
410+
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler, "vnetflowlogs": networkFlowLogsHandler};
351411
if (!(file_ext in msghandler)) {
352412
context.log.error("Error in messageHandler: Unknown file extension - " + file_ext + " for blob: " + serviceBusTask.blobName);
353413
context.done();
354414
return;
355415
}
356-
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent")) {
416+
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" || serviceBusTask.containerName === "insights-logs-flowlogflowevent")) {
357417
// because in json first block and last block remain as it is and azure service adds new block in 2nd last pos
358418
if ((serviceBusTask.endByte < JSON_BLOB_HEAD_BYTES + JSON_BLOB_TAIL_BYTES) || (serviceBusTask.endByte == serviceBusTask.startByte)) {
359419
context.done(); //rejecting first commit when no data is there data will always be atleast HEAD_BYTES+DATA_BYTES+TAIL_BYTES
@@ -365,7 +425,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
365425
} else {
366426
serviceBusTask.startByte -= 1; //to remove comma before json object
367427
}
368-
file_ext = "nsg";
428+
file_ext = serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" ? "nsg" : "vnetflowlogs";
369429
}
370430
getBlockBlobService(context, serviceBusTask).then(function (blobService) {
371431
return getData(serviceBusTask, blobService, context).then(async function (msg) {
@@ -385,10 +445,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
385445
context.done(err);
386446
});
387447
} else {
388-
if (file_ext == "nsg") {
389-
messageArray = await nsgLogsHandler(context, msg, serviceBusTask);
448+
if (file_ext === "nsg" || file_ext === "vnetflowlogs") {
449+
messageArray = await msghandler[file_ext](context, msg, serviceBusTask);
390450
} else {
391-
messageArray = msghandler[file_ext](context,msg);
451+
messageArray = msghandler[file_ext](context, msg);
392452
}
393453
messageArray.forEach(function (msg) {
394454
sumoClient.addData(msg);

BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async function setAppendBlobOffset(context, serviceBusTask, newOffset) {
218218

219219
async function nsgLogsHandler(context, msg, serviceBusTask) {
220220

221-
var jsonArray = [];
221+
let jsonArray = [];
222222
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
223223

224224
try {
@@ -236,7 +236,7 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
236236

237237
}
238238

239-
var eventsArr = [];
239+
let eventsArr = [];
240240
jsonArray.forEach(function (record) {
241241
let version = record.properties.Version;
242242
record.properties.flows.forEach(function (rule) {
@@ -282,6 +282,66 @@ async function nsgLogsHandler(context, msg, serviceBusTask) {
282282
return eventsArr;
283283
}
284284

285+
async function networkFlowLogsHandler(context, msg, serviceBusTask) {
286+
287+
let jsonArray = [];
288+
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
289+
290+
try {
291+
jsonArray = JSON.parse("[" + msg + "]");
292+
} catch(err) {
293+
let response = getParseableJsonArray(msg, context, serviceBusTask);
294+
jsonArray = response[0];
295+
let is_success = response[2];
296+
let newOffset = response[1] + serviceBusTask.startByte;
297+
if (is_success) {
298+
await setAppendBlobOffset(context, serviceBusTask, newOffset);
299+
} else {
300+
return jsonArray;
301+
}
302+
303+
}
304+
// Format: https://learn.microsoft.com/en-us/azure/network-watcher/vnet-flow-logs-overview#log-format
305+
let eventsArr = [];
306+
jsonArray.forEach(function (record) {
307+
record.flowRecords.flows.forEach(function (acl) {
308+
acl.flowGroups.forEach(function (rule) {
309+
rule.flowTuples.forEach(function (tuple) {
310+
let col = tuple.split(",");
311+
let event = {
312+
time: col[0], // Time stamp of when the flow occurred, in UNIX epoch format.
313+
flowLogGUID: record.flowLogGUID,
314+
category: record.category,
315+
flow_log_resource_id: record.flowLogResourceID,
316+
target_resource_id: record.targetResourceID,
317+
event_name: record.operationName,
318+
acl_id: acl.aclID,
319+
rule_name: rule.rule,
320+
mac: record.macAddress,
321+
src_ip: col[1],
322+
dest_IP: col[2],
323+
src_port: col[3],
324+
dest_port: col[4],
325+
protocol: col[5],
326+
flow_direction: col[6],
327+
flow_state: (col[7] === "" || col[7] === undefined) ? null : col[7],
328+
flow_encryption_status: (col[8] === "" || col[8] === undefined) ? null : col[8],
329+
num_packets_sent_src_to_dest: (col[9] === "" || col[9] === undefined) ? null : col[9],
330+
bytes_sent_src_to_dest: (col[10] === "" || col[10] === undefined) ? null : col[10],
331+
num_packets_sent_dest_to_src: (col[11] === "" || col[11] === undefined) ? null : col[11],
332+
bytes_sent_dest_to_src: (col[12] === "" || col[12] === undefined) ? null : col[12],
333+
version: record.flowLogVersion
334+
335+
}
336+
eventsArr.push(event);
337+
});
338+
});
339+
});
340+
});
341+
return eventsArr;
342+
}
343+
344+
285345
function jsonHandler(context,msg) {
286346
// it's assumed that json is well formed {},{}
287347
var jsonArray = [];
@@ -347,13 +407,13 @@ function messageHandler(serviceBusTask, context, sumoClient) {
347407
if (file_ext == serviceBusTask.blobName) {
348408
file_ext = "log";
349409
}
350-
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler};
410+
var msghandler = {"log": logHandler, "csv": csvHandler, "json": jsonHandler, "blob": blobHandler, "nsg": nsgLogsHandler, "vnetflowlogs": networkFlowLogsHandler};
351411
if (!(file_ext in msghandler)) {
352412
context.log.error("Error in messageHandler: Unknown file extension - " + file_ext + " for blob: " + serviceBusTask.blobName);
353413
context.done();
354414
return;
355415
}
356-
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent")) {
416+
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" || serviceBusTask.containerName === "insights-logs-flowlogflowevent")) {
357417
// because in json first block and last block remain as it is and azure service adds new block in 2nd last pos
358418
if ((serviceBusTask.endByte < JSON_BLOB_HEAD_BYTES + JSON_BLOB_TAIL_BYTES) || (serviceBusTask.endByte == serviceBusTask.startByte)) {
359419
context.done(); //rejecting first commit when no data is there data will always be atleast HEAD_BYTES+DATA_BYTES+TAIL_BYTES
@@ -365,7 +425,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
365425
} else {
366426
serviceBusTask.startByte -= 1; //to remove comma before json object
367427
}
368-
file_ext = "nsg";
428+
file_ext = serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent" ? "nsg" : "vnetflowlogs";
369429
}
370430
getBlockBlobService(context, serviceBusTask).then(function (blobService) {
371431
return getData(serviceBusTask, blobService, context).then(async function (msg) {
@@ -385,10 +445,10 @@ function messageHandler(serviceBusTask, context, sumoClient) {
385445
context.done(err);
386446
});
387447
} else {
388-
if (file_ext == "nsg") {
389-
messageArray = await nsgLogsHandler(context, msg, serviceBusTask);
448+
if (file_ext === "nsg" || file_ext === "vnetflowlogs") {
449+
messageArray = await msghandler[file_ext](context, msg, serviceBusTask);
390450
} else {
391-
messageArray = msghandler[file_ext](context,msg);
451+
messageArray = msghandler[file_ext](context, msg);
392452
}
393453
messageArray.forEach(function (msg) {
394454
sumoClient.addData(msg);

0 commit comments

Comments
 (0)