Skip to content

Commit 40badff

Browse files
jszwedkomichaelshobbs
authored andcommitted
Add optional TCP framing to syslog adapter
Enable users to specify "octet-counted" frames as described in RFC6587 (Syslog over TCP) 3.4.1 and RFC5424 (Syslog over TLS). This prefixes each message with the length of the message to allow consumers to easily determine where the message ends (rather than traditional LF framing). This also enables multiline Syslog messages without escaping. This keeps the default as LF framing for backwards compatibility though octet-counted framing is preferred when it is known the downstream consumer can handle it.
1 parent 4bfb0d6 commit 40badff

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

adapters/syslog/syslog.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package syslog
22

33
import (
44
"bytes"
5+
"crypto/tls"
56
"errors"
67
"fmt"
78
"io/ioutil"
@@ -24,9 +25,17 @@ const defaultRetryCount = 10
2425
var (
2526
hostname string
2627
retryCount uint
28+
tcpFraming TCPFraming
2729
econnResetErrStr string
2830
)
2931

32+
type TCPFraming string
33+
34+
const (
35+
TraditionalTCPFraming TCPFraming = "traditional" // LF framing
36+
OctetCountedTCPFraming = "octet-counted" // https://tools.ietf.org/html/rfc6587#section-3.4.1
37+
)
38+
3039
func init() {
3140
hostname, _ = os.Hostname()
3241
econnResetErrStr = fmt.Sprintf("write: %s", syscall.ECONNRESET.Error())
@@ -89,6 +98,17 @@ func NewSyslogAdapter(route *router.Route) (router.LogAdapter, error) {
8998
structuredData = fmt.Sprintf("[%s]", structuredData)
9099
}
91100

101+
if isTCPConnecion(conn) {
102+
switch s := cfg.GetEnvDefault("SYSLOG_TCP_FRAMING", "traditional"); s {
103+
case "traditional":
104+
tcpFraming = TraditionalTCPFraming
105+
case "octet-counted":
106+
tcpFraming = OctetCountedTCPFraming
107+
default:
108+
return nil, fmt.Errorf("unknown SYSLOG_TCP_FRAMING value: %s", s)
109+
}
110+
}
111+
92112
var tmplStr string
93113
switch format {
94114
case "rfc5424":
@@ -137,6 +157,19 @@ func (a *Adapter) Stream(logstream chan *router.Message) {
137157
log.Println("syslog:", err)
138158
return
139159
}
160+
161+
if isTCPConnecion(a.conn) {
162+
switch tcpFraming {
163+
case OctetCountedTCPFraming:
164+
buf = append([]byte(fmt.Sprintf("%d ", len(buf))), buf...)
165+
case TraditionalTCPFraming:
166+
// leave as-is
167+
default:
168+
// should never get here, validated above
169+
panic("unknown framing format: " + tcpFraming)
170+
}
171+
}
172+
140173
if _, err = a.conn.Write(buf); err != nil {
141174
log.Println("syslog:", err)
142175
switch a.conn.(type) {
@@ -226,6 +259,17 @@ func retryExp(fun func() error, tries uint) error {
226259
}
227260
}
228261

262+
func isTCPConnecion(conn net.Conn) bool {
263+
switch conn.(type) {
264+
case *net.TCPConn:
265+
return true
266+
case *tls.Conn:
267+
return true
268+
default:
269+
return false
270+
}
271+
}
272+
229273
// Message extends router.Message for the syslog standard
230274
type Message struct {
231275
*router.Message

adapters/syslog/syslog_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package syslog
22

33
import (
44
"bufio"
5+
"fmt"
56
"io"
67
"io/ioutil"
78
"log"
@@ -40,6 +41,56 @@ var (
4041
badHostnameContent = "hostname\r\n"
4142
)
4243

44+
func TestSyslogOctetFraming(t *testing.T) {
45+
os.Setenv("SYSLOG_TCP_FRAMING", "octet-counted")
46+
defer os.Unsetenv("SYSLOG_TCP_FRAMING")
47+
48+
done := make(chan string)
49+
addr, sock, srvWG := startServer("tcp", "", done)
50+
defer srvWG.Wait()
51+
defer os.Remove(addr)
52+
defer sock.Close()
53+
54+
route := &router.Route{Adapter: "syslog+tcp", Address: addr}
55+
adapter, err := NewSyslogAdapter(route)
56+
if err != nil {
57+
t.Fatal(err)
58+
}
59+
defer adapter.(*Adapter).conn.Close()
60+
61+
stream := make(chan *router.Message)
62+
go adapter.Stream(stream)
63+
64+
count := 1
65+
messages := make(chan string, count)
66+
go sendLogstream(stream, messages, adapter, count)
67+
68+
timeout := time.After(6 * time.Second)
69+
msgnum := 1
70+
select {
71+
case msg := <-done:
72+
sizeStr := ""
73+
_, err := fmt.Sscan(msg, &sizeStr)
74+
if err != nil {
75+
t.Fatal("unable to scan size from message: ", err)
76+
}
77+
78+
size, err := strconv.ParseInt(sizeStr, 10, 32)
79+
if err != nil {
80+
t.Fatal("unable to scan size from message: ", err)
81+
}
82+
83+
expectedOctetFrame := len(sizeStr) + 1 + int(size)
84+
if len(msg) != expectedOctetFrame {
85+
t.Errorf("expected octet frame to be %d. got %d instead for message %s", expectedOctetFrame, size, msg)
86+
}
87+
return
88+
case <-timeout:
89+
t.Fatal("timeout after", msgnum, "messages")
90+
return
91+
}
92+
}
93+
4394
func TestSyslogRetryCount(t *testing.T) {
4495
newRetryCount := uint(20)
4596
os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount)))

0 commit comments

Comments
 (0)