diff --git a/README.md b/README.md index 82f7531..1fedbb2 100644 --- a/README.md +++ b/README.md @@ -115,3 +115,5 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. + +Arbitrary change to trigger a build (2) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 36a1f65..3331a02 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -1,14 +1,12 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "logstash/plugin_mixins/aws_config" require "time" require "date" require "tmpdir" require "stud/interval" require "stud/temporary" -require "aws-sdk" -require "logstash/inputs/s3/patch" +require "aws-sdk-s3" require "logstash/plugin_mixins/ecs_compatibility_support" require 'java' @@ -27,9 +25,16 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base java_import java.util.zip.GZIPInputStream java_import java.util.zip.ZipException - include LogStash::PluginMixins::AwsConfig::V2 include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) + require "logstash/inputs/s3/poller" + require "logstash/inputs/s3/processor" + require "logstash/inputs/s3/processor_manager" + require "logstash/inputs/s3/processing_policy_validator" + require "logstash/inputs/s3/event_processor" + require "logstash/inputs/s3/sincedb" + require "logstash/inputs/s3/post_processor" + config_name "s3" default :codec, "plain" @@ -37,6 +42,14 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base # The name of the S3 bucket. config :bucket, :validate => :string, :required => true + # The AWS region name for the bucket. For most S3 buckets this is us-east-1 + # unless otherwise configured. + config :region, :validate => :string, :default => 'us-east-1' + + config :access_key_id, :validate => :string, :default => nil + + config :secret_access_key, :validate => :string, :default => nil + # If specified, the prefix of filenames in the bucket must match (not a regexp) config :prefix, :validate => :string, :default => nil @@ -88,25 +101,79 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base # default to an expression that matches *.gz and *.gzip file extensions config :gzip_pattern, :validate => :string, :default => "\.gz(ip)?$" - CUTOFF_SECOND = 3 - - def initialize(*params) + # When the S3 input discovers a file that was last modified less than ignore_newer + # it will ignore it, causing it to be processed on the next attempt. This helps + # prevent the input from getting stuck on files that are actively being written to. + config :ignore_newer, :validate => :number, :default => 3 + + # When the S3 input discovers a file that was last modified + # before the specified timespan in seconds, the file is ignored. + # After it's discovery, if an ignored file is modified it is no + # longer ignored and any new data is read. The default is 24 hours. + config :ignore_older, :validate => :number, :default => 24 * 60 * 60 + + # Use the object key as the SinceDB key, rather than the last_modified date. + # If this is set to true, objects will be fetched from S3 using start_after + # so that filtering of old objects can happen on the server side, which + # should dramatically speed up the initial listing of a bucket with many + # objects. If set to true, you can use the sincedb_start_value parameter to + # start at a manually specified key. + config :use_start_after, :validate => :boolean, :default => false + + # Used in concert with object_key_sincedb, this allows you to specify the + # object key to start at. This is useful if you want to start processing + # if you want to start from a specific key rather than the last key that + # was processed. Note that leaving this value the same across multiple restarts + # will cause the input to reprocess all objects that have been processed before. + # Can also be used without @object_key_sincedb to start at a specific last_modified + # date. (Format: 2023-10-27 15:00:12 UTC). Once the value has been set, the + # pipeline shuts down to prevent accidentally leaving this value set and surprising + # people upon restart later on. + config :sincedb_start_value, :validate => :string, :default => nil + + # How many threads to use for processing. You may want to tweak this for whatever + # gives you the best performance for your particular environment. + config :processors_count, :validate => :number, :default => 20 + + # The number of events to fetch from S3 per request. The default is 1000. + config :batch_size, :validate => :number, :default => 1000 + + # Clear the sincedb database at startup and exit. + config :purge_sincedb, :validate => :boolean, :default => false + + # Expire SinceDB entries that are sincedb_expire_secs older than the newest entry. + # This keeps the database from getting too large and slowing down processing. To avoid + # duplicate log entries when not using use_start_after, set this to a value larger than + # the oldest expected age of any file in the bucket. + config :sincedb_expire_secs, :validate => :number, :default => 120 + + # Number of times to retry processing a downloaded file when a broken pipe error occurs. + config :broken_pipe_retries, :validate => :number, :default => 10 + + public + def initialize(options = {}) super - @cloudfront_fields_key = ecs_select[disabled: 'cloudfront_fields', v1: '[@metadata][s3][cloudfront][fields]'] - @cloudfront_version_key = ecs_select[disabled: 'cloudfront_version', v1: '[@metadata][s3][cloudfront][version]'] + + if @purge_sincedb + @logger.info("Purging the sincedb and exiting", :sincedb_path => @sincedb_path) + ::File.unlink(@sincedb_path) rescue nil + return + end + + @sincedb = SinceDB.new( + @sincedb_path, + @ignore_older, + @logger, + { :sincedb_expire_secs => @sincedb_expire_secs } + ) end def register require "fileutils" require "digest/md5" - require "aws-sdk-resources" @logger.info("Registering", :bucket => @bucket, :region => @region) - s3 = get_s3object - - @s3bucket = s3.bucket(@bucket) - unless @backup_to_bucket.nil? @backup_bucket = s3.bucket(@backup_to_bucket) begin @@ -128,342 +195,117 @@ def register end def run(queue) - @current_thread = Thread.current - Stud.interval(@interval) do - process_files(queue) - stop unless @watch_for_new_files - end - end # def run - - def list_new_files - objects = [] - found = false - current_time = Time.now - sincedb_time = sincedb.read - begin - @s3bucket.objects(:prefix => @prefix).each do |log| - found = true - @logger.debug('Found key', :key => log.key) - if ignore_filename?(log.key) - @logger.debug('Ignoring', :key => log.key) - elsif log.content_length <= 0 - @logger.debug('Object Zero Length', :key => log.key) - elsif log.last_modified <= sincedb_time - @logger.debug('Object Not Modified', :key => log.key) - elsif log.last_modified > (current_time - CUTOFF_SECOND).utc # file modified within last two seconds will be processed in next cycle - @logger.debug('Object Modified After Cutoff Time', :key => log.key) - elsif (log.storage_class == 'GLACIER' || log.storage_class == 'DEEP_ARCHIVE') && !file_restored?(log.object) - @logger.debug('Object Archived to Glacier', :key => log.key) - else - objects << log - @logger.debug("Added to objects[]", :key => log.key, :length => objects.length) - end - end - @logger.info('No files found in bucket', :prefix => prefix) unless found - rescue Aws::Errors::ServiceError => e - @logger.error("Unable to list objects in bucket", :exception => e.class, :message => e.message, :backtrace => e.backtrace, :prefix => prefix) - end - objects.sort_by { |log| log.last_modified } - end # def fetch_new_files + return if @purge_sincedb - def backup_to_bucket(object) - unless @backup_to_bucket.nil? - backup_key = "#{@backup_add_prefix}#{object.key}" - @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}") - if @delete - object.delete() - end + if @sincedb_start_value && !@sincedb_start_value.empty? + reseed_sincedb + return end - end - def backup_to_dir(filename) - unless @backup_to_dir.nil? - FileUtils.cp(filename, @backup_to_dir) + @poller = Poller.new( + bucket_source, + @sincedb, + @logger, + { + :polling_interval => @interval, + :use_start_after => @use_start_after, + :batch_size => @batch_size, + :gzip_pattern => @gzip_pattern + } + ) + + validator = ProcessingPolicyValidator.new(@logger, *processing_policies) + + # Each processor is run into his own thread. + processor = Processor.new( + validator, + EventProcessor.new(self, @codec, queue, @include_object_properties, @logger), + @logger, + post_processors + ) + + @manager = ProcessorManager.new(@logger, { :processor => processor, + :processors_count => @processors_count, + :broken_pipe_retries => @broken_pipe_retries }) + @manager.start + + # The poller get all the new files from the S3 buckets, + # all the actual work is done in a processor which will handle the following + # tasks: + # - Downloading + # - Uncompressing + # - Reading (with metadata extraction for cloudfront) + # - enqueue + # - Backup strategy + # - Book keeping + @poller.run do |remote_file| + remote_file.download_to_path = @temporary_directory + @manager.enqueue_work(remote_file) end end - def process_files(queue) - objects = list_new_files - - objects.each do |log| - if stop? - break - else - process_log(queue, log) - end - end - end # def process_files - def stop - # @current_thread is initialized in the `#run` method, - # this variable is needed because the `#stop` is a called in another thread - # than the `#run` method and requiring us to call stop! with a explicit thread. - Stud.stop!(@current_thread) + # Gracefully stop the polling of new S3 documents + # the manager will stop consuming events from the queue, but will block until + # all the processors thread are done with their work this may take some time if we are downloading large + # files. + @poller.stop unless @poller.nil? + @manager.stop unless @manager.nil? + @sincedb.close # Force a fsync of the database end private - # Read the content of the local file - # - # @param [Queue] Where to push the event - # @param [String] Which file to read from - # @param [S3Object] Source s3 object - # @return [Boolean] True if the file was completely read, false otherwise. - def process_local_log(queue, filename, object) - @logger.debug('Processing file', :filename => filename) - metadata = {} - # Currently codecs operates on bytes instead of stream. - # So all IO stuff: decompression, reading need to be done in the actual - # input and send as bytes to the codecs. - read_file(filename) do |line| - if stop? - @logger.warn("Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started") - return false - end - - @codec.decode(line) do |event| - # We are making an assumption concerning cloudfront - # log format, the user will use the plain or the line codec - # and the message key will represent the actual line content. - # If the event is only metadata the event will be drop. - # This was the behavior of the pre 1.5 plugin. - # - # The line need to go through the codecs to replace - # unknown bytes in the log stream before doing a regexp match or - # you will get a `Error: invalid byte sequence in UTF-8' - if event_is_metadata?(event) - @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event) - update_metadata(metadata, event) - else - push_decoded_event(queue, metadata, object, event) - end - end - end - # #ensure any stateful codecs (such as multi-line ) are flushed to the queue - @codec.flush do |event| - push_decoded_event(queue, metadata, object, event) - end - - return true - end # def process_local_log - - def push_decoded_event(queue, metadata, object, event) - decorate(event) + def reseed_sincedb + start_object = bucket_source.objects(:prefix => @sincedb_start_value).first - if @include_object_properties - event.set("[@metadata][s3]", object.data.to_h) - else - event.set("[@metadata][s3]", {}) + if start_object + @logger.info("Reseeding sincedb and shutting down", :value => @sincedb_start_value) + ::File.unlink(@sincedb_path) rescue nil + @sincedb.reseed(start_object) + return end - event.set("[@metadata][s3][key]", object.key) - event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil? - event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil? - - queue << event - end - - def event_is_metadata?(event) - return false unless event.get("message").class == String - line = event.get("message") - version_metadata?(line) || fields_metadata?(line) - end - - def version_metadata?(line) - line.start_with?('#Version: ') + raise "Could not find sincedb_start_value object [sincedb_start_value=#{@sincedb_start_value}]" end - def fields_metadata?(line) - line.start_with?('#Fields: ') + def processing_policies + [ + ProcessingPolicyValidator::SkipEndingDirectory, + ProcessingPolicyValidator::SkipEmptyFile, + ProcessingPolicyValidator::IgnoreNewerThan.new(@ignore_newer), + ProcessingPolicyValidator::IgnoreOlderThan.new(@ignore_older), + @exclude_pattern ? ProcessingPolicyValidator::ExcludePattern.new(@exclude_pattern) : nil, + @backup_prefix ? ProcessingPolicyValidator::ExcludeBackupedFiles.new(@backup_prefix) : nil, + ProcessingPolicyValidator::AlreadyProcessed.new(@sincedb), + ].compact end - def update_metadata(metadata, event) - line = event.get('message').strip - - if version_metadata?(line) - metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last - end - - if fields_metadata?(line) - metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last - end + # PostProcessors are only run when everything went fine + # in the processing of the file. + def post_processors + [ + @backup_bucket ? PostProcessor::BackupToBucket.new(backup_to_bucket, backup_add_prefix) : nil, + @backup_dir ? PostProcessor::BackupLocally.new(backup_to_dir) : nil, + @delete ? PostProcessor::DeleteFromSourceBucket.new : nil, + PostProcessor::UpdateSinceDB.new(@sincedb) # The last step is to make sure we save our file progress + ].compact end - def read_file(filename, &block) - if gzip?(filename) - read_gzip_file(filename, block) - else - read_plain_file(filename, block) - end - rescue => e - # skip any broken file - @logger.error("Failed to read file, processing skipped", :exception => e.class, :message => e.message, :filename => filename) + def bucket_source + Aws::S3::Bucket.new(:name => @bucket, :client => client) end - def read_plain_file(filename, block) - File.open(filename, 'rb') do |file| - file.each(&block) - end + def client + opts = { :region => @region } + opts[:credentials] = credentials_options if @access_key_id && @secret_access_key + Aws::S3::Client.new(opts) end - def read_gzip_file(filename, block) - file_stream = FileInputStream.new(filename) - gzip_stream = GZIPInputStream.new(file_stream) - decoder = InputStreamReader.new(gzip_stream, "UTF-8") - buffered = BufferedReader.new(decoder) - - while (line = buffered.readLine()) - block.call(line) - end - ensure - buffered.close unless buffered.nil? - decoder.close unless decoder.nil? - gzip_stream.close unless gzip_stream.nil? - file_stream.close unless file_stream.nil? - end - - def gzip?(filename) - Regexp.new(@gzip_pattern).match(filename) - end - - def sincedb - @sincedb ||= if @sincedb_path.nil? - @logger.info("Using default generated file for the sincedb", :filename => sincedb_file) - SinceDB::File.new(sincedb_file) - else - @logger.info("Using the provided sincedb_path", :sincedb_path => @sincedb_path) - SinceDB::File.new(@sincedb_path) - end - end - - def sincedb_file - digest = Digest::MD5.hexdigest("#{@bucket}+#{@prefix}") - dir = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "s3") - FileUtils::mkdir_p(dir) - path = File.join(dir, "sincedb_#{digest}") - - # Migrate old default sincedb path to new one. - if ENV["HOME"] - # This is the old file path including the old digest mechanism. - # It remains as a way to automatically upgrade users with the old default ($HOME) - # to the new default (path.data) - old = File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")) - if File.exist?(old) - logger.info("Migrating old sincedb in $HOME to {path.data}") - FileUtils.mv(old, path) - end - end - - path - end - - def ignore_filename?(filename) - if @prefix == filename - return true - elsif filename.end_with?("/") - return true - elsif (@backup_add_prefix && @backup_to_bucket == @bucket && filename =~ /^#{backup_add_prefix}/) - return true - elsif @exclude_pattern.nil? - return false - elsif filename =~ Regexp.new(@exclude_pattern) - return true - else - return false - end - end - - def process_log(queue, log) - @logger.debug("Processing", :bucket => @bucket, :key => log.key) - object = @s3bucket.object(log.key) - - filename = File.join(temporary_directory, File.basename(log.key)) - if download_remote_file(object, filename) - if process_local_log(queue, filename, object) - if object.last_modified == log.last_modified - backup_to_bucket(object) - backup_to_dir(filename) - delete_file_from_bucket(object) - FileUtils.remove_entry_secure(filename, true) - sincedb.write(log.last_modified) - else - @logger.info("#{log.key} is updated at #{object.last_modified} and will process in the next cycle") - end - end - else - FileUtils.remove_entry_secure(filename, true) - end - end - - # Stream the remove file to the local disk - # - # @param [S3Object] Reference to the remove S3 objec to download - # @param [String] The Temporary filename to stream to. - # @return [Boolean] True if the file was completely downloaded - def download_remote_file(remote_object, local_filename) - completed = false - @logger.debug("Downloading remote file", :remote_key => remote_object.key, :local_filename => local_filename) - File.open(local_filename, 'wb') do |s3file| - return completed if stop? - begin - remote_object.get(:response_target => s3file) - completed = true - rescue Aws::Errors::ServiceError => e - @logger.warn("Unable to download remote file", :exception => e.class, :message => e.message, :remote_key => remote_object.key) - end - end - completed - end - - def delete_file_from_bucket(object) - if @delete and @backup_to_bucket.nil? - object.delete() - end - end - - def get_s3object - s3 = Aws::S3::Resource.new(aws_options_hash || {}) - end - - def file_restored?(object) - begin - restore = object.data.restore - if restore && restore.match(/ongoing-request\s?=\s?["']false["']/) - if restore = restore.match(/expiry-date\s?=\s?["'](.*?)["']/) - expiry_date = DateTime.parse(restore[1]) - return true if DateTime.now < expiry_date # restored - else - @logger.debug("No expiry-date header for restore request: #{object.data.restore}") - return nil # no expiry-date found for ongoing request - end - end - rescue => e - @logger.debug("Could not determine Glacier restore status", :exception => e.class, :message => e.message) - end - return false - end - - module SinceDB - class File - def initialize(file) - @sincedb_path = file - end - - # @return [Time] - def read - if ::File.exists?(@sincedb_path) - content = ::File.read(@sincedb_path).chomp.strip - # If the file was created but we didn't have the time to write to it - return content.empty? ? Time.new(0) : Time.parse(content) - else - return Time.new(0) - end - end - - def write(since = nil) - since = Time.now if since.nil? - ::File.open(@sincedb_path, 'w') { |file| file.write(since.to_s) } - end - end + # TODO: verify all the use cases from the mixin + def credentials_options + Aws::Credentials.new(@access_key_id, + @secret_access_key, + @session_token) end end # class LogStash::Inputs::S3 diff --git a/lib/logstash/inputs/s3/event_processor.rb b/lib/logstash/inputs/s3/event_processor.rb new file mode 100644 index 0000000..7616cd3 --- /dev/null +++ b/lib/logstash/inputs/s3/event_processor.rb @@ -0,0 +1,81 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + # Take the raw event from the files and apply the codec + # and the metadata. + class EventProcessor + def initialize(logstash_inputs_s3, codec, queue, include_object_properties, logger) + @queue = queue + @codec = codec + @logstash_inputs_s3 = logstash_inputs_s3 + @include_object_properties = include_object_properties + @logger = logger + end + + def process(line, metadata, remote_file_data) + @codec.decode(line) do |event| + # We are making an assumption concerning cloudfront + # log format, the user will use the plain or the line codec + # and the message key will represent the actual line content. + # If the event is only metadata the event will be drop. + # This was the behavior of the pre 1.5 plugin. + # + # The line need to go through the codecs to replace + # unknown bytes in the log stream before doing a regexp match or + # you will get a `Error: invalid byte sequence in UTF-8' + if event_is_metadata?(event) + @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event) + return update_metadata(metadata, event) + end + + @logger.debug('Event is not metadata, pushing to queue', :event => event, :metadata => metadata) + push_decoded_event(@queue, metadata, remote_file_data, event) + end + end + + private + + def push_decoded_event(queue, metadata, remote_file_data, event) + @logstash_inputs_s3.send(:decorate, event) + + if @include_object_properties + event.set("[@metadata][s3]", remote_file_data.to_h) + else + event.set("[@metadata][s3]", {}) + end + + # event.set("[@metadata][s3][key]", remote_file.key) # key should already be in remote_file_data.to_h + event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil? + event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil? + + queue << event + end + + def event_is_metadata?(event) + return false unless event.get("message").class == String + line = event.get("message") + version_metadata?(line) || fields_metadata?(line) + end + + def version_metadata?(line) + line.start_with?('#Version: ') + end + + def fields_metadata?(line) + line.start_with?('#Fields: ') + end + + def update_metadata(metadata, event) + line = event.get('message').strip + + if version_metadata?(line) + metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last + end + + if fields_metadata?(line) + metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last + end + end + end +end end end diff --git a/lib/logstash/inputs/s3/patch.rb b/lib/logstash/inputs/s3/patch.rb deleted file mode 100644 index 9121914..0000000 --- a/lib/logstash/inputs/s3/patch.rb +++ /dev/null @@ -1,20 +0,0 @@ -# This is patch related to the autoloading and ruby -# -# The fix exist in jruby 9k but not in the current jruby, not sure when or it will be backported -# https://github.com/jruby/jruby/issues/3645 -# -# AWS is doing tricky name discovery in the module to generate the correct error class and -# this strategy is bogus in jruby and `eager_autoload` don't fix this issue. -# -# This will be a short lived patch since AWS is removing the need. -# see: https://github.com/aws/aws-sdk-ruby/issues/1301#issuecomment-261115960 -old_stderr = $stderr - -$stderr = StringIO.new -begin - module Aws - const_set(:S3, Aws::S3) - end -ensure - $stderr = old_stderr -end diff --git a/lib/logstash/inputs/s3/poller.rb b/lib/logstash/inputs/s3/poller.rb new file mode 100644 index 0000000..90516af --- /dev/null +++ b/lib/logstash/inputs/s3/poller.rb @@ -0,0 +1,121 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "logstash/inputs/s3/remote_file" +require "stud/interval" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class Poller + DEFAULT_OPTIONS = { + :polling_interval => 1, + :use_start_after => false, + :batch_size => 1000, + :buckets_options => {}, + :gzip_pattern => "\.gz(ip)?$" + } + + def initialize(bucket, sincedb, logger, options = {}) + @bucket = bucket + @sincedb = sincedb + @logger = logger + @stopped = false + + @options = DEFAULT_OPTIONS.merge(options) + @last_key_fetched = nil if @options[:use_start_after] + end + + def run(&block) + Stud.interval(options[:polling_interval]) do + Stud.stop! if stop? + + if options[:use_start_after] + retrieve_objects_using_use_start_after(&block) + else + retrieve_objects(&block) + end + end + end + + def stop + @stopped = true + end + + private + attr_reader :options + + def retrieve_objects(&block) + @logger.debug("Retrieving objects from S3", :options => options) + + remote_objects.each do |object| + return if stop? + + block.call(RemoteFile.new(object, @logger, @options[:gzip_pattern])) + end + end + + def retrieve_objects_using_use_start_after(&block) + @logger.debug("Retrieving objects from S3 using use_start_after", :options => options) + + last_mtime_fetched = nil + + remote_objects.limit(options[:batch_size]).each do |object| + return if stop? + + block.call(RemoteFile.new(object, @logger, @options[:gzip_pattern])) + + next unless options[:use_start_after] + + if @last_key_fetched && ( + (@last_key_fetched <=> object.key) != + (last_mtime_fetched <=> object.last_modified) + ) + @logger.warn("S3 object listing is not consistent. Results may be incomplete or out of order", + :previous_object_key => @last_key_fetched, + :previous_object_mtime => last_mtime_fetched, + :current_object_key => object.key, + :current_object_last_modified => object.last_modified) + end + + @last_key_fetched = object.key + last_mtime_fetched = object.last_modified + @logger.debug("Setting last_key_fetched", :last_key_fetched => @last_key_fetched) + end + end + + def remote_objects + @logger.info("Instantiating S3 object collection", + :bucket_listing_options => bucket_listing_options, + :polling_interval => options[:polling_interval]) + objects = @bucket.objects(bucket_listing_options) + @logger.info("S3 object collection instantiated", :objects_count => objects.count) + objects + end + + def bucket_listing_options + output = {} + + if options[:use_start_after] + if @last_key_fetched + @logger.debug("Setting start_after to last_key_fetched", + :last_key_fetched => @last_key_fetched) + output[:start_after] = @last_key_fetched + elsif (oldest_key = @sincedb.oldest_key) + @logger.debug("Setting start_after to SinceDB.oldest_key", :oldest_key => oldest_key) + output[:start_after] = oldest_key + else + @logger.debug("use_start_after is enabled but no previous key was found in the " + + "sincedb and @last_key_fetched is nil. Starting from the beginning" + + " of the bucket.") + end + else + @logger.debug("use_start_after is disabled, relying on last_modified to filter seen objects") + end + + output.merge(options[:buckets_options]) + end + + def stop? + @stopped + end + end +end;end;end diff --git a/lib/logstash/inputs/s3/post_processor.rb b/lib/logstash/inputs/s3/post_processor.rb new file mode 100644 index 0000000..8cc3eca --- /dev/null +++ b/lib/logstash/inputs/s3/post_processor.rb @@ -0,0 +1,71 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "fileutils" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class PostProcessor + class UpdateSinceDB + def initialize(sincedb) + @sincedb = sincedb + end + + def process(remote_file) + @sincedb.completed(remote_file) + end + end + + class BackupLocally + def initialize(backup_to_dir) + @backup_dir = backup_to_dir + end + + def process(remote_file) + destination = File.join(@backup_dir, remote_file.key) + + if File.exist?(destination) + destination = File.join(@backup_dir, "#{remote_file.key}_#{remote_file.version}") + end + + case remote_file.file + when StringIO + File.open(destination) { |f| f.write(remote_file.file.read) } + when File + FileUtils.cp(remote_file.file.path, destination) + end + end + end + + class BackupToBucket + attr_reader :backup_bucket, :backup_prefix + + def initialize(backup_bucket, backup_prefix = nil) + @backup_bucket = backup_bucket + @backup_prefix = backup_prefix + end + + def process(remote_file) + remote_file.remote_object.copy_to(destination(remote_file)) + end + + def destination(remote_file) + "#{@backup_bucket}/#{rename(remote_file.key)}" + end + + def rename(key) + backup_prefix.nil? ? key : "#{backup_prefix}#{key}" + end + end + + class MoveToBucket < BackupToBucket + def process(remote_file) + remote_file.remote_object.move_to(destination(remote_file)) + end + end + + class DeleteFromSourceBucket + def process(remote_file) + remote_file.remote_object.delete + end + end +end end end end diff --git a/lib/logstash/inputs/s3/processing_policy_validator.rb b/lib/logstash/inputs/s3/processing_policy_validator.rb new file mode 100644 index 0000000..9d7c728 --- /dev/null +++ b/lib/logstash/inputs/s3/processing_policy_validator.rb @@ -0,0 +1,92 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class ProcessingPolicyValidator + class SkipEndingDirectory + ENDING_DIRECTORY_STRING = "/" + + def self.process?(remote_file) + !remote_file.key.end_with?(ENDING_DIRECTORY_STRING) + end + end + + class SkipEmptyFile + def self.process?(remote_file) + remote_file.content_length > 0 + end + end + + class IgnoreNewerThan + def initialize(seconds) + @seconds = seconds + end + + def process?(remote_file) + Time.now - remote_file.last_modified >= @seconds + end + end + + class IgnoreOlderThan + def initialize(seconds) + @seconds = seconds + end + + def process?(remote_file) + Time.now - remote_file.last_modified <= @seconds + end + end + + class AlreadyProcessed + def initialize(sincedb) + @sincedb = sincedb + end + + def process?(remote_file) + !@sincedb.processed?(remote_file) + end + end + + class ExcludePattern + def initialize(pattern) + @pattern = Regexp.new(pattern) + end + + def process?(remote_file) + remote_file.key !~ @pattern + end + end + + class ExcludeBackupedFiles < ExcludePattern + def initialize(backup_prefix) + super(/^#{backup_prefix}/) + end + end + + def initialize(logger, *policies) + @logger = logger + @policies = [] + add_policy(policies) + end + + def add_policy(*policies) + @policies = @policies.concat([policies].flatten) + end + + def process?(remote_file) + # TODO log were we stop + @policies.all? do |policy| + if !policy.process?(remote_file) + @logger.debug("Skipping file because of policy", :remote_file => remote_file, :policy => policy.class) + return false + end + + true + end + end + + def count + @policies.count + end + end +end; end; end diff --git a/lib/logstash/inputs/s3/processor.rb b/lib/logstash/inputs/s3/processor.rb new file mode 100644 index 0000000..61a0b33 --- /dev/null +++ b/lib/logstash/inputs/s3/processor.rb @@ -0,0 +1,47 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "aws-sdk-s3" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + # The processor represent a workers thread + class Processor + def initialize(validator, event_processor, logger, post_processors = []) + @validator = validator + @event_processor = event_processor + @logger = logger + @post_processors = post_processors + end + + def handle(remote_file) + @logger.debug("Validating remote file to see if we should download it", :remote_file => remote_file) + return if !validator.process?(remote_file) + @logger.debug("Remote file passed validation. Downloading data.", :remote_file => remote_file) + + remote_file.download! + + @logger.info("File downloaded. Emitting events.", :remote_file => remote_file) + remote_file.each_line do |line| + emit_event(line, remote_file.metadata, remote_file.data) + end + post_process(remote_file) + remote_file.cleanup + end + + private + attr_reader :event_processor, :post_processors, :validator + + def emit_event(line, metadata, object) + @event_processor.process(line, metadata, object) + end + + def post_process(remote_file) + @logger.debug("Post processing remote file", :remote_file => remote_file) + + @post_processors.each do |processor| + @logger.debug("Running post processor", :processor => processor.class) + processor.process(remote_file) + end + end + end +end; end; end diff --git a/lib/logstash/inputs/s3/processor_manager.rb b/lib/logstash/inputs/s3/processor_manager.rb new file mode 100644 index 0000000..9e08bf0 --- /dev/null +++ b/lib/logstash/inputs/s3/processor_manager.rb @@ -0,0 +1,116 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "logstash/inputs/s3/processor" +require "logstash/util" +require "thread" +require "concurrent" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + # This class Manage the processing threads and share the same processor instance + # The event processor and the post processors need to be threadsafe + class ProcessorManager + java_import java.util.concurrent.SynchronousQueue + java_import java.util.concurrent.TimeUnit + + DEFAULT_OPTIONS = { + :processors_count => 5, + :broken_pipe_retries => 10 + } + + TIMEOUT_MS = 150 # milliseconds, use for the SynchronousQueue + + attr_reader :processors_count + + def initialize(logger, options = {}) + @logger = logger + options = DEFAULT_OPTIONS.merge(options) + @processor = options[:processor] + @broken_pipe_retries = options[:broken_pipe_retries] + @processors_count = options[:processors_count] + + @available_processors = [] + + @work_queue = java.util.concurrent.SynchronousQueue.new + + @stopped = Concurrent::AtomicBoolean.new(false) + end + + def enqueue_work(remote_file) + @logger.debug("Enqueuing work", :remote_file => remote_file) + + # block the main thread until we are able to enqueue the workers + # but allow a gracefull shutdown. + success = false + + while !success && !stop? + success = @work_queue.offer(remote_file, TIMEOUT_MS, TimeUnit::MILLISECONDS) + end + end + + def start + @logger.debug("Starting processors", :processors_count => processors_count) + processors_count.times do |worker_id| + @available_processors << Thread.new do + start_processor(worker_id) + end + end + end + + def stop + @logger.debug("Stopping processors") + @stopped.make_true + @available_processors.join + end + + def start_processor(worker_id) + @logger.debug("Starting processor", :worker_id => worker_id) + loop do + break if stop? + + # This can be useful for debugging but it is extremely verbose + # @logger.debug("Waiting for new work", :worker_id => worker_id) + if remote_file = @work_queue.poll(TIMEOUT_MS, TimeUnit::MILLISECONDS) + @logger.debug("New work received", :worker_id => worker_id, :remote_file => remote_file) + LogStash::Util.set_thread_name("[S3 Input Processor - #{worker_id}/#{processors_count}] Working on: #{remote_file.bucket_name}/#{remote_file.key} size: #{remote_file.content_length}") + + tries = 0 + begin + @processor.handle(remote_file) + rescue IOError => e + @logger.error( + "IOError when processing remote file. Skipping for now (But not adding to SinceDB).", + :remote_file => remote_file, + :exception => e + ) + rescue Errno::EPIPE => e + @logger.error( + "Broken pipe when processing remote file", + :remote_file => remote_file, + :exception => e + ) + + raise e if (tries += 1) == @broken_pipe_retries + + sleep 1 + retry + rescue Aws::S3::Errors::NoSuchKey + @logger.debug( + "File not found on S3 (probably already handled by another worker)", + :remote_file => remote_file, + :worker_id => worker_id + ) + # This mean the file on S3 were removed under our current operation, + # we cannot do anything about it, the file should not be available on the next pooling + end + end + LogStash::Util.set_thread_name("[S3 Input Processor - #{worker_id}/#{processors_count}] Waiting for work") + end + end + + private + def stop? + @stopped.value + end + end +end; end; end; diff --git a/lib/logstash/inputs/s3/remote_file.rb b/lib/logstash/inputs/s3/remote_file.rb new file mode 100644 index 0000000..a1ed1ed --- /dev/null +++ b/lib/logstash/inputs/s3/remote_file.rb @@ -0,0 +1,96 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "logstash/inputs/s3/stream_downloader" +require "forwardable" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class RemoteFile + class NoKeepAlive + def self.notify! + end + + def self.complete! + end + end + + FILE_MODE = "w+b" + + extend Forwardable + + attr_reader :remote_object, :metadata, :file + attr_accessor :download_to_path + + def_delegators :@remote_object, :key, :content_length, :last_modified, :etag, :bucket_name, :data + + def initialize(object, logger, gzip_pattern, keep_alive = NoKeepAlive) + @remote_object = object + @logger = logger + @gzip_pattern = gzip_pattern + @keep_alive = keep_alive + + @downloaded = false + download_to_path = Dir.tmpdir + end + + def download! + @file = StreamDownloader.fetcher(self, @logger).fetch + @downloaded = true + end + + def download_to + # Lazy create FD + @download_to ||= begin + full_local_path = ::File.join(download_to_path, key) + local_dirname = ::File.dirname(full_local_path) + FileUtils.mkdir_p(local_dirname) + @logger.debug("Opening file for download", :remote_object => remote_object, :local_path => full_local_path) + ::File.open(full_local_path, FILE_MODE) + end + end + + def each_line(&block) + # extract_metadata_from_file + # seek for cloudfront metadata + @file.each_line do |line| + block.call(line, metadata) + @keep_alive.notify! + end + + @keep_alive.complete! + end + + def download_finished? + @downloaded + end + + def metadata + { + "s3" => { + "key" => key, + "bucket_name" => bucket_name, + "last_modified" => last_modified + } + } + end + + def cleanup + if @download_to + @download_to.close unless @download_to.closed? + ::File.delete(@download_to.path) if ::File.exist?(@download_to.path) + end + end + + def compressed_gzip? + # Usually I would use the content_type to retrieve this information. + # but this require another call to S3 for each download which isn't really optimal. + # So we will use the filename to do a best guess at the content type. + ::File.extname(remote_object.key).downcase =~ Regexp.new(@gzip_pattern) + end + + def inspect + "RemoteFile,##{object_id}: remote_object: #{remote_object.key}" + end + alias_method :to_s, :inspect + end +end;end;end diff --git a/lib/logstash/inputs/s3/sincedb.rb b/lib/logstash/inputs/s3/sincedb.rb new file mode 100644 index 0000000..3cbfa45 --- /dev/null +++ b/lib/logstash/inputs/s3/sincedb.rb @@ -0,0 +1,182 @@ +# encoding: utf-8 +require "logstash/inputs/base" + +require "logstash/util" +require "logstash/json" +require "thread_safe" +require "concurrent" +require "time" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class SinceDB + SinceDBKey = Struct.new(:key, :etag, :bucket_name) do + def ==(other) + other.key == key && other.etag == etag + end + + def self.create_from_remote(remote_file) + SinceDBKey.new(remote_file.key, remote_file.etag, remote_file.bucket_name) + end + + def to_hash + [ + key, + etag, + bucket_name + ] + end + + # TODO CHECK IF WE NEED #HASH + end + + class SinceDBValue + attr_reader :last_modified, :recorded_at + + def initialize(last_modified, recorded_at = Time.now) + begin + Time.now - last_modified + rescue TypeError => e + raise e, "last_modified must be a Time object, got #{last_modified.class} (#{last_modified.inspect})" + end + + @last_modified = last_modified + @recorded_at = recorded_at + end + + def to_hash + [recorded_at] + end + + def older?(age) + Time.now - last_modified >= age + end + end + + DEFAULT_OPTIONS = { + :sincedb_expire_secs => 120, + :bookkeeping_enabled => true # RSpec stubbing doesn't like background tasks + } + + def initialize(file, ignore_older, logger, options = {}) + @file = file + @ignore_older = ignore_older + @logger = logger + @options = DEFAULT_OPTIONS.merge(options) + + @db = ThreadSafe::Hash.new + load_database + + @need_sync = Concurrent::AtomicBoolean.new(false) + @stopped = Concurrent::AtomicBoolean.new(true) + + start_bookkeeping if options[:bookkeeping_enabled] + end + + def close + @stopped.make_true + clean_old_keys + serialize + end + + def completed(remote_file) + @db[SinceDBKey.create_from_remote(remote_file)] = SinceDBValue.new(remote_file.last_modified) + request_sync + end + + def oldest_key + return if @db.empty? + @db.min_by { |_, value| value.last_modified }.first.key + end + + def processed?(remote_file) + @db.include?(SinceDBKey.create_from_remote(remote_file)) + end + + def reseed(remote_file) + @db.clear + completed(remote_file) + end + + private + attr_reader :options + + def start_bookkeeping + @stopped.make_false + + Thread.new do + LogStash::Util.set_thread_name("S3 input, sincedb periodic fsync") + Stud.interval(1) { puts 'Running bookkeeper'; periodic_sync } + end + end + + def stop? + @stopped.true? + end + + def load_database + if !::File.exists?(@file) + @logger.debug("Not loading sincedb since none exists at specified location", :location => @file) + return + end + + @logger.debug("Loading sincedb", :location => @file) + + ::File.open(@file).each_line do |line| + data = JSON.parse(line) + @db[SinceDBKey.new(*data["key"])] = SinceDBValue.new(Time.parse(*data["value"])) + end + + @logger.debug("Loaded sincedb", :location => @file, :entries => @db.size) + end + + def newest_entry + return if @db.empty? + @db.max_by { |_, value| value.last_modified }.last + end + + def serialize + @logger.debug("Writing sincedb", :location => @file) + @db.each do |sincedbkey, sincedbvalue| + ::File.open(@file, "a") do |f| + f.puts(LogStash::Json.dump({ "key" => sincedbkey.to_hash, + "value" => sincedbvalue.to_hash })) + end + end + end + + def clean_old_keys + @logger.debug("Cleaning sincedb keys older than #{@ignore_older} seconds") + @db.each do |sincedbkey, sincedbvalue| + @db.delete(sincedbkey) if sincedbvalue.older?(@ignore_older) + end + + return unless @db.size > 1 + + newest_last_modified = newest_entry.last_modified + @logger.debug( + "Cleaning sincedb keys older than newest_last_modified - SINCEDB_EXPIRE_SECS" + + " seconds (#{newest_last_modified} - #{options[:sincedb_expire_secs]} = " + + "#{newest_last_modified - options[:sincedb_expire_secs]})") + @db.delete_if do |_sincedbkey, sincedbvalue| + newest_last_modified - sincedbvalue.last_modified > options[:sincedb_expire_secs] + end + end + + def periodic_sync + clean_old_keys + + if need_sync? + serialize + @need_sync.make_false + end + end + + def need_sync? + @need_sync.value + end + + def request_sync + @need_sync.make_true + end + end +end end end diff --git a/lib/logstash/inputs/s3/stream_downloader.rb b/lib/logstash/inputs/s3/stream_downloader.rb new file mode 100644 index 0000000..1aed28a --- /dev/null +++ b/lib/logstash/inputs/s3/stream_downloader.rb @@ -0,0 +1,57 @@ +# encoding: utf-8 +require "logstash/inputs/base" +require "stud/temporary" + +module LogStash module Inputs class S3 < LogStash::Inputs::Base + class StreamDownloader + def initialize(logger, remote_object, writer = StringIO.new) + @logger = logger + @writer = writer + @remote_object = remote_object + end + + def fetch + @logger.debug("Downloading remote file", :remote_object_key => @remote_object.key) + @remote_object.get({ :response_target => @writer }) + # @writer.rewind + @writer + end + + def self.fetcher(remote_file, logger) + if remote_file.compressed_gzip? + return CompressedStreamDownloader.new(logger, remote_file.remote_object, remote_file.download_to) + end + + StreamDownloader.new(logger, remote_file.remote_object, remote_file.download_to) + end + end + + class CompressedStreamDownloader < StreamDownloader + def fetch + compressed_file_io_object = super + @logger.debug("Decompressing gzip file", :remote_object_key => @remote_object.key) + decompress_io_object(compressed_file_io_object) + end + + private + + def decompress_io_object(io_object) + # Shelling out is necessary here until logstash-oss is using JRuby 9.4 which includes + # the Zlib::GzipReader.zcat method + output = '' + IO.popen('zcat', 'r+') do |zcat| + writer_thread = Thread.new do + while chunk = io_object.read(65536) + zcat.write(chunk) + end + zcat.close_write + end + + output = zcat.read + writer_thread.join + end + + output + end + end +end;end;end diff --git a/logstash-input-s3.gemspec b/logstash-input-s3.gemspec index 24a4ccd..2513658 100644 --- a/logstash-input-s3.gemspec +++ b/logstash-input-s3.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-s3' - s.version = '3.8.4' + s.version = '3.9.0-inv1' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files in a S3 bucket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,11 +21,13 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.1.12", "<= 2.99" - s.add_runtime_dependency 'logstash-mixin-aws', '>= 5.1.0' + s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2' s.add_runtime_dependency 'stud', '~> 0.0.18' - # s.add_runtime_dependency 'aws-sdk-resources', '>= 2.0.33' + s.add_runtime_dependency 'aws-sdk-s3', '~> 1' s.add_development_dependency 'logstash-devutils' s.add_development_dependency "logstash-codec-json" s.add_development_dependency "logstash-codec-multiline" - s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2' + s.add_development_dependency "rspec-wait" + s.add_development_dependency "concurrent-ruby" + s.add_development_dependency "flores" end diff --git a/notes.md b/notes.md new file mode 100644 index 0000000..0826f4e --- /dev/null +++ b/notes.md @@ -0,0 +1 @@ +# TO many process` method we need to refactor them into better name? SinceDB diff --git a/spec/inputs/s3/event_processor_spec.rb b/spec/inputs/s3/event_processor_spec.rb new file mode 100644 index 0000000..efcfb04 --- /dev/null +++ b/spec/inputs/s3/event_processor_spec.rb @@ -0,0 +1,41 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/s3/event_processor" +require "logstash/codecs/json" +require "logstash/json" +require "thread" + +describe LogStash::Inputs::S3::EventProcessor do + let(:logstash_inputs_s3) { double("logstash-inputs-s3") } + let(:include_object_properties) { true } + let(:logger) { double("Logger").as_null_object } + let(:metadata) { { "s3" => { "bucket_name" => "bucket-land" } } } + let(:encoded_line) { LogStash::Json.dump({ "message" => "Hello World" }) } + let(:codec) { LogStash::Codecs::JSON.new } + let(:queue) { Queue.new } + let(:remote_file_data) { { "bucket_name" => "bucket-land" } } + + before do + allow(logstash_inputs_s3).to receive(:decorate) + described_class.new(logstash_inputs_s3, codec, queue, include_object_properties, logger) + .process(encoded_line, metadata, remote_file_data) + end + + subject { queue.pop } + + it "queue should have things in it" do + expect(queue).not_to be_empty + end + + it "Event object should not be nil" do + expect(subject).not_to be_nil + end + + it "uses the codec and insert the event to the queue" do + expect(subject.get("message")).to eq("Hello World") + end + + it "add metadata to the event" do + expect(subject.get("[@metadata][s3][bucket_name]")).to eq("bucket-land") + end +end diff --git a/spec/inputs/s3/poller_spec.rb b/spec/inputs/s3/poller_spec.rb new file mode 100644 index 0000000..c534830 --- /dev/null +++ b/spec/inputs/s3/poller_spec.rb @@ -0,0 +1,89 @@ +# encoding: utf-8 +# +require "logstash/inputs/s3" +require "aws-sdk-s3" +require "ostruct" +require "rspec/wait" + +describe LogStash::Inputs::S3::Poller do + let(:sincedb) { double("sincedb").as_null_object } + let(:logger) { double("logger").as_null_object } + let(:bucket_name) { "my-stuff" } + let(:bucket) { Aws::S3::Bucket.new(:stub_responses => true, :name => bucket_name) } + let(:remote_objects) { double("remote_objects") } + let(:objects) { [OpenStruct.new({:key => "myobject", :last_modified => Time.now-60, :body => "Nooo" })] } + + before :each do + allow(bucket).to receive(:objects).with(anything).and_return(remote_objects) + allow(remote_objects).to receive(:limit).with(anything) do |num| + expect(num).to be_a(Integer) + expect(num).to be > 0 + objects + end + end + + subject { described_class.new(bucket, sincedb, logger) } + + it "lists the files from the remote host" do + retrieved_objects = [] + + subject.run do |object| + retrieved_objects << object + subject.stop if objects.size == retrieved_objects.size + end + + expect(retrieved_objects.collect(&:key)).to eq(objects.collect(&:key)) + end + + it "can be stopped" do + t = Thread.new { subject.run {} } + expect(["run", "sleep"]).to include(t.status) + subject.stop + wait_for { t.status }.to eq(false) + end + + context 'use_start_after is true' do + let(:options) { {:use_start_after => true} } + subject { described_class.new(bucket, sincedb, logger, options) } + + context 'there are files to fetch' do + it "sets the last_key_fetched" do + retrieved_objects = [] + + subject.run do |object| + retrieved_objects << object + subject.stop if objects.size == retrieved_objects.size + end + + expect(subject.instance_variable_get(:@last_key_fetched)).to eq(objects.first.key) + end + end + + context 'there are no files to fetch' do + let(:objects) { [] } + + it "does not set the last_key_fetched" do + subject.run {} + expect(subject.instance_variable_get(:@last_key_fetched)).to be_nil + end + + context 'and there is an oldest_key in the sincedb' do + let(:oldest_key) { "oldest-object-key" } + + it 'calls @bucekt.objects with the appropriate listing options' do + allow(sincedb).to receive(:oldest_key).and_return(oldest_key) + subject.run {} + expect(bucket).to have_received(:objects).with({:start_after => oldest_key}) + end + end + + context 'and there is no oldest_key in the sincedb' do + it 'calls @bucket.objects with no listing options' do + allow(sincedb).to receive(:oldest_key).and_return(nil) + subject.run {} + expect(bucket).to have_received(:objects).with({}) + end + end + end + end +end diff --git a/spec/inputs/s3/post_processor_spec.rb b/spec/inputs/s3/post_processor_spec.rb new file mode 100644 index 0000000..f2d92bf --- /dev/null +++ b/spec/inputs/s3/post_processor_spec.rb @@ -0,0 +1,42 @@ +# encoding: utf-8 +require "logstash/inputs/s3/post_processor" +require "logstash/inputs/s3/remote_file" +require "logstash/inputs/s3/sincedb" +require "stud/temporary" + +describe LogStash::Inputs::S3::PostProcessor do + let(:logger) { double("logger").as_null_object } + let(:gzip_pattern) { "*.gz" } + let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) } + let(:s3_object) { double("s3_object", + :key => "hola", + :bucket_name => "mon-bucket", + :content_length => 20, + :etag => "123", + :last_modified => Time.now-60) } + + describe LogStash::Inputs::S3::PostProcessor::UpdateSinceDB do + let(:ignore_older) { 3600 } + let(:sincedb_path) { Stud::Temporary.file.path } + let(:logger) { double("logger").as_null_object } + + before do + # Avoid starting the bookkeeping thread since it will keep running after the test + allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping) + end + + let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, ignore_older, logger) } + + subject { described_class.new(sincedb) } + + after :each do + File.delete(sincedb_path) + end + + it "mark the remote file as completed" do + subject.process(remote_file) + expect(sincedb.processed?(remote_file)).to be_truthy + end + end +end + diff --git a/spec/inputs/s3/proccessing_policy_validator_spec.rb b/spec/inputs/s3/proccessing_policy_validator_spec.rb new file mode 100644 index 0000000..1cd42fa --- /dev/null +++ b/spec/inputs/s3/proccessing_policy_validator_spec.rb @@ -0,0 +1,211 @@ +# encoding: utf-8 +require "logstash/inputs/s3/processing_policy_validator" +require "logstash/inputs/s3/remote_file" +require "logstash/inputs/s3/sincedb" +require "stud/temporary" + +describe LogStash::Inputs::S3::ProcessingPolicyValidator do + let(:logger) { double("logger").as_null_object } + let(:gzip_pattern) { "*.gz" } + let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) } + let(:s3_object) { double("s3_object", :key => "hola", :content_length => 20, :last_modified => Time.now-60) } + + let(:validator_1) { LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile } + let(:validator_2) { LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEndingDirectory } + + context "#initialize" do + subject { described_class } + + it "accepts multiples validator" do + expect(subject.new(logger, validator_1, validator_2).count).to eq(2) + end + + it "accepts one validator" do + expect(subject.new(logger, validator_1).count).to eq(1) + end + end + + context "#add_policy" do + subject { described_class.new(logger, validator_1) } + + it "allows to add more validators" do + expect(subject.count).to eq(1) + subject.add_policy(validator_2) + expect(subject.count).to eq(2) + end + + it "adds the validator at the end of the chain" do + subject.add_policy(validator_2) + + expect(validator_1).to receive(:process?).ordered.and_return(true) + expect(validator_2).to receive(:process?).ordered.and_return(true) + + subject.process?(remote_file) + end + end + + context "#process?" do + subject { described_class.new(logger, validator_1, validator_2) } + + it "execute the validator in declarations order" do + expect(validator_1).to receive(:process?).ordered.and_return(true) + expect(validator_2).to receive(:process?).ordered.and_return(true) + + subject.process?(remote_file) + end + + context "When all the validator pass" do + it "accepts to process the file" do + expect(subject.process?(remote_file)).to be_truthy + end + end + + context "When one validator fails" do + let(:s3_object) { double("s3_object", :key => "hola/", :content_length => 20, :last_modified => Time.now-60) } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEndingDirectory do + subject { described_class } + + context "when the key is a directory" do + let(:s3_object) { double("remote_file", :key => "hola/") } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "when the key is not a directory" do + let(:s3_object) { double("remote_file", :key => "hola") } + + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile do + subject { described_class } + + context "When the file is empty" do + let(:s3_object) { double("remote_file", :key => "hola", :content_length => 0) } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "When the file has contents" do + let(:s3_object) { double("remote_file", :key => "hola", :content_length => 100) } + + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::IgnoreOlderThan do + let(:older_than) { 3600 } + + subject { described_class.new(older_than) } + + context "when the file is older than the threshold" do + let(:s3_object) { double("remote_file", :key => "hola", :content_length => 100, :last_modified => Time.now - older_than) } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "when the file is newer than the threshold" do + let(:s3_object) { double("remote_file", :key => "hola", :content_length => 100, :last_modified => Time.now) } + + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::AlreadyProcessed do + let(:older_than) { 3600 } + let(:s3_object) { double("remote_file", :etag => "1234", :bucket_name => "mon-bucket", :key => "hola", :content_length => 100, :last_modified => Time.now) } + let(:sincedb_path) { Stud::Temporary.file.path } + let(:logger) { double("logger").as_null_object } + + before do + # Avoid starting the bookkeeping thread since it will keep running after the test + allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping) + end + + let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, older_than, logger) } + + subject { described_class.new(sincedb) } + + context "when we have processed the file in the past" do + before do + sincedb.completed(remote_file) + end + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "when we never processed the file" do + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::ExcludePattern do + subject { described_class.new(exclude_pattern) } + + let(:s3_object) { double("remote_file", :key => "bonjourlafamille" ) } + + context "When the pattern is valid" do + context "When the file is match the pattern" do + let(:exclude_pattern) { "^bonjour" } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "When the file doesnt match the pattern" do + let(:exclude_pattern) { "^notmatch" } + + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end + end + + describe LogStash::Inputs::S3::ProcessingPolicyValidator::ExcludeBackupedFiles do + subject { described_class.new(backup_prefix) } + + let(:s3_object) { double("remote_file", :key => "bonjourlafamille" ) } + + context "When the file start with the backup prefix" do + let(:backup_prefix) { "bonjour" } + + it "doesnt accept to process" do + expect(subject.process?(remote_file)).to be_falsey + end + end + + context "when the file doesnt start with the backup prefix" do + let(:backup_prefix) { "Aholabonjour" } + + it "accepts to process" do + expect(subject.process?(remote_file)).to be_truthy + end + end + end +end diff --git a/spec/inputs/s3/processor_spec.rb b/spec/inputs/s3/processor_spec.rb new file mode 100644 index 0000000..43408dd --- /dev/null +++ b/spec/inputs/s3/processor_spec.rb @@ -0,0 +1,68 @@ +# encoding: utf-8 +require "logstash/inputs/s3/processor" +require "logstash/inputs/s3/remote_file" +require "logstash/inputs/s3/processing_policy_validator" +require "logstash/inputs/s3/event_processor" + +describe LogStash::Inputs::S3::Processor do + let(:event_processor) { spy("LogStash::Inputs::S3::EventProcessor") } + let(:post_processor_1) { spy("LogStash::Inputs::S3::PostProcessor") } + let(:post_processor_2) { spy("LogStash::Inputs::S3::PostProcessor") } + let(:post_processors) { [post_processor_1, post_processor_2] } + let(:logger) { double("logger").as_null_object } + + let(:validator) { + LogStash::Inputs::S3::ProcessingPolicyValidator.new( + logger, + LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile + ) + } + + let(:gzip_pattern) { "*.gz" } + let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) } + let(:s3_object) { double("s3_object", + :data => { "bucket_name" => "mon-bucket" }, + :key => "hola", + :bucket_name => "mon-bucket", + :content_length => 20, + :last_modified => Time.now-60) } + + subject { described_class.new(validator, event_processor, logger, post_processors) } + + context "When handling remote file" do + context "when the file is not valid to process (because content_length = 0)" do + let(:s3_object) { double("s3_object", + :data => { "bucket_name" => "mon-bucket" }, + :key => "hola", + :content_length => 0, + :last_modified => Time.now-60) } + + it "doesnt download the file" do + expect(remote_file).not_to receive(:download!) + subject.handle(remote_file) + end + end + + context "when the file is valid to process" do + let(:content) { "bonjour la famille" } + let(:metadata) { { "s3" => { "key" => "hola", "bucket_name" => "mon-bucket" }} } + + before do + expect(remote_file).to receive(:download!).and_return(true) + expect(remote_file).to receive(:each_line).and_yield(content) + end + + it "send the file content to the event processor" do + subject.handle(remote_file) + expect(event_processor).to have_received(:process).with(content, { "s3" => hash_including(metadata["s3"])}, s3_object.data) + end + + it "sends the file to all post processors" do + subject.handle(remote_file) + expect(post_processor_1).to have_received(:process).with(remote_file) + expect(post_processor_2).to have_received(:process).with(remote_file) + end + end + end +end + diff --git a/spec/inputs/s3/remote_file_spec.rb b/spec/inputs/s3/remote_file_spec.rb new file mode 100644 index 0000000..f4dcd03 --- /dev/null +++ b/spec/inputs/s3/remote_file_spec.rb @@ -0,0 +1,34 @@ +# encoding: utf-8 +require "logstash/inputs/s3/remote_file" + +describe LogStash::Inputs::S3::RemoteFile do + let(:logger) { double("logger").as_null_object } + let(:gzip_pattern) { "\.gz(ip)?$" } + + subject { described_class.new(s3_object, logger, gzip_pattern) } + + context "#compressed_gzip?" do + context "when remote object key ends in .gz" do + let(:s3_object) { double("s3_object", + :content_type => "application/gzip", + :key => "hola.gz", + :content_length => 20, + :last_modified => Time.now-60) } + + it "return true" do + expect(subject.compressed_gzip?).to be_truthy + end + end + + context "when remote object key ends in something else" do + let(:s3_object) { double("s3_object", + :content_type => "text/plain", + :key => "hola", + :content_length => 20, + :last_modified => Time.now-60) } + it "return false" do + expect(subject.compressed_gzip?).to be_falsey + end + end + end +end diff --git a/spec/inputs/s3/sincedb_spec.rb b/spec/inputs/s3/sincedb_spec.rb new file mode 100644 index 0000000..80ee57d --- /dev/null +++ b/spec/inputs/s3/sincedb_spec.rb @@ -0,0 +1,113 @@ +require "logstash/inputs/s3/sincedb" + +describe LogStash::Inputs::S3::SinceDB do + let(:sincedb_path) { Stud::Temporary.file.path } + let(:ignore_older) { 86400 } + let(:logger) { double("logger").as_null_object } + let(:key) { LogStash::Inputs::S3::SinceDB::SinceDBKey.new("file.txt", "etag123", "bucket") } + let(:value) { LogStash::Inputs::S3::SinceDB::SinceDBValue.new(Time.now, Time.now) } + let(:options) { { sincedb_expire_secs: 120, bookkeeping_enabled: false } } + let(:sincedb_args) { [sincedb_path, ignore_older, logger, options] } + + subject { described_class.new(*sincedb_args) } + + describe "#initialize" do + it "initializes the sincedb with default options" do + expect(subject.instance_variable_get(:@options)).to eq(options) + expect(subject.instance_variable_get(:@db)).to be_a(ThreadSafe::Hash) + expect(subject.instance_variable_get(:@need_sync)).to be_a(Concurrent::AtomicBoolean) + expect(subject.instance_variable_get(:@stopped)).to be_a(Concurrent::AtomicBoolean) + end + end + + describe "#close" do + it "cleans old keys and serializes the sincedb" do + expect(subject.instance_variable_get(:@stopped)).to receive(:make_true) + expect(subject).to receive(:clean_old_keys) + expect(subject).to receive(:serialize) + subject.close + end + + context 'db contains value that is less than sincedb_expire_secs older than the newest value' do + let(:old_value) { LogStash::Inputs::S3::SinceDB::SinceDBValue.new(Time.now - 119, Time.now - 119) } + let(:new_value) { LogStash::Inputs::S3::SinceDB::SinceDBValue.new(Time.now, Time.now) } + let(:new_key) { LogStash::Inputs::S3::SinceDB::SinceDBKey.new("file2.txt", "etag123", "bucket") } + + it 'preserves both entries in the db' do + subject.instance_variable_get(:@db)[key] = old_value + subject.instance_variable_get(:@db)[new_key] = new_value + subject.close + expect(subject.instance_variable_get(:@db)).to include(key) + expect(subject.instance_variable_get(:@db)).to include(new_key) + end + end + + context 'db contains value that is more than sincedb_expire_secs older than the newest value' do + let(:old_value) { LogStash::Inputs::S3::SinceDB::SinceDBValue.new(Time.now - 121, Time.now - 121) } + let(:new_value) { LogStash::Inputs::S3::SinceDB::SinceDBValue.new(Time.now, Time.now) } + let(:new_key) { LogStash::Inputs::S3::SinceDB::SinceDBKey.new("file2.txt", "etag123", "bucket") } + + it "cleans only the old entries from the db" do + subject.instance_variable_get(:@db)[key] = old_value + subject.instance_variable_get(:@db)[new_key] = new_value + subject.close + expect(subject.instance_variable_get(:@db)).to include(new_key) + expect(subject.instance_variable_get(:@db)).not_to include(key) + end + end + end + + describe "#completed" do + let(:remote_file) { double('remote_file') } + + before do + allow(remote_file).to receive(:etag).and_return("etag123") + allow(remote_file).to receive(:key).and_return("file.txt") + allow(remote_file).to receive(:bucket_name).and_return("bucket") + allow(remote_file).to receive(:last_modified).and_return(Time.now) + end + + it "requests sync" do + expect(subject).to receive(:request_sync) + subject.completed(remote_file) + end + end + + describe "#oldest_key" do + it "returns the oldest key in the sincedb" do + subject.instance_variable_get(:@db)[key] = value + oldest_key = subject.oldest_key + expect(oldest_key).to eq(key.key) + end + end + + describe "#processed?" do + let(:remote_file) { double('remote_file') } + + before do + allow(remote_file).to receive(:etag).and_return("etag123") + allow(remote_file).to receive(:key).and_return("file.txt") + allow(remote_file).to receive(:bucket_name).and_return("bucket") + end + + it "returns true if the sincedb is not empty" do + subject.instance_variable_get(:@db)[key] = value + expect(subject.processed?(remote_file)).to be true + end + + it "returns false if the sincedb is empty" do + expect(subject.processed?(remote_file)).to be false + end + end + + describe "#reseed" do + let(:remote_file) { "remote_file.txt" } + + it "calls completed with the remote file" do + expect(subject).to receive(:completed).with(remote_file) + subject.reseed(remote_file) + end + end + + # Add more tests for other methods as needed +end diff --git a/spec/inputs/s3/stream_downloader_spec.rb b/spec/inputs/s3/stream_downloader_spec.rb new file mode 100644 index 0000000..92d8ec0 --- /dev/null +++ b/spec/inputs/s3/stream_downloader_spec.rb @@ -0,0 +1,5 @@ +# encoding: utf-8 +require "logstash/inputs/s3/stream_downloader" + +## S3Object +## KeepAlive(SQSMessage) diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb deleted file mode 100644 index b34b682..0000000 --- a/spec/inputs/s3_spec.rb +++ /dev/null @@ -1,612 +0,0 @@ -# encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" -require "logstash/devutils/rspec/shared_examples" -require "logstash/inputs/s3" -require "logstash/codecs/multiline" -require "logstash/errors" -require "aws-sdk-resources" -require_relative "../support/helpers" -require "stud/temporary" -require "aws-sdk" -require "fileutils" -require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' - -describe LogStash::Inputs::S3 do - let(:temporary_directory) { Stud::Temporary.pathname } - let(:sincedb_path) { Stud::Temporary.pathname } - let(:day) { 3600 * 24 } - let(:creds) { Aws::Credentials.new('1234', 'secret') } - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "temporary_directory" => temporary_directory, - "sincedb_path" => File.join(sincedb_path, ".sincedb") - } - } - let(:cutoff) { LogStash::Inputs::S3::CUTOFF_SECOND } - - - before do - FileUtils.mkdir_p(sincedb_path) - Aws.config[:stub_responses] = true - Thread.abort_on_exception = true - end - - context "when interrupting the plugin" do - let(:config) { super().merge({ "interval" => 5 }) } - let(:s3_obj) { double(:key => "awesome-key", :last_modified => Time.now.round, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } - - before do - expect_any_instance_of(LogStash::Inputs::S3).to receive(:list_new_files).and_return(TestInfiniteS3Object.new(s3_obj)) - end - - it_behaves_like "an interruptible input plugin" do - let(:allowed_lag) { 16 } if LOGSTASH_VERSION.split('.').first.to_i <= 6 - end - end - - describe "#register" do - subject { LogStash::Inputs::S3.new(config) } - - context "with temporary directory" do - let(:temporary_directory) { Stud::Temporary.pathname } - - it "creates the direct when it doesn't exist" do - expect { subject.register }.to change { Dir.exist?(temporary_directory) }.from(false).to(true) - end - end - end - - describe '#get_s3object' do - subject { LogStash::Inputs::S3.new(settings) } - - context 'with modern access key options' do - let(:settings) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "proxy_uri" => "http://example.com", - "bucket" => "logstash-test", - } - } - - it 'should instantiate AWS::S3 clients with a proxy set' do - expect(Aws::S3::Resource).to receive(:new).with({ - :credentials => kind_of(Aws::Credentials), - :http_proxy => 'http://example.com', - :region => subject.region - }) - - subject.send(:get_s3object) - end - end - - describe "additional_settings" do - context "supported settings" do - let(:settings) { - { - "additional_settings" => { "force_path_style" => 'true', "ssl_verify_peer" => 'false', "profile" => 'logstash' }, - "bucket" => "logstash-test", - } - } - - it 'should instantiate AWS::S3 clients with force_path_style set' do - expect(Aws::S3::Resource).to receive(:new).with({ - :region => subject.region, - :force_path_style => true, :ssl_verify_peer => false, :profile => 'logstash' - }).and_call_original - - subject.send(:get_s3object) - end - end - - context 'when an unknown setting is given' do - let(:settings) { - { - "additional_settings" => { "this_setting_doesnt_exist" => true }, - "bucket" => "logstash-test", - } - } - - it 'should raise an error' do - expect { subject.send(:get_s3object) }.to raise_error(ArgumentError) - end - end - end - end - - describe "#list_new_files" do - before { allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects_list } } - - let!(:present_object_after_cutoff) {double(:key => 'this-should-not-be-present', :last_modified => Time.now, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } - let!(:present_object) {double(:key => 'this-should-be-present', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } - let!(:archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } - let!(:deep_archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } - let!(:restored_object) {double(:key => 'this-should-be-restored-from-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } - let!(:deep_restored_object) {double(:key => 'this-should-be-restored-from-deep-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'DEEP_ARCHIVE', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } - let(:objects_list) { - [ - double(:key => 'exclude-this-file-1', :last_modified => Time.now - 2 * day, :content_length => 100, :storage_class => 'STANDARD'), - double(:key => 'exclude/logstash', :last_modified => Time.now - 2 * day, :content_length => 50, :storage_class => 'STANDARD'), - archived_object, - restored_object, - deep_restored_object, - present_object, - present_object_after_cutoff - ] - } - - it 'should allow user to exclude files from the s3 bucket' do - plugin = LogStash::Inputs::S3.new(config.merge({ "exclude_pattern" => "^exclude" })) - plugin.register - - files = plugin.list_new_files.map { |item| item.key } - expect(files).to include(present_object.key) - expect(files).to include(restored_object.key) - expect(files).to include(deep_restored_object.key) - expect(files).to_not include('exclude-this-file-1') # matches exclude pattern - expect(files).to_not include('exclude/logstash') # matches exclude pattern - expect(files).to_not include(archived_object.key) # archived - expect(files).to_not include(deep_archived_object.key) # archived - expect(files).to_not include(present_object_after_cutoff.key) # after cutoff - expect(files.size).to eq(3) - end - - it 'should support not providing a exclude pattern' do - plugin = LogStash::Inputs::S3.new(config) - plugin.register - - files = plugin.list_new_files.map { |item| item.key } - expect(files).to include(present_object.key) - expect(files).to include(restored_object.key) - expect(files).to include(deep_restored_object.key) - expect(files).to include('exclude-this-file-1') # no exclude pattern given - expect(files).to include('exclude/logstash') # no exclude pattern given - expect(files).to_not include(archived_object.key) # archived - expect(files).to_not include(deep_archived_object.key) # archived - expect(files).to_not include(present_object_after_cutoff.key) # after cutoff - expect(files.size).to eq(5) - end - - context 'when all files are excluded from a bucket' do - let(:objects_list) { - [ - double(:key => 'exclude-this-file-1', :last_modified => Time.now - 2 * day, :content_length => 100, :storage_class => 'STANDARD'), - double(:key => 'exclude/logstash', :last_modified => Time.now - 2 * day, :content_length => 50, :storage_class => 'STANDARD'), - ] - } - - it 'should not log that no files were found in the bucket' do - plugin = LogStash::Inputs::S3.new(config.merge({ "exclude_pattern" => "^exclude" })) - plugin.register - allow(plugin.logger).to receive(:debug).with(anything, anything) - - expect(plugin.logger).not_to receive(:info).with(/No files found/, anything) - expect(plugin.logger).to receive(:debug).with(/Ignoring/, anything) - expect(plugin.list_new_files).to be_empty - end - end - - context 'with an empty bucket' do - let(:objects_list) { [] } - - it 'should log that no files were found in the bucket' do - plugin = LogStash::Inputs::S3.new(config) - plugin.register - allow(plugin.logger).to receive(:info).with(/Using the provided sincedb_path/, anything) - expect(plugin.logger).to receive(:info).with(/No files found/, anything) - expect(plugin.list_new_files).to be_empty - end - end - - context "If the bucket is the same as the backup bucket" do - it 'should ignore files from the bucket if they match the backup prefix' do - objects_list = [ - double(:key => 'mybackup-log-1', :last_modified => Time.now, :content_length => 5, :storage_class => 'STANDARD'), - present_object - ] - - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects_list } - - plugin = LogStash::Inputs::S3.new(config.merge({ 'backup_add_prefix' => 'mybackup', - 'backup_to_bucket' => config['bucket']})) - plugin.register - - files = plugin.list_new_files.map { |item| item.key } - expect(files).to include(present_object.key) - expect(files).to_not include('mybackup-log-1') # matches backup prefix - expect(files.size).to eq(1) - end - end - - it 'should ignore files older than X' do - plugin = LogStash::Inputs::S3.new(config.merge({ 'backup_add_prefix' => 'exclude-this-file'})) - - - allow_any_instance_of(LogStash::Inputs::S3::SinceDB::File).to receive(:read).and_return(Time.now - day) - plugin.register - - files = plugin.list_new_files.map { |item| item.key } - expect(files).to include(present_object.key) - expect(files).to include(restored_object.key) - expect(files).to include(deep_restored_object.key) - expect(files).to_not include('exclude-this-file-1') # too old - expect(files).to_not include('exclude/logstash') # too old - expect(files).to_not include(archived_object.key) # archived - expect(files).to_not include(deep_archived_object.key) # archived - expect(files).to_not include(present_object_after_cutoff.key) # after cutoff - expect(files.size).to eq(3) - end - - it 'should ignore file if the file match the prefix' do - prefix = 'mysource/' - - objects_list = [ - double(:key => prefix, :last_modified => Time.now, :content_length => 5, :storage_class => 'STANDARD'), - present_object - ] - - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects).with(:prefix => prefix) { objects_list } - - plugin = LogStash::Inputs::S3.new(config.merge({ 'prefix' => prefix })) - plugin.register - expect(plugin.list_new_files.map { |item| item.key }).to eq([present_object.key]) - end - - it 'should sort return object sorted by last_modification date with older first' do - objects = [ - double(:key => 'YESTERDAY', :last_modified => Time.now - day, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY', :last_modified => Time.now, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now - cutoff, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') - ] - - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } - - - plugin = LogStash::Inputs::S3.new(config) - plugin.register - expect(plugin.list_new_files.map { |item| item.key }).to eq(['TWO_DAYS_AGO', 'YESTERDAY', 'TODAY_BEFORE_CUTOFF']) - end - - describe "when doing backup on the s3" do - it 'should copy to another s3 bucket when keeping the original file' do - plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_bucket" => "mybackup"})) - plugin.register - - s3object = Aws::S3::Object.new('mybucket', 'testkey') - expect_any_instance_of(Aws::S3::Object).to receive(:copy_from).with(:copy_source => "mybucket/testkey") - expect(s3object).to_not receive(:delete) - - plugin.backup_to_bucket(s3object) - end - - it 'should copy to another s3 bucket when deleting the original file' do - plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_bucket" => "mybackup", "delete" => true })) - plugin.register - - s3object = Aws::S3::Object.new('mybucket', 'testkey') - expect_any_instance_of(Aws::S3::Object).to receive(:copy_from).with(:copy_source => "mybucket/testkey") - expect(s3object).to receive(:delete) - - plugin.backup_to_bucket(s3object) - end - - it 'should add the specified prefix to the backup file' do - plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_bucket" => "mybackup", - "backup_add_prefix" => 'backup-' })) - plugin.register - - s3object = Aws::S3::Object.new('mybucket', 'testkey') - expect_any_instance_of(Aws::S3::Object).to receive(:copy_from).with(:copy_source => "mybucket/testkey") - expect(s3object).to_not receive(:delete) - - plugin.backup_to_bucket(s3object) - end - end - - it 'should support doing local backup of files' do - Stud::Temporary.directory do |backup_dir| - Stud::Temporary.file do |source_file| - backup_file = File.join(backup_dir.to_s, Pathname.new(source_file.path).basename.to_s) - - plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_dir" => backup_dir })) - - plugin.backup_to_dir(source_file) - - expect(File.exists?(backup_file)).to eq(true) - end - end - end - end - - shared_examples "generated events" do - let(:events_to_process) { 2 } - - it 'should process events' do - events = fetch_events(config) - expect(events.size).to eq(events_to_process) - expect(events[0].get("[@metadata][s3][key]")).to eql log.key - expect(events[1].get("[@metadata][s3][key]")).to eql log.key - end - - it "deletes the temporary file" do - events = fetch_events(config) - expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) - end - end - - context 'while communicating with s3' do - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "codec" => "json", - } - } - %w(AccessDenied NotFound).each do |error| - context "while listing bucket contents, #{error} is returned" do - before do - Aws.config[:s3] = { - stub_responses: { - list_objects: error - } - } - end - - it 'should not crash the plugin' do - events = fetch_events(config) - expect(events.size).to eq(0) - end - end - end - - %w(AccessDenied NoSuchKey).each do |error| - context "when retrieving an object, #{error} is returned" do - let(:objects) { [log] } - let(:log) { double(:key => 'uncompressed.log', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } - - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "codec" => "json", - } - } - before do - Aws.config[:s3] = { - stub_responses: { - get_object: error - } - } - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } - end - - it 'should not crash the plugin' do - events = fetch_events(config) - expect(events.size).to eq(0) - end - end - end - end - - context 'when working with logs' do - let(:objects) { [log] } - let(:log) { double(:key => 'uncompressed.log', :last_modified => Time.now - 2 * day, :content_length => 5, :data => { "etag" => 'c2c966251da0bc3229d12c2642ba50a4' }, :storage_class => 'STANDARD') } - let(:data) { File.read(log_file) } - - before do - Aws.config[:s3] = { - stub_responses: { - get_object: { body: data } - } - } - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } - allow_any_instance_of(Aws::S3::Bucket).to receive(:object).with(log.key) { log } - expect(log).to receive(:get).with(instance_of(Hash)) do |arg| - File.open(arg[:response_target], 'wb') { |s3file| s3file.write(data) } - end - end - - context "when event doesn't have a `message` field" do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'json.log') } - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "codec" => "json", - } - } - - include_examples "generated events" - end - - context "when event does have a `message` field" do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'json_with_message.log') } - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "codec" => "json", - } - } - - include_examples "generated events" - end - - context "multiple compressed streams" do - let(:log) { double(:key => 'log.gz', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'multiple_compressed_streams.gz') } - - include_examples "generated events" do - let(:events_to_process) { 16 } - end - end - - context 'compressed' do - let(:log) { double(:key => 'log.gz', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'compressed.log.gz') } - - include_examples "generated events" - end - - context 'compressed with gzip extension and using default gzip_pattern option' do - let(:log) { double(:key => 'log.gz', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'compressed.log.gzip') } - - include_examples "generated events" - end - - context 'compressed with gzip extension and using custom gzip_pattern option' do - let(:config) { super().merge({ "gzip_pattern" => "gee.zip$" }) } - let(:log) { double(:key => 'log.gee.zip', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'compressed.log.gee.zip') } - include_examples "generated events" - end - - context 'plain text' do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') } - - include_examples "generated events" - end - - context 'multi-line' do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'multiline.log') } - let(:config) { - { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash-test", - "codec" => LogStash::Codecs::Multiline.new( {"pattern" => "__SEPARATOR__", "negate" => "true", "what" => "previous"}) - } - } - - include_examples "generated events" - end - - context 'encoded' do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'invalid_utf8.gbk.log') } - - include_examples "generated events" - end - - context 'cloudfront' do - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'cloudfront.log') } - - describe "metadata", :ecs_compatibility_support, :aggregate_failures do - ecs_compatibility_matrix(:disabled, :v1) do |ecs_select| - before(:each) do - allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) - end - - it 'should extract metadata from cloudfront log' do - events = fetch_events(config) - - events.each do |event| - expect(event.get ecs_select[disabled: "cloudfront_fields", v1: "[@metadata][s3][cloudfront][fields]"] ).to eq('date time x-edge-location c-ip x-event sc-bytes x-cf-status x-cf-client-id cs-uri-stem cs-uri-query c-referrer x-page-url​ c-user-agent x-sname x-sname-query x-file-ext x-sid') - expect(event.get ecs_select[disabled: "cloudfront_version", v1: "[@metadata][s3][cloudfront][version]"] ).to eq('1.0') - end - end - end - end - - include_examples "generated events" - end - - context 'when include_object_properties is set to true' do - let(:config) { super().merge({ "include_object_properties" => true }) } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') } - - it 'should extract object properties onto [@metadata][s3]' do - events = fetch_events(config) - events.each do |event| - expect(event.get('[@metadata][s3]')).to include(log.data) - end - end - - include_examples "generated events" - end - - context 'when include_object_properties is set to false' do - let(:config) { super().merge({ "include_object_properties" => false }) } - let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') } - - it 'should NOT extract object properties onto [@metadata][s3]' do - events = fetch_events(config) - events.each do |event| - expect(event.get('[@metadata][s3]')).to_not include(log.data) - end - end - - include_examples "generated events" - end - end - - describe "data loss" do - let(:s3_plugin) { LogStash::Inputs::S3.new(config) } - let(:queue) { [] } - - before do - s3_plugin.register - end - - context 'events come after cutoff time' do - it 'should be processed in next cycle' do - s3_objects = [ - double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now.round - 2 * day, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now.round - cutoff, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') - ] - size = s3_objects.length - - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_objects } - allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) - expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original - expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) - expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) - expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size) - - # first iteration - s3_plugin.process_files(queue) - - # second iteration - sleep(cutoff + 1) - s3_plugin.process_files(queue) - end - end - - context 's3 object updated after getting summary' do - it 'should not update sincedb' do - s3_summary = [ - double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD') - ] - - s3_objects = [ - double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), - double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') - ] - - size = s3_objects.length - - allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_summary } - allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) - expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original - expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) - expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) - expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size) - - s3_plugin.process_files(queue) - expect(s3_plugin.send(:sincedb).read).to eq(s3_summary[0].last_modified) - end - end - end -end diff --git a/spec/inputs/sincedb_spec.rb b/spec/inputs/sincedb_spec.rb deleted file mode 100644 index eb19cc5..0000000 --- a/spec/inputs/sincedb_spec.rb +++ /dev/null @@ -1,17 +0,0 @@ -# encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/s3" -require "stud/temporary" -require "fileutils" - -describe LogStash::Inputs::S3::SinceDB::File do - let(:file) { Stud::Temporary.file.path } - subject { LogStash::Inputs::S3::SinceDB::File.new(file) } - before do - FileUtils.touch(file) - end - - it "doesnt raise an exception if the file is empty" do - expect { subject.read }.not_to raise_error - end -end diff --git a/spec/integration/retrieve_logs_spec.rb b/spec/integration/retrieve_logs_spec.rb new file mode 100644 index 0000000..df29e68 --- /dev/null +++ b/spec/integration/retrieve_logs_spec.rb @@ -0,0 +1,99 @@ +# encoding: utf-8 +require "logstash/inputs/s3" +require "logstash/inputs/s3/sincedb" +require_relative "../support/matcher_helpers" +require_relative "../support/s3_input_test_helper" +require "stud/temporary" +require "thread" + +# Retrieve the credentials from the environment +# and clear them to make sure the credentials are taken where they should +# be. +ACCESS_KEY_ID = ENV.delete("AWS_ACCESS_KEY_ID") +SECRET_ACCESS_KEY = ENV.delete("AWS_SECRET_ACCESS_KEY") +BUCKET_SOURCE = ENV.fetch("AWS_LOGSTASH_TEST_BUCKET", "logstash-input-s3-test") +BACKUP_BUCKET = "ls-ph-test" +REGION = ENV.fetch("AWS_LOGSTASH_REGION", "us-east-1") + +describe "Retrieve logs from S3", :tags => :integration do + let(:queue) { Queue.new } + let(:logger) { instance_double('LogStash::Logging::Logger') } + let(:sincedb_args) { [ + plugin_config["sincedb_path"], + 86400, + logger, + { :sincedb_expire_secs => 120 } + ] } + let(:stub_since_db) { instance_double('LogStash::Inputs::S3::SinceDB') } + + before do + skip "AWS credentials not found" unless ACCESS_KEY_ID && SECRET_ACCESS_KEY + + # Stub this out so that we can avoid starting the bookkeeper thread which doesn't die + allow(LogStash::Inputs::S3::SinceDB).to receive(:new).with(*sincedb_args).and_return(stub_since_db) + allow(stub_since_db).to receive(:close).and_return(true) + @plugin = LogStash::Inputs::S3.new(plugin_config) + end + + # let(:plugin) { LogStash::Inputs::S3.new(plugin_config) } + + let(:plugin_config) do + { + "bucket" => bucket_source, + "interval" => 1, + "sincedb_path" => Stud::Temporary.file.path + } + end + + context "when credentials are defined in the config as `access_key_id` `secret_access_key`" do + let(:access_key_id) { ACCESS_KEY_ID } + let(:secret_access_key) { SECRET_ACCESS_KEY } + let(:bucket_source) { BUCKET_SOURCE } + let(:bucket_backup) { BACKUP_BUCKET } + let(:region) { REGION } + + let(:plugin_config) do + super().merge({ + "access_key_id" => access_key_id, + "secret_access_key" => secret_access_key, + "region" => region, + }) + end + + let(:s3_client) do + credentials = Aws::Credentials.new(access_key_id, secret_access_key) + Aws::S3::Client.new(:region => region, :credentials => credentials) + end + + let(:s3_bucket) { Aws::S3::Bucket.new(bucket_source, :client => s3_client) } + let(:s3_input_test_helper) { S3InputTestHelper.new(s3_bucket) } + + before :each do + @thread_abort_on_exception = Thread.abort_on_exception + Thread.abort_on_exception = true + + @plugin.register + + s3_input_test_helper.setup + + @plugin_thread = Thread.new do + @plugin.run(queue) + end + end + + after :each do + @plugin.stop if @plugin + Thread.abort_on_exception = @thread_abort_on_exception + end + + it "correctly generate the content" do + sleep(50) + expect(queue.size).to eq(s3_input_test_helper.content.size) + # expect(queue).to include_content_of(s3_input_test_helper.content) + end + + xit "update the local database" + xit "it rename files with a prefix" + xit "it move files to a bucket on complete" + end +end diff --git a/spec/integration/s3_spec.rb b/spec/integration/s3_spec.rb deleted file mode 100644 index 98e0d25..0000000 --- a/spec/integration/s3_spec.rb +++ /dev/null @@ -1,67 +0,0 @@ -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/s3" -require "aws-sdk" -require "fileutils" -require_relative "../support/helpers" - -describe LogStash::Inputs::S3, :integration => true, :s3 => true do - before do - Thread.abort_on_exception = true - - upload_file('../fixtures/uncompressed.log' , "#{prefix}uncompressed_1.log") - upload_file('../fixtures/compressed.log.gz', "#{prefix}compressed_1.log.gz") - sleep(LogStash::Inputs::S3::CUTOFF_SECOND + 1) - end - - after do - delete_remote_files(prefix) - FileUtils.rm_rf(temporary_directory) - delete_remote_files(backup_prefix) - end - - let(:temporary_directory) { Stud::Temporary.directory } - let(:prefix) { 'logstash-s3-input-prefix/' } - - let(:minimal_settings) { { "access_key_id" => ENV['AWS_ACCESS_KEY_ID'], - "secret_access_key" => ENV['AWS_SECRET_ACCESS_KEY'], - "bucket" => ENV['AWS_LOGSTASH_TEST_BUCKET'], - "region" => ENV["AWS_REGION"] || "us-east-1", - "prefix" => prefix, - "temporary_directory" => temporary_directory } } - let(:backup_prefix) { "backup/" } - let(:backup_bucket) { "logstash-s3-input-backup" } - - it "support prefix to scope the remote files" do - events = fetch_events(minimal_settings) - expect(events.size).to eq(4) - end - - - it "add a prefix to the file" do - fetch_events(minimal_settings.merge({ "backup_to_bucket" => ENV["AWS_LOGSTASH_TEST_BUCKET"], - "backup_add_prefix" => backup_prefix })) - expect(list_remote_files(backup_prefix).size).to eq(2) - end - - it "allow you to backup to a local directory" do - Stud::Temporary.directory do |backup_dir| - fetch_events(minimal_settings.merge({ "backup_to_dir" => backup_dir })) - expect(Dir.glob(File.join(backup_dir, "*")).size).to eq(2) - end - end - - context "remote backup" do - before do - create_bucket(backup_bucket) - end - - it "another bucket" do - fetch_events(minimal_settings.merge({ "backup_to_bucket" => backup_bucket})) - expect(list_remote_files("", backup_bucket).size).to eq(2) - end - - after do - delete_bucket(backup_bucket) - end - end -end diff --git a/spec/support/matcher_helpers.rb b/spec/support/matcher_helpers.rb new file mode 100644 index 0000000..b0b223e --- /dev/null +++ b/spec/support/matcher_helpers.rb @@ -0,0 +1,9 @@ +# encoding: utf-8 +require 'rspec/expectations' + +RSpec::Matchers.define :include_content_of do |expected| + match do |actual| + return false if actual.size != expected.size + actual.all? { |item| expected.include?(item) } + end +end diff --git a/spec/support/s3_input_test_helper.rb b/spec/support/s3_input_test_helper.rb new file mode 100644 index 0000000..5ead603 --- /dev/null +++ b/spec/support/s3_input_test_helper.rb @@ -0,0 +1,139 @@ +# encoding: utf-8 +require "flores/random" +require "zlib" +require "securerandom" +require "logstash/event" + +class S3InputTestHelper + class PlainTextFile + RANGE_NUMBER_OF_EVENTS = 10..200 + RANGE_LABEL_TEXT = 15..150 + + NL = "\n" + + attr_reader :extension + def initialize + @extension = "log" + @ignored = false + end + + def content + StringIO.new(file_content) + end + + def filename + @filename ||= [basename, extension].join(".") + end + + def ignored? + @ignored + end + + def events + @events ||= generate_events + end + + protected + def file_content + events.join(NL) + end + + private + def basename + "#{klass_name}-#{SecureRandom.uuid}" + end + + def klass_name + self.class.to_s.split("::").last + end + + def generate_events + number_of_events = Flores::Random.integer(RANGE_NUMBER_OF_EVENTS) + label = Flores::Random.text(RANGE_LABEL_TEXT) + + events = [] + + number_of_events.times do |identifier| + events << generate_event(identifier, label) + end + + events + end + + def generate_event(id, label) + "#{label} - #{id}" + end + end + + class GzipFile < PlainTextFile + def initialize + super + @extension = "log.gz" + end + + def content + compressed = StringIO.new + gz = Zlib::GzipWriter.new(compressed) + gz.write(file_content) + gz.close + compressed.string + end + end + + class NoopFile < PlainTextFile + def initialize + super + @extension = "bz2" + @ignored = true + end + end + + class ZeroFile < PlainTextFile + def initialize + super + @extension = "log" + @ignore = true + end + + def events + [] + end + end + + RANGE_NUMBER_OF_FILES = 10..40 + + def initialize(bucket) + @bucket = bucket + @files = [] + end + + def setup + cleanup # make sure the bucket is in a clean state + generate_files + upload_files + end + + def cleanup + @bucket.objects.each { |key| key.delete } + end + + def content + @files.collect do |file| + file.events.collect { |event_data| LogStash::Event.new({ "message" => event_data }) } + end.flatten + end + + def upload_files + @files.collect do |file| + Concurrent::Future.execute { @bucket.put_object({ :key => file.filename, :body => file.content }) } + end.all? { |upload| !upload.value.nil? } + end + + def generate_files + [PlainTextFile, GzipFile, NoopFile, ZeroFile].each do |klass| + Flores::Random.integer(RANGE_NUMBER_OF_FILES).times do + @files << klass.new + end + end + end +end