RxAndroidBle supports persistent connection + record / notification

I am creating an Android application that has special requirements for Bluetooth Low Energy.

I need to write a record-only description and get responses to a separate notification attribute, and I need to do this in many, many actions. Is there an Rx way to send a request on the 1st characteristic, wait for the answer on the second, and then move on to another request?

Also, to share my copy of RxAndroidBle, I thought about making some kind of BleManager Singleton, where I would expose Observables, so I can easily subscribe to them in my Presenter. I just want to avoid the need to copy the connection logic for each activity and have (ideally) a stable connection. That way, I could only expose connectionObservable and subscribe to it, so I can easily send write requests and receive notifications, but I'm sure there is a better way to do this.

This is what I have now:

@Singleton public class BleManager { private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create(); private Observable<RxBleConnection> connectionObservable; private boolean isConnected; private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID"); private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID"); private final RxBleClient bleClient; private String mMacAddress; private final Context context; private RxBleDevice bleDevice; @Inject public BleManager(Context context, RxBleClient client) { Timber.d("Constructing BleManager and injecting members"); this.context = context; this.bleClient = client; } public void setMacAddress(String mMacAddress) { this.mMacAddress = mMacAddress; // Set the associated device on MacAddress change bleDevice = bleClient.getBleDevice(this.mMacAddress); } public String getMacAddress() { return mMacAddress; } public RxBleDevice getBleDevice() { Preconditions.checkNotNull(mMacAddress); return bleClient.getBleDevice(mMacAddress); } public Observable<RxBleScanResult> getScanSubscription() { Preconditions.checkNotNull(context); Preconditions.checkNotNull(bleClient); return bleClient.scanBleDevices().distinct(); } public Observable<RxBleConnection> getConnectionSubscription() { Preconditions.checkNotNull(context); Preconditions.checkNotNull(bleDevice); if (connectionObservable == null) { connectionObservable = bleDevice.establishConnection(context, false) .takeUntil(disconnectTriggerSubject) .observeOn(AndroidSchedulers.mainThread()) .doOnUnsubscribe(this::clearSubscription) .compose(new ConnectionSharingAdapter()); } return connectionObservable; } public Observable<byte[]> setupListeners() { return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID)) .doOnNext(notificationObservable -> Timber.d("Notification Setup")) .flatMap(notificationObservable -> notificationObservable) .observeOn(AndroidSchedulers.mainThread()); } private void triggerDisconnect() { disconnectTriggerSubject.onNext(null); } public Observable<byte[]> writeBytes(byte[] bytes) { return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic( BLE_WRITE_CHARACTERISTIC_UUID, bytes)).observeOn(AndroidSchedulers.mainThread()); } private boolean isConnected() { return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED; } /** * Will update the UI with the current state of the Ble Connection */ private void registerConnectionStateChange() { bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> { isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED); }); } private void clearSubscription() { connectionObservable = null; } } 
+7
android rx-java rx-android bluetooth-lowenergy rxandroidble
source share
1 answer

I thought a little about your use case. When sharing the same connection, you enter states into your application, which requires some state processing, and therefore it is impossible (or at least I don’t know how) to be purely reactive.

I focused on making the connection and doing the transfer of the notification record to the BLE device, which is serialized.

 private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create(); private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create(); private Subscription connectionSubscription; private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time public void connect() { Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection() final UUID notificationUuid = // your notification characteristic UUID final UUID writeUuid = // your write-only characteristic UUID connectionSubscription = connectionObservable .flatMap( rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications (rxBleConnection, notificationObservable) -> // connection is established and notification prepared inputSubject // waiting for the data-packet to transmit .onBackpressureBuffer() .flatMap(bytesAndFilter -> { return Observable.combineLatest( // subscribe at the same time to notificationObservable.take(1), // getting the next notification bytes rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes ) .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier }, 1 // serializing communication as only one Observable will be processed at the same time ) ) .flatMap(observable -> observable) .subscribe( response -> { /* ignored here - used only as side effect with outputSubject */ }, throwable -> outputSubject.onError(throwable) ); } public void disconnect() { if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) { connectionSubscription.unsubscribe(); connectionSubscription = null; } } public Observable<byte[]> writeData(byte[] data) { return Observable.defer(() -> { final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect() return outputSubject .filter(responseIdPair -> responseIdPair.second == uniqueId) .first() .map(responseIdPair -> responseIdPair.first); } ); } 

This is an approach that I think is good, because the whole stream is described in one place and therefore easier to understand. The part of the message, which is stateful (write request and waiting for a response), is serialized and has the ability to continue the connection until disconnect() called.

The disadvantage is that the transfer depends on the side effects of the different stream and the call to writeData() before the connection is established, and setting notifications will never complete the returned Observable, although it should not be a problem to add processing for this stateful script.

Best wishes

+4
source share

All Articles