gevent for the Working Python Developer

Geschrieben von der Gevent-Community

gevent ist eine Bibliothek für Nebenläufigkeit, basierend auf libev. Sie liefert eine klare API für eine Vielfalt von nebenläufigkeits- und netzwerk-basierten Aufgaben.

Einführung

Die Struktur dieses Tutorials nimmt an, dass der Leser ein gewisses Level an Wissen über Python-Programmierung hat, sonst jedoch nicht viel. Es wird nicht erwartet, dass er etwas über Nebenläufigkeit weiss. Das Ziel ist es, dem Leser die Werkzeuge in die Hand zu geben, um mit gevent zu arbeiten, ihm zu helfen, seine vorhandenen Probleme mit Nebenläufigkeit zu bezwingen und es ihm ermöglichen, noch heute asynchrone Applikationen zu schreiben.

Mitwirkende

In chronologischer Reihenfolge der Mitarbeit: Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkov Veit Heller

Ausserdem ein Dankeschön an Denis Bilenko dafür, dass er gevent geschrieben und uns bei der Erstellung dieses Tutorials beraten hat.

Dies ist ein kollaboratives Dokument, veröffentlicht unter der MIT-Lizenz. Du hast etwas hinzuzufügen? Siehst einen Tippfehler? Forke es und erstelle einen Pull Request auf Github. Jede Mitarbeit ist willkommen.

(Anmerkung des Übersetzers: Falls Tippfehler in der Übersetzung auftauchen solleten, kannst du sie hier melden.)

Diese Seite ist auch in Japanisch und Italienisch verfügbar.

Core

Greenlets

Die primäre Struktur, die in gevent verwendet wird, ist das Greenlet, eine leichtgewichtige Koroutine, die Python als C-Erweiterungs-Modul zur Verfügung gestellt wird. Greenlets laufen allesamt innerhalb des OS-Prozesses des Hauptprogrammes, werden aber kooperativ verwaltet.

Only one greenlet is ever running at any given time. (Nur ein Greenlet läuft zu jeder gegebenen Zeit.)

Dies unterscheidet sich von allen echten Parallelitäts-Konstrukten, die von den multiprocessing- oder `threading-Bibliotheken implementiert werden; diese verwenden Spin-Prozesse und POSIX-Threads, welche vom Betriebssystem verwaltet werden und echt parallel ablaufen.

Synchrone & Asynchrone Ausführung

Die Kernidee von Nebenläufigkeit ist, dass eine grössere Aufgabe in eine Ansammlung von Sub-Aufgaben unterteilt werden kann, welche simultan ausgeführt werden sollen oder asynchron anstatt nacheinander oder synchron. Ein Übergang zwischen zwei Sub-Aufgaben nennt man Kontext-Wechsel(Context Switch).

Ein Kontext-Wechsel in gevent wird durch Yielding umgesetzt. In diesem Beispiel haben wir zwei Kontexte, die sich gegenseitig "yielden", indem sie ``gevent.sleep(0)```aufrufen.


import gevent

def foo():
    print('Expliziter Kontext bei foo')
    gevent.sleep(0)
    print('Expliziter Kontextwechsel zu foo')

def bar():
    print('Expliziter Kontext bei bar')
    gevent.sleep(0)
    print('Expliziter Kontextwechsel zu bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])


Expliziter Kontext bei foo
Expliziter Kontext bei bar
Expliziter Kontextwechsel zu foo
Expliziter Kontextwechsel zu bar

Es ist erhellend, den Kontrollfluss des Programms zu visualisieren oder mit einem Debugger hindurchzugeben, um die Kontext-Wechsel in Echtzeit zu betrachten.

(Anmerkung des Übersetzers: An dieser Stelle will ich mich entschuldigen, die Gif nicht übersetzt zu haben, aber ich glaube, dass es dem Verständnis nicht abträglich ist.)

Greenlet Control Flow

Die wirkliche Stärke von gevent ist zu sehen, wenn wir es für Netzwerk- und IO-lastige Funktionen benutzen, welche kooperativ verwaltet werden können. Gevent kümmert sich um all die Details, die nötig sind, damit deine Netzwerkbibliotheken immer implizit ihre Greenlet-Kontexte liefern, wenn dies möglich ist. Ich kann nicht genug betonen, was für ein mächtiges Idiom das ist. Aber vielleicht illustriert ein Beispiel das.

In diesem Fall ist die select()-Funktion normalerweise ein blockierender Aufruf, der verscheidene Dateideskriptoren abfragt.


import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'bei %1.1f Sekunden' % (time.time() - start)

def gr1():
    # Wartet eine Sekunde lang, aber wir wollen nicht herumlungern
    print('Beginn der Wartezeit: %s' % tic())
    select.select([], [], [], 2)
    print('Ende der Wartezeit: %s' % tic())

def gr2():
    # Wartet eine Sekunde lang, aber wir wollen nicht herumlungern
    print('Beginn der Wartezeit: %s' % tic())
    select.select([], [], [], 2)
    print('Ende der Wartezeit: %s' % tic())

def gr3():
    print("Hey, lass uns irgendwas tun, solange die Greenlets warten, %s" % tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])


Beginn der Wartezeit: bei 0.0 Sekunden
Beginn der Wartezeit: bei 0.0 Sekunden
Hey, lass uns irgendwas tun, solange die Greenlets warten, bei 0.0 Sekunden
Ende der Wartezeit: bei 2.0 Sekunden
Ende der Wartezeit: bei 2.0 Sekunden

Ein weiteres etwas synthetisches Beispiel definiert eine task-Funktion, die nichtdeterministisch ist(d.h. es wird nicht garantiert, dass ihre Rückgabe immer das selbe Ergebnis bei gleicher Eingabe ist). In diesem Fall ist der Nebeneffekt dieser Funktion, dass die Ausführung der Aufgabe für eine zufällige Anzahl an Sekunden pausiert wird.


import gevent
import random

def task(pid):
    """
    Eine nichtdeterministische Aufgabe
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Aufgabe %s beendet' % pid)

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchron:')
synchronous()

print('Asynchron:')
asynchronous()


Synchron:
Aufgabe 1 beendet
Aufgabe 2 beendet
Aufgabe 3 beendet
Aufgabe 4 beendet
Aufgabe 5 beendet
Aufgabe 6 beendet
Aufgabe 7 beendet
Aufgabe 8 beendet
Aufgabe 9 beendet
Asynchron:
Aufgabe 5 beendet
Aufgabe 0 beendet
Aufgabe 1 beendet
Aufgabe 6 beendet
Aufgabe 7 beendet
Aufgabe 8 beendet
Aufgabe 9 beendet
Aufgabe 2 beendet
Aufgabe 3 beendet
Aufgabe 4 beendet

In der synchronen Funktion laufen alle Aufrufe von task() sequentiell ab, was ein blockierendes (d.h. die Ausführung des Hauptprogrammes pausierendes) Programm erzeugt. Es "wartet" auf die Ausführung jedes Aufrufes.

Die wichitgen Teile des Programms sind die Aufrufe von gevent.spawn, welche die angegebene Funktion in einem Greenlet-Thread verpackt. Die Liste initialisierter Greenlets werden im Array threads gespeichert, welcher an die Funktion gevent.joinallweitergereicht wird, welche das laufende Programm blockiert, um alle Greenlets auszuführen. Die Ausführung wird nur weitergeführt, wenn alle Greenlets terminieren.

Der wichtige Umstand hier ist, dass die Reihenfolge, in der der Code ausgeführt wird, im asynchronen Fall mehr oder weniger zufällig ist und dass die ganze Ausführungszeit im asynchronen Fall sehr viel geringer ist als im synchronen Fall. Tatsächlich ist die maximale Zeit, die der synchrone Code braucht, um einen task`` auszuführen 0.002 Sekunden, was bedeutet, dass die Gesamtzeit sich auf etwa 0.02 Sekunden beläuft. Im asynchronen Fall beläuft sich die maximale Gesamtzeit auf ungefähr 0.002 Sekunden, da keintask```die Ausführung der anderen blockiert.

Bei einem etwas üblicheren Fall, dem asynchronen Abrufen von Daten von einem Server, wird sich die Laufzeit von fetch() in verschiedenen Abfragen unterscheiden, in Abhängigkeit von der Last des Servers zur Zeit der Abfrage.


import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print('Prozess %s: %s' % (pid, datetime))
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print('Synchron:')
synchronous()

print('Asynchron:')
asynchronous()

Determinismus

Wie bereits erwähnt sind Greenlets deterministisch. Gibt man ihnen den selben Input und konfiguriert man sie gleich, produzieren sie immer die selbe Ausgabe. Im Folgenenden werden wir zum Beispiel eine Aufgabe auf einen Multiprozessor-Pool aufteilen und die Resultate mit denen eines gevent-Pools vergleichen.


import time

def echo(i):
    time.sleep(0.001)
    return i

# Nichtdeterministischer Prozess Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

# Deterministischer Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

False
True

Obwohl gevent normalerweise deterministisch ist, können Quellen von Nichtdeterminismus sich in ein Programm einschleichen, wenn man beginnt, mit der Aussenwelt, zum Beispiel Sockets und Dateien, zu kommunizieren. Daher können Green Threads, obwohl sie eine Form der "deterministischen Nebenläufigkeit" sind, trotzdem einige der gleichen Probleme haben, die POSIX Threads und Prozesse durchmachen.

Das immerwährende Problem mit Nebenläufigkeit ist bekannt als die Wettlaufsituation(Race Condition). Einfach gesagt passiert eine Wettlaufsituation, wenn zwei nebenläufige Threads/Prozesse von einer geteilten Resource abhängen, sie jedoch auch zu modifizieren versuchen. Dies resultiert in Resourcen, deren Werte abhängig von Zeit und Ablauf der Ausführung werden. Dies ist ein Problem und im Allgemeinen sollte man sehr stark versuchen, Wettlaufsituationen zu vermeiden, da sie global nicht-deterministisches Programmverhalten zur Folge haben.

Der beste Ansatz hierfür ist es, jeglichen globalen Zustand zu jedem Zeitpunkt zu vermeiden. Globaler Zustand und Seiten-Effekte zur Import-Zeit werden dich immer wieder belästigen!

Greenlets kreieren

gevent bietet einige Wrapper um die Initialisierung von Greenlets an. Einige der meist verwendeten Muster sind:


import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Jeder Thread bekommt die Argumente message und n
    bei seiner Initialisierung
    """
    gevent.sleep(n)
    print(message)

# Initialisiert eine neue Greenlet-Instanz, die die Funktion
# foo ausführt
thread1 = Greenlet.spawn(foo, "Hallo", 1)

# Wrapper, um ein neues Greenlet mit der Funktion foo
# zu erstellen und auszuführen, mit den übergebenen Argumenten
thread2 = gevent.spawn(foo, "Ich lebe!", 2)

# Lambda-Ausdruck
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Blockiert, bis alle Threads fertig sind.
gevent.joinall(threads)


Hallo
Ich lebe!

Zusätzlich zur Verwendung der Greenlet-Klasse kann man auch eine Subklasse dieser erstellen und die _run Methode überschreiben.


import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hallo!", 3)
g.start()
g.join()


Hallo!

Greenlet-Zustände

Wie jeder andere Code-Teil können Greenlets auf verschiedene Wege fehlschlagen. Ein Greenlet mag fehlschlagen, weil es eine Exception wirft, um das Program zu beenden oder weil es zu viele System-Resourcen benötigt.

Der interne Zustand eines Greenlets ist normalerweise ein von der Zeit abhängiger Parameter. Es gibt einige Flags in Greenlets, die es ermöglichen, den Zustand des Threads zu beobachten:


import gevent

def win():
    return 'Gewonnen!'

def fail():
    raise Exception('Du bist ein Verlierer im Verlieren.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions die im Greenlet geworfen wurden bleiben im Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('Hierhin kommen wir nie')

print(winner.value) # 'Gewonnen!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# Die Exception die in fail geworfen wurde wird nicht ausserhalb
# des Greenlets propagiert. Ein stacktrace wird nach stdout geschrieben,
# aber der Stack des Parents wird nicht aufgerollt.

print(loser.exception)

# Es ist jedoch möglich die Exception auch ausserhalb wieder
# zu werden
# raise loser.exception
# oder mit
# loser.get()


True
True
Gewonnen!
None
True
True
True
False
Du bist ein Verlierer im Verlieren.

Programmende

Greenlets, die nicht beenden, wenn das Hauptprogramm ein SIGQUIT erhält, können die Programmausführung länger als erwartet weiterführen. Diese werden zu sogenannten "Zombie-Prozessen", die ausserhalb des Python-Interpreters beendet werden müssen.

Ein häufig verwendetes Muster ist es, auf SIGQUIT-Signale in Richtung des Hauptprogramms zu hören und gevent.shutdown vor dem Beenden des Programms aufzurufen.

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.kill)
    thread = gevent.spawn(run_forever)
    thread.join()

Timeouts

Timeouts sind eine Einschränkung der Laufzeit eines Code-Blocks oder Greenlets.


import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print('Konnte nicht beendet werden')


Sie können auch mit einem Context Manager in einem with-Ausdruck verwendet werden.

import gevent
from gevent import Timeout

time_to_wait = 5 # Sekunden

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

Zusätzlich liefert gevent auch Timeout-Argumente für eine Vielzahl von Greenlet- und Datenstrukturen-basierten Aufrufen. Zum Beispiel:


import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Timeout in Thread 1')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Timeout in Thread 2')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Timeout in Thread 3')


Timeout in Thread 1
Timeout in Thread 2
Timeout in Thread 3

Monkeypatching

Leider kommen wir nun zu den dunklen Ecken von Gevent. Ich habe es vermieden, Monkeypatching bis jetzt zu erwähnen, um die mächtigen Koroutinen zu betonen, aber die Zeit ist gekommen die dunklen Künste des Monkeypatching zu erklären. Wir haben oben bereits das Kommando monkey.patch_socket() ausgeführt. Dies ist ein reines Seiteneffekt-Kommando um die Socket-Programme der Standard-Bibliothek zu modifizieren.

import socket
print(socket.socket)

print("Nach dem monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)

import select
print(select.select)
monkey.patch_select()
print("Nach dem monkey patch")
print(select.select)

class 'socket.socket'
Nach dem monkey patch
class 'gevent.socket.socket'

built-in function select
Nach dem monkey patch
function select at 0x1924de8

Pythons Runtime erlaubt es, dass die meisten Objekte zur Laufzeit modifiziert werden, auch Module, Klassen und sogar Funktionen. Das ist normalerweise eine unglaublich schlechte Idee, da es zu einem "impliziten Seiteneffekt" führt, der meistens extrem schwer zu debuggen ist, falls Probleme auftreten; nichtsdestotroz können Monkey Patches in extremen Situationen benutzt werden, in denen eine Bibliothek das fundamentale Verhalten von Python selbst verändern muss. In diesem Fall ist gevent in der Lage, die meisten blockierenden System Calls in der Standardbibliothek so zu patchen, dass sie stattdessen kooperativ arbeiten, einschliesslich der Module in socket, ssl, threading und select.

Zum Beispiel nutzt die Redis-Anbindung an Python reguläre TCP-Sockets, um mit der redis-server-Instanz zu kommunizieren. Nur durch den Aufruf von gevent.monkey.patch_all() können wir die Redis-Anbindung dazu bringen, Anfragen kooperativ zu behandeln und mit dem Rest unseres gevent-Überbaus zu interagieren.

Dies lässt und Bibliotheken integrieren, die normalerweise nicht mit gevent arbeiten würden, ohne jemals eine einzige Zeile Code schreiben zu müssen. Obwohl Monkey Patching immer noch schlecht ist, ist es in diesem Fall ein "nützliches Übel".

Daten-Strukturen

Events

Events sind eine Form der asynchronen Kommunikation zwischen Greenlets.

import gevent
from gevent.event import Event

'''
Illustriert den Nutzen von Events
'''


evt = Event()

def setter():
    '''Nach 3 Sekunden werden alle Threads die auf den Wert von evt warten
       aufgeweckt'''
    print('A: Hey, warte auf mich, ich muss etwas besorgen')
    gevent.sleep(3)
    print("Ok, ich bin fertig")
    evt.set()


def waiter():
    '''Nach 3 Sekundenwird der get-Aufruf entblockt'''
    print("Ich werde auf dich warten")
    evt.wait()  # blockierend
    print("Wird ja auch Zeit")

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])

if __name__ == '__main__': main()


Eine Erweiterun des Event-Objekts ist ein AsyncResult, welches es erlaubt, zusammen mit dem Weckruf einen Wert zu versenden. Dies wird manchmal future oder deferred genannt, da es eine Referenz auf einen zukünftigen Wert hält, der zu einem frei wählbaren Zeitpunkt gesetzt werden kann.

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter():
    """
    Setze das Ergebnis von a nach 3 Sekunden.
    """
    gevent.sleep(3)
    a.set('Hallo!')

def waiter():
    """
    Nach 3 Sekunden wird der get call entblockt, nachdem der Setter
    einen Wert in das AsyncResult schreibt.
    """
    print(a.get())

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])


Queues

Queues sind geoordnete Datensets, die die üblichen put/get Operationen unterstützt, aber auf eine solche Weise implementiert sind, dass sie sicher zwischen Greenlets manipuliert werden können.

Zum Beispiel wird bei simultanem Zugriff zweier Greenlets auf ein Item der Queue nicht zweimal das selbe Item herausgenommen.


import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Arbeiter %s hat Task %s bekommen' % (n, task))
        gevent.sleep(0)

    print('Ende!')

def boss():
    for i in xrange(1,25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])


Arbeiter steve hat Task 1 bekommen
Arbeiter john hat Task 2 bekommen
Arbeiter nancy hat Task 3 bekommen
Arbeiter steve hat Task 4 bekommen
Arbeiter john hat Task 5 bekommen
Arbeiter nancy hat Task 6 bekommen
Arbeiter steve hat Task 7 bekommen
Arbeiter john hat Task 8 bekommen
Arbeiter nancy hat Task 9 bekommen
Arbeiter steve hat Task 10 bekommen
Arbeiter john hat Task 11 bekommen
Arbeiter nancy hat Task 12 bekommen
Arbeiter steve hat Task 13 bekommen
Arbeiter john hat Task 14 bekommen
Arbeiter nancy hat Task 15 bekommen
Arbeiter steve hat Task 16 bekommen
Arbeiter john hat Task 17 bekommen
Arbeiter nancy hat Task 18 bekommen
Arbeiter steve hat Task 19 bekommen
Arbeiter john hat Task 20 bekommen
Arbeiter nancy hat Task 21 bekommen
Arbeiter steve hat Task 22 bekommen
Arbeiter john hat Task 23 bekommen
Arbeiter nancy hat Task 24 bekommen
Ende!
Ende!
Ende!

Queues können auch put und get-Operationen blockieren, falls es nötig wird.

Jede der put und get-Operationen hat einen nicht-blockierenden Gegensata, put_nowait und get_nowait, welche anstatt zu blockieren entweder gevent.queue.Empty oder gevent.queue.Full zurückgeben, falls die Operation nicht möglich ist.

In diesem beispiel läuft der Boss zur gleichen Zeit wie die Arbeiter und auf der Queue liegt eine Restriktion, die verhindert, dass darauf mehr als drei Elemente liegen. Diese Restriktion bedeutet, dass die put Operation blockiert bis kein Platz mehr in der Queue ist. Umgekehrt blockiert die get-Operation, falls keine Elemente mehr in der Queue sind und nimmt ausserdem ein Timeout-Argument, das es erlaubt, dass die Queue mit der Exception gevent.queue.Empty beendet wird, falls innerhalb der Zeitspanne des Timeouts keine Arbeit mehr gefunden wird.


import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(name):
    try:
        while True:
            task = tasks.get(timeout=1) # decrements queue size by 1
            print('Arbeiter %s hat Task %s bekommen' % (name, task))
            gevent.sleep(0)
    except Empty:
        print('Ende!')

def boss():
    """
    Der Boss wartet mit dem Austeilen der Arbeit, bis ein individueller
    Arbeiter frei ist, da die Maximalgrösse der Task-Queue 3 ist.
    """

    for i in xrange(1,10):
        tasks.put(i)
    print('Alle Arbeit in Iteration 1 ausgeteilt')

    for i in xrange(10,20):
        tasks.put(i)
    print('Alle Arbeit in Iteration 2 ausgeteilt')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])


Arbeiter steve hat Task 1 bekommen
Arbeiter john hat Task 2 bekommen
Arbeiter bob hat Task 3 bekommen
Arbeiter steve hat Task 4 bekommen
Arbeiter john hat Task 5 bekommen
Arbeiter bob hat Task 6 bekommen
Alle Arbeit in Iteration 1 ausgeteilt
Arbeiter steve hat Task 7 bekommen
Arbeiter john hat Task 8 bekommen
Arbeiter bob hat Task 9 bekommen
Arbeiter steve hat Task 10 bekommen
Arbeiter john hat Task 11 bekommen
Arbeiter bob hat Task 12 bekommen
Arbeiter steve hat Task 13 bekommen
Arbeiter john hat Task 14 bekommen
Arbeiter bob hat Task 15 bekommen
Arbeiter steve hat Task 16 bekommen
Arbeiter john hat Task 17 bekommen
Arbeiter bob hat Task 18 bekommen
Alle Arbeit in Iteration 2 ausgeteilt
Arbeiter steve hat Task 19 bekommen
Ende!
Ende!
Ende!

Groups und Pools

Eine Group ist eine Sammlung laufender Greenlets, die zusammen als Gruppe verwaltet und geleitet werden. Es dient ausserdem als paralleler Dispatcher, der die Python multiprocessing-Bibliothek spiegelt.


import gevent
from gevent.pool import Group

def talk(msg):
    for i in xrange(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()


bar
bar
bar
foo
foo
foo
fizz
fizz
fizz

Das ist sehr nützlich, um Gruppen asynchroner Aufgaben zu verwalten.

Wie oben erwähnt, hat Group auch eine API, um Jobs an gruppierte Greenlets auszuliefern und deren Resultate wieder einzusammeln; all dies auf mehreren verschiedenen Wegen.


import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Ausmass der Gruppe: %s' % len(group))
    print('Hallo von Greenlet %s' % id(getcurrent()))

group.map(hello_from, xrange(3))


def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Geordnet')

ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
    print(i)

print('Ungeordnet')

igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
    print(i)


Ausmass der Gruppe: 3
Hallo von Greenlet 4472170608
Ausmass der Gruppe: 3
Hallo von Greenlet 4472170448
Ausmass der Gruppe: 3
Hallo von Greenlet 4472991824
Geordnet
('task', 0)
('task', 1)
('task', 2)
Ungeordnet
('task', 2)
('task', 1)
('task', 0)

Ein Pool ist eine Struktur, die dafür entwickelt wurde, eine dynamische Anzahl von Greenlets, die in ihrer Nebenläufigkeit limitiert werden müssen, zu handhaben. Dies ist oftmals gewünscht, wenn viele Netzwerk- oder IO-basierte Aufgaben parallel bearbeitet werden sollen.


import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Ausmass des Pools %s' % len(pool))

pool.map(hello_from, xrange(3))


Ausmass des Pools 2
Ausmass des Pools 2
Ausmass des Pools 1

Oftmals wird beim Bau eines gevent-basierten Services der gesamte Service um eine Pool-Struktur herum gebaut. Ein Beispiel könnte eine Klasse sein, die auf verschiedenen Sockets arbeitet.

from gevent.pool import Pool

class SocketPool(object):

    def __init__(self):
        self.pool = Pool(1000)
        self.pool.start()

    def listen(self, socket):
        while True:
            socket.recv()

    def add_handler(self, socket):
        if self.pool.full():
            raise Exception("Pool hat Maximum erreicht")
        else:
            self.pool.spawn(self.listen, socket)

    def shutdown(self):
        self.pool.kill()


Locks und Semaphoren

Eine Semaphore ist eine Low-Level Synchronisations-Primitive, die es Greenlets ermöglicht, gleichzeitige Zugriffe und Ausführung zu koordinieren und zu limitieren. Eine Semaphore hat zwei Methoden, acquire und release. Die Maximalanzahl an gleichzeitigen acquires nennt sich die Grenze der Semaphore. Falls die Grenze der Semaphore 0 erreicht(alle freien Plätze besetzt sind), blockiert sie bis ein Greenlet seinen Platz wieder freigibt.


from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Arbeiter %i akquiriert Semaphore' % n)
    sleep(0)
    sem.release()
    print('Arbeiter %i git Semaphore frei' % n)

def worker2(n):
    with sem:
        print('Arbeiter %i akquiriert Semaphore' % n)
        sleep(0)
    print('Arbeiter %i gibt Semaphore frei' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))


Arbeiter 0 akquiriert Semaphore
Arbeiter 1 akquiriert Semaphore
Arbeiter 0 git Semaphore frei
Arbeiter 1 git Semaphore frei
Arbeiter 3 akquiriert Semaphore
Arbeiter 4 akquiriert Semaphore
Arbeiter 3 gibt Semaphore frei
Arbeiter 4 gibt Semaphore frei
Arbeiter 5 akquiriert Semaphore
Arbeiter 5 gibt Semaphore frei

Eine Semaphore mit Grenze 1 wird Lock genannt. Dieses lässt die exklusive Ausführung eines einzelnen Greenlets zu. Sie werden oft benutzt, um sicherzustellen, dass Resourcen nur von einer Stelle im Kontext des Programms gleichzeitig benutzt werden.

Thread Locals

Gevent erlaubt es auch, den Greenlets Daten zur Verfügung zu stellen, die lokal für dessen Kontext verfügbar ist. Intern ist dies durch ein globales Nachlagen eines privaten Namespaces implementiert, wobei der Wert von getcurrent() des Greenlets der Schlüssel ist.


import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x ist nicht lokal in f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])


1
2
x ist nicht lokal in f2

Viele Web-Frameworks, die gevent benutzen, speichern HTTP-Session-Objekte in gevent Thread Locals. Zum Beispiel können wir unter Benutzung der Werkzeug-Bibliothek und dessen Proxy-Objekt Request-Objekte im Stile des Flask-Frameworks nachbauen.

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hallo " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()



Flasks System ist ein bisschen ausgeklügelter als dieses Beispiel, aber die Idee, Thread Locals als lokalen Session-Speicher zu verwenden ist trotzdem der gleiche.

Subprocess

Seit gevent 1.0 ist gevent.subprocess - eine gepatchte Version von Pythons subprocess-Modul - in gevent verfügbar. Es unterstützt kooperatives Warten auf Subprozesse.


import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

cron
cron
cron
cron
cron
Linux

Einige Benutzer wollen gevent und multiprocessing zusammen verwenden. Eine der offensichtlichen Herausforderungen hierbei ist, dass Interprozesskommunikation in multiprocessing nicht standardmässig kooperativ ist. Da multiprozessing.Connection-basierte Objekte (wie Pipe) ihre ihnen zugrundeliegenen Dateideskriptoren offenlegen, können gevent.socket.wait_read und wait_write benutzt werden, um kooperativ auf ready-to-read/ready-to-write-Events zu warten, bevor tatsächlich gelesen/geschrieben wird:


import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay():
    for i in xrange(10):
        msg = b.recv()
        c.send(msg + " in " + str(i))

def put_msg():
    for i in xrange(10):
        wait_write(a.fileno())
        a.send('hi')

def get_msg():
    for i in xrange(10):
        wait_read(d.fileno())
        print(d.recv())

if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)

Man sollte sich jedoch klarmachen, dass die Kombination von multiprocessing und gevent einige betriebssystemspezifische Fallstricke mit sich bringt, unter anderem:

  • Nach dem Forken ist gevents Zustand im Child schlecht gestellt. Ein Nebeneffekt ist, dass Greenlets, die vor dem Erstellen von multiprocessing.Process hervorgebracht werden in beiden Prozessen laufen, Parent und Child.
  • a.send() in put_msg oben könnte den aufrufenden Thread immer noch nicht kooperativ blockieren: ein ready-to-write-Event versichert nur, dass ein Byte geschrieben werden kann. Der zugrundeliegende Puffer könnte voll sein, bevor der versuchte Write beendet wurde.
  • Der Ansatz, der auf wait_write()/wait_read() basiert, so wie es oben verwendet wurde, funktioniert nicht auf Windows(IOError: 3 is not a socket (files are not supported)), da Windows Pipes nicht auf Events hin beobachten kann.

Das Python-Paket gipc überwindet diese Herausforderungen für den Anwender auf eine grössenteils transparente Weise, sowohl auf POSIX- als auch auf Windows-Systemen. Es liefert gevent-kompatible multiprocessing.Process-basierte Kindprozesse und gevent-kooperative Interprozesskommunikation basierend auf Pipes.

Actors

Das Aktoren-Modell ist ein High-Level-Nebenläufigkeitsmodell, das von der Sprache Erlang popularisiert wurde. Kurz gesagt ist die Hauptidee, dass man über eine Ansammlung unabhängiger Actors verfügt, die eine Inbox besitzen, aus welcher sie Nachrichten von anderen Actors bekommen. Die Hauptschleife im Actor iteriert durch seine Nachrichten und agiert auf Basis seines gewünschten Verhaltens.

Gevent hat keinen primitiven Actor-Typen, jedoch können wir sehr einfach einen definieren, indem wir eine Queue innerhalb einer Greenlet-Subklasse verwenden.

import gevent
from gevent.queue import Queue

class Actor(gevent.Greenlet):

    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)

    def receive(self, message):
        """
        Dies muss in jeder Subklasse anders definiert werden.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)


Ein Anwendungsfall:

import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)

class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])

Anwendungen aus der echten Welt

Gevent ZeroMQ

ZeroMQ wird von dessen Autoren als "eine Socket-Bibliothek, die als ein Nebenläufigkeitsframework agiert" beschrieben. Es stellt eine sehr mächtige Messaging-Schicht bereit, die es erlaubt, nebenläufige und verteilte Anwendungen zu erstellen.

ZeroMQ bietet eine Vielfalt an Socket-Primitiven an, die simpelste davon ist ein Request-Response-Socket-Paar. Ein Socket hat zwei interessante Methoden namens send and recv, beide davon normale blockierende Operationen. Dies jedoch wird durch eine geniale Bibliothek von Travis Cline in Ordnung gebracht, welche gevent.socket benutzt, um ZeroMQ-Sockets auf nicht-blockierende Weise abzufragen. Um dies auszunutzen, muss lediglich die Python-Anbindung an ZeroMQ via pip install pyzmq installiert werden.

(Anmerkung des Übersetzers: Diese Bibliothek wurde inzwischen in den Kern des PyZeroMQ-Projekts eingebettet. Installationshinweise für gevent-zeromq sind daher hinfällig)


# Anmerkung: Zuvor muss ``pip install pyzmq`` ausgeführt werden
import gevent
import zmq.green as zmq

# Globaler Kontext
context = zmq.Context()

def server():
    server_socket = context.socket(zmq.REQ)
    server_socket.bind("tcp://127.0.0.1:5000")

    for request in range(1,10):
        server_socket.send("Hallo")
        print('Zum Server gewechselt, um %s zu bearbeiten' % request)
        # IHier passiert ein impliziter Kontextwechsel
        server_socket.recv()

def client():
    client_socket = context.socket(zmq.REP)
    client_socket.connect("tcp://127.0.0.1:5000")

    for request in range(1,10):

        client_socket.recv()
        print('Zum Client gewechselt, um %s zu bearbeiten' % request)
        # Implicit context switch occurs here
        client_socket.send("Welt")

publisher = gevent.spawn(server)
client    = gevent.spawn(client)

gevent.joinall([publisher, client])


Zum Server gewechselt, um 1 zu bearbeiten
Zum Client gewechselt, um 1 zu bearbeiten
Zum Server gewechselt, um 2 zu bearbeiten
Zum Client gewechselt, um 2 zu bearbeiten
Zum Server gewechselt, um 3 zu bearbeiten
Zum Client gewechselt, um 3 zu bearbeiten
Zum Server gewechselt, um 4 zu bearbeiten
Zum Client gewechselt, um 4 zu bearbeiten
Zum Server gewechselt, um 5 zu bearbeiten
Zum Client gewechselt, um 5 zu bearbeiten
Zum Server gewechselt, um 6 zu bearbeiten
Zum Client gewechselt, um 6 zu bearbeiten
Zum Server gewechselt, um 7 zu bearbeiten
Zum Client gewechselt, um 7 zu bearbeiten
Zum Server gewechselt, um 8 zu bearbeiten
Zum Client gewechselt, um 8 zu bearbeiten
Zum Server gewechselt, um 9 zu bearbeiten
Zum Client gewechselt, um 9 zu bearbeiten

Ein simpler Server


# Unter Unix: Zugang mit ``$ nc 127.0.0.1 5000``
# Unter Window: Zugang mit ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer

def handle(socket, address):
    socket.send("Hallo von telnet!\n")
    for i in range(5):
        socket.send(str(i) + '\n')
    socket.close()

server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI-Server

Gevent liefert zwei WSGI-Server mit, um Inhalte über HTTP anzubieten, fortan nennen wir sie wsgi und pywsgi:

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

In früheren Versionen von gevent(vor 1.0.x) benutzte gevent libevent anstelle von libev. Libevent schloss einen HTTP-Server mit ein, welcher von gevents wsgi-Server benutzt wurde.

In gevent 1.0.x gibt es keinen HTTP-Server mehr. Stattdessen ist gevent.wsgi nun ein Alias für den reinen Python-Server in gevent.pywsgi.

Streaming-Server

Diese Sektion ist unter gevent 1.0.x nicht anwendbar.

Für diejenigen Benutzer, die HTTP-Streaming-Services kennen: die Kernidee ist, dass wir im Header nicht die Länge des Inhalts spezifizieren. Anstelle dessen halten wir die Verbindung offen und schicken Teile durch die Pipe, während wir jedem Teil eine hexadezimalen Zahl voranstellen, die die Länge des Teils repräsentiert. Der Stream wird geschlossen, sobald ein Teil von der Grösse 0 gesendet wird.

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

8
<p>Hallo

9
Welt!</p>

0

Die obige HTTP-Verbindung kann nicht in wsgi kreiert werden, da Streaming nicht unterstützt wird. Anstelle dessen würde es gepuffert.

from gevent.wsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'
    body = '<p>Hallo Welt!</p>'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()


Unter Verwendung von pywsgi können wir jedoch unseren Handler als Generator schreiben und das Ergebnis Teil für Teil versenden.

from gevent.pywsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    yield "<p>Hallo"
    yield "Welt!</p>"

WSGIServer(('', 8000), application).serve_forever()


Trotzdem ist die Performance von Gevent-Servern phänomenal im Vergleich zu anderen Python-Servern. Libev ist eine sehr gut untersuchte Technologie und Server, die sie benutzen, sind bekannt dafür, dass sie gut skalieren.

Als Massstab kann der geneigte Leser zum Beispiel das Apache Benchmark ab verwenden oder diesen Vergleich von Python-WSGI-Servern für einen Vergleich mit anderen Servern konsultieren.

$ ab -n 10000 -c 100 http://127.0.0.1:8000/

Langes Polling

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as json

data_source = Queue()

def producer():
    while True:
        data_source.put_nowait('Hallo Welt')
        gevent.sleep(1)

def ajax_endpoint(environ, start_response):
    status = '200 OK'
    headers = [
        ('Content-Type', 'application/json')
    ]

    start_response(status, headers)

    while True:
        try:
            datum = data_source.get(timeout=5)
            yield json.dumps(datum) + '\n'
        except Empty:
            pass


gevent.spawn(producer)

WSGIServer(('', 8000), ajax_endpoint).serve_forever()


Websockets

Dies ist ein Beispiel für Websockets, welches gevent-websocket benötigt.

# Simpler gevent-websocket server
import json
import random

from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler

class WebSocketApp(object):
    '''Sendet Zufallsdaten an den Websocket'''

    def __call__(self, environ, start_response):
        ws = environ['wsgi.websocket']
        x = 0
        while True:
            data = json.dumps({'x': x, 'y': random.randint(1, 5)})
            ws.send(data)
            x += 1
            sleep(0.5)

server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),
    handler_class=WebSocketHandler)
server.serve_forever()

Die dazugehörige HTML-Seite sieht wie folgt aus::

<html>
    <head>
        <title>Minimale Websocket-Anwendung</title>
        <script type="text/javascript" src="jquery.min.js"></script>
        <script type="text/javascript">
        $(function() {
            // Öffnet eine Verbindung zu unserem Server
            var ws = new WebSocket("ws://localhost:10000/");

            // Was tun wir, wenn wir eine Nachricht erhalten?
            ws.onmessage = function(evt) {
                $("#placeholder").append('<p>' + evt.data + '</p>')
            }
            // Wir aktualisieren unser conn_status-Feld mit dem Verbindungs-Status
            ws.onopen = function(evt) {
                $('#conn_status').html('<b>Verbunden</b>');
            }
            ws.onerror = function(evt) {
                $('#conn_status').html('<b>Fehler</b>');
            }
            ws.onclose = function(evt) {
                $('#conn_status').html('<b>Geschlossen</b>');
            }
        });
    </script>
    </head>
    <body>
        <h1>WebSocket-Beispiel</h1>
        <div id="conn_status">Nicht verbunden</div>
        <div id="placeholder" style="width:600px;height:300px;"></div>
    </body>
</html>

Chat-Server

Das letzte Beispiel sei ein Echtzeit-Chat-Raum. Dieses Beispiel benötigt Flask ( aber nicht notwendigerweise, verwendbar wären auch Django, Pyramid, usw.). Die entsprechenden Javascript und HTML-Dateien sind hier zu finden.

# Mikro-gevent-Chatraum.
# ----------------------

from flask import Flask, render_template, request

from gevent import queue
from gevent.pywsgi import WSGIServer

import simplejson as json

app = Flask(__name__)
app.debug = True

rooms = {
    'topic1': Room(),
    'topic2': Room(),
}

users = {}

class Room(object):

    def __init__(self):
        self.users = set()
        self.messages = []

    def backlog(self, size=25):
        return self.messages[-size:]

    def subscribe(self, user):
        self.users.add(user)

    def add(self, message):
        for user in self.users:
            print(user)
            user.queue.put_nowait(message)
        self.messages.append(message)

class User(object):

    def __init__(self):
        self.queue = queue.Queue()

@app.route('/')
def choose_name():
    return render_template('choose.html')

@app.route('/<uid>')
def main(uid):
    return render_template('main.html',
        uid=uid,
        rooms=rooms.keys()
    )

@app.route('/<room>/<uid>')
def join(room, uid):
    user = users.get(uid, None)

    if not user:
        users[uid] = user = User()

    active_room = rooms[room]
    active_room.subscribe(user)
    print('subscribe %s %s' % (active_room, user))

    messages = active_room.backlog()

    return render_template('room.html',
        room=room, uid=uid, messages=messages)

@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
    user = users[uid]
    room = rooms[room]

    message = request.form['message']
    room.add(':'.join([uid, message]))

    return ''

@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
    try:
        msg = users[uid].queue.get(timeout=10)
    except queue.Empty:
        msg = []
    return json.dumps(msg)

if __name__ == "__main__":
    http = WSGIServer(('', 5000), app)
    http.serve_forever()