Wie bekomme ich den Rückgabewert einer an multiprocessing.Process übergebenen Funktion?

Lesezeit: 10 Minuten

Benutzeravatar von Louis Thibault
Louis Thibaut

Im folgenden Beispielcode möchte ich den Rückgabewert der Funktion erhalten worker. Wie kann ich das anstellen? Wo wird dieser Wert gespeichert?

Beispielcode:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Ausgabe:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Ich kann das relevante Attribut anscheinend nicht in den gespeicherten Objekten finden jobs.

Benutzeravatar von vartec
vartec

Verwenden freigegebene Variable kommunizieren. Zum Beispiel so:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

  • Ich würde empfehlen, eine zu verwenden multiprocessing.Queueeher als ein Manager hier. Verwendung einer Manager erfordert das Spawnen eines völlig neuen Prozesses, was übertrieben ist, wenn a Queue würdest du.

    – dano

    19. April 2015 um 0:54 Uhr

  • @dano: Ich frage mich, wenn wir das Objekt Queue() verwenden, können wir die Reihenfolge nicht sicher sein, wenn jeder Prozess den Wert zurückgibt. Ich meine, wenn wir die Reihenfolge im Ergebnis brauchen, um die nächste Arbeit zu erledigen. Wie können wir sicher sein, wo genau welche Ausgabe von welchem ​​Prozess kommt?

    – Catbuilds

    29. September 2016 um 11:08 Uhr


  • @Catbuilts Sie könnten von jedem Prozess ein Tupel zurückgeben, wobei ein Wert der tatsächliche Rückgabewert ist, der Ihnen wichtig ist, und der andere eine eindeutige Kennung aus dem Prozess ist. Aber ich frage mich auch, warum Sie wissen müssen, welcher Prozess welchen Wert zurückgibt. Wenn das das ist, was Sie tatsächlich über den Prozess wissen müssen, oder müssen Sie Ihre Liste der Eingaben und die Liste der Ausgaben korrelieren? In diesem Fall würde ich die Verwendung empfehlen multiprocessing.Pool.map um Ihre Liste mit Arbeitselementen zu verarbeiten.

    – dano

    1. Dezember 2016 um 14:43 Uhr

  • Einschränkungen für Funktionen mit nur einem einzigen Argument : sollte nutzen args=(my_function_argument, ). Beachten Sie das , Komma hier! Andernfalls beschwert sich Python über “fehlende Positionsargumente”. Ich habe 10 Minuten gebraucht, um es herauszufinden. Überprüfen Sie auch die manuelle Nutzung (unter dem Abschnitt “Prozessklasse”).

    – yuqli

    29. April 2019 um 15:17 Uhr


  • @vartec Ein Nachteil der Verwendung eines multipriocessing.Manager()-Wörterbuchs besteht darin, dass es das zurückgegebene Objekt pickelt (serialisiert), sodass es einen Engpass gibt, der von der Pickle-Bibliothek mit einer maximalen Größe von 2 GiB für das zurückzugebende Objekt angegeben wird. Gibt es eine andere Möglichkeit, dies zu vermeiden, um die Serialisierung des zurückgegebenen Objekts zu vermeiden?

    – hirschme

    13. November 2019 um 21:46 Uhr

Marks Benutzeravatar
Markieren

Ich denke, der von @sega_sai vorgeschlagene Ansatz ist der bessere. Aber es braucht wirklich ein Codebeispiel, also hier gehts:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Was die Rückgabewerte druckt:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Wenn Sie sich auskennen map (das in Python 2 eingebaute) sollte dies nicht zu schwierig sein. Ansonsten schau mal sega_Sais Link.

Beachten Sie, wie wenig Code benötigt wird. (Beachten Sie auch, wie Prozesse wiederverwendet werden).

  • Irgendwelche Ideen, warum mein getpid() alle den gleichen Wert zurückgeben? Ich verwende Python3

    – zelusp

    29. Oktober 2016 um 17:39 Uhr

  • Ich bin mir nicht sicher, wie Pool Aufgaben über Arbeiter verteilt. Vielleicht können sie alle beim selben Arbeiter landen, wenn sie wirklich schnell sind? Kommt es regelmäßig vor? Auch wenn Sie eine Verzögerung hinzufügen?

    – Markieren

    31. Oktober 2016 um 15:30 Uhr

  • Ich dachte auch, es wäre eine Sache der Geschwindigkeit, aber wenn ich füttere pool.map ein Bereich von 1.000.000 mit mehr als 10 Prozessen sehe ich höchstens zwei verschiedene PIDs.

    – zelusp

    31. Oktober 2016 um 19:00 Uhr

  • Dann bin ich mir nicht sicher. Ich denke, es wäre interessant, dafür eine separate Frage zu eröffnen.

    – Markieren

    1. November 2016 um 11:27 Uhr

  • Wenn die Dinge, die Sie senden möchten, eine andere Funktion für jeden Prozess verwenden pool.apply_async: docs.python.org/3/library/…

    – Kyle

    5. Juni 2019 um 20:28 Uhr

Benutzeravatar von Matthew Moisen
Matthäus Moisen

Für alle anderen, die suchen, wie man einen Wert von a erhält Process verwenden Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Beachten Sie, dass in Windows oder Jupyter Notebook mit multithreading Sie müssen dies als Datei speichern und die Datei ausführen. Wenn Sie dies in einer Eingabeaufforderung tun, wird ein Fehler wie dieser angezeigt:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

  • Wenn ich etwas in meinem Worker-Prozess in eine Warteschlange stelle, wird mein Join nie erreicht. Irgendeine Idee, wie das kommen könnte?

    – Laurens Koppenol

    6. Oktober 2016 um 12:30 Uhr

  • @LaurensKoppenol meinst du damit, dass dein Hauptcode dauerhaft bei p.join() hängt und nie fortgesetzt wird? Hat Ihr Prozess eine Endlosschleife?

    – Matthäus Moisen

    6. Oktober 2016 um 17:44 Uhr

  • Ja, es hängt dort unendlich. Meine Arbeiter sind alle fertig (Schleife innerhalb der Arbeiterfunktion endet, danach wird die Druckanweisung für alle Arbeiter gedruckt). Der Join tut nichts. Wenn ich die entferne Queue von meiner funktion lässt es mich das passieren join()

    – Laurens Koppenol

    10. Oktober 2016 um 8:11 Uhr

  • @LaurensKoppenol Rufst du vielleicht nicht an queue.put(ret) vor dem Anruf p.start() ? In diesem Fall bleibt der Worker-Thread hängen queue.get() bis in alle Ewigkeit. Sie können dies replizieren, indem Sie mein obiges Snippet kopieren, während Sie es auskommentieren queue.put(ret).

    – Matthäus Moisen

    16. August 2017 um 2:47 Uhr

  • @Bendemann Jemand hat die Antwort bearbeitet und durch Platzieren der falsch gemacht queue.get vor der Warteschlange.join. Ich habe es jetzt durch Platzieren behoben queue.get nach p.join. Bitte versuche es erneut.

    – Matthäus Moisen

    28. Juli 2020 um 16:58 Uhr

Benutzeravatar von sudo
sudo

Aus irgendeinem Grund konnte ich kein allgemeines Beispiel dafür finden, wie man das mit macht Queue überall (selbst die Doc-Beispiele von Python erzeugen nicht mehrere Prozesse), also habe ich nach etwa 10 Versuchen Folgendes zum Laufen gebracht:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue ist eine blockierende, Thread-sichere Warteschlange, die Sie verwenden können, um die Rückgabewerte der untergeordneten Prozesse zu speichern. Sie müssen also die Warteschlange an jeden Prozess weitergeben. Etwas weniger Offensichtliches ist hier, dass Sie es tun müssen get() aus der Warteschlange vor Ihnen join das Processsonst füllt sich die Warteschlange und blockiert alles.

Aktualisieren für diejenigen, die objektorientiert sind (getestet in Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

Dieses Beispiel zeigt, wie eine Liste von verwendet wird Multiprocessing.Pipe Instanzen, um Strings von einer beliebigen Anzahl von Prozessen zurückzugeben:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Ausgabe:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Diese Lösung verbraucht weniger Ressourcen als a multiprocessing.Queue was nutzt

  • ein Rohr
  • mindestens ein Schloss
  • ein Puffer
  • ein Faden

oder ein multiprocessing.SimpleQueue was nutzt

  • ein Rohr
  • mindestens ein Schloss

Es ist sehr aufschlussreich, sich die Quelle für jeden dieser Typen anzusehen.

  • Was wäre der beste Weg, dies zu tun, ohne die Pipes zu einer globalen Variablen zu machen?

    – Nickpick

    25. Oktober 2016 um 13:15 Uhr

  • Ich habe alle globalen Daten und den Code in eine Hauptfunktion gesteckt und es funktioniert genauso. Beantwortet das deine Frage?

    Benutzer3657941

    25. Oktober 2016 um 13:43 Uhr

  • muss die Pipe immer gelesen werden, bevor ihr ein neuer Wert hinzugefügt (gesendet) werden kann?

    – Nickpick

    25. Oktober 2016 um 14:56 Uhr

  • Diese Antwort verursacht einen Deadlock, wenn das zurückgegebene Objekt groß ist. Anstatt zuerst proc.join() auszuführen, würde ich zuerst versuchen, den Rückgabewert zu recv() und dann den Join auszuführen.

    – L. Pes

    12. Februar 2020 um 20:13 Uhr

  • Ich bin da bei @L.Pes. Könnte betriebssystemspezifisch sein, aber ich habe dieses Beispiel an meinen Anwendungsfall angepasst, und Worker, die versuchen, send_end.send(result) für ein großes Ergebnis zu senden, würden auf unbestimmte Zeit hängen bleiben. Der Beitritt nach Erhalt hat es behoben. Ich gebe gerne ein Beispiel, wenn N = 2 für Sie zu anekdotisch ist.

    – Vlad

    22. April 2020 um 2:10 Uhr


Benutzeravatar von Divyanshu Srivastava
Divyanshu Srivastava

Es scheint, dass Sie die verwenden sollten Multiprocessing.Pool Klasse statt und verwenden Sie die Methoden .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

  • Was wäre der beste Weg, dies zu tun, ohne die Pipes zu einer globalen Variablen zu machen?

    – Nickpick

    25. Oktober 2016 um 13:15 Uhr

  • Ich habe alle globalen Daten und den Code in eine Hauptfunktion gesteckt und es funktioniert genauso. Beantwortet das deine Frage?

    Benutzer3657941

    25. Oktober 2016 um 13:43 Uhr

  • muss die Pipe immer gelesen werden, bevor ihr ein neuer Wert hinzugefügt (gesendet) werden kann?

    – Nickpick

    25. Oktober 2016 um 14:56 Uhr

  • Diese Antwort verursacht einen Deadlock, wenn das zurückgegebene Objekt groß ist. Anstatt zuerst proc.join() auszuführen, würde ich zuerst versuchen, den Rückgabewert zu recv() und dann den Join auszuführen.

    – L. Pes

    12. Februar 2020 um 20:13 Uhr

  • Ich bin da bei @L.Pes. Könnte betriebssystemspezifisch sein, aber ich habe dieses Beispiel an meinen Anwendungsfall angepasst, und Worker, die versuchen, send_end.send(result) für ein großes Ergebnis zu senden, würden auf unbestimmte Zeit hängen bleiben. Der Beitritt nach Erhalt hat es behoben. Ich gebe gerne ein Beispiel, wenn N = 2 für Sie zu anekdotisch ist.

    – Vlad

    22. April 2020 um 2:10 Uhr


Du kannst den … benutzen exit eingebaut, um den Beendigungscode eines Prozesses festzulegen. Es ist erhältlich bei der exitcode Attribut des Prozesses:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Ausgabe:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

  • Seien Sie gewarnt, dass dieser Ansatz verwirrend werden könnte. Prozesse sollten im Allgemeinen mit dem Exit-Code 0 beendet werden, wenn sie ohne Fehler abgeschlossen wurden. Wenn Sie etwas haben, das Ihre Systemprozess-Exit-Codes überwacht, werden diese möglicherweise als Fehler gemeldet.

    – Eisenrad

    23. Mai 2017 um 21:50 Uhr

  • Perfekt, wenn Sie im Fehlerfall nur eine Ausnahme im übergeordneten Prozess auslösen möchten.

    – crizCraig

    19. Juli 2018 um 17:45 Uhr


1436450cookie-checkWie bekomme ich den Rückgabewert einer an multiprocessing.Process übergebenen Funktion?

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy