Skip to content

Commit 909787d

Browse files
yaauiejgoughkaisecheng
authored
Final flush (#223)
- Decorate flushed events from multiline codecs - Fix missing metadata from the last event Fixed: #153 Co-authored-by: Jonathan Gough <jonathanpgough@gmail.com> Co-authored-by: Kaise Cheng <kaise.cheng@elastic.co>
1 parent 5768c61 commit 909787d

File tree

4 files changed

+23
-15
lines changed

4 files changed

+23
-15
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.8.3
2+
- Fix missing `metadata` and `type` of the last event [#223](https://github.com/logstash-plugins/logstash-input-s3/pull/223)
3+
14
## 3.8.2
25
- Refactor: read sincedb time once per bucket listing [#233](https://github.com/logstash-plugins/logstash-input-s3/pull/233)
36

lib/logstash/inputs/s3.rb

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -235,30 +235,34 @@ def process_local_log(queue, filename, object)
235235
@logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
236236
update_metadata(metadata, event)
237237
else
238-
decorate(event)
239-
240-
if @include_object_properties
241-
event.set("[@metadata][s3]", object.data.to_h)
242-
else
243-
event.set("[@metadata][s3]", {})
244-
end
245-
246-
event.set("[@metadata][s3][key]", object.key)
247-
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
248-
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?
249-
250-
queue << event
238+
push_decoded_event(queue, metadata, object, event)
251239
end
252240
end
253241
end
254242
# #ensure any stateful codecs (such as multi-line ) are flushed to the queue
255243
@codec.flush do |event|
256-
queue << event
244+
push_decoded_event(queue, metadata, object, event)
257245
end
258246

259247
return true
260248
end # def process_local_log
261249

250+
def push_decoded_event(queue, metadata, object, event)
251+
decorate(event)
252+
253+
if @include_object_properties
254+
event.set("[@metadata][s3]", object.data.to_h)
255+
else
256+
event.set("[@metadata][s3]", {})
257+
end
258+
259+
event.set("[@metadata][s3][key]", object.key)
260+
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
261+
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?
262+
263+
queue << event
264+
end
265+
262266
def event_is_metadata?(event)
263267
return false unless event.get("message").class == String
264268
line = event.get("message")

logstash-input-s3.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-s3'
4-
s.version = '3.8.2'
4+
s.version = '3.8.3'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files in a S3 bucket"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/s3_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@
329329
events = fetch_events(config)
330330
expect(events.size).to eq(events_to_process)
331331
expect(events[0].get("[@metadata][s3][key]")).to eql log.key
332+
expect(events[1].get("[@metadata][s3][key]")).to eql log.key
332333
end
333334

334335
it "deletes the temporary file" do

0 commit comments

Comments
 (0)