@@ -11,7 +11,7 @@ import {
1111} from "../chunk" ;
1212import { InternalError , ManifestError , RangeError , ServerError } from "../errors" ;
1313import { SHA256_PREFIX_LEN , getSHA256 , hexToDigest } from "../user" ;
14- import { readableToBlob , readerToBlob , wrap } from "../utils" ;
14+ import { errorString , readableToBlob , readerToBlob , wrap } from "../utils" ;
1515import { BlobUnknownError , ManifestUnknownError } from "../v2-errors" ;
1616import {
1717 CheckLayerResponse ,
@@ -29,6 +29,7 @@ import {
2929} from "./registry" ;
3030import { GarbageCollectionMode , GarbageCollector } from "./garbage-collector" ;
3131import { ManifestSchema , manifestSchema } from "../manifest" ;
32+ import { DigestInvalid , RegistryResponseJSON } from "../v2-responses" ;
3233
3334export type Chunk =
3435 | {
@@ -101,6 +102,10 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
101102 return { jwt : jwtSignature , hash : await getSHA256 ( jwtSignature , "" ) } ;
102103}
103104
105+ export const referenceHeader = "X-Serverless-Registry-Reference" ;
106+ export const digestHeaderInReference = "X-Serverless-Registry-Digest" ;
107+ export const registryUploadKey = "X-Serverless-Registry-Upload" ;
108+
104109export async function getUploadState (
105110 name : string ,
106111 uploadId : string ,
@@ -127,6 +132,15 @@ export async function getUploadState(
127132 return { state : stateObject , stateStr : stateStr , hash : stateStrHash } ;
128133}
129134
135+ export function isReference ( r2Object : R2Object ) : false | string {
136+ if ( r2Object . customMetadata === undefined ) return false ;
137+ const value = r2Object . customMetadata [ referenceHeader ] ;
138+ if ( value !== undefined ) {
139+ return value ;
140+ }
141+ return false ;
142+ }
143+
130144export class R2Registry implements Registry {
131145 private gc : GarbageCollector ;
132146
@@ -196,12 +210,18 @@ export class R2Registry implements Registry {
196210 // name format is:
197211 // <path>/<'blobs' | 'manifests'>/<name>
198212 const parts = object . key . split ( "/" ) ;
213+ // maybe an upload.
214+ if ( parts . length === 1 ) {
215+ return ;
216+ }
217+
199218 const repository = parts . slice ( 0 , parts . length - 2 ) . join ( "/" ) ;
200219 if ( parts [ parts . length - 2 ] === "blobs" ) return ;
201220
202221 if ( repository in repositories ) return ;
203222 totalRecords ++ ;
204223 repositories [ repository ] = { } ;
224+ console . log ( "Pushing" , object . key , object . customMetadata ) ;
205225 repositoriesOrder . push ( repository ) ;
206226 } ;
207227
@@ -389,15 +409,39 @@ export class R2Registry implements Registry {
389409 } ;
390410 }
391411
412+ const key = isReference ( res ) ;
413+ let [ digest , size ] = [ "" , 0 ] ;
414+ if ( key ) {
415+ const [ res , err ] = await wrap ( this . env . REGISTRY . head ( key ) ) ;
416+ if ( err ) {
417+ return wrapError ( "layerExists" , err ) ;
418+ }
419+
420+ if ( ! res ) {
421+ return { exists : false } ;
422+ }
423+
424+ if ( ! res . customMetadata ) throw new Error ( "unreachable" ) ;
425+ if ( ! res . customMetadata [ digestHeaderInReference ] ) throw new Error ( "unreachable" ) ;
426+ const possibleDigest = res . customMetadata [ digestHeaderInReference ] ;
427+ if ( ! possibleDigest ) throw new Error ( "unreachable, no digest" ) ;
428+
429+ digest = possibleDigest ;
430+ size = res . size ;
431+ } else {
432+ digest = hexToDigest ( res . checksums . sha256 ! ) ;
433+ size = res . size ;
434+ }
435+
392436 return {
393- digest : hexToDigest ( res . checksums . sha256 ! ) ,
394- size : res . size ,
437+ digest,
438+ size,
395439 exists : true ,
396440 } ;
397441 }
398442
399443 async getLayer ( name : string , digest : string ) : Promise < RegistryError | GetLayerResponse > {
400- const [ res , err ] = await wrap ( this . env . REGISTRY . get ( `${ name } /blobs/${ digest } ` ) ) ;
444+ let [ res , err ] = await wrap ( this . env . REGISTRY . get ( `${ name } /blobs/${ digest } ` ) ) ;
401445 if ( err ) {
402446 return wrapError ( "getLayer" , err ) ;
403447 }
@@ -408,9 +452,24 @@ export class R2Registry implements Registry {
408452 } ;
409453 }
410454
455+ const id = isReference ( res ) ;
456+ if ( id ) {
457+ [ res , err ] = await wrap ( this . env . REGISTRY . get ( id ) ) ;
458+ if ( err ) {
459+ return wrapError ( "getLayer" , err ) ;
460+ }
461+
462+ if ( ! res ) {
463+ // not a 500, because garbage collection deletes the underlying layer first
464+ return {
465+ response : new Response ( JSON . stringify ( BlobUnknownError ) , { status : 404 } ) ,
466+ } ;
467+ }
468+ }
469+
411470 return {
412471 stream : res . body ! ,
413- digest : hexToDigest ( res . checksums . sha256 ! ) ,
472+ digest,
414473 size : res . size ,
415474 } ;
416475 }
@@ -419,7 +478,9 @@ export class R2Registry implements Registry {
419478 // Generate a unique ID for this upload
420479 const uuid = crypto . randomUUID ( ) ;
421480
422- const upload = await this . env . REGISTRY . createMultipartUpload ( uuid ) ;
481+ const upload = await this . env . REGISTRY . createMultipartUpload ( uuid , {
482+ customMetadata : { [ registryUploadKey ] : "true" } ,
483+ } ) ;
423484 const state = {
424485 uploadId : upload . uploadId ,
425486 parts : [ ] ,
@@ -691,12 +752,55 @@ export class R2Registry implements Registry {
691752 // TODO: Handle one last buffer here
692753 await upload . complete ( state . parts ) ;
693754 const obj = await this . env . REGISTRY . get ( uuid ) ;
694- const put = this . env . REGISTRY . put ( `${ namespace } /blobs/${ expectedSha } ` , obj ! . body , {
695- sha256 : ( expectedSha as string ) . slice ( SHA256_PREFIX_LEN ) ,
696- } ) ;
755+ if ( obj === null ) {
756+ console . error ( "unreachable, obj is null when we just created upload" ) ;
757+ return {
758+ response : new InternalError ( ) ,
759+ } ;
760+ }
697761
698- await put ;
699- await this . env . REGISTRY . delete ( uuid ) ;
762+ const target = `${ namespace } /blobs/${ expectedSha } ` ;
763+ const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000 ;
764+ // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
765+ // stream and create a reference from the blobs path to the
766+ // upload path. In R2, moving objects mean copying the stream, which
767+ // doesn't really work if it's above 5GB due to R2 limits.
768+ if ( obj . size >= MAXIMUM_SIZE_R2_OBJECT ) {
769+ const compressionStream = new crypto . DigestStream ( "SHA-256" ) ;
770+ obj . body . pipeTo ( compressionStream ) ;
771+ const digest = hexToDigest ( await compressionStream . digest ) ;
772+ if ( digest !== expectedSha ) {
773+ return { response : new RegistryResponseJSON ( JSON . stringify ( DigestInvalid ( expectedSha , digest ) ) ) } ;
774+ }
775+
776+ const [ , err ] = await wrap (
777+ this . env . REGISTRY . put ( target , uuid , {
778+ customMetadata : {
779+ [ referenceHeader ] : uuid ,
780+ [ digestHeaderInReference ] : digest ,
781+ } ,
782+ } ) ,
783+ ) ;
784+ if ( err !== null ) {
785+ console . error ( "error uploading reference blob" , errorString ( err ) ) ;
786+ await this . env . REGISTRY . delete ( uuid ) ;
787+ return {
788+ response : new InternalError ( ) ,
789+ } ;
790+ }
791+ } else {
792+ const put = this . env . REGISTRY . put ( target , obj ! . body , {
793+ sha256 : ( expectedSha as string ) . slice ( SHA256_PREFIX_LEN ) ,
794+ } ) ;
795+ const [ , err ] = await wrap ( put ) ;
796+ await this . env . REGISTRY . delete ( uuid ) ;
797+ if ( err !== null ) {
798+ console . error ( "error uploading blob" , errorString ( err ) ) ;
799+ return {
800+ response : new InternalError ( ) ,
801+ } ;
802+ }
803+ }
700804 }
701805
702806 await this . env . REGISTRY . delete ( getRegistryUploadsPath ( state ) ) ;
0 commit comments