diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 2584a725e79..c7ea403b7f2 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -35,7 +35,8 @@ has_for_source_in_mnesia/1, has_for_source_in_khepri/1, match_source_and_destination_in_khepri_tx/2, - clear_in_khepri/0 + clear_in_khepri/0, + khepri_ret_to_deletions/2 ]). -export([ @@ -201,6 +202,14 @@ create_in_khepri(#binding{source = SrcName, case ChecksFun(Src, Dst) of ok -> RoutePath = khepri_route_path(Binding), + DstPath = case DstName of + #resource{kind = queue} -> + rabbit_db_queue:khepri_queue_path(DstName); + #resource{kind = exchange} -> + rabbit_db_exchange:khepri_exchange_path(DstName) + end, + KeepWhile = #{DstPath => #if_node_exists{}}, + PutOptions = #{keep_while => KeepWhile}, MaybeSerial = rabbit_exchange:serialise_events(Src), Serial = rabbit_khepri:transaction( fun() -> @@ -210,11 +219,17 @@ create_in_khepri(#binding{source = SrcName, true -> already_exists; false -> - ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)), + ok = khepri_tx:put( + RoutePath, + sets:add_element(Binding, Set), + PutOptions), serial_in_khepri(MaybeSerial, Src) end; _ -> - ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))), + ok = khepri_tx:put( + RoutePath, + sets:add_element(Binding, sets:new([{version, 2}])), + PutOptions), serial_in_khepri(MaybeSerial, Src) end end, rw), @@ -906,6 +921,7 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na Name, ?KHEPRI_WILDCARD_STAR), %% RoutingKey {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), + % logger:alert("BindingsMap = ~p", [BindingsMap]), Bindings = maps:fold( fun(Path, Props, Acc) -> case {Path, Props} of @@ -920,6 +936,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). +khepri_ret_to_deletions(Deleted, OnlyDurable) -> + Bindings0 = maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + _VHost, _SrcName, _Kind, _Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], Deleted), + Bindings1 = lists:keysort(#binding.source, Bindings0), + rabbit_binding:group_bindings_fold( + fun(XName, Bindings, Deletions, _OnlyDurable) -> + ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName), + case Deleted of + #{ExchangePath := #{data := X}} -> + rabbit_binding:add_deletion( + XName, X, deleted, Bindings, Deletions); + _ -> + case rabbit_db_exchange:get(XName) of + {ok, X} -> + rabbit_binding:add_deletion( + XName, X, not_deleted, Bindings, Deletions); + _ -> + Deletions + end + end + end, + Bindings1, OnlyDurable). + %% ------------------------------------------------------------------- %% delete_transient_for_destination_in_mnesia(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 548ab78f696..981c47bc372 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -414,7 +414,24 @@ create_or_get_in_khepri(#exchange{name = XName} = X) -> Path0, [#if_any{conditions = [#if_node_exists{exists = false}, #if_has_payload{has_payload = false}]}]), - case rabbit_khepri:put(Path1, X) of + Options = case X of + #exchange{name = #resource{virtual_host = VHost, + name = Name}, + auto_delete = true} -> + Path = rabbit_db_binding:khepri_route_path( + VHost, + Name, + _Kind = ?KHEPRI_WILDCARD_STAR, + _DstName = ?KHEPRI_WILDCARD_STAR, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + KeepWhile = #{Path => #if_all{conditions = + [#if_node_exists{}, + #if_has_data{}]}}, + #{keep_while => KeepWhile}; + _ -> + #{} + end, + case rabbit_khepri:put(Path1, X, Options) of ok -> {new, X}; {error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} -> diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 2fe9fbccf2f..cd625dcfe5c 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -411,28 +411,46 @@ delete_in_khepri(QueueName) -> delete_in_khepri(QueueName, false). delete_in_khepri(QueueName, OnlyDurable) -> - rabbit_khepri:transaction( - fun () -> - Path = khepri_queue_path(QueueName), - UsesUniformWriteRet = try - khepri_tx:does_api_comply_with(uniform_write_ret) - catch - error:undef -> - false - end, - case khepri_tx_adv:delete(Path) of - {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> - %% we want to execute some things, as decided by rabbit_exchange, - %% after the transaction. - rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); - {ok, #{data := _}} when not UsesUniformWriteRet -> - %% we want to execute some things, as decided by rabbit_exchange, - %% after the transaction. - rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); - {ok, _} -> - ok - end - end, rw). + Path = khepri_queue_path(QueueName), + FeatureFlag = true, + case FeatureFlag of + true -> + case khepri_adv:delete(Path) of + {ok, #{Path := #{data := _}} = Deleted} -> + %% we want to execute some things, as decided by + %% rabbit_exchange, after the transaction. + rabbit_db_binding:khepri_ret_to_deletions( + Deleted, OnlyDurable); + {ok, _} -> + ok; + {error, _} = Error -> + Error + end; + false -> + UsesUniformWriteRet = try + khepri_tx:does_api_comply_with(uniform_write_ret) + catch + error:undef -> + false + end, + rabbit_khepri:transaction( + fun () -> + Ret1 = khepri_tx_adv:delete(Path), + % logger:alert("Deleted queue ret = ~p", [Ret1]), + case Ret1 of + {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + {ok, #{data := _}} when not UsesUniformWriteRet -> + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + {ok, _} -> + ok + end + end, rw) + end. %% ------------------------------------------------------------------- %% internal_delete(). diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 61891fc7b63..9384550b734 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -46,7 +46,8 @@ dep_credentials_obfuscation = hex 3.5.0 dep_cuttlefish = hex 3.6.0 dep_gen_batch_server = hex 0.8.8 dep_jose = hex 1.11.10 -dep_khepri = hex 0.17.2 +#dep_khepri = hex 0.17.2 +dep_khepri = git https://github.com/rabbitmq/khepri.git skip-keep_while-on-children-when-parent-created dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1