Skip to content

Commit d1a4951

Browse files
committed
Update specs to match new code
1 parent e75a421 commit d1a4951

File tree

9 files changed

+81
-25
lines changed

9 files changed

+81
-25
lines changed

lib/logstash/inputs/s3.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
4646
# unless otherwise configured.
4747
config :region, :validate => :string, :default => 'us-east-1'
4848

49+
config :access_key_id, :validate => :string, :default => nil
50+
51+
config :secret_access_key, :validate => :string, :default => nil
52+
4953
# If specified, the prefix of filenames in the bucket must match (not a regexp)
5054
config :prefix, :validate => :string, :default => nil
5155

lib/logstash/inputs/s3/sincedb.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,13 @@ def reseed(remote_file)
9999
private
100100
attr_reader :options
101101

102-
def start_bookkeeping
102+
def start_bookkeeping
103+
raise 'never let this run'
103104
@stopped.make_false
104105

105106
Thread.new do
106107
LogStash::Util.set_thread_name("S3 input, sincedb periodic fsync")
107-
Stud.interval(1) { periodic_sync }
108+
Stud.interval(1) { puts 'Running bookkeeper'; periodic_sync }
108109
end
109110
end
110111

spec/inputs/s3/event_processor_spec.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,29 @@
66
require "thread"
77

88
describe LogStash::Inputs::S3::EventProcessor do
9+
let(:logstash_inputs_s3) { double("logstash-inputs-s3") }
10+
let(:include_object_properties) { true }
11+
let(:logger) { double("Logger").as_null_object }
912
let(:metadata) { { "s3" => { "bucket_name" => "bucket-land" } } }
1013
let(:encoded_line) { LogStash::Json.dump({ "message" => "Hello World" }) }
1114
let(:codec) { LogStash::Codecs::JSON.new }
1215
let(:queue) { Queue.new }
13-
16+
let(:remote_file_data) { { "bucket_name" => "bucket-land" } }
17+
1418
before do
15-
described_class.new(codec, queue).process(encoded_line, metadata)
19+
described_class.new(logstash_inputs_s3, codec, queue, include_object_properties, logger).process(encoded_line, metadata, remote_file_data)
1620
end
1721

1822
subject { queue.pop }
1923

24+
it "queue should have things in it" do
25+
expect(queue).not_to be_empty
26+
end
27+
28+
it "Event object should not be nil" do
29+
expect(subject).not_to be_nil
30+
end
31+
2032
it "uses the codec and insert the event to the queue" do
2133
expect(subject["message"]).to eq("Hello World")
2234
end

spec/inputs/s3/poller_spec.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,23 @@
66
require "rspec/wait"
77

88
describe LogStash::Inputs::S3::Poller do
9-
let(:sincedb) { double("sincedb") }
9+
let(:sincedb) { double("sincedb").as_null_object }
10+
let(:logger) { double("logger").as_null_object }
1011
let(:bucket_name) { "my-stuff" }
1112
let(:bucket) { Aws::S3::Bucket.new(:stub_responses => true, :name => bucket_name) }
13+
let(:remote_objects) { double("remote_objects") }
1214
let(:objects) { [OpenStruct.new({:key => "myobject", :last_modified => Time.now-60, :body => "Nooo" })] }
1315

1416
before :each do
15-
allow(bucket).to receive(:objects).with(anything).and_return(objects)
17+
allow(bucket).to receive(:objects).with(anything).and_return(remote_objects)
18+
allow(remote_objects).to receive(:limit).with(anything) do |num|
19+
expect(num).to be_a(Integer)
20+
expect(num).to be > 0
21+
objects
22+
end
1623
end
1724

18-
subject { described_class.new(bucket) }
25+
subject { described_class.new(bucket, sincedb, logger) }
1926

2027
it "lists the files from the remote host" do
2128
retrieved_objects = []

spec/inputs/s3/post_processor_spec.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
require "stud/temporary"
66

77
describe LogStash::Inputs::S3::PostProcessor do
8-
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object) }
8+
let(:logger) { double("logger").as_null_object }
9+
let(:gzip_pattern) { "*.gz" }
10+
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
911
let(:s3_object) { double("s3_object",
1012
:key => "hola",
1113
:bucket_name => "mon-bucket",
@@ -16,15 +18,22 @@
1618
describe LogStash::Inputs::S3::PostProcessor::UpdateSinceDB do
1719
let(:ignore_older) { 3600 }
1820
let(:sincedb_path) { Stud::Temporary.file.path }
19-
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, ignore_older) }
21+
let(:logger) { double("logger").as_null_object }
22+
23+
before do
24+
# Avoid starting the bookkeeping thread since it will keep running after the test
25+
allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping)
26+
end
27+
28+
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, ignore_older, logger) }
2029

2130
subject { described_class.new(sincedb) }
2231

2332
after :each do
2433
File.delete(sincedb_path)
2534
end
2635

27-
it "make the remote file as completed" do
36+
it "mark the remote file as completed" do
2837
subject.process(remote_file)
2938
expect(sincedb.processed?(remote_file)).to be_truthy
3039
end

spec/inputs/s3/proccessing_policy_validator_spec.rb

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
require "stud/temporary"
66

77
describe LogStash::Inputs::S3::ProcessingPolicyValidator do
8-
let(:remote_file) { RemoteFile.new(s3_object) }
8+
let(:logger) { double("logger").as_null_object }
9+
let(:gzip_pattern) { "*.gz" }
10+
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
911
let(:s3_object) { double("s3_object", :key => "hola", :content_length => 20, :last_modified => Time.now-60) }
1012

1113
let(:validator_1) { LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile }
@@ -15,16 +17,16 @@
1517
subject { described_class }
1618

1719
it "accepts multiples validator" do
18-
expect(subject.new(validator_1, validator_2).count).to eq(2)
20+
expect(subject.new(logger, validator_1, validator_2).count).to eq(2)
1921
end
2022

2123
it "accepts one validator" do
22-
expect(subject.new(validator_1).count).to eq(1)
24+
expect(subject.new(logger, validator_1).count).to eq(1)
2325
end
2426
end
2527

2628
context "#add_policy" do
27-
subject { described_class.new(validator_1) }
29+
subject { described_class.new(logger, validator_1) }
2830

2931
it "allows to add more validators" do
3032
expect(subject.count).to eq(1)
@@ -43,7 +45,7 @@
4345
end
4446

4547
context "#process?" do
46-
subject { described_class.new(validator_1, validator_2) }
48+
subject { described_class.new(logger, validator_1, validator_2) }
4749

4850
it "execute the validator in declarations order" do
4951
expect(validator_1).to receive(:process?).ordered.and_return(true)
@@ -133,7 +135,14 @@
133135
let(:older_than) { 3600 }
134136
let(:s3_object) { double("remote_file", :etag => "1234", :bucket_name => "mon-bucket", :key => "hola", :content_length => 100, :last_modified => Time.now) }
135137
let(:sincedb_path) { Stud::Temporary.file.path }
136-
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, older_than) }
138+
let(:logger) { double("logger").as_null_object }
139+
140+
before do
141+
# Avoid starting the bookkeeping thread since it will keep running after the test
142+
allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping)
143+
end
144+
145+
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, older_than, logger) }
137146

138147
subject { described_class.new(sincedb) }
139148

spec/inputs/s3/processor_spec.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,22 @@
1111
let(:post_processors) { [post_processor_1, post_processor_2] }
1212

1313
let(:validator) { LogStash::Inputs::S3::ProcessingPolicyValidator.new(LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile) }
14-
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object) }
14+
let(:logger) { double("Logger").as_null_object }
15+
let(:gzip_pattern) { "*.gz" }
16+
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
1517
let(:s3_object) { double("s3_object",
18+
:data => { "bucket_name" => "mon-bucket" },
1619
:key => "hola",
1720
:bucket_name => "mon-bucket",
1821
:content_length => 20,
1922
:last_modified => Time.now-60) }
2023

21-
subject { described_class.new(validator, event_processor, post_processors) }
24+
subject { described_class.new(validator, event_processor, logger, post_processors) }
2225

2326
context "When handling remote file" do
24-
context "when the file is not valid to process" do
27+
context "when the file is not valid to process (because content_length = 0)" do
2528
let(:s3_object) { double("s3_object",
29+
:data => { "bucket_name" => "mon-bucket" },
2630
:key => "hola",
2731
:content_length => 0,
2832
:last_modified => Time.now-60) }
@@ -44,7 +48,7 @@
4448

4549
it "send the file content to the event processor" do
4650
subject.handle(remote_file)
47-
expect(event_processor).to have_received(:process).with(content, { "s3" => hash_including(metadata["s3"])})
51+
expect(event_processor).to have_received(:process).with(content, { "s3" => hash_including(metadata["s3"])}, s3_object.data)
4852
end
4953

5054
it "sends the file to all post processors" do

spec/inputs/s3/remote_file_spec.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
require "logstash/inputs/s3/remote_file"
33

44
describe LogStash::Inputs::S3::RemoteFile do
5-
subject { described_class.new(s3_object) }
5+
let(:logger) { double("logger").as_null_object }
6+
let(:gzip_pattern) { "\.gz(ip)?$" }
7+
8+
subject { described_class.new(s3_object, logger, gzip_pattern) }
69

710
context "#compressed_gzip?" do
8-
context "when `content_type` is `application/gzip`" do
11+
context "when remote object key ends in .gz" do
912
let(:s3_object) { double("s3_object",
1013
:content_type => "application/gzip",
11-
:key => "hola",
14+
:key => "hola.gz",
1215
:content_length => 20,
1316
:last_modified => Time.now-60) }
1417

@@ -17,7 +20,7 @@
1720
end
1821
end
1922

20-
context "when `content_type` is not `application/gzip`" do
23+
context "when remote object key ends in something else" do
2124
let(:s3_object) { double("s3_object",
2225
:content_type => "text/plain",
2326
:key => "hola",

spec/integration/retrieve_logs_spec.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# encoding: utf-8
22
require "logstash/inputs/s3"
3+
require "logstash/inputs/s3/sincedb"
34
require_relative "../support/matcher_helpers"
45
require_relative "../support/s3_input_test_helper"
56
require "stud/temporary"
@@ -16,6 +17,12 @@
1617

1718
describe "Retrieve logs from S3", :tags => :integration do
1819
let(:queue) { Queue.new }
20+
let(:stub_since_db) { double("since_db") }
21+
22+
before do
23+
# Stub this out so that we can avoid starting the bookkeeper thread which doesn't die
24+
allow(LogStash::Inputs::S3::SinceDB).to receive(:new).with(anything).and_return(stub_since_db)
25+
end
1926

2027
let(:plugin) { LogStash::Inputs::S3.new(plugin_config) }
2128

@@ -35,7 +42,7 @@
3542
let(:region) { REGION }
3643

3744
let(:plugin_config) do
38-
super.merge({
45+
super().merge({
3946
"access_key_id" => access_key_id,
4047
"secret_access_key" => secret_access_key,
4148
"region" => region,

0 commit comments

Comments
 (0)