Concat VS Merge-Operator

Lesezeit: 5 Minuten

Benutzer-Avatar
paul

Ich habe die Dokumentation von RXJava überprüft und festgestellt, dass Concat- und Merge-Operatoren dasselbe zu tun scheinen. Ich habe sicherheitshalber einen Test geschrieben.

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

Die Dokumentation sagt

Der Merge-Operator ist ebenfalls ähnlich. Es kombiniert die Emissionen von zwei oder mehr Observables, kann sie aber verschachteln, während Concat niemals die Emissionen von mehreren Observablen verschachtelt.

Aber ich verstehe immer noch nicht ganz, wenn ich diesen Test tausendmal durchführe, ist das Ergebnis der Zusammenführung immer dasselbe. Da der Auftrag nicht erteilt wurde, erwartete ich manchmal “reaktives” “Welt” “Hallo” zum Beispiel.

Der Code ist hier https://github.com/politrons/reactive

Benutzer-Avatar
Artur Biesiadowski

Es ist wie in der von Ihnen zitierten Dokumentation beschrieben – Merge kann die Ausgaben verschachteln, während concat zuerst auf das Ende früherer Streams wartet, bevor spätere Streams verarbeitet werden. In Ihrem Fall macht es bei statischen Streams mit einem Element keinen wirklichen Unterschied (aber theoretisch könnte Merge Wörter in zufälliger Reihenfolge ausgeben und dennoch gemäß Spezifikation gültig sein). Wenn Sie den Unterschied sehen möchten, versuchen Sie Folgendes (Sie müssen danach etwas Schlaf hinzufügen, um ein vorzeitiges Verlassen zu vermeiden):

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

gegen

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat wird niemals mit dem Drucken von B beginnen, da Stream A niemals beendet wird.

s/stream/beobachtbar/g 😉

Die Dokumentation gibt schöne Grafiken, um den Unterschied zu zeigen. Sie müssen sich daran erinnern, dass Zusammenführen nein ergibt Garantie Elemente einzeln zu verschachteln, ist nur eines von möglichen Beispielen.

Konkat

Concat-Operator
Verschmelzen

Operator zusammenführen

  • Eine kurze Anmerkung ist, dass, wenn die Quellen dann synchron sind merge = concat

    – Dave Moten

    12. August 2016 um 3:39 Uhr

  • Guter Punkt!, das erklärt, warum mein Beispiel immer in der gleichen Reihenfolge abläuft. Das war der Teil, der mich verwirrte. Mit dem Beispiel von Dave, da Intervall (async) ist, ist es reproduzierbar

    – paul

    12. August 2016 um 7:59 Uhr

  • @ ArturBiesiadowski Es macht Ihnen etwas aus, hinzuzufügen zip auch in deiner Antwort?

    – Jared Burrows

    9. Oktober 2016 um 23:51 Uhr


  • @DaveMotenby zum Beispiel, wenn meine eine Quelle im Netzwerk und die zweite eine Datenbank ist, also sind beide richtig asynchron

    – Pratham Kesarkar

    28. Juli 2017 um 7:06 Uhr

  • @AbhishekKumar Synchron bedeutet, dass eine Quelle die Kontrolle nicht aufgibt, bis sie abgeschlossen ist.

    – Julian A.

    16. März 2021 um 23:41 Uhr

Konkat

Concat gibt die Emissionen von zwei oder mehr Observables aus, ohne sie zu verschachteln. Es behält die Reihenfolge der Observables bei, während es die Items aussendet. Das bedeutet, dass alle Items des ersten Observable emittiert werden und dann alle Items des zweiten Observable und so weiter.

Concat-Operator

Lassen Sie es uns anhand eines Beispiels klar verstehen.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Da wir den Concat-Operator verwenden, behält er die Reihenfolge bei und gibt die Werte als A1, A2, A3, A4, B1, B2, B3 aus.

Verschmelzen

Merge kombiniert mehrere Observables zu einem, indem ihre Emissionen zusammengeführt werden. Es wird die Reihenfolge nicht beibehalten, während die Artikel ausgegeben werden.

Zusammenführungsoperator

Lassen Sie es uns anhand eines Beispiels klar verstehen.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Da wir den Merge-Operator verwenden, behält er die Reihenfolge nicht bei und kann die Werte in beliebiger Reihenfolge ausgeben, z A1, B1, A2, A3, B2, B3, A4 oder A1, A2, B1, B2, A3, A4, B3 oder kann alles sein.

So sollten wir je nach Anwendungsfall die Concat- und Merge-Operatoren in RxJava verwenden.

Benutzer-Avatar
Thrakisch

Wenn beide Quellen synchron sind, wie OP angegeben hat, geben sie die genaue Reihenfolge von beiden zurück verschmelzen und konkat.

Aber wenn Quellen nicht synchron sind, z. B. zwei Quellen wie eine Datenbank und ein Socket, z. B. Push-Daten mit zufälligen oder unbestimmten Verzögerungen, ist das Ergebnis oder die Reihenfolge der Daten unterschiedlich verschmelzen und konkat.

fun <T> Observable<T>.getAsyncItems(): Observable<T> {

    return concatMap {

        val delay = Random().nextInt(10)

        Observable.just(it)
                .delay(delay.toLong(), TimeUnit.MILLISECONDS)
    }
}

Diese Erweiterungsfunktion oben fügt jeder Emission zufällig eine Verzögerung hinzu, aber sie werden immer in der gleichen Reihenfolge emittiert. Wenn die Daten A, B, C, D und E sind, dann ist die Ausgabe immer A, B, C, D und E.

Jetzt vergleichen wir wie merge und concat Verhalten, wenn Quellen asynchron sind.

val source1 =
        Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
        Observable.just(1, 2, 3, 4, 5).getAsyncItems()

Observable.merge(source1, source2).subscribe {print("$it-")}

Druckt sowas A-B-1-C-2-D-3-4-E-5- oder A-1-2-3-B-C-D-4-E-5-
es hängt von der zufälligen Verzögerung ab.

Observable.concat(source1, source2).subscribe {print("$it-")}

Drucke A-B-C-D-E-1-2-3-4-5- egal wie oft er läuft.

1034670cookie-checkConcat VS Merge-Operator

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy