Kombinieren Sie eine Liste von Observables und warten Sie, bis alle abgeschlossen sind

Lesezeit: 8 Minuten

Benutzer-Avatar
Craig Russel

TL;DR
Wie konvertiert man Task.whenAll(List<Task>) hinein RxJava?

Mein vorhandener Code verwendet Bolts, um eine Liste asynchroner Aufgaben zu erstellen, und wartet, bis alle diese Aufgaben abgeschlossen sind, bevor er andere Schritte ausführt. Im Wesentlichen baut es a auf List<Task> und gibt eine Single zurück Task die wann als erledigt markiert wird alle Aufgaben in der Liste abgeschlossen, gemäß der Beispiel auf der Bolts-Website.

Ich suche Ersatz Bolts mit RxJava und ich gehe von dieser Methode aus, eine Liste von asynchronen Aufgaben (Größe nicht im Voraus bekannt) aufzubauen und sie alle in eine einzige zu packen Observable ist möglich, aber ich weiß nicht wie.

Ich habe versucht zu schauen merge, zip, concat etc… kann aber nicht an der arbeiten List<Observable> die ich aufbauen würde, da sie alle darauf ausgerichtet zu sein scheinen, nur an zweien zu arbeiten Observables zu einem Zeitpunkt, wenn ich die Dokumentation richtig verstehe.

Ich versuche … zu lernen RxJava und ich bin noch sehr neu darin, also verzeihen Sie mir, wenn dies eine offensichtliche Frage ist oder irgendwo in den Dokumenten erklärt wird; Ich habe versucht zu suchen. Jede Hilfe wäre sehr willkommen.

Benutzer-Avatar
MeinDogTom

Sie können verwenden flatMap falls Sie eine dynamische Aufgabenzusammensetzung haben. Etwas wie das:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Ein weiteres gutes Beispiel für parallele Ausführung

Hinweis: Ich kenne Ihre Anforderungen an die Fehlerbehandlung nicht wirklich. Zum Beispiel, was zu tun ist, wenn nur eine Aufgabe fehlschlägt. Ich denke, Sie sollten dieses Szenario überprüfen.

  • Dies sollte die akzeptierte Antwort sein, wenn man bedenkt, dass die Frage lautet “wenn alle Aufgaben in der Liste abgeschlossen sind”. zip benachrichtigt über die Erledigung, sobald eine der Aufgaben erledigt und somit nicht anwendbar ist.

    – Benutzer3707125

    3. Dezember 2016 um 17:41 Uhr


  • @MyDogTom: Können Sie die Antwort mit der Java7-Syntax-Version (nicht der Lambda-Version) aktualisieren?

    – Sanedroid

    4. Januar 2017 um 6:39 Uhr

  • @PoojaGaikwad Mit Lambda ist es besser lesbar. Ersetzen Sie einfach das erste Lambda durch new Func1<Observable<Boolean>, Observable<Boolean>>()... und zweite mit new Func1<List<Boolean>, Boolean>()

    – MyDogTom

    4. Januar 2017 um 8:40 Uhr

  • @soshial RxJava 2 ist das Schlimmste, was jemals mit RxJava passiert ist, ja

    – Egorikem

    15. Januar 2019 um 18:00 Uhr

Benutzer-Avatar
Malz

Es hört sich so an, als suchst du nach dem Zip-Operator.

Es gibt ein paar verschiedene Möglichkeiten, es zu verwenden, also schauen wir uns ein Beispiel an. Angenommen, wir haben ein paar einfache Observables verschiedener Typen:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

Der einfachste Weg, auf sie alle zu warten, ist ungefähr so:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Beachten Sie, dass die Parameter in der Zip-Funktion konkrete Typen haben, die den Typen der gezippten Observablen entsprechen.

Das Zippen einer Liste von Observablen ist ebenfalls möglich, entweder direkt:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

…oder indem Sie die Liste in eine umbrechen Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

In beiden Fällen kann die Zip-Funktion jedoch nur eine einzige akzeptieren Object[] Parameter, da die Typen der Observablen in der Liste sowie ihre Anzahl nicht im Voraus bekannt sind. Dies bedeutet, dass die Zip-Funktion die Anzahl der Parameter überprüfen und entsprechend umwandeln müsste.

Unabhängig davon werden alle oben genannten Beispiele schließlich gedruckt 1 Blah true

BEARBEITEN: Stellen Sie bei der Verwendung von Zip sicher, dass die Observables gezippt werden, geben alle die gleiche Anzahl von Elementen aus. In den obigen Beispielen haben alle drei Observablen ein einzelnes Element emittiert. Wenn wir sie in etwa so ändern würden:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Dann 1, Blah, True und 2, Hello, True wären die einzigen Elemente, die an die Zip-Funktion(en) übergeben werden. Der Artikel 3würde niemals gezippt werden, da die anderen Observables abgeschlossen sind.

  • Dies funktioniert nicht, wenn einer der Anrufe fehlschlägt. In diesem Fall gehen alle Anrufe verloren.

    – StarWind0

    14. November 2016 um 22:09 Uhr

  • @StarWind0 Sie können den Fehler durch Verwendung überspringen onErrorResumeNextBeispiel: Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())

    – vuhung3990

    22. März 2018 um 10:11 Uhr

  • Was ist, wenn ich 100 Observables habe?

    – Krzysztof Kubicki

    13. November 2019 um 12:13 Uhr

  • Um mit Fehlern umzugehen, was ist hier der beste Ansatz

    – Jagen

    19. Januar 2021 um 15:25 Uhr

Benutzer-Avatar
eis

Von den vorgeschlagenen Vorschlägen kombiniert zip() tatsächlich beobachtbare Ergebnisse miteinander, die möglicherweise gewünscht sind oder nicht, aber in der Frage nicht gestellt wurden. In der Frage war alles, was gewünscht wurde, die Ausführung jeder der Operationen, entweder einzeln oder parallel (was nicht angegeben wurde, aber das verknüpfte Bolts-Beispiel betraf die parallele Ausführung). Außerdem wird zip() sofort abgeschlossen, wenn eines der Observables abgeschlossen ist, sodass es gegen die Anforderungen verstößt.

Für die parallele Ausführung von Observables ist flatMap() in der anderen Antwort in Ordnung, aber verschmelzen() wäre direkter. Beachten Sie, dass das Zusammenführen bei einem Fehler eines der Observables beendet wird. Wenn Sie das Beenden lieber verschieben, bis alle Observables beendet sind, sollten Sie sich ansehen mergeDelayError().

Für eins nach dem anderen, denke ich Statische Methode Observable.concat() sollte benutzt werden. Sein Javadoc besagt Folgendes:

concat(java.lang.Iterable> sequences) Flacht ein Iterable von Observables nacheinander zu einem Observable ab, ohne sie zu verschachteln

was nach dem klingt, wonach Sie suchen, wenn Sie keine parallele Ausführung wünschen.

Auch wenn Sie nur an der Ausführung Ihrer Aufgabe interessiert sind, nicht an der Rückgabe von Werten, sollten Sie sich wahrscheinlich damit befassen Abschließbar Anstatt von Beobachtbar.

TLDR: Für die Einzelausführung von Aufgaben und das Oncompletion-Ereignis, wenn sie abgeschlossen sind, ist Completable.concat() meiner Meinung nach am besten geeignet. Für die parallele Ausführung klingt Completable.merge() oder Completable.mergeDelayError() wie die Lösung. Ersteres stoppt sofort bei jedem Fehler auf jeder ergänzbaren, letzteres führt sie alle aus, selbst wenn einer von ihnen einen Fehler hat, und meldet erst dann den Fehler.

Benutzer-Avatar
Kevin ABRIOUX

Mit Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

Es ist wichtig, den Typ für die Argumente der Funktion festzulegen, da sonst Kompilierungsfehler auftreten

Der letzte Argumenttyp ändert sich mit der Anzahl der Argumente: BiFunction für 2 Function3 für 3 Function4 für 4 …

Sie haben wahrscheinlich auf die geschaut zip Operator, der mit 2 Observables arbeitet.

Es gibt auch die statische Methode Observable.zip. Es hat eine Form, die für Sie nützlich sein sollte:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Sie können die überprüfen javadoc für mehr.

Benutzer-Avatar
Sjörs

Ich schreibe einige Berechnungscodes in Kotlin mit JavaRx Observables und RxKotlin. Ich möchte eine Liste der zu vervollständigenden Observables beobachten und mich in der Zwischenzeit über den Fortschritt und das neueste Ergebnis auf dem Laufenden halten. Am Ende liefert es das beste Berechnungsergebnis. Eine zusätzliche Anforderung war, Observables parallel auszuführen, um alle meine CPU-Kerne zu nutzen. Am Ende bin ich bei dieser Lösung gelandet:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}

Benutzer-Avatar
Anton Makow

Ich hatte ein ähnliches Problem, ich musste Suchbegriffe aus dem Rest-Call abrufen und gleichzeitig gespeicherte Vorschläge aus einer RecentSearchProvider.AUTHORITY integrieren und zu einer einheitlichen Liste kombinieren. Ich habe versucht, die @MyDogTom-Lösung zu verwenden, leider gibt es in RxJava keine Observable.from. Nach einiger Recherche habe ich eine Lösung gefunden, die für mich funktioniert hat.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

Aus dem Array der Observables habe ich ein Observable erstellt, das je nach Anfrage Listen mit Vorschlägen und Ergebnissen aus dem Internet enthält. Danach gehen Sie diese Aufgaben einfach mit flatMapIterable durch und führen sie mit flatmap aus, platzieren die Ergebnisse in einem Array, das später in eine Recycling-Ansicht abgerufen werden kann.

1290220cookie-checkKombinieren Sie eine Liste von Observables und warten Sie, bis alle abgeschlossen sind

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy