Spring webflux and read from database

Spring 5 introduces a reactive programming style for the leisure API with webflux . I'm new to this myself, and I was wondering how to wrap synchronous calls into a database in Flux or Mono makes sense for preformence-wise? If so, then this is the way to do it:

 @RestController public class HomeController { private MeasurementRepository repository; public HomeController(MeasurementRepository repository){ this.repository = repository; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))); } } 

Is there something like an asynchronous CrudRepository? I could not find him.

+20
spring reactive-programming
source share
4 answers

One option is to use alternative SQL clients that are not completely blocked. Some examples include: https://github.com/mauricio/postgresql-async or https://github.com/finagle/roc . Of course, none of these drivers are officially supported by database providers. In addition, functionality is much less attractive compared to mature JDBC-based abstractions like Hibernate or jOOQ.

An alternative idea came to me from the world of Scala. The idea is to send blocking calls to an isolated ThreadPool so as not to mix blocking and non-blocking calls together. This will allow us to control the total number of threads and allow the processor to perform non-blocking tasks in the main execution context with some potential optimizations. Assuming we have a JDBC-based implementation, such as Spring Data JPA, that really blocks, we can make its execution asynchronous and send it to the dedicated thread pool.

 @RestController public class HomeController { private final MeasurementRepository repository; private final Scheduler scheduler; public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) { this.repository = repository; this.scheduler = scheduler; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler); } } 

Our scheduler for JDBC must be configured using a dedicated thread pool with a size equal to the number of connections.

 @Configuration public class SchedulerConfiguration { private final Integer connectionPoolSize; public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) { this.connectionPoolSize = connectionPoolSize; } @Bean public Scheduler jdbcScheduler() { return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); } } 

However, difficulties arise with this approach. The main thing is transaction management. In JDBC, transactions are possible in only one java.sql.Connection connection. To perform multiple operations in a single transaction, they must share the connection. If we want to do some calculations between them, we need to keep in touch. This is not very effective, since we keep a limited number of connections in standby mode when performing calculations between them.

This JDBC asynchronous shell idea is not new and has already been implemented in the Scala Slick 3 library. Finally, non-blocking JDBC may appear in the Java roadmap. As announced in JavaOne in September 2016, it is possible that we will see it in Java 10.

+23
source share

Spring supports a reactive data warehouse interface for Mongo and Cassandra.

Spring Data MongoDb Reactive Interface

Spring Data MongoDB provides support for reactive repositories with reactive types Project Reactor and RxJava 1. The reactive API supports converting reactive types between reactive types.

 public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> { Flux<Person> findByLastname(String lastname); @Query("{ 'firstname': ?0, 'lastname': ?1}") Mono<Person> findByFirstnameAndLastname(String firstname, String lastname); // Accept parameter inside a reactive type for deferred execution Flux<Person> findByLastname(Mono<String> lastname); Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname); @InfiniteStream // Use a tailable cursor Flux<Person> findWithTailableCursorBy(); } public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> { Observable<Person> findByLastname(String lastname); @Query("{ 'firstname': ?0, 'lastname': ?1}") Single<Person> findByFirstnameAndLastname(String firstname, String lastname); // Accept parameter inside a reactive type for deferred execution Observable<Person> findByLastname(Single<String> lastname); Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname); @InfiniteStream // Use a tailable cursor Observable<Person> findWithTailableCursorBy(); } 
+4
source share

Based on this blog , you should rewrite your snippet as follows

 @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)))) .subscribeOn(Schedulers.elastic()); } 
+1
source share

Getting Flux or Mono does not necessarily mean that they will work in a dedicated stream. Instead, most statements continue to work in the thread in which the previous statement was executed. If not specified, the topmost statement (source) is itself executed in the thread in which the subscribe () call was made.

If you have blocking persistent APIs (JPA, JDBC) or network APIs, Spring MVC is at least the best choice for common architectures. It is technically feasible with both Reactor and RxJava to make blocking calls in a separate thread, but you will not use most of the non-blocking web stack.

So ... How do I wrap a synchronous, blocking call?

Use Callable to delay execution. And you should use Schedulers.elastic because it creates a dedicated thread to wait for a blocking resource without binding any other resource.

  • Schedulers.immediate (): current thread.
  • Schedulers.single (): single, reusable thread.
  • Schedulers.newSingle (): a separate thread for each call.
  • Schedulers.elastic (): A pool of elastic threads. It creates new work pools as needed and reuses free ones. For example, this is a good choice for blocking I / O.
  • Schedulers.parallel (): a fixed pool of workers configured for parallel operation.

    public Mono save () {Mono.fromCallable (() β†’ blockingRepository.save ()). subscribeOn (Schedulers.elastic ());

0
source share

All Articles