Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 0a11731

Browse files
committed
Fixes http2 instrumentation.
Since #226 we lost automatic http log and metrics. This PR re-introduce it by fixing the faulty middleware (log). Fixes #231
1 parent 6a67c7b commit 0a11731

File tree

2 files changed

+263
-13
lines changed

2 files changed

+263
-13
lines changed

pkg/phlare/modules.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ func (f *Phlare) initServer() (services.Service, error) {
216216
prometheus.MustRegister(version.NewCollector("phlare"))
217217
DisableSignalHandling(&f.Cfg.Server)
218218
f.Cfg.Server.Registerer = prometheus.WrapRegistererWithPrefix("phlare_", f.reg)
219-
// TODO(cyril) figure why this is locking the bidi stream see https://github.com/grafana/phlare/issues/231
219+
// Not all default middleware works with http2 so we'll add then manually.
220+
// see https://github.com/grafana/phlare/issues/231
220221
f.Cfg.Server.DoNotAddDefaultHTTPMiddleware = true
221222

222223
serv, err := server.New(f.Cfg.Server)
@@ -237,22 +238,19 @@ func (f *Phlare) initServer() (services.Service, error) {
237238
return svs
238239
}
239240

240-
// sounds like logging is the problem. see https://github.com/grafana/phlare/issues/231
241+
httpMetric, err := util.NewHTTPMetricMiddleware(f.Server.HTTP, f.Cfg.Server.MetricsNamespace, f.Cfg.Server.Registerer)
242+
if err != nil {
243+
return nil, err
244+
}
241245
defaultHTTPMiddleware := []middleware.Interface{
242246
middleware.Tracer{
243247
RouteMatcher: f.Server.HTTP,
244248
},
245-
// middleware.Log{
246-
// Log: f.Server.Log,
247-
// LogRequestAtInfoLevel: f.Cfg.Server.LogRequestAtInfoLevel,
248-
// },
249-
// middleware.Instrument{
250-
// RouteMatcher: router,
251-
// Duration: requestDuration,
252-
// RequestBodySize: receivedMessageSize,
253-
// ResponseBodySize: sentMessageSize,
254-
// InflightRequests: inflightRequests,
255-
// },
249+
util.Log{
250+
Log: f.Server.Log,
251+
LogRequestAtInfoLevel: f.Cfg.Server.LogRequestAtInfoLevel,
252+
},
253+
httpMetric,
256254
}
257255
f.Server.HTTPServer.Handler = middleware.Merge(defaultHTTPMiddleware...).Wrap(f.Server.HTTP)
258256

pkg/util/http.go

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
11
package util
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/tls"
7+
"errors"
8+
"io"
69
"net"
710
"net/http"
11+
"strings"
812
"time"
913

14+
"github.com/felixge/httpsnoop"
15+
"github.com/gorilla/mux"
16+
"github.com/grafana/dskit/multierror"
1017
"github.com/opentracing-contrib/go-stdlib/nethttp"
1118
"github.com/opentracing/opentracing-go"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/weaveworks/common/instrument"
21+
"github.com/weaveworks/common/logging"
22+
"github.com/weaveworks/common/middleware"
23+
"github.com/weaveworks/common/tracing"
24+
"github.com/weaveworks/common/user"
1225
"golang.org/x/net/http2"
1326
"gopkg.in/yaml.v3"
1427
)
@@ -63,3 +76,242 @@ func WriteYAMLResponse(w http.ResponseWriter, v interface{}) {
6376
// Also this isn't internal error, but error communicating with client.
6477
_, _ = w.Write(data)
6578
}
79+
80+
const (
81+
maxResponseBodyInLogs = 4096 // At most 4k bytes from response bodies in our logs.
82+
)
83+
84+
// Log middleware logs http requests
85+
type Log struct {
86+
Log logging.Interface
87+
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
88+
LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level
89+
SourceIPs *middleware.SourceIPExtractor
90+
}
91+
92+
// logWithRequest information from the request and context as fields.
93+
func (l Log) logWithRequest(r *http.Request) logging.Interface {
94+
localLog := l.Log
95+
traceID, ok := tracing.ExtractTraceID(r.Context())
96+
if ok {
97+
localLog = localLog.WithField("traceID", traceID)
98+
}
99+
100+
if l.SourceIPs != nil {
101+
ips := l.SourceIPs.Get(r)
102+
if ips != "" {
103+
localLog = localLog.WithField("sourceIPs", ips)
104+
}
105+
}
106+
107+
return user.LogWith(r.Context(), localLog)
108+
}
109+
110+
// Wrap implements Middleware
111+
func (l Log) Wrap(next http.Handler) http.Handler {
112+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
113+
begin := time.Now()
114+
uri := r.RequestURI // capture the URI before running next, as it may get rewritten
115+
requestLog := l.logWithRequest(r)
116+
// Log headers before running 'next' in case other interceptors change the data.
117+
headers, err := dumpRequest(r)
118+
if err != nil {
119+
headers = nil
120+
requestLog.Errorf("Could not dump request headers: %v", err)
121+
}
122+
var (
123+
httpErr multierror.MultiError
124+
httpCode int = http.StatusOK
125+
headerWritten bool
126+
buf bytes.Buffer
127+
bodyLeft int = maxResponseBodyInLogs
128+
)
129+
130+
wrapped := httpsnoop.Wrap(w, httpsnoop.Hooks{
131+
WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
132+
return func(code int) {
133+
next(code)
134+
if !headerWritten {
135+
httpCode = code
136+
headerWritten = true
137+
}
138+
}
139+
},
140+
141+
Write: func(next httpsnoop.WriteFunc) httpsnoop.WriteFunc {
142+
return func(p []byte) (int, error) {
143+
n, err := next(p)
144+
headerWritten = true
145+
httpErr.Add(err)
146+
if httpCode >= 500 && httpCode < 600 {
147+
bodyLeft = captureResponseBody(p, bodyLeft, &buf)
148+
}
149+
return n, err
150+
}
151+
},
152+
153+
ReadFrom: func(next httpsnoop.ReadFromFunc) httpsnoop.ReadFromFunc {
154+
return func(src io.Reader) (int64, error) {
155+
n, err := next(src)
156+
headerWritten = true
157+
httpErr.Add(err)
158+
return n, err
159+
}
160+
},
161+
})
162+
next.ServeHTTP(wrapped, r)
163+
164+
statusCode, writeErr := httpCode, httpErr.Err()
165+
166+
if writeErr != nil {
167+
if errors.Is(writeErr, context.Canceled) {
168+
if l.LogRequestAtInfoLevel {
169+
requestLog.Infof("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
170+
} else {
171+
requestLog.Debugf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
172+
}
173+
} else {
174+
requestLog.Warnf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
175+
}
176+
177+
return
178+
}
179+
if 100 <= statusCode && statusCode < 500 || statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable {
180+
if l.LogRequestAtInfoLevel {
181+
requestLog.Infof("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
182+
} else {
183+
requestLog.Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
184+
}
185+
if l.LogRequestHeaders && headers != nil {
186+
if l.LogRequestAtInfoLevel {
187+
requestLog.Infof("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
188+
} else {
189+
requestLog.Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
190+
}
191+
}
192+
} else {
193+
requestLog.Warnf("%s %s (%d) %s Response: %q ws: %v; %s",
194+
r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)
195+
}
196+
})
197+
}
198+
199+
func captureResponseBody(data []byte, bodyBytesLeft int, buf *bytes.Buffer) int {
200+
if bodyBytesLeft <= 0 {
201+
return 0
202+
}
203+
if len(data) > bodyBytesLeft {
204+
buf.Write(data[:bodyBytesLeft])
205+
io.WriteString(buf, "...")
206+
return 0
207+
} else {
208+
buf.Write(data)
209+
return bodyBytesLeft - len(data)
210+
}
211+
}
212+
213+
func dumpRequest(req *http.Request) ([]byte, error) {
214+
var b bytes.Buffer
215+
216+
// Exclude some headers for security, or just that we don't need them when debugging
217+
err := req.Header.WriteSubset(&b, map[string]bool{
218+
"Cookie": true,
219+
"X-Csrf-Token": true,
220+
"Authorization": true,
221+
})
222+
if err != nil {
223+
return nil, err
224+
}
225+
226+
ret := bytes.Replace(b.Bytes(), []byte("\r\n"), []byte("; "), -1)
227+
return ret, nil
228+
}
229+
230+
// IsWSHandshakeRequest returns true if the given request is a websocket handshake request.
231+
func IsWSHandshakeRequest(req *http.Request) bool {
232+
if strings.ToLower(req.Header.Get("Upgrade")) == "websocket" {
233+
// Connection header values can be of form "foo, bar, ..."
234+
parts := strings.Split(strings.ToLower(req.Header.Get("Connection")), ",")
235+
for _, part := range parts {
236+
if strings.TrimSpace(part) == "upgrade" {
237+
return true
238+
}
239+
}
240+
}
241+
return false
242+
}
243+
244+
// NewHTTPMetricMiddleware creates a new middleware that automatically instruments HTTP requests from the given router.
245+
func NewHTTPMetricMiddleware(mux *mux.Router, namespace string, reg prometheus.Registerer) (middleware.Interface, error) {
246+
// Prometheus histograms for requests.
247+
requestDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
248+
Namespace: namespace,
249+
Name: "request_duration_seconds",
250+
Help: "Time (in seconds) spent serving HTTP requests.",
251+
Buckets: instrument.DefBuckets,
252+
}, []string{"method", "route", "status_code", "ws"})
253+
err := reg.Register(requestDuration)
254+
if err != nil {
255+
already, ok := err.(prometheus.AlreadyRegisteredError)
256+
if ok {
257+
requestDuration = already.ExistingCollector.(*prometheus.HistogramVec)
258+
} else {
259+
return nil, err
260+
}
261+
}
262+
263+
receivedMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{
264+
Namespace: namespace,
265+
Name: "request_message_bytes",
266+
Help: "Size (in bytes) of messages received in the request.",
267+
Buckets: middleware.BodySizeBuckets,
268+
}, []string{"method", "route"})
269+
err = reg.Register(receivedMessageSize)
270+
if err != nil {
271+
already, ok := err.(prometheus.AlreadyRegisteredError)
272+
if ok {
273+
receivedMessageSize = already.ExistingCollector.(*prometheus.HistogramVec)
274+
} else {
275+
return nil, err
276+
}
277+
}
278+
279+
sentMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{
280+
Namespace: namespace,
281+
Name: "response_message_bytes",
282+
Help: "Size (in bytes) of messages sent in response.",
283+
Buckets: middleware.BodySizeBuckets,
284+
}, []string{"method", "route"})
285+
286+
err = reg.Register(sentMessageSize)
287+
if err != nil {
288+
already, ok := err.(prometheus.AlreadyRegisteredError)
289+
if ok {
290+
sentMessageSize = already.ExistingCollector.(*prometheus.HistogramVec)
291+
} else {
292+
return nil, err
293+
}
294+
}
295+
296+
inflightRequests := prometheus.NewGaugeVec(prometheus.GaugeOpts{
297+
Namespace: namespace,
298+
Name: "inflight_requests",
299+
Help: "Current number of inflight requests.",
300+
}, []string{"method", "route"})
301+
err = reg.Register(inflightRequests)
302+
if err != nil {
303+
already, ok := err.(prometheus.AlreadyRegisteredError)
304+
if ok {
305+
inflightRequests = already.ExistingCollector.(*prometheus.GaugeVec)
306+
} else {
307+
return nil, err
308+
}
309+
}
310+
return middleware.Instrument{
311+
RouteMatcher: mux,
312+
Duration: requestDuration,
313+
RequestBodySize: receivedMessageSize,
314+
ResponseBodySize: sentMessageSize,
315+
InflightRequests: inflightRequests,
316+
}, nil
317+
}

0 commit comments

Comments
 (0)