@@ -2990,82 +2990,169 @@ FetchResult Executor::fetchChunks(
29902990 std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
29912991 std::vector<std::vector<int64_t >> all_num_rows;
29922992 std::vector<std::vector<uint64_t >> all_frag_offsets;
2993- for (const auto & selected_frag_ids : frag_ids_crossjoin) {
2994- std::vector<const int8_t *> frag_col_buffers (
2995- plan_state_->global_to_local_col_ids_ .size ());
2996- for (const auto & col_id : col_global_ids) {
2997- if (interrupted_.load ()) {
2998- throw QueryExecutionError (ERR_INTERRUPTED);
2999- }
3000- CHECK (col_id);
3001- if (col_id->isVirtual ()) {
3002- continue ;
3003- }
3004- const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3005- CHECK (fragments_it != all_tables_fragments.end ());
3006- const auto fragments = fragments_it->second ;
3007- auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3008- CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3009- CHECK_LT (static_cast <size_t >(it->second ),
3010- plan_state_->global_to_local_col_ids_ .size ());
3011- const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3012- if (!fragments->size ()) {
3013- return {};
3014- }
3015- auto memory_level_for_column = memory_level;
3016- if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3017- plan_state_->columns_to_fetch_ .end ()) {
3018- memory_level_for_column = Data_Namespace::CPU_LEVEL;
3019- }
3020- if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3021- // determine if we need special treatment to linearlize multi-frag table
3022- // i.e., a column that is classified as varlen type, i.e., array
3023- // for now, we can support more types in this way
3024- if (needLinearizeAllFragments (
3025- *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3026- bool for_lazy_fetch = false ;
3027- if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3028- plan_state_->columns_to_not_fetch_ .end ()) {
3029- for_lazy_fetch = true ;
3030- VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3031- << col_id->getColId () << " )" ;
2993+ if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2994+ std::mutex all_frag;
2995+ tbb::task_arena limitedArena (16 );
2996+ limitedArena.execute ([&]() {
2997+ tbb::parallel_for_each (
2998+ frag_ids_crossjoin.begin (),
2999+ frag_ids_crossjoin.end (),
3000+ [&](const std::vector<size_t >& selected_frag_ids) {
3001+ // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3002+ std::vector<const int8_t *> frag_col_buffers (
3003+ plan_state_->global_to_local_col_ids_ .size ());
3004+ for (const auto & col_id : col_global_ids) {
3005+ if (interrupted_.load ()) {
3006+ throw QueryExecutionError (ERR_INTERRUPTED);
3007+ }
3008+ CHECK (col_id);
3009+ if (col_id->isVirtual ()) {
3010+ continue ;
3011+ }
3012+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3013+ CHECK (fragments_it != all_tables_fragments.end ());
3014+ const auto fragments = fragments_it->second ;
3015+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3016+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3017+ CHECK_LT (static_cast <size_t >(it->second ),
3018+ plan_state_->global_to_local_col_ids_ .size ());
3019+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3020+ if (!fragments->size ()) {
3021+ continue ;
3022+ }
3023+ auto memory_level_for_column = memory_level;
3024+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3025+ plan_state_->columns_to_fetch_ .end ()) {
3026+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3027+ }
3028+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3029+ // determine if we need special treatment to linearlize multi-frag table
3030+ // i.e., a column that is classified as varlen type, i.e., array
3031+ // for now, we can support more types in this way
3032+ if (needLinearizeAllFragments (
3033+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3034+ bool for_lazy_fetch = false ;
3035+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3036+ plan_state_->columns_to_not_fetch_ .end ()) {
3037+ for_lazy_fetch = true ;
3038+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3039+ << col_id->getColId () << " )" ;
3040+ }
3041+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3042+ col_id->getColInfo (),
3043+ all_tables_fragments,
3044+ chunks,
3045+ chunk_iterators,
3046+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3047+ for_lazy_fetch ? 0 : device_id,
3048+ device_allocator,
3049+ thread_idx);
3050+ } else {
3051+ frag_col_buffers[it->second ] =
3052+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3053+ all_tables_fragments,
3054+ memory_level_for_column,
3055+ device_id,
3056+ device_allocator,
3057+ thread_idx);
3058+ }
3059+ } else {
3060+ frag_col_buffers[it->second ] =
3061+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3062+ frag_id,
3063+ all_tables_fragments,
3064+ chunks,
3065+ chunk_iterators,
3066+ memory_level_for_column,
3067+ device_id,
3068+ device_allocator);
3069+ }
3070+ }
3071+ all_frag.lock ();
3072+ all_frag_col_buffers.push_back (frag_col_buffers);
3073+ all_frag.unlock ();
3074+ });
3075+ });
3076+ } else {
3077+ for (const auto & selected_frag_ids : frag_ids_crossjoin) {
3078+ std::vector<const int8_t *> frag_col_buffers (
3079+ plan_state_->global_to_local_col_ids_ .size ());
3080+ for (const auto & col_id : col_global_ids) {
3081+ if (interrupted_.load ()) {
3082+ throw QueryExecutionError (ERR_INTERRUPTED);
3083+ }
3084+ CHECK (col_id);
3085+ if (col_id->isVirtual ()) {
3086+ continue ;
3087+ }
3088+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3089+ CHECK (fragments_it != all_tables_fragments.end ());
3090+ const auto fragments = fragments_it->second ;
3091+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3092+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3093+ CHECK_LT (static_cast <size_t >(it->second ),
3094+ plan_state_->global_to_local_col_ids_ .size ());
3095+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3096+ if (!fragments->size ()) {
3097+ return {};
3098+ }
3099+ auto memory_level_for_column = memory_level;
3100+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3101+ plan_state_->columns_to_fetch_ .end ()) {
3102+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3103+ }
3104+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3105+ // determine if we need special treatment to linearlize multi-frag table
3106+ // i.e., a column that is classified as varlen type, i.e., array
3107+ // for now, we can support more types in this way
3108+ if (needLinearizeAllFragments (
3109+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3110+ bool for_lazy_fetch = false ;
3111+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3112+ plan_state_->columns_to_not_fetch_ .end ()) {
3113+ for_lazy_fetch = true ;
3114+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3115+ << col_id->getColId () << " )" ;
3116+ }
3117+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3118+ col_id->getColInfo (),
3119+ all_tables_fragments,
3120+ chunks,
3121+ chunk_iterators,
3122+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3123+ for_lazy_fetch ? 0 : device_id,
3124+ device_allocator,
3125+ thread_idx);
3126+ } else {
3127+ frag_col_buffers[it->second ] =
3128+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3129+ all_tables_fragments,
3130+ memory_level_for_column,
3131+ device_id,
3132+ device_allocator,
3133+ thread_idx);
3134+ }
3135+ } else {
3136+ auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3137+ frag_col_buffers[it->second ] =
3138+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3139+ frag_id,
3140+ all_tables_fragments,
3141+ chunks,
3142+ chunk_iterators,
3143+ memory_level_for_column,
3144+ device_id,
3145+ device_allocator);
3146+ timer1.stop ();
3147+ }
3148+ }
3149+ all_frag_col_buffers.push_back (frag_col_buffers);
30323150 }
3033- frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3034- col_id->getColInfo (),
3035- all_tables_fragments,
3036- chunks,
3037- chunk_iterators,
3038- for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3039- for_lazy_fetch ? 0 : device_id,
3040- device_allocator,
3041- thread_idx);
3042- } else {
3043- frag_col_buffers[it->second ] =
3044- column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3045- all_tables_fragments,
3046- memory_level_for_column,
3047- device_id,
3048- device_allocator,
3049- thread_idx);
3050- }
3051- } else {
3052- auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3053- frag_col_buffers[it->second ] =
3054- column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3055- frag_id,
3056- all_tables_fragments,
3057- chunks,
3058- chunk_iterators,
3059- memory_level_for_column,
3060- device_id,
3061- device_allocator);
3062- timer1.stop ();
3063- }
3064- }
3065- all_frag_col_buffers.push_back (frag_col_buffers);
30663151 }
30673152 std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
30683153 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3154+ CHECK_EQ (all_num_rows.size (), all_frag_col_buffers.size ());
3155+ CHECK_EQ (all_frag_offsets.size (), all_frag_col_buffers.size ());
30693156 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
30703157}
30713158
0 commit comments