@@ -931,22 +931,38 @@ class NamespaceFS {
931931 async read_object_md ( params , object_sdk ) {
932932 const fs_context = this . prepare_fs_context ( object_sdk ) ;
933933 let file_path ;
934+ let stat ;
935+ let isDir ;
936+ let retries = ( this . _is_versioning_enabled ( ) || this . _is_versioning_suspended ( ) ) ? config . NSFS_RENAME_RETRIES : 0 ;
937+ const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
934938 try {
935- file_path = await this . _find_version_path ( fs_context , params , true ) ;
936- await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
937- await this . _load_bucket ( params , fs_context ) ;
938- let stat = await nb_native ( ) . fs . stat ( fs_context , file_path ) ;
939-
940- const isDir = native_fs_utils . isDirectory ( stat ) ;
941- if ( isDir ) {
942- if ( ! stat . xattr ?. [ XATTR_DIR_CONTENT ] || ! params . key . endsWith ( '/' ) ) {
943- throw error_utils . new_error_code ( 'ENOENT' , 'NoSuchKey' ) ;
939+ for ( ; ; ) {
940+ try {
941+ file_path = await this . _find_version_path ( fs_context , params , true ) ;
942+ await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
943+ await this . _load_bucket ( params , fs_context ) ;
944+ stat = await nb_native ( ) . fs . stat ( fs_context , file_path ) ;
945+ isDir = native_fs_utils . isDirectory ( stat ) ;
946+ if ( isDir ) {
947+ if ( ! stat . xattr ?. [ XATTR_DIR_CONTENT ] || ! params . key . endsWith ( '/' ) ) {
948+ throw error_utils . new_error_code ( 'ENOENT' , 'NoSuchKey' ) ;
944949 } else if ( stat . xattr ?. [ XATTR_DIR_CONTENT ] !== '0' ) {
945950 // find dir object content file path and return its stat + xattr of its parent directory
946951 const dir_content_path = await this . _find_version_path ( fs_context , params ) ;
947952 const dir_content_path_stat = await nb_native ( ) . fs . stat ( fs_context , dir_content_path ) ;
948953 const xattr = stat . xattr ;
949954 stat = { ...dir_content_path_stat , xattr } ;
955+ }
956+ }
957+ if ( this . _is_mismatch_version_id ( stat , params . version_id ) ) {
958+ dbg . warn ( 'NamespaceFS.read_object_md mismatch version_id' , file_path , params . version_id , this . _get_version_id_by_xattr ( stat ) ) ;
959+ throw error_utils . new_error_code ( 'MISMATCH_VERSION' , 'file version does not match the version we asked for' ) ;
960+ }
961+ break ;
962+ } catch ( err ) {
963+ dbg . warn ( `NamespaceFS.read_object_md: retrying retries=${ retries } file_path=${ file_path } ` , err ) ;
964+ retries -= 1 ;
965+ if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
950966 }
951967 }
952968 this . _throw_if_delete_marker ( stat , params ) ;
@@ -959,42 +975,70 @@ class NamespaceFS {
959975 }
960976 }
961977
978+ async _is_empty_directory_content ( file_path , fs_context , params ) {
979+ const is_dir_content = this . _is_directory_content ( file_path , params . key ) ;
980+ if ( is_dir_content ) {
981+ try {
982+ const md_path = this . _get_file_md_path ( params ) ;
983+ const dir_stat = await nb_native ( ) . fs . stat ( fs_context , md_path ) ;
984+ if ( dir_stat && dir_stat . xattr [ XATTR_DIR_CONTENT ] === '0' ) return true ;
985+ } catch ( err ) {
986+ //failed to get object
987+ new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
988+ { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
989+ dbg . log0 ( 'NamespaceFS: read_object_stream couldnt find dir content xattr' , err ) ;
990+ }
991+ }
992+ return false ;
993+ }
994+
962995 // eslint-disable-next-line max-statements
963996 async read_object_stream ( params , object_sdk , res ) {
964- let file ;
965997 let buffer_pool_cleanup = null ;
966998 const fs_context = this . prepare_fs_context ( object_sdk ) ;
967999 let file_path ;
1000+ let file ;
9681001 try {
9691002 await this . _load_bucket ( params , fs_context ) ;
1003+ let retries = ( this . _is_versioning_enabled ( ) || this . _is_versioning_suspended ( ) ) ? config . NSFS_RENAME_RETRIES : 0 ;
1004+ const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
1005+ let stat ;
1006+ for ( ; ; ) {
1007+ try {
9701008 file_path = await this . _find_version_path ( fs_context , params ) ;
9711009 await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
9721010
9731011 // NOTE: don't move this code after the open
9741012 // this can lead to ENOENT failures due to file not exists when content size is 0
9751013 // if entry is a directory object and its content size = 0 - return empty response
976- const is_dir_content = this . _is_directory_content ( file_path , params . key ) ;
977- if ( is_dir_content ) {
978- try {
979- const md_path = this . _get_file_md_path ( params ) ;
980- const dir_stat = await nb_native ( ) . fs . stat ( fs_context , md_path ) ;
981- if ( dir_stat && dir_stat . xattr [ XATTR_DIR_CONTENT ] === '0' ) return null ;
982- } catch ( err ) {
983- //failed to get object
984- new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
985- { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
986- dbg . log0 ( 'NamespaceFS: read_object_stream couldnt find dir content xattr' , err ) ;
987- }
988- }
1014+ if ( await this . _is_empty_directory_content ( file_path , fs_context , params ) ) return null ;
9891015
9901016 file = await nb_native ( ) . fs . open (
9911017 fs_context ,
9921018 file_path ,
9931019 config . NSFS_OPEN_READ_MODE ,
9941020 native_fs_utils . get_umasked_mode ( config . BASE_MODE_FILE ) ,
9951021 ) ;
996-
997- const stat = await file . stat ( fs_context ) ;
1022+ stat = await file . stat ( fs_context ) ;
1023+ if ( this . _is_mismatch_version_id ( stat , params . version_id ) ) {
1024+ dbg . warn ( 'NamespaceFS.read_object_stream mismatch version_id' , params . version_id , this . _get_version_id_by_xattr ( stat ) ) ;
1025+ throw error_utils . new_error_code ( 'MISMATCH_VERSION' , 'file version does not match the version we asked for' ) ;
1026+ }
1027+ break ;
1028+ } catch ( err ) {
1029+ dbg . warn ( `NamespaceFS.read_object_stream: retrying retries=${ retries } file_path=${ file_path } ` , err ) ;
1030+ if ( file ) {
1031+ await file . close ( fs_context ) ;
1032+ file = null ;
1033+ }
1034+ retries -= 1 ;
1035+ if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) {
1036+ new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
1037+ { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
1038+ throw err ;
1039+ }
1040+ }
1041+ }
9981042 this . _throw_if_delete_marker ( stat , params ) ;
9991043 // await this._fail_if_archived_or_sparse_file(fs_context, file_path, stat);
10001044
@@ -2755,11 +2799,15 @@ class NamespaceFS {
27552799 }
27562800 }
27572801
2802+ _is_mismatch_version_id ( stat , version_id ) {
2803+ return version_id && ! this . _is_versioning_disabled ( ) && this . _get_version_id_by_xattr ( stat ) !== version_id ;
2804+ }
2805+
27582806 /**
2759- * _delete_single_object_versioned does the following -
2807+ * _delete_single_object_versioned does the following -
27602808 * if the deleted version is the latest - try to delete it from the latest version location
27612809 * if the deleted version is in .versions/ - unlink the version
2762- * we call check_version_moved() in case of concurrent puts, the version might move to .versions/
2810+ * we call check_version_moved() in case of concurrent puts, the version might move to .versions/
27632811 * if the version moved we will retry
27642812 * @param {nb.NativeFSContext } fs_context
27652813 * @param {string } key
@@ -2964,8 +3012,8 @@ class NamespaceFS {
29643012 const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
29653013 if ( this . _is_versioning_enabled ( ) || suspended_and_latest_is_not_null ) {
29663014 await native_fs_utils . _make_path_dirs ( versioned_path , fs_context ) ;
2967- await native_fs_utils . safe_move ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
2968- gpfs_options ?. delete_version , bucket_tmp_dir_path ) ;
3015+ await native_fs_utils . safe_move_posix ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
3016+ bucket_tmp_dir_path ) ;
29693017 if ( suspended_and_latest_is_not_null ) {
29703018 // remove a version (or delete marker) with null version ID from .versions/ (if exists)
29713019 await this . _delete_null_version_from_versions_directory ( params . key , fs_context ) ;
@@ -3136,7 +3184,7 @@ class NamespaceFS {
31363184 dst_file = await native_fs_utils . open_file ( fs_context , this . bucket_path , dst_path , 'r' ) ;
31373185 }
31383186 return {
3139- move_to_versions : { src_file : dst_file , dir_file, dst_file : versioned_file } ,
3187+ move_to_versions : { src_file : dst_file , dir_file, should_override : false } ,
31403188 move_to_dst : { src_file, dst_file, dir_file, versioned_file }
31413189 } ;
31423190 } catch ( err ) {
0 commit comments