Skip to content

Commit 0df3b6d

Browse files
handle no @timestamp and fix generateId
review changes
1 parent 381582b commit 0df3b6d

File tree

2 files changed

+92
-37
lines changed

2 files changed

+92
-37
lines changed

lib/logstash/outputs/mongodb.rb

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,22 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
4646
# Mutex used to synchronize access to 'documents'
4747
@@mutex = Mutex.new
4848

49-
public
5049
def register
50+
if @bulk_size > 1000
51+
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
52+
end
53+
5154
Mongo::Logger.logger = @logger
5255
conn = Mongo::Client.new(@uri)
5356
@db = conn.use(@database)
5457

55-
if @bulk_size > 1000
56-
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
57-
end
58+
@closed = Concurrent::AtomicBoolean.new(false)
5859
@documents = {}
59-
Thread.new do
60-
loop do
61-
sleep @bulk_interval
60+
61+
@bulk_thread = Thread.new(@bulk_interval) do |bulk_interval|
62+
while @closed.false? do
63+
sleep(bulk_interval)
64+
6265
@@mutex.synchronize do
6366
@documents.each do |collection, values|
6467
if values.length > 0
@@ -69,23 +72,31 @@ def register
6972
end
7073
end
7174
end
72-
end # def register
75+
end
7376

7477
def receive(event)
7578
begin
7679
# Our timestamp object now has a to_bson method, using it here
7780
# {}.merge(other) so we don't taint the event hash innards
7881
document = {}.merge(event.to_hash)
82+
7983
if !@isodate
80-
# not using timestamp.to_bson
81-
document["@timestamp"] = event.timestamp.to_json
84+
timestamp = event.timestamp
85+
if timestamp
86+
# not using timestamp.to_bson
87+
document["@timestamp"] = timestamp.to_json
88+
else
89+
@logger.warn("Cannot set MongoDB document `@timestamp` field because it does not exist in the event", :event => event)
90+
end
8291
end
92+
8393
if @generateId
84-
document["_id"] = BSON::ObjectId.new(nil, event.timestamp)
94+
document["_id"] = BSON::ObjectId.new
8595
end
96+
8697
if @bulk
98+
collection = event.sprintf(@collection)
8799
@@mutex.synchronize do
88-
collection = event.sprintf(@collection)
89100
if(!@documents[collection])
90101
@documents[collection] = []
91102
end
@@ -99,20 +110,25 @@ def receive(event)
99110
else
100111
@db[event.sprintf(@collection)].insert_one(document)
101112
end
102-
103113
rescue => e
104-
@logger.warn("Failed to send event to MongoDB", :event => event, :exception => e,
105-
:backtrace => e.backtrace)
106114
if e.message =~ /^E11000/
107-
# On a duplicate key error, skip the insert.
108-
# We could check if the duplicate key err is the _id key
109-
# and generate a new primary key.
110-
# If the duplicate key error is on another field, we have no way
111-
# to fix the issue.
115+
# On a duplicate key error, skip the insert.
116+
# We could check if the duplicate key err is the _id key
117+
# and generate a new primary key.
118+
# If the duplicate key error is on another field, we have no way
119+
# to fix the issue.
120+
@logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e)
112121
else
113-
sleep @retry_delay
122+
@logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e)
123+
sleep(@retry_delay)
114124
retry
115125
end
116126
end
117-
end # def receive
118-
end # class LogStash::Outputs::Mongodb
127+
end
128+
129+
def close
130+
@closed.make_true
131+
@bulk_thread.wakeup
132+
@bulk_thread.join
133+
end
134+
end

spec/outputs/mongodb_spec.rb

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,21 @@
88
let(:database) { 'logstash' }
99
let(:collection) { 'logs' }
1010

11-
let(:config) do
12-
{ "uri" => uri, "database" => database, "collection" => collection }
13-
end
11+
let(:config) {{
12+
"uri" => uri,
13+
"database" => database,
14+
"collection" => collection
15+
}}
1416

15-
it "should register" do
17+
it "should register and close" do
1618
plugin = LogStash::Plugin.lookup("output", "mongodb").new(config)
1719
expect {plugin.register}.to_not raise_error
20+
plugin.close
1821
end
1922

20-
describe "#send" do
21-
23+
describe "receive" do
2224
subject! { LogStash::Outputs::Mongodb.new(config) }
2325

24-
let(:properties) { { "message" => "This is a message!",
25-
"uuid" => SecureRandom.uuid,
26-
"number" => BigDecimal.new("4321.1234"),
27-
"utf8" => "żółć"} }
2826
let(:event) { LogStash::Event.new(properties) }
2927
let(:connection) { double("connection") }
3028
let(:client) { double("client") }
@@ -38,10 +36,51 @@
3836
subject.register
3937
end
4038

41-
it "should send the event to the database" do
42-
expect(collection).to receive(:insert_one)
43-
subject.receive(event)
39+
after(:each) do
40+
subject.close
41+
end
42+
43+
describe "#send" do
44+
let(:properties) {{
45+
"message" => "This is a message!",
46+
"uuid" => SecureRandom.uuid,
47+
"number" => BigDecimal.new("4321.1234"),
48+
"utf8" => "żółć"
49+
}}
50+
51+
it "should send the event to the database" do
52+
expect(collection).to receive(:insert_one)
53+
subject.receive(event)
54+
end
4455
end
45-
end
4656

57+
describe "no event @timestamp" do
58+
let(:properties) { { "message" => "foo" } }
59+
60+
it "should not contain a @timestamp field in the mongo document" do
61+
expect(event).to receive(:timestamp).and_return(nil)
62+
expect(event).to receive(:to_hash).and_return(properties)
63+
expect(collection).to receive(:insert_one).with(properties)
64+
subject.receive(event)
65+
end
66+
end
67+
68+
describe "generateId" do
69+
let(:properties) { { "message" => "foo" } }
70+
let(:config) {{
71+
"uri" => uri,
72+
"database" => database,
73+
"collection" => collection,
74+
"generateId" => true
75+
}}
76+
77+
it "should contain a BSON::ObjectId as _id" do
78+
expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId")
79+
expect(event).to receive(:timestamp).and_return(nil)
80+
expect(event).to receive(:to_hash).and_return(properties)
81+
expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId"))
82+
subject.receive(event)
83+
end
84+
end
85+
end
4786
end

0 commit comments

Comments
 (0)