1+ import { createSHA256 , IHasher , loadWasm } from "@taylorzane/hash-wasm" ;
12import { Env } from "../.." ;
23import jwt from "@tsndr/cloudflare-worker-jwt" ;
34import {
@@ -10,6 +11,8 @@ import {
1011 split ,
1112} from "../chunk" ;
1213import { InternalError , ManifestError , RangeError , ServerError } from "../errors" ;
14+ import { Buffer } from "node:buffer" ;
15+
1316import { SHA256_PREFIX_LEN , getSHA256 , hexToDigest } from "../user" ;
1417import { errorString , readableToBlob , readerToBlob , wrap } from "../utils" ;
1518import { BlobUnknownError , ManifestUnknownError } from "../v2-errors" ;
@@ -30,6 +33,38 @@ import {
3033import { GarbageCollectionMode , GarbageCollector } from "./garbage-collector" ;
3134import { ManifestSchema , manifestSchema } from "../manifest" ;
3235import { DigestInvalid , RegistryResponseJSON } from "../v2-responses" ;
36+ // @ts -expect-error: No declaration file for module
37+ import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm" ;
38+
39+ export async function hash ( readableStream : ReadableStream | null , state ?: Uint8Array ) : Promise < IHasher > {
40+ loadWasm ( { sha256 : sha256Wasm } ) ;
41+ let hasher = await createSHA256 ( ) ;
42+ if ( state !== undefined ) {
43+ hasher . load ( state ) ;
44+ } else {
45+ hasher = hasher . init ( ) ;
46+ }
47+
48+ const reader = readableStream ?. getReader ( { mode : "byob" } ) ;
49+ while ( reader !== undefined ) {
50+ // Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow)
51+ const array = new Uint8Array ( 1024 * 1024 * 5 ) ;
52+ const value = await reader . read ( array ) ;
53+ if ( value . done ) break ;
54+ hasher . update ( value . value ) ;
55+ }
56+
57+ return hasher ;
58+ }
59+
60+ export function hashStateToUint8Array ( hashState : string ) : Uint8Array {
61+ const buffer = Buffer . from ( hashState , "base64" ) ;
62+ return new Uint8Array ( buffer ) ;
63+ }
64+
65+ export function intoBase64FromUint8Array ( array : Uint8Array ) : string {
66+ return Buffer . from ( array ) . toString ( "base64" ) ;
67+ }
3368
3469export type Chunk =
3570 | {
@@ -66,6 +101,7 @@ export type State = {
66101 registryUploadId : string ;
67102 byteRange : number ;
68103 name : string ;
104+ hashState ?: string ;
69105} ;
70106
71107export function getRegistryUploadsPath ( state : { registryUploadId : string ; name : string } ) : string {
@@ -686,12 +722,48 @@ export class R2Registry implements Registry {
686722 } ;
687723 }
688724
689- const res = await appendStreamKnownLength ( stream , length ) ;
725+ let hasherPromise : Promise < IHasher > | undefined ;
726+ if (
727+ length <= MAXIMUM_CHUNK &&
728+ // if starting, or already started.
729+ ( state . parts . length === 0 || ( state . parts . length > 0 && state . hashState !== undefined ) )
730+ ) {
731+ const [ s1 , s2 ] = stream . tee ( ) ;
732+ stream = s1 ;
733+ let bytes : undefined | Uint8Array ;
734+ if ( state . hashState !== undefined ) {
735+ bytes = hashStateToUint8Array ( state . hashState ) ;
736+ }
737+
738+ hasherPromise = hash ( s2 , bytes ) ;
739+ } else {
740+ state . hashState = undefined ;
741+ }
742+
743+ const [ res , hasherResponse ] = await Promise . allSettled ( [ appendStreamKnownLength ( stream , length ) , hasherPromise ] ) ;
690744 state . byteRange += length ;
691- if ( res instanceof RangeError )
745+ if ( res . status === "rejected" ) {
746+ return {
747+ response : new InternalError ( ) ,
748+ } ;
749+ }
750+
751+ if ( res . value instanceof RangeError ) {
692752 return {
693- response : res ,
753+ response : res . value ,
694754 } ;
755+ }
756+
757+ if ( hasherPromise !== undefined && hasherResponse !== undefined ) {
758+ if ( hasherResponse . status === "rejected" ) {
759+ throw hasherResponse . reason ;
760+ }
761+
762+ if ( hasherResponse . value === undefined ) throw new Error ( "unreachable" ) ;
763+
764+ const value = hasherResponse . value . save ( ) ;
765+ state . hashState = intoBase64FromUint8Array ( value ) ;
766+ }
695767
696768 const hashedJwtState = await encodeState ( state , env ) ;
697769 return {
@@ -758,16 +830,24 @@ export class R2Registry implements Registry {
758830 } ;
759831 }
760832
761- const target = `${ namespace } /blobs/${ expectedSha } ` ;
762833 const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000 ;
834+ if ( obj . size >= MAXIMUM_SIZE_R2_OBJECT && state . hashState === undefined ) {
835+ console . error ( `The maximum size of an R2 object is 5gb, multipart uploads don't
836+ have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb` ) ;
837+ return {
838+ response : new InternalError ( ) ,
839+ } ;
840+ }
841+
842+ const target = `${ namespace } /blobs/${ expectedSha } ` ;
763843 // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
764844 // stream and create a reference from the blobs path to the
765- // upload path. In R2, moving objects mean copying the stream, which
766- // doesn't really work if it's above 5GB due to R2 limits.
767- if ( obj . size >= MAXIMUM_SIZE_R2_OBJECT ) {
768- const compressionStream = new crypto . DigestStream ( "SHA-256" ) ;
769- obj . body . pipeTo ( compressionStream ) ;
770- const digest = hexToDigest ( await compressionStream . digest ) ;
845+ // upload path. That's why we need hash-wasm, as it allows you to store the state.
846+ if ( state . hashState !== undefined ) {
847+ const stateEncoded = hashStateToUint8Array ( state . hashState ) ;
848+ const hasher = await hash ( null , stateEncoded ) ;
849+ const digest = hasher . digest ( "hex" ) ;
850+
771851 if ( digest !== expectedSha ) {
772852 return { response : new RegistryResponseJSON ( JSON . stringify ( DigestInvalid ( expectedSha , digest ) ) ) } ;
773853 }
0 commit comments