Kombinieren Sie eine Liste von Observables und warten Sie, bis alle abgeschlossen sind
Lesezeit: 8 Minuten
Craig Russel
TL;DR
Wie konvertiert man Task.whenAll(List<Task>) hinein RxJava?
Mein vorhandener Code verwendet Bolts, um eine Liste asynchroner Aufgaben zu erstellen, und wartet, bis alle diese Aufgaben abgeschlossen sind, bevor er andere Schritte ausführt. Im Wesentlichen baut es a auf List<Task> und gibt eine Single zurück Task die wann als erledigt markiert wird alle Aufgaben in der Liste abgeschlossen, gemäß der Beispiel auf der Bolts-Website.
Ich suche Ersatz Bolts mit RxJava und ich gehe von dieser Methode aus, eine Liste von asynchronen Aufgaben (Größe nicht im Voraus bekannt) aufzubauen und sie alle in eine einzige zu packen Observable ist möglich, aber ich weiß nicht wie.
Ich habe versucht zu schauen merge, zip, concat etc… kann aber nicht an der arbeiten List<Observable> die ich aufbauen würde, da sie alle darauf ausgerichtet zu sein scheinen, nur an zweien zu arbeiten Observables zu einem Zeitpunkt, wenn ich die Dokumentation richtig verstehe.
Ich versuche … zu lernen RxJava und ich bin noch sehr neu darin, also verzeihen Sie mir, wenn dies eine offensichtliche Frage ist oder irgendwo in den Dokumenten erklärt wird; Ich habe versucht zu suchen. Jede Hilfe wäre sehr willkommen.
MeinDogTom
Sie können verwenden flatMap falls Sie eine dynamische Aufgabenzusammensetzung haben. Etwas wie das:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}
Hinweis: Ich kenne Ihre Anforderungen an die Fehlerbehandlung nicht wirklich. Zum Beispiel, was zu tun ist, wenn nur eine Aufgabe fehlschlägt. Ich denke, Sie sollten dieses Szenario überprüfen.
Dies sollte die akzeptierte Antwort sein, wenn man bedenkt, dass die Frage lautet “wenn alle Aufgaben in der Liste abgeschlossen sind”. zip benachrichtigt über die Erledigung, sobald eine der Aufgaben erledigt und somit nicht anwendbar ist.
– Benutzer3707125
3. Dezember 2016 um 17:41 Uhr
@MyDogTom: Können Sie die Antwort mit der Java7-Syntax-Version (nicht der Lambda-Version) aktualisieren?
– Sanedroid
4. Januar 2017 um 6:39 Uhr
@PoojaGaikwad Mit Lambda ist es besser lesbar. Ersetzen Sie einfach das erste Lambda durch new Func1<Observable<Boolean>, Observable<Boolean>>()... und zweite mit new Func1<List<Boolean>, Boolean>()
– MyDogTom
4. Januar 2017 um 8:40 Uhr
@soshial RxJava 2 ist das Schlimmste, was jemals mit RxJava passiert ist, ja
– Egorikem
15. Januar 2019 um 18:00 Uhr
Malz
Es hört sich so an, als suchst du nach dem Zip-Operator.
Es gibt ein paar verschiedene Möglichkeiten, es zu verwenden, also schauen wir uns ein Beispiel an. Angenommen, wir haben ein paar einfache Observables verschiedener Typen:
In beiden Fällen kann die Zip-Funktion jedoch nur eine einzige akzeptieren Object[] Parameter, da die Typen der Observablen in der Liste sowie ihre Anzahl nicht im Voraus bekannt sind. Dies bedeutet, dass die Zip-Funktion die Anzahl der Parameter überprüfen und entsprechend umwandeln müsste.
Unabhängig davon werden alle oben genannten Beispiele schließlich gedruckt 1 Blah true
BEARBEITEN: Stellen Sie bei der Verwendung von Zip sicher, dass die Observables gezippt werden, geben alle die gleiche Anzahl von Elementen aus. In den obigen Beispielen haben alle drei Observablen ein einzelnes Element emittiert. Wenn wir sie in etwa so ändern würden:
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
Dann 1, Blah, True und 2, Hello, True wären die einzigen Elemente, die an die Zip-Funktion(en) übergeben werden. Der Artikel 3würde niemals gezippt werden, da die anderen Observables abgeschlossen sind.
Dies funktioniert nicht, wenn einer der Anrufe fehlschlägt. In diesem Fall gehen alle Anrufe verloren.
– StarWind0
14. November 2016 um 22:09 Uhr
@StarWind0 Sie können den Fehler durch Verwendung überspringen onErrorResumeNextBeispiel: Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
– vuhung3990
22. März 2018 um 10:11 Uhr
Was ist, wenn ich 100 Observables habe?
– Krzysztof Kubicki
13. November 2019 um 12:13 Uhr
Um mit Fehlern umzugehen, was ist hier der beste Ansatz
– Jagen
19. Januar 2021 um 15:25 Uhr
eis
Von den vorgeschlagenen Vorschlägen kombiniert zip() tatsächlich beobachtbare Ergebnisse miteinander, die möglicherweise gewünscht sind oder nicht, aber in der Frage nicht gestellt wurden. In der Frage war alles, was gewünscht wurde, die Ausführung jeder der Operationen, entweder einzeln oder parallel (was nicht angegeben wurde, aber das verknüpfte Bolts-Beispiel betraf die parallele Ausführung). Außerdem wird zip() sofort abgeschlossen, wenn eines der Observables abgeschlossen ist, sodass es gegen die Anforderungen verstößt.
Für die parallele Ausführung von Observables ist flatMap() in der anderen Antwort in Ordnung, aber verschmelzen() wäre direkter. Beachten Sie, dass das Zusammenführen bei einem Fehler eines der Observables beendet wird. Wenn Sie das Beenden lieber verschieben, bis alle Observables beendet sind, sollten Sie sich ansehen mergeDelayError().
concat(java.lang.Iterable> sequences) Flacht ein Iterable von Observables nacheinander zu einem Observable ab, ohne sie zu verschachteln
was nach dem klingt, wonach Sie suchen, wenn Sie keine parallele Ausführung wünschen.
Auch wenn Sie nur an der Ausführung Ihrer Aufgabe interessiert sind, nicht an der Rückgabe von Werten, sollten Sie sich wahrscheinlich damit befassen Abschließbar Anstatt von Beobachtbar.
TLDR: Für die Einzelausführung von Aufgaben und das Oncompletion-Ereignis, wenn sie abgeschlossen sind, ist Completable.concat() meiner Meinung nach am besten geeignet. Für die parallele Ausführung klingt Completable.merge() oder Completable.mergeDelayError() wie die Lösung. Ersteres stoppt sofort bei jedem Fehler auf jeder ergänzbaren, letzteres führt sie alle aus, selbst wenn einer von ihnen einen Fehler hat, und meldet erst dann den Fehler.
Ich schreibe einige Berechnungscodes in Kotlin mit JavaRx Observables und RxKotlin. Ich möchte eine Liste der zu vervollständigenden Observables beobachten und mich in der Zwischenzeit über den Fortschritt und das neueste Ergebnis auf dem Laufenden halten. Am Ende liefert es das beste Berechnungsergebnis. Eine zusätzliche Anforderung war, Observables parallel auszuführen, um alle meine CPU-Kerne zu nutzen. Am Ende bin ich bei dieser Lösung gelandet:
@Volatile var results: MutableList<CalculationResult> = mutableListOf()
fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {
return Observable.create { subscriber ->
Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
}).subscribeBy(
onNext = {
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))
},
onComplete = {
subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results))
subscriber.onComplete()
},
onError = {
subscriber.onError(it)
}
)
}
}
Anton Makow
Ich hatte ein ähnliches Problem, ich musste Suchbegriffe aus dem Rest-Call abrufen und gleichzeitig gespeicherte Vorschläge aus einer RecentSearchProvider.AUTHORITY integrieren und zu einer einheitlichen Liste kombinieren. Ich habe versucht, die @MyDogTom-Lösung zu verwenden, leider gibt es in RxJava keine Observable.from. Nach einiger Recherche habe ich eine Lösung gefunden, die für mich funktioniert hat.
Aus dem Array der Observables habe ich ein Observable erstellt, das je nach Anfrage Listen mit Vorschlägen und Ergebnissen aus dem Internet enthält. Danach gehen Sie diese Aufgaben einfach mit flatMapIterable durch und führen sie mit flatmap aus, platzieren die Ergebnisse in einem Array, das später in eine Recycling-Ansicht abgerufen werden kann.
12902200cookie-checkKombinieren Sie eine Liste von Observables und warten Sie, bis alle abgeschlossen sindyes