Node.js-Streams vs. Observables

Lesezeit: 5 Minuten

Benutzer-Avatar
urish

Nach dem Lernen BeobachtbareIch finde sie ziemlich ähnlich Node.js-Streams. Beide haben einen Mechanismus, um den Verbraucher zu benachrichtigen, wenn neue Daten eintreffen, ein Fehler auftritt oder keine Daten mehr vorhanden sind (EOF).

Ich würde gerne etwas über die konzeptionellen / funktionalen Unterschiede zwischen den beiden erfahren. Vielen Dank!

  • @BenjaminGruenbaum Ich frage mich, warum du das mit rxjs und Bacon getaggt hast? OP scheint sich auf die Observables von ecmascript-harmony zu beziehen

    – Bergi

    2. Juni 2015 um 3:49 Uhr

  • @Bergi Vorkenntnisse über OP und die Frage. Grundsätzlich.

    – Benjamin Grünbaum

    2. Juni 2015 um 7:02 Uhr

  • Lol Glückwunsch zu den Upvotes, aber ich habe keine Ahnung, warum diese Frage nicht geschlossen wurde. Wie ist das eine echte Frage / angemessen für SO.

    – Alexander Mills

    2. Februar 2017 um 6:40 Uhr


  • @AlexanderMills wieso ist das keine passende Frage für SO? Dies ist keine „Welches ist Ihr Favorit“-Frage; Es fragt nach den Unterschieden zwischen zwei häufig verwendeten reaktiven Mustern in JS/Node.

    – Michael Martin-Smucker

    11. April 2017 um 22:52 Uhr


Benutzer-Avatar
m4ktub

Beide Beobachtbare und node.js Ströme ermöglichen es Ihnen, das gleiche zugrunde liegende Problem zu lösen: eine Folge von Werten asynchron zu verarbeiten. Der Hauptunterschied zwischen den beiden, glaube ich, hängt mit dem Kontext zusammen, der ihr Erscheinen motivierte. Dieser Kontext spiegelt sich in der Terminologie und der API wider.

Auf der Beobachtbare Seite haben Sie eine Erweiterung zu EcmaScript, die das reaktive Programmiermodell einführt. Es versucht, die Lücke zwischen Wertgenerierung und Asynchronität mit den minimalistischen und komponierbaren Konzepten von zu schließen Observer und Observable.

Auf node.js und Ströme Seite wollten Sie eine Schnittstelle zur asynchronen und performanten Verarbeitung von Netzwerkstreams und lokalen Dateien schaffen. Die Terminologie leitet sich aus diesem anfänglichen Kontext ab und Sie erhalten pipe, chunk, encoding, flush, Duplex, Bufferusw. Durch einen pragmatischen Ansatz, der explizite Unterstützung für bestimmte Anwendungsfälle bietet, verlieren Sie etwas die Fähigkeit, Dinge zu komponieren, da dies nicht so einheitlich ist. Sie verwenden zum Beispiel push auf einen Readable streamen und write auf einen Writable Obwohl Sie konzeptionell dasselbe tun: einen Wert veröffentlichen.

In der Praxis also, wenn Sie sich die Konzepte ansehen und die Option verwenden { objectMode: true }Sie können übereinstimmen Observable mit dem Readable streamen und Observer mit dem Writable Strom. Sie können sogar einige einfache Adapter zwischen den beiden Modellen erstellen.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

Sie haben vielleicht bemerkt, dass ich einige Namen geändert und die einfacheren Konzepte von verwendet habe Observer und Subscriptionhier eingeführt, um die Überlastung der Verantwortlichkeiten zu vermeiden Beobachtbare in Generator. Grundsätzlich die Subscription ermöglicht es Ihnen, sich vom Abonnement abzumelden Observable. Wie auch immer, mit dem obigen Code können Sie a pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

Im Vergleich zu process.stdin.pipe(process.stdout), was Sie haben, ist eine Möglichkeit zum Kombinieren, Filtern und Transformieren von Streams, die auch für jede andere Datensequenz funktioniert. Sie können es mit erreichen Readable, Transformund Writable Streams, aber die API bevorzugt Unterklassen statt Verkettung Readables und Anwenden von Funktionen. Auf der Observable model. Beispielsweise entspricht das Transformieren von Werten dem Anwenden einer Transformer-Funktion auf den Stream. Es ist kein neuer Untertyp von erforderlich Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

Der Abschluss? Es ist einfach, das reaktive Modell und die einzuführen Observable Konzept überall. Es ist schwieriger, eine ganze Bibliothek um dieses Konzept herum zu implementieren. All diese kleinen Funktionen müssen konsistent zusammenarbeiten. Immerhin die ReaktivX Projekt ist immer noch am Laufen. Aber wenn Sie den Dateiinhalt wirklich an den Client senden, sich mit der Codierung befassen und ihn komprimieren müssen, dann ist die Unterstützung in NodeJS vorhanden, und es funktioniert ziemlich gut.

  • Ich bin mir bei dieser ganzen “Erweiterung von Ecmascript” wirklich nicht sicher. RxJS ist nur eine Bibliothek, genauso wie RxJava usw. Schließlich gibt es in ES7 oder ES8 möglicherweise einige Schlüsselwörter in ES/JS, die sich auf Observables beziehen, aber sie sind sicherlich nicht Teil der Sprache und schon gar nicht, als Sie die Frage beantwortet haben im Jahr 2015.

    – Alexander Mills

    31. Dezember 2016 um 9:36 Uhr

  • Unterstützt die RX-Implementierung verlustfreien Gegendruck? Wenn beispielsweise nodejs den Stream im angehaltenen Modus liest, können wir die verwenden read() Methode zum Lesen aus dem Stream bei Bedarf. Und die drain event kann signalisieren, dass der beschreibbare Stream mehr Daten empfangen kann.

    – Buggy

    3. Mai 2019 um 7:30 Uhr

1142270cookie-checkNode.js-Streams vs. Observables

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy