@@ -26,7 +26,7 @@ use rdkafka::{
2626 producer:: { FutureProducer , FutureRecord } ,
2727 statistics:: Statistics ,
2828 util:: Timeout ,
29- TopicPartitionList ,
29+ TopicPartitionList , topic_partition_list :: TopicPartitionListElem , groups :: GroupList , metadata :: Metadata ,
3030} ;
3131
3232use futures:: { future, stream:: StreamExt } ;
@@ -342,6 +342,7 @@ pub struct PrometheusConfig {
342342#[ derive( Debug , Serialize , Deserialize , Clone ) ]
343343pub struct ObserverConfig {
344344 client : String ,
345+ group_id : Option < String > ,
345346 topics : Vec < String > ,
346347 show_progress_interval_secs : Option < u64 > ,
347348 fetch_timeout_secs : Option < u64 > ,
@@ -362,7 +363,7 @@ pub struct Observer {
362363
363364 last_status_time : Instant ,
364365 last_update_time : Instant ,
365- last_results : HashMap < String , i64 > ,
366+ last_results : HashMap < String , ( i64 , i64 ) > ,
366367
367368 metrics : Arc < Mutex < metrics:: ObserverMetrics > > ,
368369}
@@ -449,8 +450,7 @@ impl Observer {
449450 let difference = Instant :: now ( ) . duration_since ( self . last_update_time ) ;
450451
451452 if self . get_fetch_interval ( ) . as_secs ( ) > 0 && difference > self . get_fetch_interval ( ) {
452- let results = self . update_current_status ( ) ;
453- self . last_results = results;
453+ self . last_results = self . update_current_status ( ) ;
454454 self . last_update_time = Instant :: now ( ) ;
455455 }
456456 }
@@ -461,8 +461,13 @@ impl Observer {
461461
462462 if self . get_show_progress_interval ( ) . as_secs ( ) > 0 && difference > self . get_show_progress_interval ( ) {
463463
464- for ( name, count) in self . last_results . iter ( ) {
465- info ! ( "\" {:}\" : topic \" {:}\" has {:} message(s)" , self . name, name, count) ;
464+ for ( name, ( count, remaining) ) in self . last_results . iter ( ) {
465+ if let Some ( group_id) = self . group_id . clone ( ) {
466+ info ! ( "\" {:}\" : topic=\" {:}\" group=\" {:}\" messages={:} remaining={:}" , self . name, name, group_id, count, remaining) ;
467+
468+ } else {
469+ info ! ( "\" {:}\" : topic \" {:}\" messages={:} remaining={:}" , self . name, name, count, remaining) ;
470+ }
466471 }
467472 self . last_status_time = Instant :: now ( ) ;
468473 }
@@ -489,9 +494,9 @@ impl Observer {
489494 )
490495 }
491496
492- pub fn update_current_status ( & self ) -> HashMap < String , i64 > {
497+ pub fn update_current_status ( & self ) -> HashMap < String , ( i64 , i64 ) > {
493498
494- let mut results: HashMap < String , i64 > = HashMap :: new ( ) ;
499+ let mut results: HashMap < String , ( i64 , i64 ) > = HashMap :: new ( ) ;
495500
496501 let topic: Option < & str > =
497502 if self . topics . len ( ) == 1 {
@@ -500,15 +505,48 @@ impl Observer {
500505 None
501506 } ;
502507
503- let metadata = self
508+ let metadata: Metadata = self
504509 . client
505510 . fetch_metadata ( topic, self . get_fetch_timeout ( ) )
506511 . expect ( "Failed to fetch metadata" ) ;
507512
513+ // let groups: GroupList = self.client.fetch_group_list(None, Duration::from_secs(20)).unwrap();
514+
515+ // for item in groups.groups() {
516+ // debug!("Group name: {:}", item.name());
517+
518+ // }
519+
508520 for topic in metadata. topics ( ) . iter ( ) {
509521 if self . topics_set . contains ( topic. name ( ) ) || self . topics_set . len ( ) == 0 {
522+
523+
524+ let tp_map: HashMap < ( String , i32 ) , rdkafka:: Offset > = topic. partitions ( )
525+ . iter ( )
526+ . map ( |partition_md|{
527+ ( ( topic. name ( ) . to_string ( ) , partition_md. id ( ) ) , rdkafka:: Offset :: Stored )
528+ } ) . collect ( ) ;
529+
530+ let tpl = TopicPartitionList :: from_topic_map ( & tp_map) ;
531+ let commited_offsets = self . client . committed_offsets ( tpl, self . get_fetch_timeout ( ) ) . unwrap_or ( TopicPartitionList :: new ( ) ) ;
532+
533+
510534 let mut topic_message_count = 0 ;
535+ let mut topic_remaining_count = 0 ;
511536 for partition in topic. partitions ( ) {
537+
538+ let tpl_item: Option < TopicPartitionListElem > = commited_offsets. find_partition ( topic. name ( ) , partition. id ( ) ) ;
539+
540+ let mut current_partition_offset = 0 ;
541+ if let Some ( value) = tpl_item {
542+ match value. offset ( ) {
543+ rdkafka:: Offset :: Offset ( offset) => {
544+ current_partition_offset = offset;
545+ } ,
546+ _ => { }
547+ }
548+ }
549+
512550 let ( low, high) = self
513551 . client
514552 . fetch_watermarks (
@@ -520,6 +558,9 @@ impl Observer {
520558 let partition_message_count = high - low;
521559 topic_message_count += partition_message_count;
522560
561+ let partition_remaining_count = high - current_partition_offset;
562+ topic_remaining_count += partition_remaining_count;
563+
523564 let labels = [ topic. name ( ) , & partition. id ( ) . to_string ( ) ] ;
524565
525566 match self . metrics . lock ( ) {
@@ -530,6 +571,13 @@ impl Observer {
530571 guard. partition_end_offset . with_label_values ( & labels) . set ( high) ;
531572
532573 guard. number_of_records_for_partition . with_label_values ( & labels) . set ( partition_message_count) ;
574+
575+ if let Some ( group_id) = self . group_id . clone ( ) {
576+ let labels = [ topic. name ( ) , & partition. id ( ) . to_string ( ) , & group_id] ;
577+ guard. commited_offset . with_label_values ( & labels) . set ( current_partition_offset) ;
578+ guard. remaining_by_partition . with_label_values ( & labels) . set ( partition_remaining_count) ;
579+
580+ }
533581 } ,
534582 Err ( _poisoned) => {
535583 error ! ( "Can't acquire metrics lock for topic={:} and partition={:}" , topic. name( ) , & partition. id( ) ) ;
@@ -540,7 +588,7 @@ impl Observer {
540588
541589 }
542590
543- results. insert ( topic. name ( ) . to_string ( ) , topic_message_count) ;
591+ results. insert ( topic. name ( ) . to_string ( ) , ( topic_message_count, topic_remaining_count ) ) ;
544592
545593 match self . metrics . lock ( ) {
546594 Ok ( guard) => {
@@ -550,6 +598,10 @@ impl Observer {
550598 guard. number_of_records_total . with_label_values ( & [ topic. name ( ) ] ) . set ( topic_message_count) ;
551599 guard. last_fetch_ts . with_label_values ( & [ topic. name ( ) ] ) . set ( since_the_epoch. as_secs_f64 ( ) ) ;
552600
601+ if let Some ( group_id) = self . group_id . clone ( ) {
602+ guard. remaining_for_topic . with_label_values ( & [ topic. name ( ) , & group_id] ) . set ( topic_remaining_count) ;
603+ }
604+
553605 } ,
554606 Err ( _) => {
555607 error ! ( "Can't acquire metrics lock for topic={:}" , topic. name( ) )
@@ -720,7 +772,7 @@ impl Pipeline {
720772 }
721773
722774 } ,
723- Err ( poisoned ) => {
775+ Err ( _poisoned ) => {
724776 error ! ( "Can't acquire metrics for {:} {:} [ {:} ] -> {:} [ {:} ]" ,
725777 self . name,
726778 self . upstream_client_name,
@@ -1005,11 +1057,12 @@ impl Config {
10051057 . iter ( )
10061058 . enumerate ( )
10071059 . map ( |( i, x) | {
1008- let client: ClientConfig = self . create_client_config ( & x. client , None ) ;
1060+
1061+ let client: ClientConfig = self . create_client_config ( & x. client , ( x. group_id ) . as_deref ( ) ) ;
10091062
10101063 Observer {
10111064 client : client. create ( ) . expect ( "Can't create consumer" ) ,
1012- group_id : None ,
1065+ group_id : x . group_id . clone ( ) ,
10131066 topics : x. topics . clone ( ) ,
10141067 topics_set : x. topics . iter ( ) . cloned ( ) . collect ( ) ,
10151068 show_progress_interval_secs : x. show_progress_interval_secs . clone ( ) ,
0 commit comments