Skip to content

Commit 619c7ca

Browse files
committed
feat(transport): add resumable transport for remote resources
1 parent 8ce189d commit 619c7ca

File tree

5 files changed

+603
-0
lines changed

5 files changed

+603
-0
lines changed

cmd/crane/cmd/pull.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func NewCmdPull(options *[]crane.Option) *cobra.Command {
3232
var (
3333
cachePath, format string
3434
annotateRef bool
35+
resumable bool
3536
)
3637

3738
cmd := &cobra.Command{
@@ -49,6 +50,10 @@ func NewCmdPull(options *[]crane.Option) *cobra.Command {
4950
return fmt.Errorf("parsing reference %q: %w", src, err)
5051
}
5152

53+
if resumable {
54+
o.Remote = append(o.Remote, remote.WithResumable())
55+
}
56+
5257
rmt, err := remote.Get(ref, o.Remote...)
5358
if err != nil {
5459
return err
@@ -133,6 +138,7 @@ func NewCmdPull(options *[]crane.Option) *cobra.Command {
133138
cmd.Flags().StringVarP(&cachePath, "cache_path", "c", "", "Path to cache image layers")
134139
cmd.Flags().StringVar(&format, "format", "tarball", fmt.Sprintf("Format in which to save images (%q, %q, or %q)", "tarball", "legacy", "oci"))
135140
cmd.Flags().BoolVar(&annotateRef, "annotate-ref", false, "Preserves image reference used to pull as an annotation when used with --format=oci")
141+
cmd.Flags().BoolVar(&resumable, "resumable", false, "Enable resumable transport for pulling images")
136142

137143
return cmd
138144
}

pkg/v1/remote/image_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package remote
1717
import (
1818
"bytes"
1919
"context"
20+
"crypto/sha256"
21+
"encoding/hex"
2022
"encoding/json"
2123
"fmt"
2224
"io"
@@ -747,3 +749,45 @@ func TestData(t *testing.T) {
747749
t.Fatal(err)
748750
}
749751
}
752+
753+
func TestImageResumable(t *testing.T) {
754+
ref, err := name.ParseReference("ghcr.io/labring/fastgpt:v4.9.0")
755+
if err != nil {
756+
t.Fatal(err)
757+
}
758+
759+
image, err := Image(ref, WithResumable())
760+
if err != nil {
761+
t.Fatal(err)
762+
}
763+
764+
layers, err := image.Layers()
765+
if err != nil {
766+
t.Fatal(err)
767+
}
768+
769+
for _, layer := range layers {
770+
digest, err := layer.Digest()
771+
if err != nil {
772+
t.Fatal(err)
773+
}
774+
775+
rc, err := layer.Compressed()
776+
if err != nil {
777+
t.Fatal(err)
778+
}
779+
780+
hash := sha256.New()
781+
_, err = io.Copy(hash, rc)
782+
rc.Close()
783+
if err != nil {
784+
t.Fatal(err)
785+
}
786+
787+
if digest.Hex == hex.EncodeToString(hash.Sum(nil)) {
788+
t.Logf("digest matches: %s", digest)
789+
} else {
790+
t.Errorf("digest mismatch: %s != %s", digest, hex.EncodeToString(hash.Sum(nil)))
791+
}
792+
}
793+
}

pkg/v1/remote/options.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type options struct {
4545
retryBackoff Backoff
4646
retryPredicate retry.Predicate
4747
retryStatusCodes []int
48+
resumable bool
4849

4950
// Only these options can overwrite Reuse()d options.
5051
platform v1.Platform
@@ -170,6 +171,11 @@ func makeOptions(opts ...Option) (*options, error) {
170171

171172
// Wrap the transport in something that can retry network flakes.
172173
o.transport = transport.NewRetry(o.transport, transport.WithRetryBackoff(o.retryBackoff), transport.WithRetryPredicate(predicate), transport.WithRetryStatusCodes(o.retryStatusCodes...))
174+
175+
if o.resumable {
176+
o.transport = transport.NewResumable(o.transport)
177+
}
178+
173179
// Wrap this last to prevent transport.New from double-wrapping.
174180
if o.userAgent != "" {
175181
o.transport = transport.NewUserAgent(o.transport, o.userAgent)
@@ -192,6 +198,13 @@ func WithTransport(t http.RoundTripper) Option {
192198
}
193199
}
194200

201+
func WithResumable() Option {
202+
return func(o *options) error {
203+
o.resumable = true
204+
return nil
205+
}
206+
}
207+
195208
// WithAuth is a functional option for overriding the default authenticator
196209
// for remote operations.
197210
// It is an error to use both WithAuth and WithAuthFromKeychain in the same Option set.
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package transport
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"regexp"
9+
"strconv"
10+
"strings"
11+
"sync/atomic"
12+
13+
"github.com/google/go-containerregistry/pkg/logs"
14+
)
15+
16+
// NewResumable creates a http.RoundTripper that resumes http GET from error,
17+
// and the inner should be wrapped with retry transport, otherwise, the
18+
// transport will abort if resume() returns error.
19+
func NewResumable(inner http.RoundTripper) http.RoundTripper {
20+
return &resumableTransport{inner: inner}
21+
}
22+
23+
var (
24+
contentRangeRe = regexp.MustCompile(`^bytes (\d+)-(\d+)/(\d+|\*)$`)
25+
)
26+
27+
type resumableTransport struct {
28+
inner http.RoundTripper
29+
}
30+
31+
func (rt *resumableTransport) RoundTrip(in *http.Request) (*http.Response, error) {
32+
if in.Method != http.MethodGet {
33+
return rt.inner.RoundTrip(in)
34+
}
35+
36+
req := in.Clone(in.Context())
37+
req.Header.Set("Range", "bytes=0-")
38+
resp, err := rt.inner.RoundTrip(req)
39+
if err != nil {
40+
return resp, err
41+
}
42+
43+
switch resp.StatusCode {
44+
case http.StatusPartialContent:
45+
case http.StatusRequestedRangeNotSatisfiable:
46+
// fallback to previous behavior
47+
resp.Body.Close()
48+
return rt.inner.RoundTrip(in)
49+
default:
50+
return resp, nil
51+
}
52+
53+
var contentLength int64
54+
if _, _, contentLength, err = parseContentRange(resp.Header.Get("Content-Range")); err != nil || contentLength <= 0 {
55+
// fallback to previous behavior
56+
resp.Body.Close()
57+
return rt.inner.RoundTrip(in)
58+
}
59+
60+
// modify response status to 200, ensure caller error checking works
61+
resp.StatusCode = http.StatusOK
62+
resp.Status = "200 OK"
63+
resp.ContentLength = contentLength
64+
resp.Body = &resumableBody{
65+
rc: resp.Body,
66+
inner: rt.inner,
67+
req: req,
68+
total: contentLength,
69+
transferred: 0,
70+
}
71+
72+
return resp, nil
73+
}
74+
75+
type resumableBody struct {
76+
rc io.ReadCloser
77+
78+
inner http.RoundTripper
79+
req *http.Request
80+
81+
transferred int64
82+
total int64
83+
84+
closed uint32
85+
}
86+
87+
func (rb *resumableBody) Read(p []byte) (n int, err error) {
88+
if atomic.LoadUint32(&rb.closed) == 1 {
89+
// response body already closed
90+
return 0, http.ErrBodyReadAfterClose
91+
} else if rb.total >= 0 && rb.transferred >= rb.total {
92+
return 0, io.EOF
93+
}
94+
95+
resume:
96+
if n, err = rb.rc.Read(p); n > 0 {
97+
rb.transferred += int64(n)
98+
}
99+
100+
if err == nil {
101+
return
102+
}
103+
104+
if errors.Is(err, io.EOF) && rb.total >= 0 && rb.transferred == rb.total {
105+
return
106+
}
107+
108+
if err = rb.resume(err); err == nil {
109+
if n == 0 {
110+
// zero bytes read, try reading again with new response.Body
111+
goto resume
112+
}
113+
114+
// already read some bytes from previous response.Body, returns and waits for next Read operation
115+
}
116+
117+
return n, err
118+
}
119+
120+
func (rb *resumableBody) Close() (err error) {
121+
if !atomic.CompareAndSwapUint32(&rb.closed, 0, 1) {
122+
return nil
123+
}
124+
125+
return rb.rc.Close()
126+
}
127+
128+
func (rb *resumableBody) resume(reason error) error {
129+
if reason != nil {
130+
logs.Debug.Printf("Resume http transporting from error: %v", reason)
131+
}
132+
133+
ctx := rb.req.Context()
134+
select {
135+
case <-ctx.Done():
136+
// context already done, stop resuming from error
137+
return ctx.Err()
138+
default:
139+
}
140+
141+
req := rb.req.Clone(ctx)
142+
req.Header.Set("Range", "bytes="+strconv.FormatInt(rb.transferred, 10)+"-")
143+
resp, err := rb.inner.RoundTrip(req)
144+
if err != nil {
145+
return err
146+
}
147+
148+
if err = rb.validate(resp); err != nil {
149+
resp.Body.Close()
150+
return err
151+
}
152+
153+
if atomic.LoadUint32(&rb.closed) == 1 {
154+
resp.Body.Close()
155+
return http.ErrBodyReadAfterClose
156+
}
157+
158+
rb.rc.Close()
159+
rb.rc = resp.Body
160+
161+
return nil
162+
}
163+
164+
func (rb *resumableBody) validate(resp *http.Response) (err error) {
165+
var start, total int64
166+
switch resp.StatusCode {
167+
case http.StatusPartialContent:
168+
if start, _, total, err = parseContentRange(resp.Header.Get("Content-Range")); err != nil {
169+
return err
170+
}
171+
172+
if total > rb.total {
173+
rb.total = total
174+
}
175+
176+
if start == rb.transferred {
177+
break
178+
} else if start < rb.transferred {
179+
if _, err := io.CopyN(io.Discard, resp.Body, rb.transferred-start); err != nil {
180+
return fmt.Errorf("discard overlapped data failed, %v", err)
181+
}
182+
} else {
183+
return fmt.Errorf("unexpected resume start %d, wanted: %d", start, rb.transferred)
184+
}
185+
case http.StatusOK:
186+
if rb.transferred > 0 {
187+
if _, err = io.CopyN(io.Discard, resp.Body, rb.transferred); err != nil {
188+
return err
189+
}
190+
}
191+
case http.StatusRequestedRangeNotSatisfiable:
192+
if contentRange := resp.Header.Get("Content-Range"); contentRange != "" && strings.HasPrefix(contentRange, "bytes */") {
193+
if total, err = strconv.ParseInt(strings.TrimPrefix(contentRange, "bytes */"), 10, 64); err == nil && total >= 0 && rb.transferred >= total {
194+
return io.EOF
195+
}
196+
}
197+
198+
fallthrough
199+
default:
200+
return fmt.Errorf("unexpected status code %d", resp.StatusCode)
201+
}
202+
203+
return nil
204+
}
205+
206+
func parseContentRange(contentRange string) (start, end, size int64, err error) {
207+
if contentRange == "" {
208+
return -1, -1, -1, errors.New("unexpected empty content range")
209+
}
210+
211+
matches := contentRangeRe.FindStringSubmatch(contentRange)
212+
if len(matches) != 4 {
213+
return -1, -1, -1, fmt.Errorf("invalid content range: %s", contentRange)
214+
}
215+
216+
if start, err = strconv.ParseInt(matches[1], 10, 64); err != nil {
217+
return -1, -1, -1, fmt.Errorf("unexpected start from content range '%s', %v", contentRange, err)
218+
}
219+
220+
if end, err = strconv.ParseInt(matches[2], 10, 64); err != nil {
221+
return -1, -1, -1, fmt.Errorf("unexpected end from content range '%s', %v", contentRange, err)
222+
}
223+
224+
if start > end {
225+
return -1, -1, -1, fmt.Errorf("invalid content range: %s", contentRange)
226+
}
227+
228+
if matches[3] == "*" {
229+
size = -1
230+
} else {
231+
size, err = strconv.ParseInt(matches[3], 10, 64)
232+
if err != nil {
233+
return -1, -1, -1, fmt.Errorf("unexpected total from content range '%s', %v", contentRange, err)
234+
}
235+
236+
if end >= size {
237+
return -1, -1, -1, fmt.Errorf("invalid content range: %s", contentRange)
238+
}
239+
}
240+
241+
return
242+
}

0 commit comments

Comments
 (0)