@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
1717use std:: collections:: btree_map:: Entry ;
1818
1919use itertools:: Itertools ;
20- use quickwit_proto:: indexing:: CpuCapacity ;
20+ use quickwit_proto:: indexing:: { CpuCapacity , PIPELINE_FULL_CAPACITY } ;
2121
2222use super :: scheduling_logic_model:: * ;
2323use crate :: indexing_scheduler:: scheduling:: inflate_node_capacities_if_necessary;
@@ -356,8 +356,11 @@ fn place_unassigned_shards_single_source(
356356) -> Result < ( ) , NotEnoughCapacity > {
357357 let mut num_shards = source. num_shards ;
358358 // To ensure that merges can keep up, try not to assign more than 3
359- // shards per indexer for a source (except if there aren't enough nodes)
360- let limit_num_shards_per_indexer_per_source = 3 . max ( num_shards. div_ceil ( num_indexers as u32 ) ) ;
359+ // pipelines per indexer for a source (except if there aren't enough nodes)
360+ let target_limit_num_shards_per_indexer_per_source =
361+ 3 * PIPELINE_FULL_CAPACITY . cpu_millis ( ) / source. load_per_shard . get ( ) ;
362+ let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source
363+ . max ( num_shards. div_ceil ( num_indexers as u32 ) ) ;
361364 while num_shards > 0 {
362365 let Some ( ( indexer_ord, available_capacity) ) = indexer_with_capacities. next ( ) else {
363366 return Err ( NotEnoughCapacity ) ;
@@ -608,6 +611,21 @@ mod tests {
608611 problem. add_source ( 4 , NonZeroU32 :: new ( 1_000 ) . unwrap ( ) ) ;
609612 problem. add_source ( 4 , NonZeroU32 :: new ( 1_000 ) . unwrap ( ) ) ;
610613 problem. inc_affinity ( 0 , 1 ) ;
614+ problem. inc_affinity ( 1 , 0 ) ;
615+ let mut solution = problem. new_solution ( ) ;
616+ place_unassigned_shards_with_affinity ( & problem, & mut solution) ;
617+ assert_eq ! ( solution. indexer_assignments[ 0 ] . num_shards( 1 ) , 4 ) ;
618+ assert_eq ! ( solution. indexer_assignments[ 1 ] . num_shards( 0 ) , 4 ) ;
619+ }
620+
621+ #[ test]
622+ fn test_limit_placement_to_three_pipelines ( ) {
623+ let mut problem =
624+ SchedulingProblem :: with_indexer_cpu_capacities ( vec ! [ mcpu( 16_000 ) , mcpu( 16_000 ) ] ) ;
625+ let pipeline_full_capacity = NonZeroU32 :: new ( PIPELINE_FULL_CAPACITY . cpu_millis ( ) ) . unwrap ( ) ;
626+ problem. add_source ( 4 , pipeline_full_capacity) ;
627+ problem. add_source ( 4 , pipeline_full_capacity) ;
628+ problem. inc_affinity ( 0 , 1 ) ;
611629 problem. inc_affinity ( 0 , 1 ) ;
612630 problem. inc_affinity ( 0 , 0 ) ;
613631 problem. inc_affinity ( 1 , 0 ) ;
0 commit comments