Skip to content

Commit 66de32f

Browse files
authored
Merge branch 'main' into feature/fixCheckOldBlockIds
2 parents ab13739 + a42a27c commit 66de32f

File tree

5 files changed

+199
-4
lines changed

5 files changed

+199
-4
lines changed

compose/docker-compose-3blasters.yml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,15 @@ services:
147147
JAEGER_AGENT_HOST: localhost
148148
JAEGER_AGENT_PORT: 6831
149149
logLevel: "DEBUG"
150+
KAFKA_BLOCKS: blocks-teranode1
151+
KAFKA_BLOCKS_FINAL: blocks-final-teranode1
152+
KAFKA_INVALID_BLOCKS: invalid-blocks-teranode1
153+
KAFKA_INVALID_SUBTREES: invalid-subtrees-teranode1
154+
KAFKA_LEGACY_INV: legacy-inv-teranode1
155+
KAFKA_REJECTEDTX: rejectedtx-teranode1
156+
KAFKA_SUBTREES: subtrees-teranode1
157+
KAFKA_TXMETA: txmeta-teranode1
158+
KAFKA_VALIDATORTXS: validatortxs-teranode1
150159
volumes:
151160
- ../settings.conf:/app/settings.conf
152161
- ../settings_local.conf:/app/settings_local.conf
@@ -209,6 +218,15 @@ services:
209218
JAEGER_AGENT_HOST: localhost
210219
JAEGER_AGENT_PORT: 6831
211220
logLevel: "DEBUG"
221+
KAFKA_BLOCKS: blocks-teranode2
222+
KAFKA_BLOCKS_FINAL: blocks-final-teranode2
223+
KAFKA_INVALID_BLOCKS: invalid-blocks-teranode2
224+
KAFKA_INVALID_SUBTREES: invalid-subtrees-teranode2
225+
KAFKA_LEGACY_INV: legacy-inv-teranode2
226+
KAFKA_REJECTEDTX: rejectedtx-teranode2
227+
KAFKA_SUBTREES: subtrees-teranode2
228+
KAFKA_TXMETA: txmeta-teranode2
229+
KAFKA_VALIDATORTXS: validatortxs-teranode2
212230
volumes:
213231
- ../settings.conf:/app/settings.conf
214232
- ../settings_local.conf:/app/settings_local.conf
@@ -271,6 +289,15 @@ services:
271289
JAEGER_AGENT_HOST: localhost
272290
JAEGER_AGENT_PORT: 6831
273291
logLevel: "DEBUG"
292+
KAFKA_BLOCKS: blocks-teranode3
293+
KAFKA_BLOCKS_FINAL: blocks-final-teranode3
294+
KAFKA_INVALID_BLOCKS: invalid-blocks-teranode3
295+
KAFKA_INVALID_SUBTREES: invalid-subtrees-teranode3
296+
KAFKA_LEGACY_INV: legacy-inv-teranode3
297+
KAFKA_REJECTEDTX: rejectedtx-teranode3
298+
KAFKA_SUBTREES: subtrees-teranode3
299+
KAFKA_TXMETA: txmeta-teranode3
300+
KAFKA_VALIDATORTXS: validatortxs-teranode3
274301
volumes:
275302
- ../settings.conf:/app/settings.conf
276303
- ../settings_local.conf:/app/settings_local.conf
@@ -378,6 +405,8 @@ services:
378405
restart: no
379406

380407
coinbase2:
408+
profiles:
409+
- skip
381410
<<: *teranode-coinbase-base
382411
container_name: coinbase2
383412
depends_on:
@@ -430,6 +459,8 @@ services:
430459

431460
coinbase3:
432461
<<: *teranode-coinbase-base
462+
profiles:
463+
- skip
433464
container_name: coinbase3
434465
depends_on:
435466
- teranode3
@@ -488,6 +519,8 @@ services:
488519

489520
tx-blaster-2:
490521
<<: *teranode-coinbase-base
522+
profiles:
523+
- skip
491524
container_name: tx-blaster-2
492525
depends_on:
493526
- coinbase2
@@ -513,6 +546,8 @@ services:
513546
- ../data/txblaster2:/app/data
514547

515548
tx-blaster-3:
549+
profiles:
550+
- skip
516551
<<: *teranode-coinbase-base
517552
container_name: tx-blaster-3
518553
depends_on:

settings.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,11 @@ http_sign_response = true
624624

625625
http_timeout = 30000
626626

627+
# HTTP streaming timeout in milliseconds for large file downloads (subtree data, blocks during catchup)
628+
# This is longer than http_timeout to accommodate large subtree data files which can be 100+ MB
629+
# Default: 300000ms (5 minutes)
630+
http_streaming_timeout = 300000
631+
627632
# IPV6 Addresses
628633
# --------------
629634
# ipv6_addresses = ff02::1234

util/http.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"io"
77
"net/http"
8+
"strings"
89
"time"
910

1011
"github.com/bsv-blockchain/teranode/errors"
@@ -14,7 +15,12 @@ import (
1415
var (
1516
// httpRequestTimeout defines the default HTTP request timeout in milliseconds
1617
// when no deadline is set on the context.
17-
httpRequestTimeout, _ = gocore.Config().GetInt("http_timeout", 60)
18+
httpRequestTimeout, _ = gocore.Config().GetInt("http_timeout", 60000)
19+
20+
// httpStreamingTimeout defines the default HTTP streaming timeout in milliseconds
21+
// for operations that stream large responses. This is longer than httpRequestTimeout
22+
// to accommodate large block/subtree downloads during catchup.
23+
httpStreamingTimeout, _ = gocore.Config().GetInt("http_streaming_timeout", 300000) // 5 minutes default
1824
)
1925

2026
// DoHTTPRequest performs an HTTP GET or POST request and returns the response body as bytes.
@@ -57,17 +63,34 @@ func DoHTTPRequest(ctx context.Context, url string, requestBody ...[]byte) ([]by
5763
}
5864
}
5965

66+
// readCloserWithCancel wraps an io.ReadCloser and calls a cancel function when closed.
67+
type readCloserWithCancel struct {
68+
io.ReadCloser
69+
cancelFn context.CancelFunc
70+
}
71+
72+
func (r *readCloserWithCancel) Close() error {
73+
defer r.cancelFn()
74+
return r.ReadCloser.Close()
75+
}
76+
6077
// DoHTTPRequestBodyReader performs an HTTP request and returns the response body as a ReadCloser.
6178
// This is more memory-efficient for large responses as it streams the data.
6279
// Caller is responsible for closing the returned ReadCloser.
80+
// Applies a default timeout of 5 minutes (configurable via http_streaming_timeout) when no
81+
// deadline is set on the context. This timeout is longer than the standard HTTP timeout
82+
// to accommodate large file downloads during operations like P2P catchup.
6383
func DoHTTPRequestBodyReader(ctx context.Context, url string, requestBody ...[]byte) (io.ReadCloser, error) {
64-
bodyReaderCloser, cancelFn, err := doHTTPRequest(ctx, url, requestBody...)
84+
bodyReaderCloser, cancelFn, err := doHTTPRequestForStreaming(ctx, url, requestBody...)
6585
if err != nil {
6686
cancelFn()
6787
return nil, err
6888
}
6989

70-
return bodyReaderCloser, nil
90+
return &readCloserWithCancel{
91+
ReadCloser: bodyReaderCloser,
92+
cancelFn: cancelFn,
93+
}, nil
7194
}
7295

