Reactive/dataflow programming in Python, part 2
In the previous part of this series, we introduced Lusmu, our minimal reactive/dataflow programming framework in Python. We clarified dataflow terminology and some of our design choices using simple usage examples.
In this part, we create a stripped-down implementation of the framework to highlight some characteristics of the framework:
- Synchronous Activation
- Pull Data (invalidate / lazy-revalidate)
- Doubly Linked Adjacency List as the graph data structure
Also, before we start, thanks to readers for feedback on the first part, and thanks also to the audience in Pycon Finland 2014 for questions and discussion about our Lusmu presentation!
Simplification: no arcs
Many dataflow systems connect the input and output ports of nodes explicitly with arc objects:
In asynchronous systems, arcs buffer data between nodes. Conversely, synchronous systems like Lusmu can be simplified by doing without the concept of arcs. We can just store the current output port value in a data attribute inside each node, and use an inputs attribute to link nodes:
Note
It’s important to remember that due to the lack of buffering, it is the user’s responsibility to drive the graph in a “feed data → fire nodes → read results” loop. Feeding multiple rounds of data without extracting results in between will throw away previous data without affecting the results.
A side effect of this design decision is that we need a “magic constant” value which denotes that the node doesn’t have any data yet:
>>> class NoData:
... def __repr__(self):
... return 'NO_DATA'
...
>>> NO_DATA = NoData()
Node classes
To represent source and operation nodes, we need two different node classes. It’s convenient to use a common base class which provides node objects with:
- a name (for debugging and visualization purposes)
- the data attribute for keeping the current value of the output port (initially empty)
- a way to signal its observer nodes (stored in the observers attribute) whenever the value of the output port changes
We’ll also define a set_data() setter method, which
- is always used to set the output port value
- invalidates observers whenever the value changes
The invalidation is the first trick needed for the “invalidate/lazy revalidate” scheme (a.k.a. Pull Data).
>>> class BaseNode:
... def __init__(self, name):
... self.name = name
... self.data = NO_DATA
... self.observers = set()
...
... def set_data(self, data):
... self.data = data
... print('SET {} = {}'.format(self.name, data))
... for node in self.observers:
... node.set_data(NO_DATA)
Source nodes
Source nodes only extend the base class with the get_data() getter which just returns the current output port value fed from outside:
>>> class SrcNode(BaseNode):
... def get_data(self):
... return self.data
Operation nodes
Operation nodes extend the base node functionality by:
- storing their operation (a simple Python callable) in the op attribute
- receiving data from other nodes which are specified in the inputs attribute
- being automatically connected back to their input nodes as observers
Each entry in the inputs list of nodes is always mirrored by a link in the observers set of the input node. Thus, the nodes form a directed graph using a doubly linked adjacency list.
If there already is a value in the output port of an operation node, the get_data() getter is a no-op and only returns the existing value. On the other hand, a missing value is calculated by requesting input values from each input node and firing the operation function using those values. Note that all required inputs are evaluated recursively.
The caching of output port values is the second trick which makes the magic of lazy evaluation happen.
>>> class OpNode(BaseNode):
... def __init__(self, name, op, inputs):
... super().__init__(name)
... self.op = op
... self.inputs = inputs
... for node in inputs:
... node.observers.add(self)
...
... def get_data(self):
... if self.data is NO_DATA: # fire the node
... inputs_data = [node.get_data() for node in self.inputs]
... print('FIRE {}({})'.format(self.name, ', '.join(str(d) for d in inputs_data)))
... new_data = self.op(*inputs_data)
... self.set_data(new_data)
... return self.data
Creating and running a graph
We actually now have everything we need to construct our previous example. Let’s just define our operations (see part 1 for details):
>>> def difference(a, b):
... """Calculate the difference between numbers using a simple function"""
... return b - a
...
>>> class IsInRange:
... """Check that number is in range using a callable object"""
... def __init__(self, low, high):
... """Save the low and high limits"""
... self.low = low
... self.high = high
...
... def __call__(self, data):
... """Perform the actual check against saved limits"""
... return (self.low <= data) & (data <= self.high)
To construct the graph, just instantiate nodes and define correct operations and input nodes for them:
>>> import numpy as np
>>> speed_over_ground = SrcNode('speed_over_ground')
>>> speed_through_water = SrcNode('speed_through_water')
>>> current = OpNode('current',
... op=difference,
... inputs=[speed_over_ground, speed_through_water])
>>> ok_current = OpNode('ok_current',
... op=IsInRange(-2, 2),
... inputs=[current])
Here’s an illustration of the Python objects and attributes in memory after creating this graph:
Finally, let’s test the graph with some data. Here the print statements help us follow how data flows through the graph and how observer nodes are invalidated:
>>> speed_over_ground.set_data(np.array([22.0, 21.7, 21.9]))
SET speed_over_ground = [ 22. 21.7 21.9]
SET current = NO_DATA
SET ok_current = NO_DATA
>>> speed_through_water.set_data(np.array([23.5, 23.8, 23.3]))
SET speed_through_water = [ 23.5 23.8 23.3]
SET current = NO_DATA
SET ok_current = NO_DATA
>>> ok_current.get_data()
FIRE current([ 22. 21.7 21.9], [ 23.5 23.8 23.3])
SET current = [ 1.5 2.1 1.4]
SET ok_current = NO_DATA
FIRE ok_current([ 1.5 2.1 1.4])
SET ok_current = [ True False True]
array([ True, False, True], dtype=bool)
>>> current.get_data()
array([ 1.5, 2.1, 1.4])
Note
Looking carefully at the debug output, you’ll notice an obvious shortcoming of this simplified implementation: All observer nodes are invalidated recursively even if they already have no value in the output port. This is handled properly in the real implementation of Lusmu.
Summary
We’ve created a stripped-down dataflow framework prototype in 29 lines of effective Python code. The prototype illustrates how lazy evaluation work in Lusmu, how the graph is built in memory, and what restrictions non-buffered dataflow sets for using the graph.
In the next part, we’ll look at some useful features for scientific data processing.