@@ -11,7 +11,7 @@ import {
1111} from "../chunk" ;
1212import { InternalError , ManifestError , RangeError , ServerError } from "../errors" ;
1313import { SHA256_PREFIX_LEN , getSHA256 , hexToDigest } from "../user" ;
14- import { readableToBlob , readerToBlob , wrap } from "../utils" ;
14+ import { errorString , readableToBlob , readerToBlob , wrap } from "../utils" ;
1515import { BlobUnknownError , ManifestUnknownError } from "../v2-errors" ;
1616import {
1717 CheckLayerResponse ,
@@ -29,6 +29,7 @@ import {
2929} from "./registry" ;
3030import { GarbageCollectionMode , GarbageCollector } from "./garbage-collector" ;
3131import { ManifestSchema , manifestSchema } from "../manifest" ;
32+ import { DigestInvalid , RegistryResponseJSON } from "../v2-responses" ;
3233
3334export type Chunk =
3435 | {
@@ -101,6 +102,10 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
101102 return { jwt : jwtSignature , hash : await getSHA256 ( jwtSignature , "" ) } ;
102103}
103104
105+ export const referenceHeader = "X-Serverless-Registry-Reference" ;
106+ export const digestHeaderInReference = "X-Serverless-Registry-Digest" ;
107+ export const registryUploadKey = "X-Serverless-Registry-Upload" ;
108+
104109export async function getUploadState (
105110 name : string ,
106111 uploadId : string ,
@@ -127,6 +132,15 @@ export async function getUploadState(
127132 return { state : stateObject , stateStr : stateStr , hash : stateStrHash } ;
128133}
129134
135+ export function isReference ( r2Object : R2Object ) : false | string {
136+ if ( r2Object . customMetadata === undefined ) return false ;
137+ const value = r2Object . customMetadata [ referenceHeader ] ;
138+ if ( value !== undefined ) {
139+ return value ;
140+ }
141+ return false ;
142+ }
143+
130144export class R2Registry implements Registry {
131145 private gc : GarbageCollector ;
132146
@@ -196,6 +210,11 @@ export class R2Registry implements Registry {
196210 // name format is:
197211 // <path>/<'blobs' | 'manifests'>/<name>
198212 const parts = object . key . split ( "/" ) ;
213+ // maybe an upload.
214+ if ( parts . length === 1 ) {
215+ return ;
216+ }
217+
199218 const repository = parts . slice ( 0 , parts . length - 2 ) . join ( "/" ) ;
200219 if ( parts [ parts . length - 2 ] === "blobs" ) return ;
201220
@@ -389,15 +408,39 @@ export class R2Registry implements Registry {
389408 } ;
390409 }
391410
411+ const key = isReference ( res ) ;
412+ let [ digest , size ] = [ "" , 0 ] ;
413+ if ( key ) {
414+ const [ res , err ] = await wrap ( this . env . REGISTRY . head ( key ) ) ;
415+ if ( err ) {
416+ return wrapError ( "layerExists" , err ) ;
417+ }
418+
419+ if ( ! res ) {
420+ return { exists : false } ;
421+ }
422+
423+ if ( ! res . customMetadata ) throw new Error ( "unreachable" ) ;
424+ if ( ! res . customMetadata [ digestHeaderInReference ] ) throw new Error ( "unreachable" ) ;
425+ const possibleDigest = res . customMetadata [ digestHeaderInReference ] ;
426+ if ( ! possibleDigest ) throw new Error ( "unreachable, no digest" ) ;
427+
428+ digest = possibleDigest ;
429+ size = res . size ;
430+ } else {
431+ digest = hexToDigest ( res . checksums . sha256 ! ) ;
432+ size = res . size ;
433+ }
434+
392435 return {
393- digest : hexToDigest ( res . checksums . sha256 ! ) ,
394- size : res . size ,
436+ digest,
437+ size,
395438 exists : true ,
396439 } ;
397440 }
398441
399442 async getLayer ( name : string , digest : string ) : Promise < RegistryError | GetLayerResponse > {
400- const [ res , err ] = await wrap ( this . env . REGISTRY . get ( `${ name } /blobs/${ digest } ` ) ) ;
443+ let [ res , err ] = await wrap ( this . env . REGISTRY . get ( `${ name } /blobs/${ digest } ` ) ) ;
401444 if ( err ) {
402445 return wrapError ( "getLayer" , err ) ;
403446 }
@@ -408,9 +451,24 @@ export class R2Registry implements Registry {
408451 } ;
409452 }
410453
454+ const id = isReference ( res ) ;
455+ if ( id ) {
456+ [ res , err ] = await wrap ( this . env . REGISTRY . get ( id ) ) ;
457+ if ( err ) {
458+ return wrapError ( "getLayer" , err ) ;
459+ }
460+
461+ if ( ! res ) {
462+ // not a 500, because garbage collection deletes the underlying layer first
463+ return {
464+ response : new Response ( JSON . stringify ( BlobUnknownError ) , { status : 404 } ) ,
465+ } ;
466+ }
467+ }
468+
411469 return {
412470 stream : res . body ! ,
413- digest : hexToDigest ( res . checksums . sha256 ! ) ,
471+ digest,
414472 size : res . size ,
415473 } ;
416474 }
@@ -419,7 +477,9 @@ export class R2Registry implements Registry {
419477 // Generate a unique ID for this upload
420478 const uuid = crypto . randomUUID ( ) ;
421479
422- const upload = await this . env . REGISTRY . createMultipartUpload ( uuid ) ;
480+ const upload = await this . env . REGISTRY . createMultipartUpload ( uuid , {
481+ customMetadata : { [ registryUploadKey ] : "true" } ,
482+ } ) ;
423483 const state = {
424484 uploadId : upload . uploadId ,
425485 parts : [ ] ,
@@ -691,12 +751,55 @@ export class R2Registry implements Registry {
691751 // TODO: Handle one last buffer here
692752 await upload . complete ( state . parts ) ;
693753 const obj = await this . env . REGISTRY . get ( uuid ) ;
694- const put = this . env . REGISTRY . put ( `${ namespace } /blobs/${ expectedSha } ` , obj ! . body , {
695- sha256 : ( expectedSha as string ) . slice ( SHA256_PREFIX_LEN ) ,
696- } ) ;
754+ if ( obj === null ) {
755+ console . error ( "unreachable, obj is null when we just created upload" ) ;
756+ return {
757+ response : new InternalError ( ) ,
758+ } ;
759+ }
697760
698- await put ;
699- await this . env . REGISTRY . delete ( uuid ) ;
761+ const target = `${ namespace } /blobs/${ expectedSha } ` ;
762+ const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000 ;
763+ // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
764+ // stream and create a reference from the blobs path to the
765+ // upload path. In R2, moving objects mean copying the stream, which
766+ // doesn't really work if it's above 5GB due to R2 limits.
767+ if ( obj . size >= MAXIMUM_SIZE_R2_OBJECT ) {
768+ const compressionStream = new crypto . DigestStream ( "SHA-256" ) ;
769+ obj . body . pipeTo ( compressionStream ) ;
770+ const digest = hexToDigest ( await compressionStream . digest ) ;
771+ if ( digest !== expectedSha ) {
772+ return { response : new RegistryResponseJSON ( JSON . stringify ( DigestInvalid ( expectedSha , digest ) ) ) } ;
773+ }
774+
775+ const [ , err ] = await wrap (
776+ this . env . REGISTRY . put ( target , uuid , {
777+ customMetadata : {
778+ [ referenceHeader ] : uuid ,
779+ [ digestHeaderInReference ] : digest ,
780+ } ,
781+ } ) ,
782+ ) ;
783+ if ( err !== null ) {
784+ console . error ( "error uploading reference blob" , errorString ( err ) ) ;
785+ await this . env . REGISTRY . delete ( uuid ) ;
786+ return {
787+ response : new InternalError ( ) ,
788+ } ;
789+ }
790+ } else {
791+ const put = this . env . REGISTRY . put ( target , obj ! . body , {
792+ sha256 : ( expectedSha as string ) . slice ( SHA256_PREFIX_LEN ) ,
793+ } ) ;
794+ const [ , err ] = await wrap ( put ) ;
795+ await this . env . REGISTRY . delete ( uuid ) ;
796+ if ( err !== null ) {
797+ console . error ( "error uploading blob" , errorString ( err ) ) ;
798+ return {
799+ response : new InternalError ( ) ,
800+ } ;
801+ }
802+ }
700803 }
701804
702805 await this . env . REGISTRY . delete ( getRegistryUploadsPath ( state ) ) ;
0 commit comments