Wie kann man eine Funktion alle x Sekunden wiederholt ausführen?

Lesezeit: 11 Minuten

Benutzeravatar von davidmytton
davidmytton

Ich möchte eine Funktion in Python alle 60 Sekunden für immer wiederholen (genau wie eine NStimer in Objective C oder setTimeout in JS). Dieser Code wird als Daemon ausgeführt und ist praktisch so, als ob das Python-Skript jede Minute mit einem Cron aufgerufen wird, ohne dass dies vom Benutzer eingerichtet werden muss.

In dieser Frage nach einem in Python implementierten Cron scheint die Lösung effektiv gerecht zu sein schlafen() für x Sekunden. Ich brauche solche erweiterten Funktionen nicht, also würde vielleicht so etwas funktionieren

while True:
    # Code executed here
    time.sleep(60)

Gibt es absehbare Probleme mit diesem Code?

  • Ein pedantischer Punkt, aber möglicherweise kritisch, Ihr Code über dem Code wird nicht alle 60 Sekunden ausgeführt, sondern zwischen den Ausführungen liegt eine Lücke von 60 Sekunden. Es passiert nur alle 60 Sekunden, wenn Ihr ausgeführter Code überhaupt keine Zeit in Anspruch nimmt.

    – Simon

    23. Januar 2009 um 21:12 Uhr

  • Auch time.sleep(60) können sowohl früher als auch später zurückkehren

    – jfs

    19. März 2014 um 7:25 Uhr

  • Ich frage mich immer noch: Gibt es absehbare Probleme mit diesem Code?

    – dwitvliet

    27. Januar 2015 um 18:39 Uhr

  • Das “vorhersehbare Problem” ist, dass Sie nicht 60 Iterationen pro Stunde erwarten können, indem Sie nur time.sleep(60) verwenden. Wenn Sie also ein Element pro Iteration anhängen und eine Liste mit festgelegter Länge führen, stellt der Durchschnitt dieser Liste keinen konsistenten “Zeitraum” dar. Daher können Funktionen wie “gleitender Durchschnitt” auf zu alte Datenpunkte verweisen, was Ihre Anzeige verfälscht.

    – Litepräsenz

    21. Februar 2017 um 14:28 Uhr


  • @Banana Ja, Sie können mit Problemen rechnen, die dadurch verursacht werden, dass Ihr Skript nicht GENAU alle 60 Sekunden ausgeführt wird. Zum Beispiel. Ich fing an, so etwas zu tun, um Videostreams aufzuteilen und hochzuladen, und am Ende bekam ich Strems 5-10 ~ Sekunden länger, weil die Medienwarteschlange puffert, während ich Daten innerhalb der Schleife verarbeite. Das hängt von Ihren Daten ab. Wenn die Funktion eine Art einfacher Wachhund ist, der Sie zum Beispiel warnt, wenn Ihre Festplatte voll ist, sollten Sie damit überhaupt keine Probleme haben. Wenn Sie die Warnmeldungen eines Kernkraftwerks überprüfen, können Sie mit einer Stadt enden völlig gesprengt x

    – DGoiko

    31. Oktober 2018 um 11:46 Uhr


Benutzeravatar von nosklo
nosklo

Wenn Ihr Programm noch keine Ereignisschleife hat, verwenden Sie die geplant -Modul, das einen Allzweck-Ereignisplaner implementiert.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    sc.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Wenn Sie bereits eine Ereignisschleifenbibliothek wie z asyncio, trio, tkinter, PyQt5, gobject, kivyund viele andere – planen Sie die Aufgabe stattdessen einfach mit den Methoden Ihrer vorhandenen Ereignisschleifenbibliothek.

  • Das Modul sched dient zum Planen von Funktionen, die nach einiger Zeit ausgeführt werden. Wie verwenden Sie es, um einen Funktionsaufruf alle x Sekunden zu wiederholen, ohne time.sleep() zu verwenden?

    – Baishampayan Ghose

    23. Januar 2009 um 21:13 Uhr

  • Dann apscheduler bei packages.python.org/APScheduler sollte an dieser Stelle ebenfalls Erwähnung finden.

    –Daniel F

    27. Januar 2013 um 20:06 Uhr

  • Hinweis: Diese Version kann driften. Du könntest benutzen enterabs() um es zu vermeiden. Hier ist eine nicht driftende Version zum Vergleich.

    – jfs

    28. Oktober 2014 um 16:45 Uhr

  • @JavaSa: weil “Mach dein Zeug” ist nicht augenblicklich und Fehler aus time.sleep können sich hier ansammeln. “alle X Sekunden ausführen” und “wiederholt mit einer Verzögerung von ~X Sekunden ausführen” sind nicht dasselbe. Siehe auch diesen Kommentar

    – jfs

    25. Januar 2017 um 15:42 Uhr


  • Du könntest umziehen s.enter(...) zum Start der Funktion, um die Drift zu reduzieren. Außerdem, was ist der Sinn von sc?

    – Solomon Ucko

    18. Dezember 2019 um 1:19 Uhr

Benutzeravatar von Dave Rove
David Rove

Verknüpfen Sie Ihre Zeitschleife wie folgt mit der Systemuhr:

import time
starttime = time.time()
while True:
    print("tick")
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))

  • +1. deine und die twisted answer sind die einzigen Antworten, die alle eine Funktion ausführen x Sekunden. Der Rest führt die Funktion mit einer Verzögerung von aus x Sekunden nach jedem Anruf.

    – jfs

    18. September 2014 um 7:09 Uhr

  • Wenn Sie hier Code hinzufügen würden, der länger als eine Sekunde dauerte … Es würde das Timing aussetzen und anfangen, hinterherzuhinken … Die akzeptierte Antwort in diesem Fall ist richtig … Jeder kann einen einfachen Druckbefehl wiederholen und lasse es jede Sekunde ohne Verzögerung laufen …

    – Wütend 84

    1. Januar 2016 um 22:48 Uhr

  • ich bevorzuge from time import time, sleep wegen der existentiellen Implikationen 😉

    – Werden

    16. Februar 2016 um 4:40 Uhr

  • Funktioniert fantastisch. Es besteht keine Notwendigkeit, Ihre abzuziehen starttime wenn Sie damit beginnen, es mit einer bestimmten Zeit zu synchronisieren: time.sleep(60 - time.time() % 60) hat bei mir gut funktioniert. Ich habe es als verwendet time.sleep(1200 - time.time() % 1200) und es gibt mir Protokolle auf dem :00 :20 :40genau wie ich wollte.

    – TemporalWolf

    14. Juli 2016 um 19:22 Uhr


  • @AntonSchigur, um Abweichungen nach mehreren Iterationen zu vermeiden. Je nachdem kann eine einzelne Iteration etwas früher oder später beginnen sleep(), timer() Genauigkeit und wie lange es dauert, den Schleifenkörper auszuführen, aber im Durchschnitt treten Iterationen immer an den Intervallgrenzen auf (auch wenn einige übersprungen werden): while keep_doing_it(): sleep(interval - timer() % interval). Vergleichen Sie es mit nur while keep_doing_it(): sleep(interval) wobei sich nach mehreren Iterationen Fehler ansammeln können.

    – jfs

    1. August 2016 um 20:51 Uhr

Wenn Sie eine nicht blockierende Möglichkeit wünschen, Ihre Funktion regelmäßig auszuführen, würde ich anstelle einer blockierenden Endlosschleife einen Thread-Timer verwenden. Auf diese Weise kann Ihr Code weiter ausgeführt und andere Aufgaben ausgeführt werden, und Ihre Funktion wird trotzdem alle n Sekunden aufgerufen. Ich verwende diese Technik häufig zum Drucken von Fortschrittsinformationen bei langen, CPU-/Festplatten-/Netzwerk-intensiven Aufgaben.

Hier ist der Code, den ich in einer ähnlichen Frage gepostet habe, mit start()- und stop()-Steuerelement:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Verwendungszweck:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Merkmale:

  • Nur Standardbibliothek, keine externen Abhängigkeiten
  • start() und stop() können sicher mehrmals aufgerufen werden, auch wenn der Timer bereits gestartet/gestoppt wurde
  • Die aufzurufende Funktion kann Positionsargumente und benannte Argumente haben
  • Du kannst ändern interval jederzeit, es wird nach dem nächsten Lauf wirksam. Das gleiche für args, kwargs und sogar function!

  • Diese Lösung scheint mit der Zeit zu driften; Ich brauchte eine Version, die darauf abzielt, die Funktion alle n Sekunden ohne Drift aufzurufen. Ich werde ein Update in einer separaten Frage veröffentlichen.

    – Äraul

    5. Dezember 2016 um 0:21 Uhr

  • Im def _run(self) Ich versuche zu verstehen, warum Sie anrufen self.start() Vor self.function(). Können Sie das näher erläutern? Ich würde denken, indem ich anrufe start() Erste self.is_running würde es immer sein False dann würden wir immer einen neuen Thread aufspinnen.

    – Reiches Episkop

    21. Dezember 2016 um 20:08 Uhr

  • Ich glaube, ich bin der Sache auf den Grund gegangen. Die Lösung von @MestreLion führt alle eine Funktion aus x Sekunden (also t=0, t=1x, t=2x, t=3x, …) wo bei dem Originalposter Beispielcode eine Funktion mit ausführt x zweite Pause dazwischen. Außerdem hat diese Lösung meines Erachtens einen Fehler, wenn interval ist kürzer als die benötigte Zeit function ausführen. In diesem Fall, self._timer wird in überschrieben start Funktion.

    – Reiches Episkop

    21. Dezember 2016 um 23:09 Uhr

  • Ja, @RichieEpiscopo, der Aufruf an .function() nach .start() soll die Funktion bei t=0 ausführen. Und ich glaube nicht, dass es ein Problem sein wird, wenn function dauert länger als intervalaber ja, der Code könnte einige Rennbedingungen enthalten.

    – MestreLion

    6. Februar 2017 um 15:12 Uhr

  • @eraoul: Ja, diese Lösung driftet, obwohl es je nach System einige hundert oder sogar ein paar tausend Läufe dauert, bevor sie eine einzige Sekunde driftet. Wenn eine solche Drift für Sie relevant ist, empfehle ich dringend die Verwendung eines richtigen System Planer wie z cron

    – MestreLion

    24. April 2019 um 16:06 Uhr

Benutzeravatar von Aaron Maenpaa
Aaron Maenpäa

Vielleicht möchten Sie darüber nachdenken Verdrehte Dies ist eine Python-Netzwerkbibliothek, die die implementiert Reaktormuster.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Während “while True: sleep(60)” wahrscheinlich funktionieren wird, implementiert Twisted wahrscheinlich bereits viele der Funktionen, die Sie irgendwann benötigen werden (Dämonisierung, Protokollierung oder Ausnahmebehandlung, wie von Bobince hervorgehoben) und wird wahrscheinlich eine robustere Lösung sein

Benutzeravatar von eraoul
Äraul

Hier ist eine Aktualisierung des Codes von MestreLion, die ein Driften im Laufe der Zeit vermeidet.

Die RepeatedTimer-Klasse ruft hier die angegebene Funktion alle “Intervall”-Sekunden auf, wie vom OP angefordert; Der Zeitplan hängt nicht davon ab, wie lange die Ausführung der Funktion dauert. Ich mag diese Lösung, da sie keine externen Bibliotheksabhängigkeiten hat; das ist nur reines Python.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Beispielnutzung (kopiert aus der Antwort von MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

  • Ich stimme zu, dass dies das Beste ist – keine Pakete von Drittanbietern, und ich habe getestet, dass es im Laufe der Zeit nicht driftet

    – Elendurwen

    13. Juli 2021 um 18:46 Uhr

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Wenn Sie dies tun möchten, ohne Ihren verbleibenden Code zu blockieren, können Sie dies verwenden, um ihn in einem eigenen Thread ausführen zu lassen:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Diese Lösung kombiniert mehrere Funktionen, die selten in anderen Lösungen kombiniert zu finden sind:

  • Ausnahmebehandlung: Ausnahmen werden auf dieser Ebene so weit wie möglich ordnungsgemäß behandelt, dh sie werden zu Debugging-Zwecken protokolliert, ohne unser Programm abzubrechen.
  • Keine Verkettung: Die übliche kettenartige Implementierung (zur Planung des nächsten Ereignisses), die Sie in vielen Antworten finden, ist insofern spröde, als dass, wenn innerhalb des Planungsmechanismus etwas schief geht (threading.Timer oder was auch immer), dies wird die Kette beenden. Es finden dann keine weiteren Ausführungen statt, selbst wenn die Ursache des Problems bereits behoben ist. Eine einfache Schleife und wartet mit einer einfachen sleep() ist im Vergleich deutlich robuster.
  • Kein Abdriften: Meine Lösung verfolgt genau die Zeiten, zu denen sie ausgeführt werden soll. Es gibt keine Drift in Abhängigkeit von der Ausführungszeit (wie bei vielen anderen Lösungen).
  • Überspringen: Meine Lösung überspringt Aufgaben, wenn eine Ausführung zu lange gedauert hat (z. B. mache X alle fünf Sekunden, aber X hat 6 Sekunden gedauert). Dies ist das Standard-Cron-Verhalten (und das aus gutem Grund). Viele andere Lösungen führen die Aufgabe dann einfach ohne Verzögerung mehrfach hintereinander aus. Für die meisten Fälle (zB Aufräumarbeiten) ist dies nicht erwünscht. Wenn es ist gewünscht, einfach verwenden next_time += delay stattdessen.

  • Ich stimme zu, dass dies das Beste ist – keine Pakete von Drittanbietern, und ich habe getestet, dass es im Laufe der Zeit nicht driftet

    – Elendurwen

    13. Juli 2021 um 18:46 Uhr

Benutzeravatar von Adrian P
Adrian P

Der einfachere Weg ist meiner Meinung nach:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Auf diese Weise wird Ihr Code ausgeführt, dann wartet er 60 Sekunden, dann wird er erneut ausgeführt, wartet, führt aus, etc … Keine Notwendigkeit, die Dinge zu komplizieren: D

  • Eigentlich ist dies nicht die Antwort: time sleep() kann nur zum Warten von X Sekunden nach jeder Ausführung verwendet werden. Wenn Ihre Funktion beispielsweise 0,5 Sekunden zur Ausführung benötigt und Sie time.sleep(1) verwenden, bedeutet dies, dass Ihre Funktion alle 1,5 Sekunden ausgeführt wird, nicht 1. Sie sollten andere Module und/oder Threads verwenden, um sicherzustellen, dass etwas für Y-mal funktioniert in jeder X Sekunde.

    – kommradHomer

    18. September 2013 um 10:09 Uhr

  • @kommradHomer: Die Antwort von Dave Rove zeigt, dass Sie kann verwenden time.sleep() Führen Sie alle X Sekunden etwas aus

    – jfs

    21. September 2014 um 4:56 Uhr

  • Meiner Meinung nach sollte der Code anrufen time.sleep() in while True Schleife wie: def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)

    – Leonard Lepadatu

    17. November 2017 um 13:09 Uhr

1436500cookie-checkWie kann man eine Funktion alle x Sekunden wiederholt ausführen?

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy