Wie bekomme ich den Rückgabewert einer an multiprocessing.Process übergebenen Funktion?
Lesezeit: 10 Minuten
Louis Thibaut
Im folgenden Beispielcode möchte ich den Rückgabewert der Funktion erhalten worker. Wie kann ich das anstellen? Wo wird dieser Wert gespeichert?
Beispielcode:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
Ich würde empfehlen, eine zu verwenden multiprocessing.Queueeher als ein Manager hier. Verwendung einer Manager erfordert das Spawnen eines völlig neuen Prozesses, was übertrieben ist, wenn a Queue würdest du.
– dano
19. April 2015 um 0:54 Uhr
@dano: Ich frage mich, wenn wir das Objekt Queue() verwenden, können wir die Reihenfolge nicht sicher sein, wenn jeder Prozess den Wert zurückgibt. Ich meine, wenn wir die Reihenfolge im Ergebnis brauchen, um die nächste Arbeit zu erledigen. Wie können wir sicher sein, wo genau welche Ausgabe von welchem Prozess kommt?
– Catbuilds
29. September 2016 um 11:08 Uhr
@Catbuilts Sie könnten von jedem Prozess ein Tupel zurückgeben, wobei ein Wert der tatsächliche Rückgabewert ist, der Ihnen wichtig ist, und der andere eine eindeutige Kennung aus dem Prozess ist. Aber ich frage mich auch, warum Sie wissen müssen, welcher Prozess welchen Wert zurückgibt. Wenn das das ist, was Sie tatsächlich über den Prozess wissen müssen, oder müssen Sie Ihre Liste der Eingaben und die Liste der Ausgaben korrelieren? In diesem Fall würde ich die Verwendung empfehlen multiprocessing.Pool.map um Ihre Liste mit Arbeitselementen zu verarbeiten.
– dano
1. Dezember 2016 um 14:43 Uhr
Einschränkungen für Funktionen mit nur einem einzigen Argument : sollte nutzen args=(my_function_argument, ). Beachten Sie das , Komma hier! Andernfalls beschwert sich Python über “fehlende Positionsargumente”. Ich habe 10 Minuten gebraucht, um es herauszufinden. Überprüfen Sie auch die manuelle Nutzung (unter dem Abschnitt “Prozessklasse”).
– yuqli
29. April 2019 um 15:17 Uhr
@vartec Ein Nachteil der Verwendung eines multipriocessing.Manager()-Wörterbuchs besteht darin, dass es das zurückgegebene Objekt pickelt (serialisiert), sodass es einen Engpass gibt, der von der Pickle-Bibliothek mit einer maximalen Größe von 2 GiB für das zurückzugebende Objekt angegeben wird. Gibt es eine andere Möglichkeit, dies zu vermeiden, um die Serialisierung des zurückgegebenen Objekts zu vermeiden?
– hirschme
13. November 2019 um 21:46 Uhr
Markieren
Ich denke, der von @sega_sai vorgeschlagene Ansatz ist der bessere. Aber es braucht wirklich ein Codebeispiel, also hier gehts:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
Was die Rückgabewerte druckt:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Wenn Sie sich auskennen map (das in Python 2 eingebaute) sollte dies nicht zu schwierig sein. Ansonsten schau mal sega_Sais Link.
Beachten Sie, wie wenig Code benötigt wird. (Beachten Sie auch, wie Prozesse wiederverwendet werden).
Irgendwelche Ideen, warum mein getpid() alle den gleichen Wert zurückgeben? Ich verwende Python3
– zelusp
29. Oktober 2016 um 17:39 Uhr
Ich bin mir nicht sicher, wie Pool Aufgaben über Arbeiter verteilt. Vielleicht können sie alle beim selben Arbeiter landen, wenn sie wirklich schnell sind? Kommt es regelmäßig vor? Auch wenn Sie eine Verzögerung hinzufügen?
– Markieren
31. Oktober 2016 um 15:30 Uhr
Ich dachte auch, es wäre eine Sache der Geschwindigkeit, aber wenn ich füttere pool.map ein Bereich von 1.000.000 mit mehr als 10 Prozessen sehe ich höchstens zwei verschiedene PIDs.
– zelusp
31. Oktober 2016 um 19:00 Uhr
Dann bin ich mir nicht sicher. Ich denke, es wäre interessant, dafür eine separate Frage zu eröffnen.
– Markieren
1. November 2016 um 11:27 Uhr
Wenn die Dinge, die Sie senden möchten, eine andere Funktion für jeden Prozess verwenden pool.apply_async: docs.python.org/3/library/…
– Kyle
5. Juni 2019 um 20:28 Uhr
Matthäus Moisen
Für alle anderen, die suchen, wie man einen Wert von a erhält Process verwenden Queue:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
Beachten Sie, dass in Windows oder Jupyter Notebook mit multithreading Sie müssen dies als Datei speichern und die Datei ausführen. Wenn Sie dies in einer Eingabeaufforderung tun, wird ein Fehler wie dieser angezeigt:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Wenn ich etwas in meinem Worker-Prozess in eine Warteschlange stelle, wird mein Join nie erreicht. Irgendeine Idee, wie das kommen könnte?
– Laurens Koppenol
6. Oktober 2016 um 12:30 Uhr
@LaurensKoppenol meinst du damit, dass dein Hauptcode dauerhaft bei p.join() hängt und nie fortgesetzt wird? Hat Ihr Prozess eine Endlosschleife?
– Matthäus Moisen
6. Oktober 2016 um 17:44 Uhr
Ja, es hängt dort unendlich. Meine Arbeiter sind alle fertig (Schleife innerhalb der Arbeiterfunktion endet, danach wird die Druckanweisung für alle Arbeiter gedruckt). Der Join tut nichts. Wenn ich die entferne Queue von meiner funktion lässt es mich das passieren join()
– Laurens Koppenol
10. Oktober 2016 um 8:11 Uhr
@LaurensKoppenol Rufst du vielleicht nicht an queue.put(ret) vor dem Anruf p.start() ? In diesem Fall bleibt der Worker-Thread hängen queue.get() bis in alle Ewigkeit. Sie können dies replizieren, indem Sie mein obiges Snippet kopieren, während Sie es auskommentieren queue.put(ret).
– Matthäus Moisen
16. August 2017 um 2:47 Uhr
@Bendemann Jemand hat die Antwort bearbeitet und durch Platzieren der falsch gemacht queue.get vor der Warteschlange.join. Ich habe es jetzt durch Platzieren behoben queue.get nach p.join. Bitte versuche es erneut.
– Matthäus Moisen
28. Juli 2020 um 16:58 Uhr
sudo
Aus irgendeinem Grund konnte ich kein allgemeines Beispiel dafür finden, wie man das mit macht Queue überall (selbst die Doc-Beispiele von Python erzeugen nicht mehrere Prozesse), also habe ich nach etwa 10 Versuchen Folgendes zum Laufen gebracht:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue ist eine blockierende, Thread-sichere Warteschlange, die Sie verwenden können, um die Rückgabewerte der untergeordneten Prozesse zu speichern. Sie müssen also die Warteschlange an jeden Prozess weitergeben. Etwas weniger Offensichtliches ist hier, dass Sie es tun müssen get() aus der Warteschlange vor Ihnen join das Processsonst füllt sich die Warteschlange und blockiert alles.
Aktualisieren für diejenigen, die objektorientiert sind (getestet in Python 3.4):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
Dieses Beispiel zeigt, wie eine Liste von verwendet wird Multiprocessing.Pipe Instanzen, um Strings von einer beliebigen Anzahl von Prozessen zurückzugeben:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
Es ist sehr aufschlussreich, sich die Quelle für jeden dieser Typen anzusehen.
Was wäre der beste Weg, dies zu tun, ohne die Pipes zu einer globalen Variablen zu machen?
– Nickpick
25. Oktober 2016 um 13:15 Uhr
Ich habe alle globalen Daten und den Code in eine Hauptfunktion gesteckt und es funktioniert genauso. Beantwortet das deine Frage?
– Benutzer3657941
25. Oktober 2016 um 13:43 Uhr
muss die Pipe immer gelesen werden, bevor ihr ein neuer Wert hinzugefügt (gesendet) werden kann?
– Nickpick
25. Oktober 2016 um 14:56 Uhr
Diese Antwort verursacht einen Deadlock, wenn das zurückgegebene Objekt groß ist. Anstatt zuerst proc.join() auszuführen, würde ich zuerst versuchen, den Rückgabewert zu recv() und dann den Join auszuführen.
– L. Pes
12. Februar 2020 um 20:13 Uhr
Ich bin da bei @L.Pes. Könnte betriebssystemspezifisch sein, aber ich habe dieses Beispiel an meinen Anwendungsfall angepasst, und Worker, die versuchen, send_end.send(result) für ein großes Ergebnis zu senden, würden auf unbestimmte Zeit hängen bleiben. Der Beitritt nach Erhalt hat es behoben. Ich gebe gerne ein Beispiel, wenn N = 2 für Sie zu anekdotisch ist.
– Vlad
22. April 2020 um 2:10 Uhr
Divyanshu Srivastava
Es scheint, dass Sie die verwenden sollten Multiprocessing.Pool Klasse statt und verwenden Sie die Methoden .apply() .apply_async(), map()
Was wäre der beste Weg, dies zu tun, ohne die Pipes zu einer globalen Variablen zu machen?
– Nickpick
25. Oktober 2016 um 13:15 Uhr
Ich habe alle globalen Daten und den Code in eine Hauptfunktion gesteckt und es funktioniert genauso. Beantwortet das deine Frage?
– Benutzer3657941
25. Oktober 2016 um 13:43 Uhr
muss die Pipe immer gelesen werden, bevor ihr ein neuer Wert hinzugefügt (gesendet) werden kann?
– Nickpick
25. Oktober 2016 um 14:56 Uhr
Diese Antwort verursacht einen Deadlock, wenn das zurückgegebene Objekt groß ist. Anstatt zuerst proc.join() auszuführen, würde ich zuerst versuchen, den Rückgabewert zu recv() und dann den Join auszuführen.
– L. Pes
12. Februar 2020 um 20:13 Uhr
Ich bin da bei @L.Pes. Könnte betriebssystemspezifisch sein, aber ich habe dieses Beispiel an meinen Anwendungsfall angepasst, und Worker, die versuchen, send_end.send(result) für ein großes Ergebnis zu senden, würden auf unbestimmte Zeit hängen bleiben. Der Beitritt nach Erhalt hat es behoben. Ich gebe gerne ein Beispiel, wenn N = 2 für Sie zu anekdotisch ist.
– Vlad
22. April 2020 um 2:10 Uhr
Du kannst den … benutzen exit eingebaut, um den Beendigungscode eines Prozesses festzulegen. Es ist erhältlich bei der exitcode Attribut des Prozesses:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Seien Sie gewarnt, dass dieser Ansatz verwirrend werden könnte. Prozesse sollten im Allgemeinen mit dem Exit-Code 0 beendet werden, wenn sie ohne Fehler abgeschlossen wurden. Wenn Sie etwas haben, das Ihre Systemprozess-Exit-Codes überwacht, werden diese möglicherweise als Fehler gemeldet.
– Eisenrad
23. Mai 2017 um 21:50 Uhr
Perfekt, wenn Sie im Fehlerfall nur eine Ausnahme im übergeordneten Prozess auslösen möchten.
– crizCraig
19. Juli 2018 um 17:45 Uhr
14364500cookie-checkWie bekomme ich den Rückgabewert einer an multiprocessing.Process übergebenen Funktion?yes