La concurrence et l'asynchronicité sont inhérentes à la programmation mobile.
La gestion de la concurrence via une programmation de style impératif, ce que la programmation sur Android implique généralement, peut être la cause de nombreux problèmes. Utilisation de la programmation réactive avec RxJava , vous pouvez éviter les problèmes potentiels de concurrence d'accès en fournissant une solution plus propre et moins sujette aux erreurs.
En plus de simplifier les tâches asynchrones simultanées, RxJava offre également la possibilité d'effectuer des opérations de style fonctionnel qui transforment, combinent et regroupent les émissions d'un observable jusqu'à ce que nous obtenions le résultat souhaité.
En combinant le paradigme réactif de RxJava et les opérations de style fonctionnel, nous pouvons modéliser un large éventail de constructions de concurrence de manière réactive, même dans le monde non réactif d'Android. Dans cet article, vous apprendrez comment vous pouvez faire exactement cela. Vous apprendrez également à adopter progressivement RxJava dans un projet existant.
Si vous êtes nouveau sur RxJava, je vous recommande de lire l'article Ici qui parle de certains des principes fondamentaux de RxJava.
L'un des défis de l'ajout de RxJava comme l'une des bibliothèques de votre projet est que cela change fondamentalement la façon dont vous raisonnez sur votre code.
RxJava vous oblige à considérer les données comme étant poussées plutôt que tirées. Bien que le concept lui-même soit simple, changer une base de code complète basée sur un paradigme pull peut être un peu intimidant. Bien que la cohérence soit toujours idéale, vous n'aurez peut-être pas toujours le privilège d'effectuer cette transition dans toute votre base de code en une seule fois, donc une approche plus incrémentielle peut être nécessaire.
Considérez le code suivant:
/** * @return a list of users with blogs */ public List getUsersWithBlogs() { final List allUsers = UserCache.getAllUsers(); final List usersWithBlogs = new ArrayList(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Cette fonction obtient une liste de User
objets du cache, filtre chacun d’entre eux selon que l’utilisateur possède ou non un blog, les trie par nom d’utilisateur et les renvoie finalement à l’appelant. En regardant cet extrait, nous remarquons que beaucoup de ces opérations peuvent tirer parti des opérateurs RxJava; Par exemple, filter()
et sorted()
.
La réécriture de cet extrait nous donne alors:
/** * @return a list of users with blogs */ public Observable getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
La première ligne de la fonction convertit le List
renvoyé par UserCache.getAllUsers()
à un Observable
via fromIterable()
. C'est la première étape pour rendre notre code réactif. Maintenant que nous opérons sur un Observable
, cela nous permet d'effectuer n'importe quel Observable
opérateur dans la boîte à outils RxJava - filter()
et sorted()
dans ce cas.
modèle de document de conception de logiciel simple
Il y a quelques autres points à noter à propos de ce changement.
Premièrement, la signature de la méthode n'est plus la même. Ce n’est peut-être pas un problème si cet appel de méthode n’est utilisé qu’à quelques endroits et qu’il est facile de propager les modifications vers d’autres zones de la pile; cependant, si cela casse les clients utilisant cette méthode, cela pose problème et la signature de la méthode doit être rétablie.
Deuxièmement, RxJava est conçu avec la paresse à l'esprit. Autrement dit, aucune opération longue ne doit être effectuée lorsqu'il n'y a aucun abonné au Observable
. Avec cette modification, cette hypothèse n'est plus vraie puisque UserCache.getAllUsers()
est invoqué avant même qu'il n'y ait d'abonnés.
Pour résoudre le premier problème de notre modification, nous pouvons utiliser n'importe lequel des opérateurs de blocage disponibles pour un Observable
comme blockingFirst()
et blockingNext()
. Essentiellement, ces deux opérateurs se bloqueront jusqu'à ce qu'un élément soit émis en aval: blockingFirst()
renverra le premier élément émis et terminera, alors que blockingNext()
renverra un Iterable
qui vous permet d'effectuer une boucle for-each sur les données sous-jacentes (chaque itération à travers la boucle bloquera).
Un effet secondaire de l’utilisation d’une opération de blocage dont il est important d’être conscient, cependant, est que les exceptions sont levées sur le thread appelant au lieu d’être transmises à un observateur onError()
méthode.
En utilisant un opérateur de blocage pour changer la signature de la méthode en un List
, notre extrait ressemblerait maintenant à ceci:
/** * @return a list of users with blogs */ public List getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Avant d'appeler un opérateur de blocage (c'est-à-dire blockingGet()
), nous devons d'abord chaîner l'opérateur d'agrégation toList()
afin que le flux soit modifié à partir d'un Observable
à un Single
(a Single
est un type spécial de Observable
qui n'émet qu'une seule valeur dans onSuccess()
, ou une erreur via onError()
).
Ensuite, nous pouvons appeler l'opérateur de blocage blockingGet()
qui déballe le Single
et renvoie un List
.
Bien que RxJava supporte cela, autant que possible cela devrait être évité car il ne s'agit pas de programmation réactive idiomatique. Cependant, lorsque cela est absolument nécessaire, les opérateurs de blocage sont un bon moyen de sortir du monde réactif.
Comme mentionné précédemment, RxJava a été conçu avec la paresse à l'esprit. Autrement dit, les opérations de longue durée doivent être retardées le plus longtemps possible (c'est-à-dire jusqu'à ce qu'un abonnement soit appelé sur un Observable
). Pour rendre notre solution paresseuse, nous utilisons le defer()
opérateur.
defer()
prend un ObservableSource
usine qui crée un Observable
pour chaque nouvel observateur qui s'abonne. Dans notre cas, nous voulons retourner Observable.fromIterable(UserCache.getAllUser())
chaque fois qu'un observateur souscrit.
/** * @return a list of users with blogs */ public Observable getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Maintenant que l'opération de longue durée est encapsulée dans un defer()
, nous avons un contrôle total sur le thread dans lequel elle doit s'exécuter simplement en spécifiant le Scheduler
approprié | dans subscribeOn()
. Avec ce changement, notre code est entièrement réactif et l'abonnement ne devrait avoir lieu qu'au moment où les données sont nécessaires.
/** * @return a list of users with blogs */ public Observable getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Un autre opérateur très utile pour différer le calcul est le fromCallable()
méthode. Contrairement à defer()
, qui attend un Observable
à renvoyer dans la fonction lambda et à son tour «aplatit» le Observable
, fromCallable()
appellera le lambda et retournera la valeur en aval.
/** * @return a list of users with blogs */ public Observable getUsersWithBlogs() { final Observable usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Unique en utilisant fromCallable()
sur une liste renverrait maintenant un Observable
, nous devons aplatir cette liste en utilisant flatMap()
.
D'après les exemples précédents, nous avons vu que nous pouvons envelopper n'importe quel objet dans un Observable
et sauter entre les états non réactif et réactif en utilisant des opérations de blocage et defer()
/ fromCallable()
. En utilisant ces constructions, nous pouvons commencer à convertir des zones d'une application Android pour qu'elles soient réactives.
Un bon endroit pour penser initialement à l'utilisation de RxJava est chaque fois que vous avez un processus qui prend un certain temps à exécuter, comme les appels réseau (consultez post précédent par exemple), lit et écrit sur le disque, etc. L'exemple suivant illustre une fonction simple qui va écrire du texte dans le système de fichiers:
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
Lors de l'appel de cette fonction, nous devons nous assurer qu'elle est effectuée sur un thread séparé car cette opération est bloquante. Imposer une telle restriction à l'appelant complique les choses pour le développeur, ce qui augmente la probabilité de bogues et peut potentiellement ralentir le développement.
L'ajout d'un commentaire à la fonction permettra bien sûr d'éviter les erreurs de l'appelant, mais c'est encore loin d'être à toute épreuve.
En utilisant RxJava, cependant, nous pouvons facilement encapsuler ceci dans un Observable
et spécifiez le Scheduler
qu'il devrait fonctionner. De cette façon, l'appelant n'a pas du tout besoin de se préoccuper d'appeler la fonction dans un thread séparé; la fonction s'en chargera elle-même.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
En utilisant fromCallable()
, l'écriture du texte dans le fichier est différée jusqu'à la date d'abonnement.
Comme les exceptions sont des objets de première classe dans RxJava, un autre avantage de notre modification est que nous n'avons plus besoin d'encapsuler l'opération dans un bloc try / catch. L'exception sera simplement propagée en aval plutôt que d'être avalée. Cela permet à l'appelant de gérer l'exception qu'il / elle juge appropriée (par exemple, afficher une erreur à l'utilisateur en fonction de l'exception qui a été levée, etc.).
Une autre optimisation que nous pouvons effectuer est de renvoyer a Completable
plutôt qu'un Observable
. A Completable
est essentiellement un type spécial de Observable
- similaire à un Single
- cela indique simplement si un calcul a réussi, via onComplete()
, ou a échoué, via onError()
. Retour d'un Completable
semble avoir plus de sens dans ce cas car il semble idiot de renvoyer un seul vrai dans un Observable
courant.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Pour terminer l'opération, nous utilisons le fromAction()
opération d'un Completable
puisque la valeur de retour ne nous intéresse plus. Si nécessaire, comme un Observable
, un Completable
prend également en charge fromCallable()
et defer()
les fonctions.
Jusqu'à présent, tous les exemples que nous avons examinés émettent soit une valeur (c'est-à-dire, peuvent être modélisés comme un Single
), soit nous indiquent si une opération a réussi ou échoué (c'est-à-dire, peut être modélisée comme un Completable
).
Cependant, comment pouvons-nous convertir les zones de notre application qui reçoivent des mises à jour ou des événements continus (tels que des mises à jour de position, des événements de clic, des événements de capteur, etc.)?
Nous allons examiner deux façons de faire cela, en utilisant create()
et en utilisant Subjects
.
create()
nous permet d'invoquer explicitement onNext()
d'un observateur | onComplete()
| onError()
méthode lorsque nous recevons des mises à jour de notre source de données. Pour utiliser create()
, on passe un ObservableOnSubscribe
qui reçoit un ObservableEmitter
chaque fois qu'un observateur souscrit. En utilisant l'émetteur reçu, nous pouvons ensuite effectuer tous les appels de configuration nécessaires pour commencer à recevoir les mises à jour, puis appeler le Emitter
un événement.
Dans le cas de mises à jour de localisation, nous pouvons nous inscrire pour recevoir des mises à jour à cet endroit et émettre des mises à jour de localisation telles que reçues.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
La fonction à l'intérieur du create()
call demande des mises à jour de position et transmet un rappel qui est appelé lorsque la position de l'appareil change. Comme nous pouvons le voir ici, nous remplaçons essentiellement l'interface de style de rappel et émettons à la place l'emplacement reçu dans le flux Observable créé (à des fins éducatives, j'ai ignoré certains des détails lors de la construction d'une demande de localisation, si vous souhaitez explorer plus profondément dans les détails, vous pouvez le lire Ici ).
Une autre chose à noter à propos de create()
est-ce que chaque fois que subscribe()
est appelé, un nouvel émetteur est fourni. En d'autres termes, create()
Retour un rhume Observable
. Cela signifie que, dans la fonction ci-dessus, nous demanderions potentiellement des mises à jour de localisation plusieurs fois, ce qui n'est pas ce que nous voulons.
Pour contourner ce problème, nous souhaitons modifier la fonction pour renvoyer un hot Observable
à l'aide de Subjects
.
A Subject
étend un Observable
et implémente Observer
en même temps. Ceci est particulièrement utile lorsque nous voulons émettre ou diffuser le même événement à plusieurs abonnés en même temps. En ce qui concerne l'implémentation, nous voudrions exposer le Subject
en tant que Observable
aux clients, tout en le conservant comme un Subject
pour le fournisseur.
public class LocationManager { private Subject locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable observeLocation() { return locationSubject; } }
Dans cette nouvelle implémentation, le sous-type PublishSubject
est utilisé qui émet des événements à mesure qu'ils arrivent à partir du moment de l'abonnement. En conséquence, si un abonnement est effectué à un moment où des mises à jour de localisation ont déjà été émises, les émissions passées ne seront pas reçues par l'observateur, seulement les suivantes. Si ce comportement n'est pas souhaité, il existe deux autres Subject
sous-types dans la boîte à outils RxJava qui peuvent être utilisé .
De plus, nous avons également créé un connect()
fonction qui lance la demande de réception des mises à jour de localisation. Le observeLocation()
peut toujours faire le connect()
appel, mais nous l'avons refactorisé hors de la fonction pour plus de clarté / simplicité.
Nous avons examiné un certain nombre de mécanismes et de techniques:
defer()
et ses variantes pour retarder l'exécution d'un calcul jusqu'à la souscriptionObservables
généré via create()
Observables
en utilisant Subjects
Espérons que les exemples fournis dans cet article ont inspiré quelques idées concernant différents domaines de votre application qui peuvent être convertis pour être réactifs. Nous avons couvert beaucoup de choses et si vous avez des questions, des suggestions ou si quelque chose n'est pas clair, n'hésitez pas à laisser un commentaire ci-dessous!
comment concevoir un site Web réactif
Si vous souhaitez en savoir plus sur RxJava, je travaille sur un livre détaillé qui explique comment visualiser les problèmes de manière réactive à l'aide d'exemples Android. Si vous souhaitez recevoir des mises à jour à ce sujet, veuillez vous abonner Ici .