Skip to content

Commit 76c46c1

Browse files
author
Chris Wiechmann
committed
Update of an existing transform improved
1 parent b1e9d4b commit 76c46c1

File tree

7 files changed

+258
-16
lines changed

7 files changed

+258
-16
lines changed

api-builder-plugin-fn-elasticsearch/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/)
55
and this project adheres to [Semantic Versioning](http://semver.org/).
66

7+
## [2.2.0] 2022-03-11
8+
### Changed
9+
- When updating an existing transform, the last checkpoint of the existing transform is taken over as a query limitation to the new transform
10+
- to avoid re-indexing the same documents again
11+
- FYI: Technically you cannot update a transform - It is a new transform and the previous has been stopped (and even deleted)
12+
713
## [2.1.2] 2022-02-02
814
### Security
915
- Updated API-Builder runtime to solve security issue

api-builder-plugin-fn-elasticsearch/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@axway-api-builder-ext/api-builder-plugin-fn-elasticsearch",
3-
"version": "2.1.2",
3+
"version": "2.2.0",
44
"description": "Integrate Elasticsearch into your API-Builder flow to combine search data for instance with other data available in your flow.",
55
"author": "Chris Wiechmann <cwiechmann@axway.com> (http://www.axway.com)",
66
"license": "Apache-2.0",

api-builder-plugin-fn-elasticsearch/src/actions/transform.js

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ async function putTransform(params, options) {
4141
delete params.deletePreviousTransform;
4242
}
4343

44+
var limitOnLastCheckpoint = true;
45+
if(params.limitOnLastCheckpoint != undefined) {
46+
limitOnLastCheckpoint = params.limitOnLastCheckpoint;
47+
delete params.limitOnLastCheckpoint;
48+
}
49+
4450
var idSuffix = `-${params.idSuffix}`
4551
if(params.idSuffix == undefined) {
4652
idSuffix = "";
@@ -56,34 +62,66 @@ async function putTransform(params, options) {
5662
var runningTransforms = [];
5763
for (i = 0; i < allTransforms.body.transforms.length; i++) {
5864
const transform = allTransforms.body.transforms[i];
59-
// Check if the Transform already exists, which means nothing to do
65+
// Check if the requested Transform (TransformId+ID-Suffix) already exists, which means nothing to do
6066
if(transform.id==`${params.transformId}${idSuffix}`) {
6167
options.logger.info(`Transform found: ${params.transformId}${idSuffix} already exists with state: ${transform.state}. To update this transform, please provide an idSuffix (e.g. v2)`);
6268
if(startTransform && transform.state != "started" && transform.state != "indexing") {
6369
options.logger.info(`Existing transform: ${params.transformId}${idSuffix} is not running, going to start it.`);
64-
await client.transform.startTransform( {transformId: transform.id}, { ignore: [404], maxRetries: 3 });
70+
await client.transform.startTransform( {transformId: transform.id}, { ignore: [404], maxRetries: 3 });
6571
}
6672
actualTransform = transform;
6773
} else {
68-
// Stop all other transforms
6974
if(transform.state == "started" || transform.state == "indexing") {
70-
await client.transform.stopTransform( {transformId: transform.id}, { ignore: [404], maxRetries: 3 });
75+
runningTransforms.push(transform);
7176
}
7277
}
7378
}
79+
// The transform with the same transformId (incl. the ID-Suffix) is found and running - Nothing to do
7480
if(actualTransform) return actualTransform;
75-
// Stop all running transforms, as we expect only one transform to run
76-
for (i = 0; i < runningTransforms.length; i++) {
77-
const runningTransform = runningTransforms[i];
78-
await client.transform.stopTransform( {transformId: runningTransform.id}, { ignore: [404], maxRetries: 3 });
79-
}
8081

82+
// If only one Transform is running with the transformId* it is considered as the currently active transform, which is used to take over the last checkpoint
83+
// as a query limitation to the new transform to avoid re-indexing everything again
84+
if(limitOnLastCheckpoint) {
85+
if(runningTransforms.length == 1) {
86+
var runningTransform = runningTransforms[0];
87+
var z = 0;
88+
// As long as the index is still running, there is no completed checkpoint, hence we have to wait
89+
while(runningTransform.state == "indexing" && z < 12) {
90+
options.logger.warn(`Existing transform: ${runningTransform} is currently indexing. Waiting max. 60 seconds.`);
91+
await sleep(5000);
92+
var response = await client.transform.getTransformStats( {transformId: runningTransform.id}, { ignore: [404], maxRetries: 3 });
93+
runningTransform = response.body.transforms[0]; // Only one transform is expected as requested for specific transformId which is unique
94+
z++;
95+
}
96+
// If the transform is still indexing - Don't try to update it
97+
if(runningTransform.state == "indexing") {
98+
return options.setOutput('noUpdate', `No update possible, as the actual transform: ${runningTransform.id} is still indexing.`);
99+
}
100+
if(!runningTransform.checkpointing || !runningTransform.checkpointing.last) {
101+
return options.setOutput('noUpdate', `Existing transform: ${JSON.stringify(runningTransform)} has no last checkpoint. Cannot update transform.`);
102+
}
103+
var lastTimeStamp = runningTransform.checkpointing.last.time_upper_bound_millis;
104+
if(!lastTimeStamp) {
105+
return options.setOutput('noUpdate', `lastTimeStamp is missing in existing running transform: ${JSON.stringify(runningTransform)}. Cannot update transform.`);
106+
}
107+
// Take over the last timestamp as a query limitation to the new transform
108+
params.body.source.query = { "bool": { "should": [ { "range": { "@timestamp": { "gt": lastTimeStamp } } } ],"minimum_should_match": 1 } };
109+
} else if(runningTransforms.length > 1) {
110+
options.logger.warn(`Expected only ONE transform running with transformId: ${params.transformId}, but found ${runningTransforms.length}`);
111+
} else if(runningTransforms.length > 0) {
112+
options.logger.warn(`No running transforms found with transformId: ${params.transformId}. Cannot take over last checkpoint to new transform`);
113+
}
114+
}
115+
// Stop all other eventually running transforms with the same transformId*, as we expect only one transform with the same transformid* to run
116+
for (var i = 0; i < runningTransforms.length; ++i) {
117+
await client.transform.stopTransform( {transformId: runningTransforms[i].id}, { ignore: [404], maxRetries: 3 });
118+
}
81119
params.transformId = `${params.transformId}${idSuffix}`;
82120
try {
83121
var putTransformResult = await client.transform.putTransform( params, { ignore: [404], maxRetries: 3 });
84122
} catch (e) {
85123
if(e.meta.statusCode == 409) {
86-
throw new Error(`Error creating the transform. The transform id: \'${params.transformId}\' was used previously. This includes deleted transforms.`);
124+
throw new Error(`Error creating the transform. The transform id: \'${params.transformId}\' was used previously. This includes even deleted transforms.`);
87125
} else {
88126
throw e;
89127
}
@@ -103,6 +141,10 @@ async function putTransform(params, options) {
103141
}
104142
}
105143

144+
async function sleep(millis) {
145+
return new Promise(resolve => setTimeout(resolve, millis));
146+
}
147+
106148
module.exports = {
107149
putTransform
108150
};

api-builder-plugin-fn-elasticsearch/src/flow-nodes.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,14 @@ flow-nodes:
11351135
required: true
11361136
schema:
11371137
type: object
1138+
limitOnLastCheckpoint:
1139+
name: Limit to last checkpoint
1140+
description: If an existing transform is updated or replaced, then this starts without a check point. So it would start again from zero and index all data again, which would lead to duplications. This switch controls that an updated transform is automatically constrained to the last timestamp of the existing transform.
1141+
required: false
1142+
initialType: boolean
1143+
schema:
1144+
default: true
1145+
type: boolean
11381146
deletePreviousTransform:
11391147
name: Delete previous transform
11401148
description: If an existing transform with the same primary ID has been found which should be replaced, this switch decides whether the old transform should be deleted. If the previous transform is running, it will be stopped automatically.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
{
2+
"body": {
3+
"count": 3,
4+
"transforms": [
5+
{
6+
"id": "traffic-summary-hourly-v2",
7+
"state": "indexing",
8+
"stats": {
9+
"pages_processed": 3,
10+
"documents_processed": 15800266,
11+
"documents_indexed": 610,
12+
"documents_deleted": 0,
13+
"trigger_count": 1,
14+
"index_time_in_ms": 55,
15+
"index_total": 2,
16+
"index_failures": 0,
17+
"search_time_in_ms": 11493,
18+
"search_total": 3,
19+
"search_failures": 0,
20+
"processing_time_in_ms": 11,
21+
"processing_total": 3,
22+
"delete_time_in_ms": 0,
23+
"exponential_avg_checkpoint_duration_ms": 11714.0,
24+
"exponential_avg_documents_indexed": 610.0,
25+
"exponential_avg_documents_processed": 1.5800266e7
26+
},
27+
"checkpointing": {
28+
"last": {
29+
"checkpoint": 1,
30+
"timestamp_millis": 1630056019784,
31+
"time_upper_bound_millis" : 1646913600000
32+
},
33+
"operations_behind": 9641,
34+
"changes_last_detected_at": 1630056019784
35+
}
36+
},
37+
{
38+
"id": "traffic-summary-hourly-v3",
39+
"state": "stopped",
40+
"stats": {
41+
"pages_processed": 3491,
42+
"documents_processed": 220077526,
43+
"documents_indexed": 11596,
44+
"documents_deleted": 0,
45+
"trigger_count": 1966,
46+
"index_time_in_ms": 1596288,
47+
"index_total": 874,
48+
"index_failures": 0,
49+
"search_time_in_ms": 6354187,
50+
"search_total": 3491,
51+
"search_failures": 0,
52+
"processing_time_in_ms": 1242,
53+
"processing_total": 3491,
54+
"delete_time_in_ms": 0,
55+
"exponential_avg_checkpoint_duration_ms": 8062.134487589088,
56+
"exponential_avg_documents_indexed": 14.361914811673138,
57+
"exponential_avg_documents_processed": 218420.17565343156
58+
},
59+
"checkpointing": {
60+
"last": {
61+
"checkpoint": 873,
62+
"timestamp_millis": 1629217091044,
63+
"time_upper_bound_millis": 1629216791044
64+
},
65+
"operations_behind": 30441542,
66+
"changes_last_detected_at": 1629217091044
67+
}
68+
},
69+
{
70+
"id": "traffic-summary-hourly-v4",
71+
"state": "stopped",
72+
"node": {
73+
"id": "3c2xbN6_TdapbspMzN889w",
74+
"name": "axway-elk-apim4elastic-elasticsearch-1",
75+
"ephemeral_id": "LWZT2OPfSO2ui_IRCqW9cA",
76+
"transport_address": "192.168.74.187:9300",
77+
"attributes": {
78+
79+
}
80+
},
81+
"stats": {
82+
"pages_processed": 1812,
83+
"documents_processed": 16129165,
84+
"documents_indexed": 954,
85+
"documents_deleted": 0,
86+
"trigger_count": 906,
87+
"index_time_in_ms": 4065,
88+
"index_total": 906,
89+
"index_failures": 0,
90+
"search_time_in_ms": 483416,
91+
"search_total": 1812,
92+
"search_failures": 0,
93+
"processing_time_in_ms": 0,
94+
"processing_total": 1812,
95+
"delete_time_in_ms": 0,
96+
"exponential_avg_checkpoint_duration_ms": 834.4249561332916,
97+
"exponential_avg_documents_indexed": 1.2735197277774921,
98+
"exponential_avg_documents_processed": 523.9206782252195
99+
},
100+
"checkpointing": {
101+
"last": {
102+
"checkpoint": 906,
103+
"timestamp_millis": 1630072061880,
104+
"time_upper_bound_millis": 1630071161880
105+
},
106+
"operations_behind": 7,
107+
"changes_last_detected_at": 1630072061007,
108+
"last_search_time": 1630072061007
109+
}
110+
}
111+
]
112+
},
113+
"statusCode": 200,
114+
"headers": {
115+
"x-elastic-product": "Elasticsearch",
116+
"content-type": "application/json; charset=UTF-8",
117+
"content-length": "530"
118+
},
119+
"meta": {
120+
"context": null,
121+
"request": {
122+
"params": {
123+
"method": "GET",
124+
"path": "/_transform/*/_stats",
125+
"body": null,
126+
"querystring": "",
127+
"headers": {
128+
"user-agent": "elasticsearch-js/7.14.0 (win32 10.0.19043-x64; Node.js v12.13.1)",
129+
"x-elastic-client-meta": "es=7.14.0,js=12.13.1,t=7.14.0,hc=12.13.1"
130+
},
131+
"timeout": 30000
132+
},
133+
"options": {
134+
"ignore": [
135+
404
136+
],
137+
"maxRetries": 3
138+
},
139+
"id": 1
140+
},
141+
"name": "elasticsearch-js",
142+
"connection": {
143+
"url": "http://api-env:9200/",
144+
"id": "http://api-env:9200/",
145+
"headers": {
146+
147+
},
148+
"deadCount": 0,
149+
"resurrectTimeout": 0,
150+
"_openRequests": 0,
151+
"status": "alive",
152+
"roles": {
153+
"master": true,
154+
"data": true,
155+
"ingest": true,
156+
"ml": false
157+
}
158+
},
159+
"attempts": 0,
160+
"aborted": false
161+
}
162+
}

api-builder-plugin-fn-elasticsearch/test/mock/transform/getTransformStatsResponseOneStarted.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"count": 3,
44
"transforms": [
55
{
6-
"id": "apigw-summary-transform-hourly",
6+
"id": "traffic-summary-hourly-v2",
77
"state": "started",
88
"stats": {
99
"pages_processed": 3,
@@ -27,14 +27,15 @@
2727
"checkpointing": {
2828
"last": {
2929
"checkpoint": 1,
30-
"timestamp_millis": 1630056019784
30+
"timestamp_millis": 1630056019784,
31+
"time_upper_bound_millis" : 1646913600000
3132
},
3233
"operations_behind": 9641,
3334
"changes_last_detected_at": 1630056019784
3435
}
3536
},
3637
{
37-
"id": "hourly_apigw_traffic-summary",
38+
"id": "traffic-summary-hourly-v3",
3839
"state": "stopped",
3940
"stats": {
4041
"pages_processed": 3491,
@@ -66,7 +67,7 @@
6667
}
6768
},
6869
{
69-
"id": "test-transform",
70+
"id": "traffic-summary-hourly-v4",
7071
"state": "stopped",
7172
"node": {
7273
"id": "3c2xbN6_TdapbspMzN889w",

api-builder-plugin-fn-elasticsearch/test/transform/Transform-Tests.js

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ describe('Transform tests', () => {
6060
expect(mockedDeleteTransform.callCount).to.equals(0); // There is nothing to delete
6161
});
6262

63-
it('should pass - 1 running transform (is the actual transform) - Created & Started new transform - Old stopped', async () => {
63+
it('should pass - 1 running transform (is the actual transform) - Created & Started new transform - Old stopped - Checkpoint taken over as query limition to new transform', async () => {
6464
const mockedGetTransformStats = setupElasticsearchMock(client, 'transform.getTransformStats', './test/mock/transform/getTransformStatsResponseOneStarted.json', false);
6565
const mockedPutTransform = setupElasticsearchMock(client, 'transform.putTransform', './test/mock/transform/putTransformResponse.json', false);
6666
const mockedDeleteTransform = setupElasticsearchMock(client, 'transform.deleteTransform', './test/mock/transform/stopTransformResponse.json', false);
@@ -76,11 +76,34 @@ describe('Transform tests', () => {
7676
expect(output).to.equal('next');
7777
expect(mockedGetTransformStats.callCount).to.equals(1); // should be called once to get all transforms
7878
expect(mockedPutTransform.callCount).to.equals(1); // a new transform should be created
79+
// Transform body should have been extended about a query limitation
80+
expect(mockedPutTransform.lastCall.arg.body.source.query).to.deep.equals({ "bool": { "should": [ { "range": { "@timestamp": { "gt": 1646913600000 } } } ],"minimum_should_match": 1 } });
7981
expect(mockedStartTransform.callCount).to.equals(1); // and started
8082
expect(mockedDeleteTransform.callCount).to.equals(0); // There is nothing to delete
8183
expect(mockedStopTransform.callCount).to.equals(1); // There is nothing to stop
8284
});
8385

86+
it('should return with noUpdate as the actual transform is still indexing', async () => {
87+
const mockedGetTransformStats = setupElasticsearchMock(client, 'transform.getTransformStats', './test/mock/transform/getTransformStatsResponseOneIndexing.json', false);
88+
const mockedPutTransform = setupElasticsearchMock(client, 'transform.putTransform', './test/mock/transform/putTransformResponse.json', false);
89+
const mockedDeleteTransform = setupElasticsearchMock(client, 'transform.deleteTransform', './test/mock/transform/stopTransformResponse.json', false);
90+
const mockedStartTransform = setupElasticsearchMock(client, 'transform.startTransform', './test/mock/transform/startTransformResponse.json', false);
91+
const mockedStopTransform = setupElasticsearchMock(client, 'transform.stopTransform', './test/mock/transform/stopTransformResponse.json', false);
92+
93+
const inputParameter = {
94+
transformId: 'traffic-summary-hourly',
95+
idSuffix: "v1",
96+
body: JSON.parse(fs.readFileSync('./test/mock/transform/putTransformRequestBody.json')) };
97+
const { value, output } = await flowNode.putTransform(inputParameter);
98+
99+
expect(output).to.equal('noUpdate');
100+
expect(mockedGetTransformStats.callCount).to.equals(13); // should be called multiple times (12 + 1), while waiting for indexing to be completed
101+
expect(mockedPutTransform.callCount).to.equals(0); // No new transform should be created
102+
expect(mockedStartTransform.callCount).to.equals(0); // Nothing needs to be started
103+
expect(mockedDeleteTransform.callCount).to.equals(0); // There is nothing to delete
104+
expect(mockedStopTransform.callCount).to.equals(0); // There is nothing to stop
105+
}).timeout(100000);
106+
84107
it('should pass with valid parameters - Transform-ID not found - Will just create new Job', async () => {
85108
const mockedGetTransformStats = setupElasticsearchMock(client, 'transform.getTransformStats', './test/mock/transform/getTransformStatsResponseZeroResult.json', false);
86109
const mockedStartTransform = setupElasticsearchMock(client, 'transform.startTransform', './test/mock/transform/startTransformResponse.json', false);

0 commit comments

Comments
 (0)