Python Multiprocessing.Pool Lazy Iteration

Lesezeit: 8 Minuten

Ich frage mich, wie die Multiprocessing.Pool-Klasse von Python mit map, imap und map_async funktioniert. Mein besonderes Problem besteht darin, dass ich einen Iterator abbilden möchte, der speicherintensive Objekte erstellt, und nicht möchte, dass alle diese Objekte gleichzeitig im Speicher generiert werden. Ich wollte sehen, ob die verschiedenen map()-Funktionen meinen Iterator auswringen oder die next()-Funktion nur dann intelligent aufrufen, wenn untergeordnete Prozesse langsam voranschreiten, also habe ich einige Tests als solche gehackt:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

Und so weiter mit map, imap und map_async. Dies ist jedoch das eklatanteste Beispiel, da ein einfacher Aufruf von next() auf g2 alle meine Elemente von meinem Generator g() ausgibt, während ich erwarten würde, dass es, wenn imap dies ‘faul’ tun würde, nur go.next aufruft () einmal, und drucke daher nur ‘1’ aus.

Kann jemand aufklären, was passiert, und ob es eine Möglichkeit gibt, den Prozesspool „faul“ den Iterator nach Bedarf auswerten zu lassen?

Danke,

Gabe

  • Nach dem Entfernen der time.sleep anrufen und a . hinzufügen print os.getpid(), x in f das Verhalten sieht noch seltsamer aus, manchmal werden nur 2 oder 3 verschiedene PIDs ausgegeben und führen immer eine andere Anzahl von Iterationen durch… Übrigens, welche Python-Version verwenden Sie?

    – Terseus

    15. März ’11 um 23:48

  • Python 2.6.6 (r266:84292, 26. Dezember 2010, 22:31:48) Standard-Debian-Installation.

    – Gabe

    15. März ’11 um 23:54

Python MultiprocessingPool Lazy Iteration
unutbu

Schauen wir uns zuerst das Ende des Programms an.

Das Multiprocessing-Modul verwendet atexit anrufen multiprocessing.util._exit_function wenn Ihr Programm endet.

Wenn Sie entfernen g2.next(), Ihr Programm endet schnell.

Der _exit_function ruft schließlich an Pool._terminate_pool. Der Hauptthread ändert den Status von pool._task_handler._state von RUN zu TERMINATE. Inzwischen die pool._task_handler Faden schlingt sich ein Pool._handle_tasks und springt aus, wenn die Bedingung erreicht ist

            if thread._state:
                debug('task handler found thread._state != RUN')
                break

(Siehe /usr/lib/python2.6/multiprocessing/pool.py)

Dies hindert den Task-Handler daran, Ihren Generator vollständig zu verbrauchen. g(). Wenn du reinschaust Pool._handle_tasks du wirst sehen

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break

Dies ist der Code, der Ihren Generator verbraucht. (taskseq ist nicht gerade dein Generator, aber als taskseq verbraucht wird, auch Ihr Generator.)

Im Gegensatz dazu, wenn Sie anrufen g2.next() der Hauptthread ruft auf IMapIterator.next, und wartet, wenn es erreicht self._cond.wait(timeout).

Dass der Hauptthread wartet, anstatt anzurufen _exit_function ermöglicht es dem Task-Handler-Thread, normal zu laufen, was bedeutet, dass der Generator vollständig verbraucht wird puts Aufgaben in der workerS’ inqueue in dem Pool._handle_tasks Funktion.

Die Quintessenz ist, dass alles Pool Kartenfunktionen verbrauchen das gesamte Iterable, das sie angegeben hat. Wenn Sie den Generator in Stücken verbrauchen möchten, können Sie stattdessen Folgendes tun:

import multiprocessing as mp
import itertools
import time


def g():
    for el in xrange(50):
        print el
        yield el


def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)

  • 3

    Tolle Antwort, ich habe am Ende einen Thread-Pool neu implementiert, der in der Zwischenzeit Element für Element verbraucht, aber Ihre islice-Lösung wäre für mich viel weniger Arbeit gewesen, na ja :-). Ich habe versucht, mich in pool.py umzusehen und festgestellt, dass die Funktionen map/imap/map_async tatsächlich den Iterator sofort auffressen. Es ist mir jedoch nicht klar, ob das wirklich notwendig ist, insbesondere im Fall von Standard-Pool.map()?

    – Gabe

    17. März ’11 um 11:33

  • 1

    @Gabe: Um den Iterator just-in-time zu verbrauchen, müsste meiner Meinung nach ein zusätzlicher Signalmechanismus codiert werden Pool dem Task-Handler sagen, wann er put weitere Aufgaben im inqueue. Vielleicht ist es möglich, aber derzeit nicht vorhanden in Pool, und kann den Prozess auch etwas verlangsamen.

    – unutbu

    17. März ’11 um 13:58


  • Tatsächlich bestand meine Lösung darin, eine Aufgabenwarteschlange der Größe N*size_of_pool zu erstellen und mit N herumzuspielen, bis es so aussah, als würde die Warteschlange einen guten Puffer behalten. Dies ist natürlich aufgabenabhängig, daher kann ich verstehen, dass sich der Autor des Pool-Codes damit nicht auseinandersetzen wollte. Vielen Dank für Ihre Antwort!

    – Gabe

    18. März ’11 um 14:29

  • Was ist, wenn der Generator so ist, dass Sie die Anzahl der Elemente (in diesem Fall 100) nicht kennen?

    – Vince

    12. Februar ’13 um 9:56

  • 1

    @Vince: Du könntest das ändern for-loop zu einem while-loop, und brechen, wenn das Ergebnis von pool.map ist leer. Ich habe den Beitrag bearbeitet, um zu zeigen, was ich meine.

    – unutbu

    12. Feb. ’13 um 11:05

Ich hatte auch dieses Problem und war enttäuscht zu erfahren, dass die Karte alle ihre Elemente verbraucht. Ich habe eine Funktion codiert, die den Iterator träge mit dem Datentyp Queue in Multiprocessing verbraucht. Dies ähnelt dem, was @unutbu in einem Kommentar zu seiner Antwort beschreibt, leidet jedoch darunter, dass es keinen Rückrufmechanismus zum erneuten Laden der Warteschlange gibt. Der Queue-Datentyp stellt stattdessen einen Timeout-Parameter bereit, und ich habe 100 Millisekunden mit guter Wirkung verwendet.

from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty

def worker(recvq, sendq):
    for func, args in iter(recvq.get, None):
        result = func(*args)
        sendq.put(result)

def pool_imap_unordered(function, iterable, procs=cpu_count()):
    # Create queues for sending/receiving items from iterable.

    sendq = Queue(procs)
    recvq = Queue()

    # Start worker processes.

    for rpt in xrange(procs):
        Process(target=worker, args=(sendq, recvq)).start()

    # Iterate iterable and communicate with worker processes.

    send_len = 0
    recv_len = 0
    itr = iter(iterable)

    try:
        value = itr.next()
        while True:
            try:
                sendq.put((function, value), True, 0.1)
                send_len += 1
                value = itr.next()
            except QueueFull:
                while True:
                    try:
                        result = recvq.get(False)
                        recv_len += 1
                        yield result
                    except QueueEmpty:
                        break
    except StopIteration:
        pass

    # Collect all remaining results.

    while recv_len < send_len:
        result = recvq.get()
        recv_len += 1
        yield result

    # Terminate worker processes.

    for rpt in xrange(procs):
        sendq.put(None)

Diese Lösung hat den Vorteil, dass Anfragen an Pool.map nicht gestapelt werden. Ein einzelner Arbeiter kann andere nicht daran hindern, Fortschritte zu machen. YMMV. Beachten Sie, dass Sie möglicherweise ein anderes Objekt verwenden möchten, um den Arbeitern die Beendigung zu signalisieren. Im Beispiel habe ich None verwendet.

Getestet auf “Python 2.7 (r27:82525, 4. Juli 2010, 09:01:59) [MSC v.1500 32 bit (Intel)] auf win32”

  • Ich habe Python 3.3 überprüft und weder imap Noch imap_unordered verbraucht nicht alle Argumente, bevor die zugeordnete Funktion gestartet wird, obwohl map tut.

    – vy32

    15. Februar ’14 um 0:16

  • +1 Das ist fast das, was ich brauche, aber leider brauche ich geordnete Ergebnisse.

    – letmaik

    6. März ’14 um 20:37 Uhr


  • Anstatt Get/Put-Timeouts für Ein-/Aus-Warteschlangen abzustimmen, stelle ich normalerweise 1) eine feste Größe für beide Warteschlangen ein und 2) lasse get/put blockieren, wenn die Warteschlange leer/voll ist. Auf diese Weise müssen keine Timeouts angepasst werden. Es besteht nur die Notwendigkeit, die Anzahl der Elemente zu überprüfen, die in die Eingangswarteschlange und aus der Ausgangswarteschlange gehen. Die richtige Reihenfolge ist dann: 1) Arbeiter starten; 2) Warteschlangenkollektor starten; 3) Iterieren Sie über die Eingabe und füllen Sie die In-Warteschlange.

    – Chronos

    24. März ’14 um 14:33


  • @neo, es ist auch möglich, geordnete Ergebnisse zu erhalten. Der Weg, dies zu erreichen, besteht darin, 4 Warteschlangen mit begrenzter Größe zu haben [In (data for workers), Out (final, properly ordered results), Serial (keeps track of items in processing), Sort (intermediate Queue)] und 2 Arten von Arbeitern – eine einzige sorter() und N tatsächliche Arbeiter. Die Idee ist: 1) habe a serial Zahlengenerator; 2) senden Sie jeden Datensatz als In.put( (serial, data) ), zusammen mit Serial.put(serial); 3) Arbeiter tun Sort.put((serial, result)); 4) sorter() Ruft Elemente von Sortieren ab, sortiert sie nach serial, und setzt in Out.

    – Chronos

    24. März ’14 um 15:15

  • @neo, hier ist ein ungetestetes Beispiel für sorter(): bitbucket.org/qmentis/bioinformatics-scripts/src/… Vergessen zu erwähnen, dass alle Warteschlangen dieselbe Größenbeschränkung haben müssen und dass alle Warteschlangenelemente ungefähr die gleiche Verarbeitungszeit benötigen, damit dieses Schema funktioniert (sonst ist die sorter()fängt der interne Puffer von zu viele Ergebnisse an).

    – Chronos

    24. März ’14 um 15:21

Was Sie wollen, wird im . umgesetzt NuMap Paket, von der Website:

NuMap ist ein paralleler (thread- oder prozessbasierter, lokaler oder entfernter), gepufferter Multitask-, itertools.imap- oder multiprocessing.Pool.imap-Funktionsersatz. Wie imap wertet es eine Funktion für Elemente einer Sequenz oder iterierbar aus, und zwar träge. Faulheit kann über die Argumente „stride“ und „buffer“ angepasst werden.

In diesem Beispiel (siehe Code, bitte) 2 Arbeiter.

Pool-Arbeit wie erwartet: Wenn der Arbeiter frei ist, wird die nächste Iteration durchgeführt.

Dieser Code als Code im Thema, außer einer Sache: Argumentgröße = 64 k.

64 k – Standard-Socket-Puffergröße.

import itertools
from multiprocessing import Pool
from time import sleep


def f( x ):
    print( "f()" )
    sleep( 3 )
    return x


def get_reader():
    for x in range( 10 ):
        print( "readed: ", x )
        value = " " * 1024 * 64 # 64k
        yield value


if __name__ == '__main__':

    p = Pool( processes=2 )

    data = p.imap( f, get_reader() )

    p.close()
    p.join()

.

178810cookie-checkPython Multiprocessing.Pool Lazy Iteration

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy