2020import org .neo4j .driver .Query ;
2121import org .neo4j .driver .Record ;
2222import org .neo4j .driver .Value ;
23- import org .neo4j .driver .reactive .RxQueryRunner ;
24- import org .neo4j .driver .reactive .RxResult ;
25- import org .neo4j .driver .reactive .RxSession ;
23+ import org .neo4j .driver .reactive .ReactiveQueryRunner ;
24+ import org .neo4j .driver .reactive .ReactiveResult ;
25+ import org .neo4j .driver .reactive .ReactiveSession ;
2626import org .neo4j .driver .summary .ResultSummary ;
2727import org .neo4j .driver .types .TypeSystem ;
2828import org .springframework .core .convert .ConversionService ;
3636import org .springframework .util .Assert ;
3737import org .springframework .util .StringUtils ;
3838
39+ import reactor .adapter .JdkFlowAdapter ;
3940import reactor .core .publisher .Flux ;
4041import reactor .core .publisher .Mono ;
4142import reactor .util .function .Tuple2 ;
4748import java .util .Map ;
4849import java .util .Optional ;
4950import java .util .Set ;
51+ import java .util .concurrent .Flow .Publisher ;
5052import java .util .concurrent .locks .ReentrantReadWriteLock ;
5153import java .util .function .BiConsumer ;
5254import java .util .function .BiFunction ;
@@ -86,42 +88,42 @@ final class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
8688 }
8789
8890 @ Override
89- public Mono <RxQueryRunner > getQueryRunner (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection ) {
91+ public Mono <ReactiveQueryRunner > getQueryRunner (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection ) {
9092
9193 return databaseSelection .zipWith (userSelection )
9294 .flatMap (targetDatabaseAndUser ->
93- ReactiveNeo4jTransactionManager .retrieveReactiveTransaction (driver , targetDatabaseAndUser .getT1 (), targetDatabaseAndUser .getT2 ())
94- .map (RxQueryRunner .class ::cast )
95- .zipWith (Mono .just (Collections .<Bookmark >emptySet ()))
96- .switchIfEmpty (Mono .fromSupplier (() -> {
97- ReentrantReadWriteLock .ReadLock lock = bookmarksLock .readLock ();
98- try {
99- lock .lock ();
100- Set <Bookmark > lastBookmarks = new HashSet <>(bookmarks );
101- return Tuples .of (driver .rxSession (Neo4jTransactionUtils .sessionConfig (false , lastBookmarks , targetDatabaseAndUser .getT1 (), targetDatabaseAndUser .getT2 ())), lastBookmarks );
102- } finally {
103- lock .unlock ();
104- }
105- })))
106- .map (t -> new DelegatingQueryRunner (t .getT1 (), t .getT2 (), (usedBookmarks , newBookmark ) -> {
95+ ReactiveNeo4jTransactionManager .retrieveReactiveTransaction (driver , targetDatabaseAndUser .getT1 (), targetDatabaseAndUser .getT2 ())
96+ .map (ReactiveQueryRunner .class ::cast )
97+ .zipWith (Mono .just (Collections .<Bookmark >emptySet ()))
98+ .switchIfEmpty (Mono .fromSupplier (() -> {
99+ ReentrantReadWriteLock .ReadLock lock = bookmarksLock .readLock ();
100+ try {
101+ lock .lock ();
102+ Set <Bookmark > lastBookmarks = new HashSet <>(bookmarks );
103+ return Tuples .of (driver .reactiveSession (Neo4jTransactionUtils .sessionConfig (false , lastBookmarks , targetDatabaseAndUser .getT1 (), targetDatabaseAndUser .getT2 ())), lastBookmarks );
104+ } finally {
105+ lock .unlock ();
106+ }
107+ })))
108+ .map (t -> new DelegatingQueryRunner (t .getT1 (), t .getT2 (), (usedBookmarks , newBookmarks ) -> {
107109 ReentrantReadWriteLock .WriteLock lock = bookmarksLock .writeLock ();
108110 try {
109111 lock .lock ();
110112 bookmarks .removeAll (usedBookmarks );
111- bookmarks .add ( newBookmark );
113+ bookmarks .addAll ( newBookmarks );
112114 } finally {
113115 lock .unlock ();
114116 }
115117 }));
116118 }
117119
118- private static class DelegatingQueryRunner implements RxQueryRunner {
120+ private static class DelegatingQueryRunner implements ReactiveQueryRunner {
119121
120- private final RxQueryRunner delegate ;
122+ private final ReactiveQueryRunner delegate ;
121123 private final Collection <Bookmark > usedBookmarks ;
122- private final BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ;
124+ private final BiConsumer <Collection <Bookmark >, Collection < Bookmark > > newBookmarkConsumer ;
123125
124- private DelegatingQueryRunner (RxQueryRunner delegate , Collection <Bookmark > lastBookmarks , BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ) {
126+ private DelegatingQueryRunner (ReactiveQueryRunner delegate , Collection <Bookmark > lastBookmarks , BiConsumer <Collection <Bookmark >, Collection < Bookmark > > newBookmarkConsumer ) {
125127 this .delegate = delegate ;
126128 this .usedBookmarks = lastBookmarks ;
127129 this .newBookmarkConsumer = newBookmarkConsumer ;
@@ -131,47 +133,46 @@ Mono<Void> close() {
131133
132134 // We're only going to close sessions we have acquired inside the client, not something that
133135 // has been retrieved from the tx manager.
134- if (this .delegate instanceof RxSession ) {
135- RxSession session = (RxSession ) this .delegate ;
136- return Mono .fromDirect (session .close ()).then ().doOnSuccess (signal ->
137- this .newBookmarkConsumer .accept (usedBookmarks , session .lastBookmark ()));
136+ if (this .delegate instanceof ReactiveSession session ) {
137+ return JdkFlowAdapter .flowPublisherToFlux (session .close ()).then ().doOnSuccess (signal ->
138+ this .newBookmarkConsumer .accept (usedBookmarks , session .lastBookmarks ()));
138139 }
139140
140141 return Mono .empty ();
141142 }
142143
143144 @ Override
144- public RxResult run (String query , Value parameters ) {
145+ public Publisher < ReactiveResult > run (String query , Value parameters ) {
145146 return delegate .run (query , parameters );
146147 }
147148
148149 @ Override
149- public RxResult run (String query , Map <String , Object > parameters ) {
150+ public Publisher < ReactiveResult > run (String query , Map <String , Object > parameters ) {
150151 return delegate .run (query , parameters );
151152 }
152153
153154 @ Override
154- public RxResult run (String query , Record parameters ) {
155+ public Publisher < ReactiveResult > run (String query , Record parameters ) {
155156 return delegate .run (query , parameters );
156157 }
157158
158159 @ Override
159- public RxResult run (String query ) {
160+ public Publisher < ReactiveResult > run (String query ) {
160161 return delegate .run (query );
161162 }
162163
163164 @ Override
164- public RxResult run (Query query ) {
165+ public Publisher < ReactiveResult > run (Query query ) {
165166 return delegate .run (query );
166167 }
167168 }
168169
169- <T > Mono <T > doInQueryRunnerForMono (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection , Function <RxQueryRunner , Mono <T >> func ) {
170+ <T > Mono <T > doInQueryRunnerForMono (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection , Function <ReactiveQueryRunner , Mono <T >> func ) {
170171
171172 return Mono .usingWhen (getQueryRunner (databaseSelection , userSelection ), func , runner -> ((DelegatingQueryRunner ) runner ).close ());
172173 }
173174
174- <T > Flux <T > doInStatementRunnerForFlux (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection , Function <RxQueryRunner , Flux <T >> func ) {
175+ <T > Flux <T > doInStatementRunnerForFlux (Mono <DatabaseSelection > databaseSelection , Mono <UserSelection > userSelection , Function <ReactiveQueryRunner , Flux <T >> func ) {
175176
176177 return Flux .usingWhen (getQueryRunner (databaseSelection , userSelection ), func , runner -> ((DelegatingQueryRunner ) runner ).close ());
177178 }
@@ -187,7 +188,7 @@ public UnboundRunnableSpec query(Supplier<String> cypherSupplier) {
187188 }
188189
189190 @ Override
190- public <T > OngoingDelegation <T > delegateTo (Function <RxQueryRunner , Mono <T >> callback ) {
191+ public <T > OngoingDelegation <T > delegateTo (Function <ReactiveQueryRunner , Mono <T >> callback ) {
191192 return new DefaultRunnableDelegation <>(callback );
192193 }
193194
@@ -415,27 +416,27 @@ Mono<Tuple2<String, Map<String, Object>>> prepareStatement() {
415416 return Mono .fromSupplier (cypherSupplier ).zipWith (Mono .just (parameters .get ()));
416417 }
417418
418- Flux <T > executeWith (Tuple2 <String , Map <String , Object >> t , RxQueryRunner runner ) {
419+ Flux <T > executeWith (Tuple2 <String , Map <String , Object >> t , ReactiveQueryRunner runner ) {
419420
420- return Flux .usingWhen (Flux . just (runner .run (t .getT1 (), t .getT2 ())),
421- result -> Flux . from (result .records ()).mapNotNull (r -> mappingFunction .apply (typeSystem , r )),
422- result -> Flux . from (result .consume ()).doOnNext (ResultSummaries ::process ));
421+ return Flux .usingWhen (JdkFlowAdapter . flowPublisherToFlux (runner .run (t .getT1 (), t .getT2 ())),
422+ result -> JdkFlowAdapter . flowPublisherToFlux (result .records ()).mapNotNull (r -> mappingFunction .apply (typeSystem , r )),
423+ result -> JdkFlowAdapter . flowPublisherToFlux (result .consume ()).doOnNext (ResultSummaries ::process ));
423424 }
424425
425426 @ Override
426427 public Mono <T > one () {
427428
428429 return doInQueryRunnerForMono (databaseSelection , userSelection ,
429- (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ())
430- .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
430+ (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ()
431+ .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException ) );
431432 }
432433
433434 @ Override
434435 public Mono <T > first () {
435436
436437 return doInQueryRunnerForMono (databaseSelection , userSelection ,
437438 runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).next ())
438- .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
439+ .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
439440 }
440441
441442 @ Override
@@ -448,10 +449,10 @@ public Flux<T> all() {
448449
449450 Mono <ResultSummary > run () {
450451
451- return doInQueryRunnerForMono (databaseSelection , userSelection , runner -> prepareStatement (). flatMap ( t -> {
452- RxResult rxResult = runner .run (t .getT1 (), t .getT2 ());
453- return Flux . from (rxResult . records ()). then ( Mono . from ( rxResult .consume ()).map (ResultSummaries ::process ));
454- })) .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
452+ return doInQueryRunnerForMono (databaseSelection , userSelection , runner -> prepareStatement ()
453+ . flatMap ( t -> JdkFlowAdapter . flowPublisherToFlux ( runner .run (t .getT1 (), t .getT2 ())). single ())
454+ . flatMap (rxResult -> JdkFlowAdapter . flowPublisherToFlux ( rxResult .consume ()).single (). map (ResultSummaries ::process )))
455+ .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
455456 }
456457 }
457458
@@ -469,12 +470,12 @@ private RuntimeException potentiallyConvertRuntimeException(RuntimeException ex)
469470
470471 class DefaultRunnableDelegation <T > implements RunnableDelegation <T >, OngoingDelegation <T > {
471472
472- private final Function <RxQueryRunner , Mono <T >> callback ;
473+ private final Function <ReactiveQueryRunner , Mono <T >> callback ;
473474
474475 private Mono <DatabaseSelection > databaseSelection ;
475- private Mono <UserSelection > userSelection ;
476+ private final Mono <UserSelection > userSelection ;
476477
477- DefaultRunnableDelegation (Function <RxQueryRunner , Mono <T >> callback ) {
478+ DefaultRunnableDelegation (Function <ReactiveQueryRunner , Mono <T >> callback ) {
478479 this .callback = callback ;
479480 this .databaseSelection = resolveTargetDatabaseName (null );
480481 this .userSelection = resolveUser (null );
0 commit comments