7396
func doHTTPRequest(ctx context.Context, url string, requestBody ...[]byte) (io.ReadCloser, context.CancelFunc, error) {
@@ -79,6 +102,25 @@ func doHTTPRequest(ctx context.Context, url string, requestBody ...[]byte) (io.R
79102
ctx, cancelFn = context.WithTimeout(ctx, time.Duration(httpRequestTimeout)*time.Millisecond)
80103
}
81104

105+
return executeHTTPRequest(ctx, cancelFn, url, requestBody...)
106+
}
107+
108+
// doHTTPRequestForStreaming performs an HTTP request with a longer timeout suitable for streaming.
109+
// Applies httpStreamingTimeout (default 5 minutes) when no deadline exists on the context.
110+
func doHTTPRequestForStreaming(ctx context.Context, url string, requestBody ...[]byte) (io.ReadCloser, context.CancelFunc, error) {
111+
cancelFn := func() {
112+
// noop
113+
}
114+
115+
if _, ok := ctx.Deadline(); !ok {
116+
ctx, cancelFn = context.WithTimeout(ctx, time.Duration(httpStreamingTimeout)*time.Millisecond)
117+
}
118+
119+
return executeHTTPRequest(ctx, cancelFn, url, requestBody...)
120+
}
121+
122+
// executeHTTPRequest performs the actual HTTP request with the given context.
123+
func executeHTTPRequest(ctx context.Context, cancelFn context.CancelFunc, url string, requestBody ...[]byte) (io.ReadCloser, context.CancelFunc, error) {
82124
httpClient := http.DefaultClient
83125

84126
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
@@ -125,7 +167,8 @@ func doHTTPRequest(ctx context.Context, url string, requestBody ...[]byte) (io.R
125167
return nil, cancelFn, errFn("http request [%s] returned status code [%d]", url, resp.StatusCode)
126168
}
127169

128-
isHTML := resp.Header.Get("content-type") == "text/html"
170+
ct := strings.ToLower(resp.Header.Get("content-type"))
171+
isHTML := strings.HasPrefix(ct, "text/html")
129172
if isHTML {
130173
return nil, cancelFn, errors.NewServiceError("http request [%s] returned HTML - assume bad URL", url)
131174
}

util/http_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,110 @@ func TestDoHTTPRequestBodyReaderServerError(t *testing.T) {
281281
assert.Contains(t, err.Error(), "500")
282282
}
283283

284+
func TestDoHTTPRequestBodyReaderNoTimeoutOnSlowStream(t *testing.T) {
285+
// This test verifies that DoHTTPRequestBodyReader successfully completes
286+
// for quick responses, proving the streaming timeout is long enough
287+
responseData := []byte("streaming response data")
288+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
289+
w.Header().Set("Content-Type", "application/octet-stream")
290+
w.WriteHeader(http.StatusOK)
291+
_, err := w.Write(responseData)
292+
require.NoError(t, err)
293+
}))
294+
defer server.Close()
295+
296+
// Context without timeout - function should apply streaming timeout (5 min default)
297+
ctx := context.Background()
298+
reader, err := DoHTTPRequestBodyReader(ctx, server.URL)
299+
require.NoError(t, err)
300+
require.NotNil(t, reader)
301+
defer func(reader io.ReadCloser) {
302+
_ = reader.Close()
303+
}(reader)
304+
305+
body, err := io.ReadAll(reader)
306+
require.NoError(t, err)
307+
assert.Equal(t, responseData, body)
308+
}
309+
310+
func TestDoHTTPRequestBodyReaderWithShortTimeout(t *testing.T) {
311+
// This test uses a custom short streaming timeout to actually verify timeout behavior
312+
// Save and restore the original timeout
313+
originalTimeout := httpStreamingTimeout
314+
httpStreamingTimeout = 500 // 500ms for testing - enough for connection but not for slow read
315+
defer func() {
316+
httpStreamingTimeout = originalTimeout
317+
}()
318+
319+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
320+
w.Header().Set("Content-Type", "application/octet-stream")
321+
w.WriteHeader(http.StatusOK)
322+
// Write some data immediately
323+
flusher, ok := w.(http.Flusher)
324+
require.True(t, ok, "Expected http.ResponseWriter to be an http.Flusher")
325+
_, _ = w.Write([]byte("starting..."))
326+
flusher.Flush()
327+
328+
// Sleep longer than timeout - this should cause the read to fail
329+
time.Sleep(1 * time.Second)
330+
_, _ = w.Write([]byte("this should not be received"))
331+
}))
332+
defer server.Close()
333+
334+
ctx := context.Background()
335+
reader, err := DoHTTPRequestBodyReader(ctx, server.URL)
336+
require.NoError(t, err)
337+
require.NotNil(t, reader)
338+
defer func(reader io.ReadCloser) {
339+
_ = reader.Close()
340+
}(reader)
341+
342+
// Reading should fail due to timeout while reading the body
343+
_, err = io.ReadAll(reader)
344+
// Should get an error because the server is too slow
345+
require.Error(t, err, "Should get timeout error on slow stream")
346+
}
347+
348+
func TestDoHTTPRequestBodyReaderRespectsExistingDeadline(t *testing.T) {
349+
// This test verifies that DoHTTPRequestBodyReader respects an existing context deadline
350+
// We do this by setting a very short deadline and ensuring it times out quickly
351+
// (rather than waiting for the 5-minute streaming timeout)
352+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
353+
w.Header().Set("Content-Type", "application/octet-stream")
354+
w.WriteHeader(http.StatusOK)
355+
// Write initial data
356+
_, _ = w.Write([]byte("start"))
357+
if flusher, ok := w.(http.Flusher); ok {
358+
flusher.Flush()
359+
}
360+
// Sleep for 2 seconds - longer than our 100ms deadline
361+
time.Sleep(2 * time.Second)
362+
_, _ = w.Write([]byte("should timeout before this"))
363+
}))
364+
defer server.Close()
365+
366+
// Context with very short 100ms timeout (much shorter than 5-minute streaming default)
367+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
368+
defer cancel()
369+
370+
start := time.Now()
371+
reader, err := DoHTTPRequestBodyReader(ctx, server.URL)
372+
require.NoError(t, err)
373+
require.NotNil(t, reader)
374+
defer func(reader io.ReadCloser) {
375+
_ = reader.Close()
376+
}(reader)
377+
378+
// This should timeout due to our 100ms deadline, not the 5-minute streaming timeout
379+
_, err = io.ReadAll(reader)
380+
elapsed := time.Since(start)
381+
382+
// Should fail with timeout
383+
require.Error(t, err, "Should timeout with custom deadline")
384+
// Should have timed out quickly (within 1 second), not after 5 minutes
385+
assert.Less(t, elapsed, 2*time.Second, "Should respect short custom deadline, not wait for streaming timeout")
386+
}
387+
284388
func TestDoHTTPRequestEmptyRequestBody(t *testing.T) {
285389
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
286390
// Empty slice still triggers POST because len(requestBody) > 0

util/kafka/kafka_producer_async.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,16 @@ import (
2121
"github.com/bsv-blockchain/teranode/util"
2222
inmemorykafka "github.com/bsv-blockchain/teranode/util/kafka/in_memory_kafka"
2323
"github.com/bsv-blockchain/teranode/util/retry"
24+
"github.com/rcrowley/go-metrics"
2425
)
2526

27+
// init disables go-metrics globally to prevent memory leak from exponential decay sample heap.
28+
// This must be set before any Sarama clients are created.
29+
// See: https://github.com/IBM/sarama/issues/1321
30+
func init() {
31+
metrics.UseNilMetrics = true
32+
}
33+
2634
// KafkaAsyncProducerI defines the interface for asynchronous Kafka producer operations.
2735
type KafkaAsyncProducerI interface {
2836
// Start begins the async producer operation with the given message channel

0 commit comments

Comments
 (0)