block()/blockFirst()/blockLast() blockieren Fehler beim Aufrufen von bodyToMono NACH Austausch()

Lesezeit: 6 Minuten

Benutzer-Avatar
DaithiG

Ich versuche, Webflux zu verwenden, um eine generierte Datei an einen anderen Ort zu streamen, aber wenn bei der Generierung der Datei ein Fehler aufgetreten ist, gibt die API Erfolg zurück, aber mit einem DTO, das die Fehler beim Generieren der Datei anstelle der Datei selbst beschreibt. Dies verwendet eine sehr alte und schlecht gestaltete API, also entschuldigen Sie bitte die Verwendung von Post und das API-Design.

Die Antwort vom API-Aufruf (exchange()) ist eine ClientResponse. Von hier aus kann ich entweder mit bodyToMono in eine ByteArrayResource konvertieren, die in eine Datei gestreamt werden kann, oder, wenn beim Erstellen der Datei ein Fehler auftritt, kann ich auch mit bodyToMono in das DTO konvertieren. Ich kann jedoch anscheinend weder noch noch je nach Inhalt des Headers von ClientResponse tun.

Zur Laufzeit bekomme ich eine IllegalStateException verursacht durch

block()/blockFirst()/blockLast() blockieren, was in Thread Reactor-http-client-epoll-12 nicht unterstützt wird

Ich denke, mein Problem ist, dass ich block() nicht zweimal in derselben Funktionskette aufrufen kann.

Mein Codeschnipsel sieht so aus:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

Grundsätzlich möchte ich die ClientResponse basierend auf dem im Header definierten MediaType unterschiedlich verarbeiten.

Ist das möglich?

  • Blockiere nicht, subscribe. Es sollte kein Grund für einen Anruf bestehen block. Wenn Sie WebFlux verwenden, tun Sie dies, weil Sie eine reaktive Pipeline aufbauen möchten, wenn Sie anrufen block das tust du nicht. Anders ausgedrückt, wenn Sie anrufen blockverwenden Sie einfach eine einfache alte RestTemplate – Ihr Code sieht sowieso sehr prozedural und nebeneffektiv aus, also wird es nicht magisch reaktiv, wenn Sie ihn in Reactor einbauen.

    – Boris die Spinne

    20. Juli 2018 um 20:37 Uhr


  • Zur Verdeutlichung versuchen wir, eine Datei mithilfe von WebClient von einer Web-API auf die Festplatte zu streamen. Die Antwort könnte beides sein 200 OK Anwendung/json bei Fehlern bzw 200 OK Inhaltsdisposition wenn keine Fehler. Wie würden wir das mit WebClient machen, ohne die Datei vollständig in den Speicher zu laden?

    – Dukethrash

    20. Juli 2018 um 21:43 Uhr


Zuerst ein paar Dinge, die Ihnen helfen, das Code-Snippet zu verstehen, das diesen Anwendungsfall löst.

  1. Sie sollten niemals eine blockierende Methode innerhalb einer Methode aufrufen, die einen reaktiven Typ zurückgibt; Sie blockieren einen der wenigen Threads Ihrer Anwendung und es ist sehr schlecht für die Anwendung
  2. Jedenfalls ab Reaktor 3.2, Das Blockieren innerhalb einer reaktiven Pipeline löst einen Fehler aus
  3. Berufung subscribe, wie in den Kommentaren vorgeschlagen, ist auch keine gute Idee. Es ist mehr oder weniger so, als würde man diesen Job als Aufgabe in einem separaten Thread starten. Sie erhalten einen Rückruf, wenn es fertig ist (die subscribe Methoden können Lambdas gegeben werden), aber Sie entkoppeln tatsächlich Ihre aktuelle Pipeline von dieser Aufgabe. In diesem Fall könnte die Client-HTTP-Antwort geschlossen und Ressourcen bereinigt werden, bevor Sie die Möglichkeit haben, den vollständigen Antworttext zu lesen, um ihn in eine Datei zu schreiben
  4. Wenn Sie nicht die gesamte Antwort im Speicher puffern möchten, bietet Spring dies an DataBuffer (Denken Sie an ByteBuffer-Instanzen, die gepoolt werden können).
  5. Sie können block aufrufen, wenn die Methode, die Sie implementieren, selbst blockiert (returning void zum Beispiel), zum Beispiel in einem Testfall.

Hier ist ein Code-Snippet, mit dem Sie dies tun könnten:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

Wie Sie sehen können, blockieren wir nirgendwo und Methoden, die sich mit I/O befassen, kehren zurück Mono<Void>was das reaktive Äquivalent von a ist done(error) Rückruf, der signalisiert, wenn etwas erledigt ist und ein Fehler aufgetreten ist.

