1515 */
1616package org .springframework .data .redis .cache ;
1717
18+ import java .nio .ByteBuffer ;
1819import java .nio .charset .StandardCharsets ;
1920import java .time .Duration ;
21+ import java .util .concurrent .CompletableFuture ;
2022import java .util .concurrent .TimeUnit ;
21- import java .util .function .Consumer ;
23+ import java .util .function .BiFunction ;
2224import java .util .function .Function ;
25+ import java .util .function .Supplier ;
2326
2427import org .springframework .dao .PessimisticLockingFailureException ;
28+ import org .springframework .data .redis .connection .ReactiveRedisConnection ;
29+ import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
2530import org .springframework .data .redis .connection .RedisConnection ;
2631import org .springframework .data .redis .connection .RedisConnectionFactory ;
2732import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
2833import org .springframework .data .redis .core .types .Expiration ;
34+ import org .springframework .data .redis .util .ByteUtils ;
2935import org .springframework .lang .Nullable ;
3036import org .springframework .util .Assert ;
3137
38+ import reactor .core .publisher .Flux ;
39+ import reactor .core .publisher .Mono ;
40+
3241/**
3342 * {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
3443 * and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
@@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114123 Assert .notNull (key , "Key must not be null" );
115124
116125 byte [] result = shouldExpireWithin (ttl )
117- ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
118- : execute (name , connection -> connection .stringCommands ().get (key ));
126+ ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
127+ : execute (name , connection -> connection .stringCommands ().get (key ));
119128
120129 statistics .incGets (name );
121130
@@ -128,6 +137,81 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128137 return result ;
129138 }
130139
140+ @ Override
141+ public boolean isRetrieveSupported () {
142+ return isReactive ();
143+ }
144+
145+ @ Override
146+ public CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl ) {
147+
148+ Assert .notNull (name , "Name must not be null" );
149+ Assert .notNull (key , "Key must not be null" );
150+
151+ CompletableFuture <byte []> result = nonBlockingRetrieveFunction (name ).apply (key , ttl );
152+
153+ result = result .thenApply (cachedValue -> {
154+
155+ statistics .incGets (name );
156+
157+ if (cachedValue != null ) {
158+ statistics .incHits (name );
159+ } else {
160+ statistics .incMisses (name );
161+ }
162+
163+ return cachedValue ;
164+ });
165+
166+ return result ;
167+ }
168+
169+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> nonBlockingRetrieveFunction (String cacheName ) {
170+ return isReactive () ? reactiveRetrieveFunction (cacheName ) : asyncRetrieveFunction (cacheName );
171+ }
172+
173+ // TODO: Possibly remove if we rely on the default Cache.retrieve(..) behavior
174+ // after assessing RedisCacheWriter.isRetrieveSupported().
175+ // Function applied for Cache.retrieve(key) when a non-reactive Redis driver is used, such as Jedis.
176+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> asyncRetrieveFunction (String cacheName ) {
177+
178+ return (key , ttl ) -> {
179+
180+ Supplier <byte []> getKey = () -> execute (cacheName , connection -> connection .stringCommands ().get (key ));
181+
182+ Supplier <byte []> getKeyWithExpiration = () -> execute (cacheName , connection ->
183+ connection .stringCommands ().getEx (key , Expiration .from (ttl )));
184+
185+ return shouldExpireWithin (ttl )
186+ ? CompletableFuture .supplyAsync (getKeyWithExpiration )
187+ : CompletableFuture .supplyAsync (getKey );
188+
189+ };
190+ }
191+
192+ // Function applied for Cache.retrieve(key) when a reactive Redis driver is used, such as Lettuce.
193+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> reactiveRetrieveFunction (String cacheName ) {
194+
195+ return (key , ttl ) -> {
196+
197+ ByteBuffer wrappedKey = ByteBuffer .wrap (key );
198+
199+ Flux <?> cacheLockCheckFlux = Flux .interval (Duration .ZERO , this .sleepTime ).takeUntil (count ->
200+ executeLockFree (connection -> !doCheckLock (cacheName , connection )));
201+
202+ Mono <ByteBuffer > getMono = shouldExpireWithin (ttl )
203+ ? executeReactively (connection -> connection .stringCommands ().getEx (wrappedKey , Expiration .from (ttl )))
204+ : executeReactively (connection -> connection .stringCommands ().get (wrappedKey ));
205+
206+ Mono <ByteBuffer > result = cacheLockCheckFlux .then (getMono );
207+
208+ @ SuppressWarnings ("all" )
209+ Mono <byte []> byteArrayResult = result .map (DefaultRedisCacheWriter ::nullSafeGetBytes );
210+
211+ return byteArrayResult .toFuture ();
212+ };
213+ }
214+
131215 @ Override
132216 public void put (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
133217
@@ -282,32 +366,42 @@ private Long doUnlock(String name, RedisConnection connection) {
282366 return connection .keyCommands ().del (createCacheLockKey (name ));
283367 }
284368
285- boolean doCheckLock (String name , RedisConnection connection ) {
286- return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
287- }
369+ private <T > T execute (String name , Function <RedisConnection , T > callback ) {
288370
289- /**
290- * @return {@literal true} if {@link RedisCacheWriter} uses locks.
291- */
292- private boolean isLockingCacheWriter () {
293- return !sleepTime .isZero () && !sleepTime .isNegative ();
371+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
372+ checkAndPotentiallyWaitUntilUnlocked (name , connection );
373+ return callback .apply (connection );
374+ }
294375 }
295376
296- private <T > T execute ( String name , Function <RedisConnection , T > callback ) {
377+ private <T > T executeLockFree ( Function <RedisConnection , T > callback ) {
297378
298- try (RedisConnection connection = connectionFactory .getConnection ()) {
299- checkAndPotentiallyWaitUntilUnlocked (name , connection );
379+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
300380 return callback .apply (connection );
301381 }
302382 }
303383
304- private void executeLockFree ( Consumer < RedisConnection > callback ) {
384+ private < T > T executeReactively ( Function < ReactiveRedisConnection , T > callback ) {
305385
306- try (RedisConnection connection = connectionFactory .getConnection ()) {
307- callback .accept (connection );
386+ ReactiveRedisConnection connection = getReactiveRedisConnectionFactory ().getReactiveConnection ();
387+
388+ try {
389+ return callback .apply (connection );
390+ }
391+ finally {
392+ connection .closeLater ();
308393 }
309394 }
310395
396+ /**
397+ * Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
398+ *
399+ * @return {@literal true} if {@link RedisCacheWriter} uses locks.
400+ */
401+ private boolean isLockingCacheWriter () {
402+ return !this .sleepTime .isZero () && !this .sleepTime .isNegative ();
403+ }
404+
311405 private void checkAndPotentiallyWaitUntilUnlocked (String name , RedisConnection connection ) {
312406
313407 if (!isLockingCacheWriter ()) {
@@ -318,29 +412,46 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
318412
319413 try {
320414 while (doCheckLock (name , connection )) {
321- Thread .sleep (sleepTime .toMillis ());
415+ Thread .sleep (this . sleepTime .toMillis ());
322416 }
323417 } catch (InterruptedException cause ) {
324418
325- // Re-interrupt current thread, to allow other participants to react.
419+ // Re-interrupt current Thread to allow other participants to react.
326420 Thread .currentThread ().interrupt ();
327421
328422 String message = String .format ("Interrupted while waiting to unlock cache %s" , name );
329423
330424 throw new PessimisticLockingFailureException (message , cause );
331425 } finally {
332- statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
426+ this . statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
333427 }
334428 }
335429
430+ boolean doCheckLock (String name , RedisConnection connection ) {
431+ return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
432+ }
433+
434+ private boolean isReactive () {
435+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
436+ }
437+
438+ private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory () {
439+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
440+ }
441+
336442 private static byte [] createCacheLockKey (String name ) {
337443 return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
338444 }
339445
340- private boolean isTrue (@ Nullable Boolean value ) {
446+ private static boolean isTrue (@ Nullable Boolean value ) {
341447 return Boolean .TRUE .equals (value );
342448 }
343449
450+ @ Nullable
451+ private static byte [] nullSafeGetBytes (@ Nullable ByteBuffer value ) {
452+ return value != null ? ByteUtils .getBytes (value ) : null ;
453+ }
454+
344455 private static boolean shouldExpireWithin (@ Nullable Duration ttl ) {
345456 return ttl != null && !ttl .isZero () && !ttl .isNegative ();
346457 }
0 commit comments