Skip to content

Commit c32c13f

Browse files
pheherot
authored andcommitted
Restructure for better large bucket support
* Use a synchronous queue to handle the processing of events in parallel * Switch to aws-sdk v3 * Add a lot of logging * Enable the use of the SDK's start_after parameter call to fetch only new events (useful in cases where objects are stored in alphabetical order by time, such as S3 access logs) * Limit the batch size of the S3 request
1 parent b7f42d7 commit c32c13f

26 files changed

+1575
-1044
lines changed

lib/logstash/inputs/s3.rb

Lines changed: 157 additions & 325 deletions
Large diffs are not rendered by default.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# encoding: utf-8
2+
require "logstash/inputs/base"
3+
4+
module LogStash module Inputs class S3 < LogStash::Inputs::Base
5+
# Take the raw event from the files and apply the codec
6+
# and the metadata.
7+
class EventProcessor
8+
def initialize(logstash_inputs_s3, codec, queue, include_object_properties, logger)
9+
@queue = queue
10+
@codec = codec
11+
@logstash_inputs_s3 = logstash_inputs_s3
12+
@include_object_properties = include_object_properties
13+
@logger = logger
14+
end
15+
16+
def process(line, metadata, remote_file_data)
17+
@codec.decode(line) do |event|
18+
# We are making an assumption concerning cloudfront
19+
# log format, the user will use the plain or the line codec
20+
# and the message key will represent the actual line content.
21+
# If the event is only metadata the event will be drop.
22+
# This was the behavior of the pre 1.5 plugin.
23+
#
24+
# The line need to go through the codecs to replace
25+
# unknown bytes in the log stream before doing a regexp match or
26+
# you will get a `Error: invalid byte sequence in UTF-8'
27+
if event_is_metadata?(event)
28+
@logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
29+
return update_metadata(metadata, event)
30+
end
31+
32+
@logger.debug('Event is not metadata, pushing to queue', :event => event, :metadata => metadata)
33+
push_decoded_event(@queue, metadata, remote_file_data, event)
34+
end
35+
end
36+
37+
private
38+
39+
def push_decoded_event(queue, metadata, remote_file_data, event)
40+
@logstash_inputs_s3.send(:decorate, event)
41+
42+
if @include_object_properties
43+
event.set("[@metadata][s3]", remote_file_data.to_h)
44+
else
45+
event.set("[@metadata][s3]", {})
46+
end
47+
48+
# event.set("[@metadata][s3][key]", remote_file.key) # key should already be in remote_file_data.to_h
49+
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
50+
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?
51+
52+
queue << event
53+
end
54+
55+
def event_is_metadata?(event)
56+
return false unless event.get("message").class == String
57+
line = event.get("message")
58+
version_metadata?(line) || fields_metadata?(line)
59+
end
60+
61+
def version_metadata?(line)
62+
line.start_with?('#Version: ')
63+
end
64+
65+
def fields_metadata?(line)
66+
line.start_with?('#Fields: ')
67+
end
68+
69+
def update_metadata(metadata, event)
70+
line = event.get('message').strip
71+
72+
if version_metadata?(line)
73+
metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last
74+
end
75+
76+
if fields_metadata?(line)
77+
metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last
78+
end
79+
end
80+
end
81+
end end end

lib/logstash/inputs/s3/patch.rb

Lines changed: 0 additions & 20 deletions
This file was deleted.

lib/logstash/inputs/s3/poller.rb

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# encoding: utf-8
2+
require "logstash/inputs/base"
3+
4+
require "logstash/inputs/s3/remote_file"
5+
require "stud/interval"
6+
7+
module LogStash module Inputs class S3 < LogStash::Inputs::Base
8+
class Poller
9+
DEFAULT_OPTIONS = {
10+
:polling_interval => 1,
11+
:use_start_after => false,
12+
:batch_size => 1000,
13+
:buckets_options => {},
14+
:gzip_pattern => "\.gz(ip)?$"
15+
}
16+
17+
def initialize(bucket, sincedb, logger, options = {})
18+
@bucket = bucket
19+
@sincedb = sincedb
20+
@logger = logger
21+
@stopped = false
22+
23+
@options = DEFAULT_OPTIONS.merge(options)
24+
@last_key_fetched = nil if @options[:use_start_after]
25+
end
26+
27+
def run(&block)
28+
Stud.interval(options[:polling_interval]) do
29+
Stud.stop! if stop?
30+
31+
retrieved_count = retrieve_objects(&block)
32+
33+
# If we retrieved the amount of the batch size, it means there are still
34+
# more objects to retrieve so don't wait for the next interval
35+
redo if retrieved_count == options[:batch_size]
36+
end
37+
end
38+
39+
def stop
40+
@stopped = true
41+
end
42+
43+
private
44+
attr_reader :options
45+
46+
def retrieve_objects(&block)
47+
@logger.debug("Retrieving objects from S3", :options => options)
48+
49+
retrieved_count = 0
50+
remote_objects.limit(options[:batch_size]).each do |object|
51+
return if stop?
52+
53+
block.call(RemoteFile.new(object, @logger, @gzip_pattern))
54+
55+
if options[:use_start_after]
56+
@last_key_fetched = object.key
57+
@logger.debug("Setting last_key_fetched", :last_key_fetched => @last_key_fetched)
58+
end
59+
60+
retrieved_count += 1
61+
end
62+
63+
retrieved_count
64+
end
65+
66+
def remote_objects
67+
@logger.info("Instantiating S3 object collection", :bucket_listing_options => bucket_listing_options)
68+
@bucket.objects(bucket_listing_options)
69+
end
70+
71+
def bucket_listing_options
72+
output = {}
73+
74+
if options[:use_start_after]
75+
if @last_key_fetched
76+
@logger.debug("Setting start_after to last_key_fetched",
77+
:last_key_fetched => @last_key_fetched)
78+
output[:start_after] = @last_key_fetched
79+
elsif (oldest_key = @sincedb.oldest_key)
80+
@logger.debug("Setting start_after to SinceDB.oldest_key", :oldest_key => oldest_key)
81+
output[:start_after] = oldest_key
82+
else
83+
@logger.debug("use_start_after is enabled but no previous key was found in the " +
84+
"sincedb and @last_key_fetched is nil. Starting from the beginning" +
85+
" of the bucket.")
86+
end
87+
else
88+
@logger.debug("use_start_after is disabled, relying on last_modified to filter seen objects")
89+
end
90+
91+
output.merge(options[:buckets_options])
92+
end
93+
94+
def stop?
95+
@stopped
96+
end
97+
end
98+
end;end;end
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# encoding: utf-8
2+
require "logstash/inputs/base"
3+
4+
require "fileutils"
5+
6+
module LogStash module Inputs class S3 < LogStash::Inputs::Base
7+
class PostProcessor
8+
class UpdateSinceDB
9+
def initialize(sincedb)
10+
@sincedb = sincedb
11+
end
12+
13+
def process(remote_file)
14+
@sincedb.completed(remote_file)
15+
end
16+
end
17+
18+
class BackupLocally
19+
def initialize(backup_to_dir)
20+
@backup_dir = backup_to_dir
21+
end
22+
23+
def process(remote_file)
24+
destination = File.join(@backup_dir, remote_file.key)
25+
26+
if File.exist?(destination)
27+
destination = File.join(@backup_dir, "#{remote_file.key}_#{remote_file.version}")
28+
end
29+
30+
case remote_file.file
31+
when StringIO
32+
File.open(destination) { |f| f.write(remote_file.file.read) }
33+
when File
34+
FileUtils.cp(remote_file.file.path, destination)
35+
end
36+
end
37+
end
38+
39+
class BackupToBucket
40+
attr_reader :backup_bucket, :backup_prefix
41+
42+
def initialize(backup_bucket, backup_prefix = nil)
43+
@backup_bucket = backup_bucket
44+
@backup_prefix = backup_prefix
45+
end
46+
47+
def process(remote_file)
48+
remote_file.remote_object.copy_to(destination(remote_file))
49+
end
50+
51+
def destination(remote_file)
52+
"#{@backup_bucket}/#{rename(remote_file.key)}"
53+
end
54+
55+
def rename(key)
56+
backup_prefix.nil? ? key : "#{backup_prefix}#{key}"
57+
end
58+
end
59+
60+
class MoveToBucket < BackupToBucket
61+
def process(remote_file)
62+
remote_file.remote_object.move_to(destination(remote_file))
63+
end
64+
end
65+
66+
class DeleteFromSourceBucket
67+
def process(remote_file)
68+
remote_file.remote_object.delete
69+
end
70+
end
71+
end end end end
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# encoding: utf-8
2+
require "logstash/inputs/base"
3+
4+
module LogStash module Inputs class S3 < LogStash::Inputs::Base
5+
class ProcessingPolicyValidator
6+
class SkipEndingDirectory
7+
ENDING_DIRECTORY_STRING = "/"
8+
9+
def self.process?(remote_file)
10+
!remote_file.key.end_with?(ENDING_DIRECTORY_STRING)
11+
end
12+
end
13+
14+
class SkipEmptyFile
15+
def self.process?(remote_file)
16+
remote_file.content_length > 0
17+
end
18+
end
19+
20+
class IgnoreNewerThan
21+
def initialize(seconds)
22+
@seconds = seconds
23+
end
24+
25+
def process?(remote_file)
26+
Time.now - remote_file.last_modified >= @seconds
27+
end
28+
end
29+
30+
class IgnoreOlderThan
31+
def initialize(seconds)
32+
@seconds = seconds
33+
end
34+
35+
def process?(remote_file)
36+
Time.now - remote_file.last_modified <= @seconds
37+
end
38+
end
39+
40+
class AlreadyProcessed
41+
def initialize(sincedb)
42+
@sincedb = sincedb
43+
end
44+
45+
def process?(remote_file)
46+
!@sincedb.processed?(remote_file)
47+
end
48+
end
49+
50+
class ExcludePattern
51+
def initialize(pattern)
52+
@pattern = Regexp.new(pattern)
53+
end
54+
55+
def process?(remote_file)
56+
remote_file.key !~ @pattern
57+
end
58+
end
59+
60+
class ExcludeBackupedFiles < ExcludePattern
61+
def initialize(backup_prefix)
62+
super(/^#{backup_prefix}/)
63+
end
64+
end
65+
66+
def initialize(logger, *policies)
67+
@logger = logger
68+
@policies = []
69+
add_policy(policies)
70+
end
71+
72+
def add_policy(*policies)
73+
@policies = @policies.concat([policies].flatten)
74+
end
75+
76+
def process?(remote_file)
77+
# TODO log were we stop
78+
@policies.all? do |policy|
79+
if policy.process?(remote_file)
80+
true
81+
else
82+
@logger.debug("Skipping file because of policy", :remote_file => remote_file, :policy => policy.class)
83+
return false
84+
end
85+
end
86+
end
87+
88+
def count
89+
@policies.count
90+
end
91+
end
92+
end; end; end

0 commit comments

Comments
 (0)