Da bin ich mir nicht sicher was die createErrorFile Methode tun sollte, habe ich ein Beispiel für bereitgestellt createSpreadsheet das schreibt nur die Body-Bytes in eine Datei. Beachten Sie, dass wir Datenpuffer freigeben müssen, sobald wir fertig sind, da Datenpuffer möglicherweise recycelt/gepoolt werden.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

Mit dieser Implementierung enthält Ihre Anwendung einige DataBuffer Instanzen im Speicher zu einem bestimmten Zeitpunkt (die reaktiven Operatoren rufen Werte aus Leistungsgründen vorab ab) und schreiben Bytes, sobald sie auf reaktive Weise kommen.

  • Was ist mit bodyToMono (Resource.class)? Sollte das nicht schon für die Pufferung sorgen?

    – Dukethrash

    24. Juli 2018 um 20:21 Uhr

  • Der ResourceDecoder unterstützt nur In-Memory-Varianten wie z InputStreamResource.class oder ByteArrayResource.class. Weil du einen Kurs gibst bodyToMono und keine Instanz, Sie können es nicht wirklich bitten, in eine vorhandene Datei zu schreiben.

    – Brian Clozel

    24. Juli 2018 um 20:30 Uhr

  • Dann lädt bodyToMono(ByteArrayResource.class) also auch die gesamte Datei in den Speicher und streamt die Bytes nicht in das Dateisystem? Würde nicht ExchangeFilterFunction besser geeignet sein, als eine separate Methode zu erstellen, z createSpreadsheet?

    – Dukethrash

    26. Juli 2018 um 2:19 Uhr


  • Ich verstehe nicht, wie Sie erwarten, dass diese Austauschfunktion funktioniert. Bitte stellen Sie eine neue Frage, die zeigt, was Sie im Sinn haben

    – Brian Clozel

    26. Juli 2018 um 2:53 Uhr

  • Diese Lösung hält den Dateihandler unter Linux geöffnet. Die Datei wird als 0 KB geschrieben und hat Tonnen von untergeordneten Prozessen, die die Datei handhaben. Ich konnte keine Möglichkeit finden, die Datei wie mit channel.close() zu schließen. Irgendwann sagte das Öffnen der Excel-Datei in Windows, dass sie bereits geöffnet war.

    – Dukethrash

    10. August 2018 um 3:02 Uhr

Benutzer-Avatar
Adelin

[UPDATE 2021/10/19]

toProcessor() ist jetzt veraltet.

Erwägen Sie die Verwendung

myMono.toFuture().get();

Wie in der Antwort mit den meisten Stimmen angegeben, sollte man niemals blockieren. In meinem Fall ist dies die einzige Option, da wir eine reaktive Bibliothek innerhalb eines zwingenden Codes verwenden. Die Sperrung kann durch erfolgen Einwickeln des Mono in einen Prozessor:

myMono.toProcessor().block()

  • Dies ist eine veraltete Methode. Die Dokumentation fordert uns auf, Share zu verwenden

    – Anisch

    5. Januar 2021 um 10:13 Uhr

  • myMono.toFuture().get(); Dauert ewig und die Anwendung führt nie die nächste Codezeile aus. Ich habe es auf Post versucht, mit mehreren funktionierenden URLs. Irgendeine Idee?

    – MA1

    17. Januar um 14:26 Uhr

Verwenden Sie zum Ausführen von Clientanforderungen außerhalb des Serveranforderungspools myWebClientMono.share().block();

  • Was macht share() genau?

    – JOSEPH Blessingh

    6. April 2021 um 7:44 Uhr

  • Es wird einen Blockierungsaufruf außerhalb des Pools des aktuellen Workers ausführen.

    – Anton Seredkin

    15. April 2021 um 16:11 Uhr

  • Das hat bei mir leider nicht funktioniert, . Ich verwende Springboot 2.4.5. Irgendeine Idee?

    – MA1

    16. Januar um 4:49 Uhr

RestResultMessage message= createWebClient()
                .get()
                .uri(uri)
                .exchange()
                .map(clientResponse -> {
                    //delegation
                    ClientResponseWrapper wrapper = new 
                                 ClientResponseWrapper(clientResponse);
                    return Mono.just(wrapper);
                })
                .block() //wait until request is not done
                .map(result -> {  
                    //convert to any data
                    if (!result.statusCode().isError()){
                       //extract the result from request
                        return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
                    } else {
                        return create(RestResultMessage.Result.error, result.statusCode().name());
                    }
                })
                .block();

1018310cookie-checkblock()/blockFirst()/blockLast() blockieren Fehler beim Aufrufen von bodyToMono NACH Austausch()

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy