Skip to content

Commit 6a3d71e

Browse files
mchaynesZachtimusPrime
authored andcommitted
Add an io.Writer to splunk (#7)
* Add an io.Writer to splunk This allows us to use splunk as an output source in a much more flexible way in particular, it allows us to be non blocking for uploading to splunk Fixes: #6 * Add Writer to README.md Fixes #6 * Add configuration details Fixes #6
1 parent 2e2b5d4 commit 6a3d71e

File tree

4 files changed

+256
-0
lines changed

4 files changed

+256
-0
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,34 @@ func main() {
8484
}
8585

8686
```
87+
88+
## Splunk Writer ##
89+
To support logging libraries, and other output, we've added an asynchronous Writer. It supports retries, and different intervals for flushing messages & max log messages in its buffer
90+
91+
The easiest way to get access to the writer with an existing client is to do:
92+
93+
```go
94+
writer := splunkClient.Writer()
95+
```
96+
97+
This will give you an io.Writer you can use to direct output to splunk. However, since the io.Writer() is asynchronous, it will never return an error from its Write() function. To access errors generated from the Client,
98+
Instantiate your Writer this way:
99+
100+
```go
101+
splunk.Writer{
102+
Client: splunkClient
103+
}
104+
```
105+
Since the type will now be splunk.Writer(), you can access the `Errors()` function, which returns a channel of errors. You can then spin up a goroutine to listen on this channel and report errors, or you can handle however you like.
106+
107+
Optionally, you can add more configuration to the writer.
108+
109+
```go
110+
splunk.Writer {
111+
Client: splunkClient,
112+
FlushInterval: 10 *time.Second, // How often we'll flush our buffer
113+
FlushThreshold: 25, // Max messages we'll keep in our buffer, regardless of FlushInterval
114+
MaxRetries: 2, // Number of times we'll retry a failed send
115+
}
116+
```
117+

splunk/splunk.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/tls"
66
"encoding/json"
77
"errors"
8+
"io"
89
"net/http"
910
"os"
1011
"time"
@@ -136,6 +137,13 @@ func (c *Client) LogEvents(events []*Event) error {
136137
return c.doRequest(buf)
137138
}
138139

140+
//Writer is a convience method for creating an io.Writer from a Writer with default values
141+
func (c *Client) Writer() io.Writer {
142+
return &Writer{
143+
Client: c,
144+
}
145+
}
146+
139147
// Client.doRequest is used internally to POST the bytes of events to the Splunk server.
140148
func (c *Client) doRequest(b *bytes.Buffer) error {
141149
// make new request

splunk/writer.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package splunk
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
const (
9+
bufferSize = 100
10+
defaultInterval = 2 * time.Second
11+
defaultThreshold = 10
12+
defaultRetries = 2
13+
)
14+
15+
// Writer is a threadsafe, aysnchronous splunk writer.
16+
// It implements io.Writer for usage in logging libraries, or whatever you want to send to splunk :)
17+
// Writer.Client's configuration determines what source, sourcetype & index will be used for events
18+
// Example for logrus:
19+
// splunkWriter := &splunk.Writer {Client: client}
20+
// logrus.SetOutput(io.MultiWriter(os.Stdout, splunkWriter))
21+
type Writer struct {
22+
Client *Client
23+
// How often the write buffer should be flushed to splunk
24+
FlushInterval time.Duration
25+
// How many Write()'s before buffer should be flushed to splunk
26+
FlushThreshold int
27+
// Max number of retries we should do when we flush the buffer
28+
MaxRetries int
29+
dataChan chan *message
30+
errors chan error
31+
once sync.Once
32+
}
33+
34+
// Associates some bytes with the time they were written
35+
// Helpful if we have long flush intervals to more precisely record the time at which
36+
// a message was written
37+
type message struct {
38+
data []byte
39+
writtenAt time.Time
40+
}
41+
42+
// Writer asynchronously writes to splunk in batches
43+
func (w *Writer) Write(b []byte) (int, error) {
44+
// only initialize once. Keep all of our buffering in one thread
45+
w.once.Do(func() {
46+
// synchronously set up dataChan
47+
w.dataChan = make(chan *message, bufferSize)
48+
// Spin up single goroutine to listen to our writes
49+
w.errors = make(chan error, bufferSize)
50+
go w.listen()
51+
})
52+
// Send the data to the channel
53+
w.dataChan <- &message{
54+
data: b,
55+
writtenAt: time.Now(),
56+
}
57+
// We don't know if we've hit any errors yet, so just say we're good
58+
return len(b), nil
59+
}
60+
61+
// Errors returns a buffered channel of errors. Might be filled over time, might not
62+
// Useful if you want to record any errors hit when sending data to splunk
63+
func (w *Writer) Errors() <-chan error {
64+
return w.errors
65+
}
66+
67+
// listen for messages
68+
func (w *Writer) listen() {
69+
if w.FlushInterval <= 0 {
70+
w.FlushInterval = defaultInterval
71+
}
72+
if w.FlushThreshold == 0 {
73+
w.FlushThreshold = defaultThreshold
74+
}
75+
ticker := time.NewTicker(w.FlushInterval)
76+
buffer := make([]*message, 0)
77+
//Define function so we can flush in several places
78+
flush := func() {
79+
// Go send the data to splunk
80+
go w.send(buffer, w.MaxRetries)
81+
// Make a new array since the old one is getting used by the splunk client now
82+
buffer = make([]*message, 0)
83+
}
84+
for {
85+
select {
86+
case <-ticker.C:
87+
if len(buffer) > 0 {
88+
flush()
89+
}
90+
case d := <-w.dataChan:
91+
buffer = append(buffer, d)
92+
if len(buffer) > w.FlushThreshold {
93+
flush()
94+
}
95+
}
96+
}
97+
}
98+
99+
// send sends data to splunk, retrying upon failure
100+
func (w *Writer) send(messages []*message, retries int) {
101+
// Create events from our data so we can send them to splunk
102+
events := make([]*Event, len(messages))
103+
for i, m := range messages {
104+
// Use the configuration of the Client for the event
105+
events[i] = w.Client.NewEventWithTime(m.writtenAt.Unix(), m.data, w.Client.Source, w.Client.SourceType, w.Client.Index)
106+
}
107+
// Send the events to splunk
108+
err := w.Client.LogEvents(events)
109+
// If we had any failures, retry as many times as they requested
110+
if err != nil {
111+
for i := 0; i < retries; i++ {
112+
// retry
113+
err = w.Client.LogEvents(events)
114+
if err == nil {
115+
return
116+
}
117+
}
118+
// if we've exhausted our max retries, let someone know via Errors()
119+
// might not have retried if retries == 0
120+
select {
121+
case w.errors <- err:
122+
// Don't block in case no one is listening or our errors channel is full
123+
default:
124+
}
125+
}
126+
}

splunk/writer_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package splunk
2+
3+
import (
4+
"fmt"
5+
"io/ioutil"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
"sync"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestWriter_Write(t *testing.T) {
15+
numWrites := 1000
16+
numMessages := 0
17+
lock := sync.Mutex{}
18+
notify := make(chan bool, numWrites)
19+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
20+
b, _ := ioutil.ReadAll(r.Body)
21+
split := strings.Split(string(b), "\n")
22+
num := 0
23+
// Since we batch our logs up before we send them:
24+
// Increment our messages counter by one for each JSON object we got in this response
25+
// We don't know how many responses we'll get, we only care about the number of messages
26+
for _, line := range split {
27+
if strings.HasPrefix(line, "{") {
28+
num++
29+
notify <- true
30+
}
31+
}
32+
lock.Lock()
33+
numMessages = numMessages + num
34+
lock.Unlock()
35+
}))
36+
37+
// Create a writer that's flushing constantly. We want this test to run
38+
// quickly
39+
writer := Writer{
40+
Client: NewClient(server.Client(), server.URL, "", "", "", ""),
41+
FlushInterval: 1 * time.Millisecond,
42+
}
43+
// Send a bunch of messages in separate goroutines to make sure we're properly
44+
// testing Writer's concurrency promise
45+
for i := 0; i < numWrites; i++ {
46+
go writer.Write([]byte(fmt.Sprintf("%d", i)))
47+
}
48+
// To notify our test we've collected everything we need.
49+
doneChan := make(chan bool)
50+
go func() {
51+
for i := 0; i < numWrites; i++ {
52+
// Do nothing, just loop through to the next one
53+
<-notify
54+
}
55+
doneChan <- true
56+
}()
57+
select {
58+
case <-doneChan:
59+
// Do nothing, we're good
60+
case <-time.After(1 * time.Second):
61+
t.Errorf("Timed out waiting for messages")
62+
}
63+
// We may have received more than numWrites amount of messages, check that case
64+
if numMessages != numWrites {
65+
t.Errorf("Didn't get the right number of messages, expected %d, got %d", numWrites, numMessages)
66+
}
67+
}
68+
69+
func TestWriter_Errors(t *testing.T) {
70+
numMessages := 1000
71+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
72+
w.WriteHeader(http.StatusBadRequest)
73+
fmt.Fprintln(w, "bad request")
74+
}))
75+
writer := Writer{
76+
Client: NewClient(server.Client(), server.URL, "", "", "", ""),
77+
// Will flush after the last message is sent
78+
FlushThreshold: numMessages - 1,
79+
// Don't let the flush interval cause raciness
80+
FlushInterval: 5 * time.Minute,
81+
}
82+
for i := 0; i < numMessages; i++ {
83+
_, _ = writer.Write([]byte("some data"))
84+
}
85+
select {
86+
case <-writer.Errors():
87+
// good to go, got our error
88+
case <-time.After(1 * time.Second):
89+
t.Errorf("Timed out waiting for error, should have gotten 1 error")
90+
}
91+
}

0 commit comments

Comments
 (0)