What is the correct way to manage transactions in RxJava Services?

I recently started experimenting with RxJava and came across a presentation by a Netflix engineer who suggested moving our business APIs to an Observable API, for example:

public interface VideoService { Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic); Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId); Observable<VideoRating> getVideoRating(Integer videoId); } 

However, I did not find a place that explained how to manage transactions in these services. At first, I just annotated my service implementation with @Transactional

 @Service @Transactional public class VideoServiceImpl implements VideoService{ @Autowired private VideoBasicInfoRepository basicInfoRepo; @Autowired private VideoRatingRepository ratingRepo; public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){ return Observable.create( s -> { s.onNext(basicInfoRepo.save(videBasic)); }); } 

We want all the code inside Object.create lambda ( s -> { // This code } ) to execute in a transaction. HOWEVER what actually happens:

  • The createVideoBasicInfo() call is made in a transactional way, returning a cold observable.
  • save() is executed as an atomic transaction.

Obviously, this makes sense since the Spring proxy is applied to the serviceImpl methods. I was thinking about how to do what I really expect, for example, to start a software transaction:

 return Observable.create( s -> { VideoBasicInfo savedBasic = transactionTemplate.execute( status -> { VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo); return basicInfo; }); s.onNext(savedBasic); }); 

Is this the recommended way to manage transactions when working with reactive APIs?

+8
spring reactive-programming rx-java rx-android netflix
source share
2 answers

Spring JpaRepository data signatures are already marked @Transactional, so if you use only one, you don't need to do anything special:

 public interface PersonRepository extends JpaRepository<Person, Integer> { } 

 @RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonRepositoryTest { private PersonRepository personRepository; @Autowired public void setPersonRepository(PersonRepository PersonRepository) { this.personRepository = PersonRepository; } @Test public void testReactiveSavePerson() { Person person = new Person("Jane", "Doe"); assertNull(person.getId()); //null before save //save person Observable.create(s -> { s.onNext(personRepository.save(person)); }).subscribe(); //fetch from DB Person fetchedPerson = personRepository.findOne(person.getId()); //should not be null assertNotNull(fetchedPerson); //should equal assertEquals(person.getId(), fetchedPerson.getId()); assertEquals(person.getFirstName(), fetchedPerson.getFirstName()); } } 

If you need to combine several repositories into one transaction, you can use something like the class below:

 @Component() public class ObservableTxFactory { public final <T> Observable<T> create(Observable.OnSubscribe<T> f) { return new ObservableTx<>(this, f); } @Transactional public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) { onSubscribe.call(subscriber); } private static class ObservableTx<T> extends Observable<T> { public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) { super(new OnSubscribeDecorator<>(observableTxFactory, f)); } } private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { private final ObservableTxFactory observableTxFactory; private final Observable.OnSubscribe<T> onSubscribe; OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) { this.onSubscribe = s; this.observableTxFactory = observableTxFactory; } @Override public void call(Subscriber<? super T> subscriber) { observableTxFactory.call(onSubscribe, subscriber); } } } 

You must also define a factory bean:

 @Bean ObservableTxFactory observableTxFactory() { return new ObservableTxFactory(); } 

Services:

 @Service public class PersonService { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; public Observable<Person> createPerson(String firstName, String lastName) { return observableTxFactory.create(s -> { Person p = new Person(firstName, lastName); s.onNext(personRepository.save(p)); }); } } 

Test:

 @RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonServiceTest { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; @Test public void testPersonService() { final PersonService service = new PersonService(); service.personRepository = personRepository; service.observableTxFactory = observableTxFactory; final Observable<Person> personObservable = service.createPerson("John", "Doe"); personObservable.subscribe(); //fetch from DB final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false) .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe")) .findFirst() .get(); //should not be null assertNotNull(fetchedPerson); } } 

Screenshot showing the proxy: enter image description here

+7
source share

I would like to answer John Scattergood an excellent answer. My typical use is with Observable.fromCallable() , so I was looking for a way to do this instead of implementing Observable.OnSubscribe , so I adapted its technique so that you can use it by passing Callable

Factory Class:

 @Component public class ObservableTxFactory { public final <T> Observable.OnSubscribe<T> createFromCallable(Callable<? extends T> resultFactory) { return new OnSubscribeDecorator<>(this, resultFactory); } @SuppressWarnings("unchecked") @Transactional public <T> void call(Callable<? extends T> resultFactory, Subscriber subscriber) { final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<>(subscriber); subscriber.setProducer(singleDelayedProducer); try { singleDelayedProducer.setValue(resultFactory.call()); } catch (Throwable t) { Exceptions.throwOrReport(t, subscriber); } } private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { private final ObservableTxFactory observableTxFactory; private final Callable<? extends T> resultFactory; OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, Callable<? extends T> resultFactory) { this.resultFactory = resultFactory; this.observableTxFactory = observableTxFactory; } @Override public void call(Subscriber<? super T> subscriber) { observableTxFactory.call(resultFactory, subscriber); } } } 

Original code:

 Observable.fromCallable(() -> fooRepository.findOne(fooID)); 

New code:

 Observable.create(observableTxFactory.createFromCallable(() -> fooRepository.findOne(fooID))); 

Make sure the method you add @Transactional on is public otherwise Spring AOP will NOT be able to tell it

0
source share

All Articles