Ich habe die Dokumentation von RXJava überprüft und festgestellt, dass Concat- und Merge-Operatoren dasselbe zu tun scheinen. Ich habe sicherheitshalber einen Test geschrieben.
@Test
public void testContact() {
Observable.concat(Observable.just("Hello"),
Observable.just("reactive"),
Observable.just("world"))
.subscribe(System.out::println);
}
@Test
public void testMerge() {
Observable.merge(Observable.just("Hello"),
Observable.just("reactive"),
Observable.just("world"))
.subscribe(System.out::println);
}
Die Dokumentation sagt
Der Merge-Operator ist ebenfalls ähnlich. Es kombiniert die Emissionen von zwei oder mehr Observables, kann sie aber verschachteln, während Concat niemals die Emissionen von mehreren Observablen verschachtelt.
Aber ich verstehe immer noch nicht ganz, wenn ich diesen Test tausendmal durchführe, ist das Ergebnis der Zusammenführung immer dasselbe. Da der Auftrag nicht erteilt wurde, erwartete ich manchmal “reaktives” “Welt” “Hallo” zum Beispiel.
Es ist wie in der von Ihnen zitierten Dokumentation beschrieben – Merge kann die Ausgaben verschachteln, während concat zuerst auf das Ende früherer Streams wartet, bevor spätere Streams verarbeitet werden. In Ihrem Fall macht es bei statischen Streams mit einem Element keinen wirklichen Unterschied (aber theoretisch könnte Merge Wörter in zufälliger Reihenfolge ausgeben und dennoch gemäß Spezifikation gültig sein). Wenn Sie den Unterschied sehen möchten, versuchen Sie Folgendes (Sie müssen danach etwas Schlaf hinzufügen, um ein vorzeitiges Verlassen zu vermeiden):
Concat wird niemals mit dem Drucken von B beginnen, da Stream A niemals beendet wird.
s/stream/beobachtbar/g 😉
Die Dokumentation gibt schöne Grafiken, um den Unterschied zu zeigen. Sie müssen sich daran erinnern, dass Zusammenführen nein ergibt Garantie Elemente einzeln zu verschachteln, ist nur eines von möglichen Beispielen.
Konkat
Verschmelzen
Eine kurze Anmerkung ist, dass, wenn die Quellen dann synchron sind merge = concat
– Dave Moten
12. August 2016 um 3:39 Uhr
Guter Punkt!, das erklärt, warum mein Beispiel immer in der gleichen Reihenfolge abläuft. Das war der Teil, der mich verwirrte. Mit dem Beispiel von Dave, da Intervall (async) ist, ist es reproduzierbar
– paul
12. August 2016 um 7:59 Uhr
@ ArturBiesiadowski Es macht Ihnen etwas aus, hinzuzufügen zip auch in deiner Antwort?
– Jared Burrows
9. Oktober 2016 um 23:51 Uhr
@DaveMotenby zum Beispiel, wenn meine eine Quelle im Netzwerk und die zweite eine Datenbank ist, also sind beide richtig asynchron
– Pratham Kesarkar
28. Juli 2017 um 7:06 Uhr
@AbhishekKumar Synchron bedeutet, dass eine Quelle die Kontrolle nicht aufgibt, bis sie abgeschlossen ist.
– Julian A.
16. März 2021 um 23:41 Uhr
Konkat
Concat gibt die Emissionen von zwei oder mehr Observables aus, ohne sie zu verschachteln. Es behält die Reihenfolge der Observables bei, während es die Items aussendet. Das bedeutet, dass alle Items des ersten Observable emittiert werden und dann alle Items des zweiten Observable und so weiter.
Lassen Sie es uns anhand eines Beispiels klar verstehen.
final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};
final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);
Observable.concat(observableFirst, observableSecond)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Da wir den Concat-Operator verwenden, behält er die Reihenfolge bei und gibt die Werte als A1, A2, A3, A4, B1, B2, B3 aus.
Verschmelzen
Merge kombiniert mehrere Observables zu einem, indem ihre Emissionen zusammengeführt werden. Es wird die Reihenfolge nicht beibehalten, während die Artikel ausgegeben werden.
Lassen Sie es uns anhand eines Beispiels klar verstehen.
final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};
final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);
Observable.merge(observableFirst, observableSecond)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Da wir den Merge-Operator verwenden, behält er die Reihenfolge nicht bei und kann die Werte in beliebiger Reihenfolge ausgeben, z A1, B1, A2, A3, B2, B3, A4 oder A1, A2, B1, B2, A3, A4, B3 oder kann alles sein.
So sollten wir je nach Anwendungsfall die Concat- und Merge-Operatoren in RxJava verwenden.
Thrakisch
Wenn beide Quellen synchron sind, wie OP angegeben hat, geben sie die genaue Reihenfolge von beiden zurück verschmelzen und konkat.
Aber wenn Quellen nicht synchron sind, z. B. zwei Quellen wie eine Datenbank und ein Socket, z. B. Push-Daten mit zufälligen oder unbestimmten Verzögerungen, ist das Ergebnis oder die Reihenfolge der Daten unterschiedlich verschmelzen und konkat.
fun <T> Observable<T>.getAsyncItems(): Observable<T> {
return concatMap {
val delay = Random().nextInt(10)
Observable.just(it)
.delay(delay.toLong(), TimeUnit.MILLISECONDS)
}
}
Diese Erweiterungsfunktion oben fügt jeder Emission zufällig eine Verzögerung hinzu, aber sie werden immer in der gleichen Reihenfolge emittiert. Wenn die Daten A, B, C, D und E sind, dann ist die Ausgabe immer A, B, C, D und E.
Jetzt vergleichen wir wie merge und concat Verhalten, wenn Quellen asynchron sind.
val source1 =
Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
Observable.just(1, 2, 3, 4, 5).getAsyncItems()
Observable.merge(source1, source2).subscribe {print("$it-")}
Druckt sowas A-B-1-C-2-D-3-4-E-5- oder A-1-2-3-B-C-D-4-E-5-
es hängt von der zufälligen Verzögerung ab.