1- package net . delirius .jmeter .backendlistener .elasticsearch ;
1+ package io . github . delirius325 .jmeter .backendlistener .elasticsearch ;
22
33import java .io .IOException ;
44import java .text .ParseException ;
55import java .text .SimpleDateFormat ;
66import java .time .LocalDateTime ;
77import java .time .format .DateTimeFormatter ;
8- import java .util .Calendar ;
9- import java .util .Date ;
10- import java .util .HashMap ;
11- import java .util .List ;
12- import java .util .Map ;
8+ import java .util .*;
139
10+ import com .google .gson .Gson ;
1411import org .apache .commons .io .IOUtils ;
1512import org .apache .commons .lang .StringUtils ;
13+ import org .apache .http .HttpEntity ;
1614import org .apache .http .HttpHost ;
15+ import org .apache .http .HttpStatus ;
16+ import org .apache .http .entity .ContentType ;
17+ import org .apache .http .nio .entity .NStringEntity ;
1718import org .apache .jmeter .assertions .AssertionResult ;
1819import org .apache .jmeter .config .Arguments ;
1920import org .apache .jmeter .samplers .SampleResult ;
2021import org .apache .jmeter .threads .JMeterContextService ;
2122import org .apache .jmeter .util .JMeterUtils ;
2223import org .apache .jmeter .visualizers .backend .AbstractBackendListenerClient ;
2324import org .apache .jmeter .visualizers .backend .BackendListenerContext ;
24- import org .elasticsearch .action .bulk .BulkRequest ;
25- import org .elasticsearch .action .bulk .BulkResponse ;
26- import org .elasticsearch .action .index .IndexRequest ;
25+ import org .elasticsearch .client .Response ;
2726import org .elasticsearch .client .RestClient ;
28- import org .elasticsearch .client .RestHighLevelClient ;
29- import org .elasticsearch .common .unit .TimeValue ;
30- import org .elasticsearch .common .xcontent .XContentType ;
3127import org .slf4j .Logger ;
3228import org .slf4j .LoggerFactory ;
3329
@@ -51,11 +47,11 @@ public class ElasticsearchBackend extends AbstractBackendListenerClient {
5147 private static final long DEFAULT_TIMEOUT_MS = 200L ;
5248 private static final Logger logger = LoggerFactory .getLogger (ElasticsearchBackend .class );
5349
54- private RestHighLevelClient client ;
50+ private List <String > bulkRequestList ;
51+ private RestClient client ;
5552 private String index ;
5653 private int buildNumber ;
5754 private int bulkSize ;
58- private BulkRequest bulkRequest ;
5955 private long timeoutMs ;
6056
6157 @ Override
@@ -75,23 +71,30 @@ public Arguments getDefaultParameters() {
7571 @ Override
7672 public void setupTest (BackendListenerContext context ) throws Exception {
7773 try {
78- this .index = context .getParameter (ES_INDEX );
79- this .bulkSize = Integer .parseInt (context .getParameter (ES_BULK_SIZE ));
74+ //
75+ String host = context .getParameter (ES_HOST );
76+ int port = Integer .parseInt (context .getParameter (ES_PORT ));
77+ this .index = context .getParameter (ES_INDEX );
78+ this .bulkSize = Integer .parseInt (context .getParameter (ES_BULK_SIZE ));
8079 this .timeoutMs = JMeterUtils .getPropDefault (ES_TIMEOUT_MS , DEFAULT_TIMEOUT_MS );
81- this .buildNumber = (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ) != null
82- && JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ).trim () != "" )
83- ? Integer .parseInt (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER )) : 0 ;
84- String host = context .getParameter (ES_HOST );
85- int port = Integer .parseInt (context .getParameter (ES_PORT ));
86- this .client = new RestHighLevelClient (
87- RestClient .builder (
88- new HttpHost (host , port , context .getParameter (ES_SCHEME , "http" )))
89- .setRequestConfigCallback (requestConfigBuilder ->
90- requestConfigBuilder
91- .setConnectTimeout (5000 )
92- .setSocketTimeout ((int )timeoutMs ))
93- .setMaxRetryTimeoutMillis (60000 ));
94- this .bulkRequest = new BulkRequest ().timeout (TimeValue .timeValueMillis (timeoutMs ));
80+ this .buildNumber = (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ) != null && JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ).trim () != "" )
81+ ? Integer .parseInt (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER )) : 0 ;
82+
83+ // Build RestHighLevelClient & BulkRequest
84+
85+ this .client = RestClient .builder (new HttpHost (context .getParameter (ES_HOST ), port , context .getParameter (ES_SCHEME )))
86+ .setRequestConfigCallback (requestConfigBuilder -> requestConfigBuilder .setConnectTimeout (5000 )
87+ .setSocketTimeout ((int ) timeoutMs ))
88+ .setFailureListener (new RestClient .FailureListener () {
89+ @ Override
90+ public void onFailure (HttpHost host ) {
91+ throw new IllegalStateException ();
92+ }
93+ })
94+ .setMaxRetryTimeoutMillis (60000 )
95+ .build ();
96+ this .bulkRequestList = new LinkedList <String >();
97+
9598 super .setupTest (context );
9699 } catch (Exception e ) {
97100 throw new IllegalStateException ("Unable to setup connectivity to ES" , e );
@@ -101,41 +104,53 @@ public void setupTest(BackendListenerContext context) throws Exception {
101104 @ Override
102105 public void handleSampleResults (List <SampleResult > results , BackendListenerContext context ) {
103106 for (SampleResult sr : results ) {
104- this . bulkRequest . add (
105- new IndexRequest ( this . index , "SampleResult" ). source (this .getElasticData (sr , context ),
106- XContentType . JSON ) );
107+ Gson gson = new Gson ();
108+ String json = gson . toJson (this .getElasticData (sr , context ));
109+ this . bulkRequestList . add ( json );
107110 }
108111
109- if (this .bulkRequest . numberOfActions () >= this .bulkSize ) {
112+ if (this .bulkRequestList . size () >= this .bulkSize ) {
110113 try {
111- sendRequest (bulkRequest );
114+ sendRequest (this . bulkRequestList );
112115 } catch (Exception e ) {
113116 logger .error ("Error sending data to ES, data will be lost" , e );
114117 } finally {
115- this .bulkRequest = new BulkRequest (). timeout ( TimeValue . timeValueMillis ( timeoutMs ) );
118+ this .bulkRequestList . clear ( );
116119 }
117120 }
118121 }
119122
120123 @ Override
121124 public void teardownTest (BackendListenerContext context ) throws Exception {
122- if (this .bulkRequest . numberOfActions () > 0 ) {
123- sendRequest (bulkRequest );
125+ if (this .bulkRequestList . size () > 0 ) {
126+ sendRequest (this . bulkRequestList );
124127 }
125128 IOUtils .closeQuietly (client );
126129 super .teardownTest (context );
127130 }
128131
129- private void sendRequest (BulkRequest bulkRequest ) throws IOException {
130- BulkResponse bulkResponse = this .client .bulk (bulkRequest );
131- if (bulkResponse .hasFailures ()) {
132+ private void sendRequest (List <String > bulkList ) throws IOException {
133+ String actionMetaData = String .format ("{ \" index\" : { \" _index\" : \" %s\" , \" _type\" : \" %s\" } }%n" , this .index , "SampleResult" );
134+
135+ StringBuilder bulkRequestBody = new StringBuilder ();
136+ for (String bulkItem : bulkList ) {
137+ bulkRequestBody .append (actionMetaData );
138+ bulkRequestBody .append (bulkItem );
139+ bulkRequestBody .append ("\n " );
140+ }
141+
142+ HttpEntity entity = new NStringEntity (bulkRequestBody .toString (), ContentType .APPLICATION_JSON );
143+ try {
144+ Response response = client .performRequest ("POST" , "/your_index/your_type/_bulk" , Collections .emptyMap (), entity );
145+ if (response .getStatusLine ().getStatusCode () != HttpStatus .SC_OK ) {
146+ if (logger .isErrorEnabled ()) {
147+ logger .error ("ElasticSearch Backend Listener failed to write results for index {}" , this .index );
148+ }
149+ }
150+ } catch (Exception e ) {
132151 if (logger .isErrorEnabled ()) {
133- logger .error ("Failed to write a result on {}: {}" ,
134- index , bulkResponse .buildFailureMessage ());
152+ logger .error ("ElasticSearch Backend Listener was unable to perform request to the ElasticSearch engine." );
135153 }
136- } else {
137- logger .debug ("Wrote {} results in {}." ,
138- index );
139154 }
140155 }
141156
@@ -176,9 +191,8 @@ public Map<String, Object> getElasticData(SampleResult sr, BackendListenerContex
176191 Date elapsedTime = getElapsedTime (false );
177192 if (elapsedTime != null )
178193 jsonObject .put ("ElapsedTime" , elapsedTime );
179- jsonObject .put ("ResponseCode" , (sr .isResponseCodeOK () &&
180- StringUtils .isNumeric (sr .getResponseCode ())) ?
181- sr .getResponseCode () : context .getParameter (ES_STATUS_CODE ));
194+ jsonObject .put ("ResponseCode" , (sr .isResponseCodeOK () && StringUtils .isNumeric (sr .getResponseCode ()))
195+ ? sr .getResponseCode () : context .getParameter (ES_STATUS_CODE ));
182196
183197 //all assertions
184198 AssertionResult [] assertionResults = sr .getAssertionResults ();
0 commit comments