Python Generators - From Iterators to Cooperative Multitasking - 2 Read
In this third issue we move on uncovering how generators can be the foundation of a cooperative multitasking system and show some code that implements it. Before we face this topic we will talk shortly about another interesting use of generators, namely generator expressions chains.
Chaining generator expressions¶
At PyCon 2008 David M. Beazley, author of “Python Essential Reference”, made a very interesting speech about the use of generators in system administration, in other words where usually more or less complex bash scripts are involved and in particular where long pipe sequences are used.
David starts from the consideration that generators, producing one element at a time, are chainable, that is a generator expression can encompass another generator expression and so on. This way he shows how to write in a very compact and reusable way components that can act as “filters” on a data set, thus following the Unix philosophy of building tools that “do one thing and do it well”, chaining them afterwards to get the needed behaviour.
The slides of this presentation are freely downloadable, so I suggest the interested reader to take a look at it at the following address: Generator Tricks for Systems Programmers.
Microthread: cooperative multitasking¶
Disclaimer: the concepts and code presented here have been heavily influenced by the Kamaelia project. You can find it here.
Let us move forward to see how (Python) generators allow us to easily build applications based on the concept of cooperative multitasking. I assume the reader is familiar with the concepts of preemption and thread-based multitasking and is aware of the pro and cons of such solutions.
Cooperative multitasking allows an application to hold the control of the CPU for an arbitrary time lapse, waiting for it to voluntarily release the resource to the scheduler. This is a major break with the modern approach to multitasking, where the scheduler is in charge of stopping and resuming applications without any previous agreement with them.
Since application can now stop on their own initiative every issue related to shared data protection, atomicity and synchronization is greatly simplified if not removed. Applications need however a mechanism to stop running, save their internal state and later resume from the same point.
Generators, indeed, through the
yield statement implement this very behaviour, thus they may be used to create a system based on cooperative multitasking, where processes are now called microthreads to highlight that they are a lightweight form of thread.
Let’s look at a simple implementation of such a system. First of all we need a
MicroThread object, i.e. an object that can run simultaneously with other similar objects, but in a cooperative way.
class MicroThread(object): def main(self): while 1: yield 1
An instance of this object exposes a
main() method that, when called, returns a generator. The latter, at each call of its
next() method simply returns
1, freezing at the same time its execution just after the
yield statement, still inside the infinite while loop.
The object can be directly tested
>>> mt = MicroThread() >>> g = mt.main() >>> g <generator object main at 0xb74331e4> >>> g.next() 1
To make the object more easily inheritable and extendable we can refactor it a little
class MicroThread(object): def step(self): pass def create(self): pass def main(self): self.create() yield 1 while 1: self.step() yield 1
Such changes let us inherit the class and extend it simply by overriding the
step() methods; the first is called as soon as
main() is called, acting as a delayed initializer, while the second is executed at each call of
next(), just before freezing the code with
yield. Pay attention that since
create() is called inside the generator function, you have to call
next() once to run it after the genrator has been created. So the standard workflow with this object is
# Instance the object mt = MicroThread() # Create the generator g = mt.main() # Initialize it g.next() # Loop over it g.next() g.next() ...
main() is a generator function it must act as any generator and signal its exhaustion rising a
StopIteration exception. The overridden
step() method, thus, may raise this exception at any point (even multiple ones) to terminate the microthread.
Now we need a scheduler, i.e. the system component that manages running tasks. While in a true multitasking system the scheduler is a big and complex component, in a cooperative environment it can be rather simple: its job is to execute each task and wait till they give control back. In between a task and the following the scheduler can execute other functions, but its basic workflow is very straightforward. Obviously the scheduler shall handle the
StopIteration exception possibly raised by a task, removing it from the list of running microthreads.
The core of the scheduler will be something like the following:
for thread in active_microthreads: try: thread.next() scheduled_microthreads.append(thread) except StopIteration: pass active_microthreads = scheduled_microthreads scheduled_microthreads = 
This snippet encompasses the behaviour described above. We have two lists,
active_microthreads with all the tasks that shall be executed in the current loop and
scheduled_microthreads with all the tasks that are goig to be executed in the next loop. At each loop of the scheduler all microthreads in
active_microthreads are executed, that is they are granted one execution of their
next() function. After this the thread is scheduled again, i.e. it is appended to the
scheduled_microthreads list. If the thread raises the
StopIteration exception during its execution it is simply not scheduled again. When the
active_microthreads list is exhausted the loop ends and the scheduled threads list is transferred in the
active_threads one; after this the loop starts again.
So the first implementation of the scheduler is the following:
class Scheduler(object): def __init__(self): self.active_microthreads =  self.scheduled_microthreads =  def add_microthread(self, mthread): g = mthread.main() g.next() self.active_microthreads.append(g) def run(self): while 1: for thread in self.active_microthreads: try: thread.next() self.scheduled_microthreads.append(thread) except StopIteration: pass self.active_microthreads = self.scheduled_microthreads self.scheduled_microthreads = 
__init__() method initializes the two internal lists we talked about above. The
add_microthread() method allows us to add a microthread to the scheduler; the method calls
main() on each microthread we add to obtain its generator, then calls
next() once on the latter to initialize it and finally adds it to the list of scheduled tasks.
The scheduler logic is then implemented in the
run() method, which executes the above core code in an infinite while loop.
We can test the microthreads and the scheduler with this simple code
import mthread import scheduler import time class TestMicroThread(mthread.MicroThread): def __init__(self, number): self.num = number def step(self): print "Number:", self.num time.sleep(1) mt1 = TestMicroThread(1) mt2 = TestMicroThread(2) mt3 = TestMicroThread(3) ms = scheduler.Scheduler() ms.add_microthread(mt1) ms.add_microthread(mt2) ms.add_microthread(mt3) ms.run()
TestMicroThread is a microthread but the
step() method was reimplemented to print a number and wait 1 second. Three microthreads are instanced and added to the scheduler and the
run() method of the scheduler is executed. Not surprisingly the result is the following
$ python test_scheduler.py Number: 1 Number: 2 Number: 3 Number: 1 Number: 2 Number: 3 [...]
The three microthreads are executed in a round-robin fashion, as expected from a cooperative multitasking system.
Note: while all microthreads showed in this article just execute
yield 1 to freeze the code,
yield can return any object, just like the
return statement does, and this could be exploited to enhance the communication between microthreads and scheduler.
The scheduler could however be more flexible, specifically it could be converted to a microthread itself. The scheduler, when executed, will return a generator, and each call of its
next() method will run one of its microthreads. After this the scheduler will freeze and give control back.
class MicroScheduler(object): def __init__(self): self.active_microthreads =  self.scheduled_microthreads =  def add_microthread(self, mthread): g = mthread.main() g.next() self.active_microthreads.append(g) def main(self): yield 1 while 1: if len(self.active_microthreads) == 0: yield 1 for thread in self.active_microthreads: try: thread.next() self.scheduled_microthreads.append(thread) except StopIteration: pass yield 1 self.active_microthreads = self.scheduled_microthreads self.scheduled_microthreads = 
It is sufficient to rename
main(), to match our arbitrary microthread interface, and add some
yield statements. The first
yield at the beginning of
main() terminates the creation part: this scheduler has no
create() method, but if present it should be called here. The second
yield is called if the scheduler contains no microthreads, since it has nothing to do. The third
yield is called after each loop of the microthread running part.
These little changes allow the scheduler to be run into another scheduler, thus enabling us to create a hierarchy to easily build complex systems. At the same time the scheduler can be used as usual simply calling its
next() method in a for loop.
import mthread import mscheduler import time class TestMicroThread(mthread.MicroThread): def __init__(self, number): self.num = number def step(self): print "Number:", self.num time.sleep(1) mt1 = TestMicroThread(1) mt2 = TestMicroThread(2) mt3 = TestMicroThread(3) ms = mscheduler.MicroScheduler() ms.add_microthread(mt1) ms.add_microthread(mt2) ms.add_microthread(mt3) for i in ms.main(): pass
This example is obviously very simple. However it shows how simple it is to build components of a cooperative system and let them live together in an execution space. Executing the scheduler in a different way, for example inside another generator, new microthreads can also be added live. This allows to instance system components on the fly to manage specific needs, such as incoming service requests.
Obviously this sort of multitasking cannot provide an interactive execution like that used on our desktop OS or on a Web server, where human users must receive an immediate feedback of their actions. But for systems where task must simply be simultaneously executed without timing needs cooperative multitasking is a valuable solution, due to its simplicity.
Another interesting scenario is that of a real multitasking system (ruled by threaded code or by the OS itself) where each task is made of small cooperating components. This way putting multiple functionalities inside a single component becomes a breeze; the code of each functionality could also be splitted in several plugins and loaded on demand.
A package that implements cooperative multitasking with generator based microthreads is Kamaelia, and this article has been heavily inspired by it. Other solutions you can find interesting are greenlet, presently the most used microthread Python library that runs on the standard unmodified Python interpreter and Stackless Python, a fork of Python that natively implements microthreads.
2014-02-17: Riccardo spotted an error in the example code after
mthread.py and submitted the correct version. Thanks!