Skip to content

Commit 32f4664

Browse files
committed
Revert "Add distribution check"
This reverts commit 87847af Signed-off-by: Vijayan Balasubramanian <balasvij@amazon.com>
1 parent 1702800 commit 32f4664

File tree

8 files changed

+10
-168
lines changed

8 files changed

+10
-168
lines changed

lib/logstash/outputs/opensearch.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
class LogStash::Outputs::OpenSearch < LogStash::Outputs::Base
6969
declare_threadsafe!
7070

71-
require "logstash/outputs/opensearch/distribution_checker"
7271
require "logstash/outputs/opensearch/http_client"
7372
require "logstash/outputs/opensearch/http_client_builder"
7473
require "logstash/plugin_mixins/opensearch/api_configs"
@@ -218,7 +217,7 @@ def register
218217

219218
@logger.info("New OpenSearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
220219

221-
@client = build_client(DistributionChecker.new(@logger))
220+
@client = build_client
222221

223222
@after_successful_connection_thread = after_successful_connection do
224223
begin

lib/logstash/outputs/opensearch/distribution_checker.rb

Lines changed: 0 additions & 39 deletions
This file was deleted.

lib/logstash/outputs/opensearch/http_client.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,6 @@ def build_pool(options)
332332
adapter = build_adapter(options)
333333

334334
pool_options = {
335-
:distribution_checker => options[:distribution_checker],
336335
:sniffing => sniffing,
337336
:sniffer_delay => options[:sniffer_delay],
338337
:sniffing_path => options[:sniffing_path],

lib/logstash/outputs/opensearch/http_client/pool.rb

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
# GitHub history for details.
99

1010
require "concurrent/atomic/atomic_reference"
11-
require "logstash/plugin_mixins/opensearch/noop_distribution_checker"
1211

1312
module LogStash; module Outputs; class OpenSearch; class HttpClient;
1413
class Pool
@@ -41,7 +40,6 @@ def message
4140
end
4241

4342
attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
44-
attr_reader :distribution_checker
4543

4644
ROOT_URI_PATH = '/'.freeze
4745

@@ -80,7 +78,6 @@ def initialize(logger, adapter, initial_urls=[], options={})
8078
@stopping = false
8179

8280
@last_version = Concurrent::AtomicReference.new
83-
@distribution_checker = options[:distribution_checker] || LogStash::PluginMixins::OpenSearch::NoopDistributionChecker::INSTANCE
8481
end
8582

8683
def start
@@ -239,8 +236,7 @@ def healthcheck!
239236
@state_mutex.synchronize do
240237
meta[:version] = version
241238
set_last_version(version, url)
242-
alive = @distribution_checker.is_supported?(self, url, @maximum_seen_major_version)
243-
meta[:state] = alive ? :alive : :dead
239+
meta[:state] = :alive
244240
end
245241
rescue HostUnreachableError, BadResponseCodeError => e
246242
logger.warn("Attempted to resurrect connection to dead OpenSearch instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message)
@@ -415,18 +411,9 @@ def return_connection(url)
415411
end
416412
end
417413

418-
def get_version_map(url)
419-
request = perform_request_to_url(url, :get, ROOT_URI_PATH)
420-
LogStash::Json.load(request.body)['version']
421-
end
422-
423414
def get_version(url)
424-
get_version_map(url)['number'] # e.g. "7.10.0"
425-
end
426-
427-
def get_distribution(url)
428-
version_map = get_version_map(url)
429-
version_map['distribution']
415+
request = perform_request_to_url(url, :get, ROOT_URI_PATH)
416+
LogStash::Json.load(request.body)["version"]["number"] # e.g. "7.10.0"
430417
end
431418

432419
def last_version

lib/logstash/outputs/opensearch/http_client_builder.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def self.build(logger, hosts, params)
2424
client_settings[:proxy] = params["proxy"] if params["proxy"]
2525

2626
common_options = {
27-
:distribution_checker => params["distribution_checker"],
2827
:client_settings => client_settings,
2928
:metric => params["metric"],
3029
:resurrect_delay => params["resurrect_delay"]

lib/logstash/plugin_mixins/opensearch/common.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ module Common
2424
# Perform some OpenSearch options validations and Build the HttpClient.
2525
# Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.
2626
# @return [HttpClient] the new http client
27-
def build_client(distribution_checker=nil)
28-
params["distribution_checker"] = distribution_checker
27+
def build_client
28+
2929
# the following 3 options validation & setup methods are called inside build_client
3030
# because they must be executed prior to building the client and logstash
3131
# monitoring and management rely on directly calling build_client

lib/logstash/plugin_mixins/opensearch/noop_distribution_checker.rb

Lines changed: 0 additions & 18 deletions
This file was deleted.

spec/unit/outputs/opensearch/http_client/pool_spec.rb

Lines changed: 4 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
let(:adapter) { LogStash::Outputs::OpenSearch::HttpClient::ManticoreAdapter.new(logger) }
1717
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
1818
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
19-
let(:node_versions) { [ "7.0.0" ] }
20-
let(:get_distribution) { "opensearch" }
19+
let(:node_versions) { [ "0.0.0" ] }
2120

2221
subject { described_class.new(logger, adapter, initial_urls, options) }
2322

@@ -32,7 +31,6 @@
3231
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)
3332

3433
allow(subject).to receive(:get_version).with(any_args).and_return(*node_versions)
35-
allow(subject.distribution_checker).to receive(:get_distribution).and_return(get_distribution)
3634
end
3735

3836
after do
@@ -212,96 +210,13 @@
212210
end
213211

214212
it "picks the largest major version" do
215-
expect(subject.maximum_seen_major_version).to eq(7)
213+
expect(subject.maximum_seen_major_version).to eq(0)
216214
end
217215

218216
context "if there are nodes with multiple major versions" do
219-
let(:node_versions) { [ "0.0.0", "7.0.0" ] }
217+
let(:node_versions) { [ "0.0.0", "6.0.0" ] }
220218
it "picks the largest major version" do
221-
expect(subject.maximum_seen_major_version).to eq(7)
222-
end
223-
end
224-
end
225-
describe "distribution checking" do
226-
before(:each) do
227-
allow(subject).to receive(:health_check_request)
228-
end
229-
230-
let(:options) do
231-
super().merge(:distribution_checker => distribution_checker)
232-
end
233-
234-
context 'when DistributionChecker#is_supported? returns false' do
235-
let(:distribution_checker) { double('DistributionChecker', :is_supported? => false) }
236-
237-
it 'does not mark the URL as active' do
238-
subject.update_initial_urls
239-
expect(subject.alive_urls_count).to eq(0)
240-
end
241-
end
242-
243-
context 'when DistributionChecker#is_supported? returns true' do
244-
let(:distribution_checker) { double('DistributionChecker', :is_supported? => true) }
245-
246-
it 'marks the URL as active' do
247-
subject.update_initial_urls
248-
expect(subject.alive_urls_count).to eq(1)
249-
end
250-
end
251-
end
252-
describe 'distribution checking with cluster output' do
253-
let(:options) do
254-
super().merge(:distribution_checker => LogStash::Outputs::OpenSearch::DistributionChecker.new(logger))
255-
end
256-
257-
before(:each) do
258-
allow(subject).to receive(:health_check_request)
259-
end
260-
261-
context 'when using opensearch' do
262-
263-
context "cluster doesn't return a valid distribution" do
264-
let(:get_distribution) { nil }
265-
context "major version is not 7" do
266-
let(:node_versions) { [ "6.0.0" ] }
267-
268-
it 'marks the url as dead' do
269-
subject.update_initial_urls
270-
expect(subject.alive_urls_count).to eq(0)
271-
end
272-
273-
it 'logs message' do
274-
expect(subject.distribution_checker).to receive(:log_not_supported).once.and_call_original
275-
subject.update_initial_urls
276-
end
277-
end
278-
context "major version is 7" do
279-
let(:node_versions) { [ "7.10.2" ] }
280-
281-
it "marks the url as active" do
282-
subject.update_initial_urls
283-
expect(subject.alive_urls_count).to eq(1)
284-
end
285-
286-
it 'does not log message' do
287-
expect(subject.distribution_checker).to_not receive(:log_not_supported)
288-
subject.update_initial_urls
289-
end
290-
291-
end
292-
end
293-
context 'cluster returns valid distribution' do
294-
let(:get_distribution) { 'opensearch' }
295-
296-
it "marks the url as active" do
297-
subject.update_initial_urls
298-
expect(subject.alive_urls_count).to eq(1)
299-
end
300-
301-
it 'does not log message' do
302-
expect(subject.distribution_checker).to_not receive(:log_not_supported)
303-
subject.update_initial_urls
304-
end
219+
expect(subject.maximum_seen_major_version).to eq(6)
305220
end
306221
end
307222
end

0 commit comments

Comments
 (0)