ThreadPoolExecutor-Block, wenn seine Warteschlange voll ist?

Lesezeit: 9 Minuten

Benutzeravatar von ghempton
Gempton

Ich versuche, viele Aufgaben mit einem ThreadPoolExecutor auszuführen. Nachfolgend ein hypothetisches Beispiel:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

Das Problem ist, dass ich schnell eine bekomme java.util.concurrent.RejectedExecutionException da die Anzahl der Aufgaben die Größe der Arbeitswarteschlange übersteigt. Das gewünschte Verhalten, nach dem ich suche, besteht jedoch darin, den Hauptthread zu blockieren, bis Platz in der Warteschlange ist. Was ist der beste Weg, dies zu erreichen?

  • Schauen Sie sich diese Frage an: stackoverflow.com/questions/2001086/…

    – Kiril

    10. August 2010 um 4:33 Uhr

  • Diese Antwort auf eine andere Frage schlägt vor, einen Benutzernamen zu verwenden BlockingQueue Unterklasse, die blockiert offer() durch Delegierung an put(). Ich denke, das funktioniert mehr oder weniger genauso wie das RejectedExecutionHandler was ruft getQueue().put().

    – Robert Tupelo-Schneck

    29. März 2014 um 20:50 Uhr

  • Das direkte Einreihen in die Warteschlange wäre falsch, wie in dieser Antwort stackoverflow.com/a/3518588/585903 erläutert

    – Sumit Jain

    9. April 2015 um 13:42 Uhr

  • @SumitJain Lesen Sie diese Antwort sorgfältiger. Nur einer der drei in dieser Antwort erhobenen Einwände gilt für den in @Robert Tupelo-Schnecks Kommentar vorgeschlagenen Ansatz. Durch Aufruf put() Von innerhalb der Warteschlange selbst greifen Sie nicht über auf die Warteschlange zu getQueue() (Einwand Nr. 3) und das Objekt, das Sie platzieren, ist gegebenenfalls bereits ordnungsgemäß verpackt (Einwand Nr. 2). Es besteht immer noch die Gefahr eines Deadlocks, wenn alle Ihre Threads sterben, bevor das Element aus der Warteschlange kommt, aber die meisten Leute, die nach dieser speziellen Lösung suchen, könnten dieses Risiko eingehen.

    – Tim

    27. April 2017 um 21:22 Uhr

  • Mögliches Duplikat von How to make ThreadPoolExecutor’s submit() method block if it is gesättigted?

    – Rogerpack

    5. Oktober 2018 um 16:40 Uhr

Benutzeravatar von Darren Gilroy
Darren Gilroy

Unter einigen sehr engen Umständen können Sie einen java.util.concurrent.RejectedExecutionHandler implementieren, der das tut, was Sie brauchen.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Jetzt. Das ist eine sehr schlechte idee aus den folgenden Gründen

  • Es ist anfällig für Deadlocks, da alle Threads im Pool sterben können, bevor das Ding, das Sie in die Warteschlange gestellt haben, sichtbar ist. Mildern Sie dies, indem Sie eine angemessene Keep-Alive-Zeit festlegen.
  • Die Aufgabe wird nicht so verpackt, wie es Ihr Executor erwarten könnte. Viele Executor-Implementierungen verpacken ihre Aufgaben vor der Ausführung in eine Art Tracking-Objekt. Schauen Sie sich die Quelle von Ihnen an.
  • Das Hinzufügen über getQueue() wird von der API dringend empfohlen und kann irgendwann verboten werden.

Eine fast immer bessere Strategie ist die Installation von ThreadPoolExecutor.CallerRunsPolicy, die Ihre App drosselt, indem die Aufgabe in dem Thread ausgeführt wird, der execute() aufruft.

Manchmal ist eine Sperrstrategie mit all ihren Risiken jedoch wirklich das, was Sie wollen. Unter diesen Bedingungen würde ich sagen

  • Sie haben nur einen Thread, der execute() aufruft
  • Sie müssen (oder wollen) eine sehr kleine Warteschlangenlänge haben
  • Sie müssen unbedingt die Anzahl der Threads begrenzen, die diese Arbeit ausführen (normalerweise aus externen Gründen), und eine Caller-Runs-Strategie würde das brechen.
  • Ihre Aufgaben haben eine unvorhersehbare Größe, sodass Caller-Runs zu einer Hungersnot führen könnten, wenn der Pool vorübergehend mit 4 kurzen Aufgaben beschäftigt war und Ihre Ausführung, die einen Thread aufruft, bei einer großen hängen bleibt.

Also wie gesagt. Es wird selten benötigt und kann gefährlich sein, aber los geht’s.

Viel Glück.

  • Eine sehr durchdachte Antwort. Ich habe ein kleines Problem mit Ihrer Bedingung, dass > “Sie eine sehr kleine Warteschlangenlänge haben müssen (oder wollen).” Sie können möglicherweise nicht vorhersagen, wie viele Aufgaben ein bestimmter Job in die Warteschlange stellen wird. Vielleicht führen Sie einen täglichen Job aus, der Daten aus irgendeiner Datenbank verarbeitet, und am Montag müssen 500 Datensätze verarbeitet werden, aber am Dienstag sind es 50.000. Sie müssen eine Obergrenze für Ihre Warteschlange festlegen, damit Sie Ihren Haufen nicht sprengen, wenn ein großer Job durchkommt. In diesem Fall schadet es nicht, auf den Abschluss einiger Aufgaben zu warten, bevor Sie weitere in die Warteschlange stellen.

    – schlaksig

    15. Januar 2015 um 14:43 Uhr


  • „Es ist anfällig für Deadlocks, da alle Threads im Pool sterben können, bevor das Ding, das Sie in die Warteschlange gestellt haben, sichtbar ist. Mildern Sie dies, indem Sie eine angemessene Keep-Alive-Zeit festlegen.“ Kann ein Deadlock nicht vollständig vermieden werden, indem die minimale Poolgröße auf einen Wert größer als Null gesetzt wird? Jeder andere Grund ist, dass Java keine eingebaute Unterstützung für das Blockieren von Puts in Executor-Warteschlangen hat. Was interessant ist, weil es eine ziemlich vernünftige Strategie zu sein scheint. Ich frage mich, was die Begründung ist.

    – Tim Pote

    30. September 2015 um 12:05 Uhr

  • Vielleicht ist eine weitere Bedingung für eine Blockierungsstrategie, wenn die Ausführungsreihenfolge wichtig ist. CallerRunsPolicy bedeutet, dass die abgelehnte Aufgabe wahrscheinlich vor anderen ausstehenden Elementen im Executor ausgeführt wird.

    – Adrian Bäcker

    10. März 2018 um 19:24 Uhr


  • @TimPote Die aktuelle Implementierung von execute() ab Java 8 kümmert sich auch um diese Bedingung. Wenn eine Aufgabe erfolgreich in die Warteschlange gestellt werden kann, * müssen wir noch überprüfen, ob wir einen Thread hätten hinzufügen sollen * (weil vorhandene Threads seit der letzten Überprüfung gestorben sind) oder ob * der Pool seit dem Eintritt in diese Methode heruntergefahren wurde. Also * überprüfen wir den Status erneut und setzen bei Bedarf das Einreihen in die Warteschlange zurück, wenn * gestoppt wurde, oder starten einen neuen Thread, wenn es keine gibt. Darren, sehen Sie bei diesem Ansatz auch Probleme mit Java 8?

    – verrückter Oberst10

    26. Mai 2020 um 22:04 Uhr


Was Sie tun müssen, ist, Ihren ThreadPoolExecutor in Executor zu verpacken, der die Anzahl der gleichzeitig ausgeführten Operationen darin explizit begrenzt:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

Sie können concurrentTasksLimit an die threadPoolSize + queueSize Ihres Delegate Executor anpassen und es wird Ihr Problem so ziemlich lösen

  • Schön und glatt. Vielen Dank!

    – Bastian Voigt

    3. März 2020 um 15:37 Uhr

Sie könnten eine verwenden semaphore um zu verhindern, dass Threads in den Pool gelangen.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Einige Fallstricke:

  • Verwenden Sie dieses Muster nur mit einem festen Thread-Pool. Es ist unwahrscheinlich, dass die Warteschlange oft voll ist, daher werden keine neuen Threads erstellt. Weitere Informationen finden Sie in den Java-Dokumenten zu ThreadPoolExecutor: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html Es gibt einen Weg, dies zu umgehen, der jedoch außerhalb des Bereichs dieser Antwort liegt.
  • Die Warteschlangengröße sollte höher sein als die Anzahl der Kernthreads. Wenn wir die Warteschlangengröße 3 machen würden, würde am Ende Folgendes passieren:

    • T0: Alle drei Threads arbeiten, die Warteschlange ist leer, es sind keine Genehmigungen verfügbar.
    • T1: Thread 1 beendet, gibt eine Genehmigung frei.
    • T2: Thread 1 fragt die Warteschlange nach neuer Arbeit ab, findet keine und wartet.
    • T3: Haupt-Thread reicht Arbeit in den Pool ein, Thread 1 beginnt mit der Arbeit.

    Das obige Beispiel wird übersetzt, um den Hauptthread zu threaden Blockierung Thread 1. Es mag wie ein kleiner Zeitraum erscheinen, aber jetzt multiplizieren Sie die Häufigkeit mit Tagen und Monaten. Plötzlich summieren sich kurze Zeiträume zu einer großen Zeitverschwendung.

  • Thread 1 ist bereits zum Zeitpunkt T2 blockiert, wenn er feststellt, dass die Warteschlange leer ist. Ich bin mir nicht sicher, ob ich Ihren Punkt verstanden habe, dass der Hauptthread diesen Thread blockiert.

    – asg

    11. November 2018 um 21:54 Uhr

  • @asgs “Thread 1 ist bereits zum Zeitpunkt T2 blockiert, wenn er feststellt, dass die Warteschlange leer ist.” Richtig, und da der Haupt-Thread dafür verantwortlich ist, Arbeit in die Warteschlange zu stellen, können Sie daraus schließen, dass der Haupt-Thread Thread 1 blockiert.

    – devkaoru

    22. November 2018 um 17:56 Uhr


Dies ist, was ich am Ende getan habe:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

Eine ziemlich einfache Option ist es, Ihre zu wickeln BlockingQueue mit einer Implementierung, die aufruft put(..) Wenn offer(..) aufgerufen wird:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

Dies funktioniert, weil standardmäßig put(..) wartet, bis Kapazität in der Warteschlange vorhanden ist, wenn diese voll ist, sehen:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

Kein Fangen von RejectedExecutionException oder komplizierte Verriegelung notwendig.

Benutzeravatar von MoeHoss
MöHoss

Hier ist mein Code-Snippet in diesem Fall:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool ist eine lokale Instanz von java.util.concurrent.ExecutorService, die bereits initialisiert wurde.

Benutzeravatar von TinkerTank
TinkerTank

Ich habe dieses Problem mit einem benutzerdefinierten RejectedExecutionHandler gelöst, der den aufrufenden Thread einfach für eine Weile blockiert und dann versucht, die Aufgabe erneut zu senden:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Diese Klasse kann einfach im Thread-Pool-Executor als RejectedExecutionHandler wie jede andere Klasse verwendet werden. In diesem Beispiel:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

Der einzige Nachteil, den ich sehe, ist, dass der aufrufende Thread möglicherweise etwas länger als unbedingt erforderlich gesperrt wird (bis zu 250 ms). Verringern Sie für viele kurz laufende Aufgaben die Wartezeit vielleicht auf etwa 10 ms. Da dieser Executor effektiv rekursiv aufgerufen wird, können sehr lange Wartezeiten (Stunden) darauf, dass ein Thread verfügbar wird, zu einem Stapelüberlauf führen.

Trotzdem gefällt mir persönlich diese Methode. Es ist kompakt, leicht verständlich und funktioniert gut. Übersehe ich etwas Wichtiges?

1433330cookie-checkThreadPoolExecutor-Block, wenn seine Warteschlange voll ist?

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy