Reactive/dataflow programming in Python, part 2

In the previous part of this series, we introduced Lusmu, our minimal reactive/dataflow programming framework in Python. We clarified dataflow terminology and some of our design choices using simple usage examples.

In this part, we create a stripped-down implementation of the framework to highlight some characteristics of the framework:

  • Synchronous Activation
  • Pull Data (invalidate / lazy-revalidate)
  • Doubly Linked Adjacency List as the graph data structure

Also, before we start, thanks to readers for feedback on the first part, and thanks also to the audience in Pycon Finland 2014 for questions and discussion about our Lusmu presentation!

Simplification: no arcs

Many dataflow systems connect the input and output ports of nodes explicitly with arc objects:

digraph {
    graph [dpi=72];
    rankdir = LR;
    graph [style=rounded; fontname="Helvetica-Bold"];
    subgraph cluster_node1 {
        label="node";
        output_port1 [shape=rect, label="output port"]; }
    subgraph cluster_node2 {
        label="node";
        output_port2 [shape=rect, label="output port"]; }
    subgraph cluster_node3 {
        label="node";
        input_port1 [shape=rect, label="input port 1"];
        input_port2 [shape=rect, label="input port 2"];}
    node [label="", shape=none; width=0]; edge [label="arc"];
    output_port1 -> input_port1;
    output_port2 -> input_port2;
}

Figure 1. Dataflow diagram with output ports, input ports and arcs

In asynchronous systems, arcs buffer data between nodes. Conversely, synchronous systems like Lusmu can be simplified by doing without the concept of arcs. We can just store the current output port value in a data attribute inside each node, and use an inputs attribute to link nodes:

digraph {
    graph [dpi=72];
    rankdir = RL;
    compound = true;
    graph [style=rounded; fontname="Helvetica-Bold"];
    subgraph cluster_input_node {
        label="a = Node()";
        node [shape=rect];
        data1 [label="a.data"]; }
    subgraph cluster_op_node {
        label="b = Node()";
        node [shape=rect];
        inputs [label="b.inputs = set([a])"];
        data2 [label="b.data"]; }
    inputs -> data1 [lhead=cluster_input_node];
}

Figure 2. Node objects with .data and .inputs attributes

Note

It’s important to remember that due to the lack of buffering, it is the user’s responsibility to drive the graph in a “feed data → fire nodes → read results” loop. Feeding multiple rounds of data without extracting results in between will throw away previous data without affecting the results.

A side effect of this design decision is that we need a “magic constant” value which denotes that the node doesn’t have any data yet:

>>> class NoData:
...     def __repr__(self):
...         return 'NO_DATA'
...
>>> NO_DATA = NoData()

Node classes

To represent source and operation nodes, we need two different node classes. It’s convenient to use a common base class which provides node objects with:

  • a name (for debugging and visualization purposes)
  • the data attribute for keeping the current value of the output port (initially empty)
  • a way to signal its observer nodes (stored in the observers attribute) whenever the value of the output port changes

We’ll also define a set_data() setter method, which

  • is always used to set the output port value
  • invalidates observers whenever the value changes

The invalidation is the first trick needed for the “invalidate/lazy revalidate” scheme (a.k.a. Pull Data).

>>> class BaseNode:
...     def __init__(self, name):
...         self.name = name
...         self.data = NO_DATA
...         self.observers = set()
...
...     def set_data(self, data):
...         self.data = data
...         print('SET  {} = {}'.format(self.name, data))
...         for node in self.observers:
...             node.set_data(NO_DATA)

Source nodes

Source nodes only extend the base class with the get_data() getter which just returns the current output port value fed from outside:

>>> class SrcNode(BaseNode):
...     def get_data(self):
...         return self.data

Operation nodes

Operation nodes extend the base node functionality by:

  • storing their operation (a simple Python callable) in the op attribute
  • receiving data from other nodes which are specified in the inputs attribute
  • being automatically connected back to their input nodes as observers

Each entry in the inputs list of nodes is always mirrored by a link in the observers set of the input node. Thus, the nodes form a directed graph using a doubly linked adjacency list.

If there already is a value in the output port of an operation node, the get_data() getter is a no-op and only returns the existing value. On the other hand, a missing value is calculated by requesting input values from each input node and firing the operation function using those values. Note that all required inputs are evaluated recursively.

The caching of output port values is the second trick which makes the magic of lazy evaluation happen.

>>> class OpNode(BaseNode):
...     def __init__(self, name, op, inputs):
...         super().__init__(name)
...         self.op = op
...         self.inputs = inputs
...         for node in inputs:
...             node.observers.add(self)
...
...     def get_data(self):
...         if self.data is NO_DATA:  # fire the node
...             inputs_data = [node.get_data() for node in self.inputs]
...             print('FIRE {}({})'.format(self.name, ', '.join(str(d) for d in inputs_data)))
...             new_data = self.op(*inputs_data)
...             self.set_data(new_data)
...         return self.data

Creating and running a graph

We actually now have everything we need to construct our previous example. Let’s just define our operations (see part 1 for details):

>>> def difference(a, b):
...     """Calculate the difference between numbers using a simple function"""
...     return b - a
...
>>> class IsInRange:
...     """Check that number is in range using a callable object"""
...     def __init__(self, low, high):
...         """Save the low and high limits"""
...         self.low = low
...         self.high = high
...
...     def __call__(self, data):
...         """Perform the actual check against saved limits"""
...         return (self.low <= data) & (data <= self.high)

To construct the graph, just instantiate nodes and define correct operations and input nodes for them:

>>> import numpy as np
>>> speed_over_ground = SrcNode('speed_over_ground')
>>> speed_through_water = SrcNode('speed_through_water')
>>> current = OpNode('current',
...                  op=difference,
...                  inputs=[speed_over_ground, speed_through_water])
>>> ok_current = OpNode('ok_current',
...                     op=IsInRange(-2, 2),
...                     inputs=[current])

Here’s an illustration of the Python objects and attributes in memory after creating this graph:

digraph {
    graph [dpi=72, ranksep=0.5, nodesep=0.3];
    rankdir = LR;
    compound = true;
    graph [style=rounded; fontname="Helvetica-Bold"];
    subgraph cluster_sog {
        label="speed_over_ground = SrcNode()";
        sog_observers [shape=rect, label="observers"];
        speed_over_ground [shape=rect, label="data"];
        speed_over_ground -> sog_observers [style=invis]; }
    subgraph cluster_stw {
        label="speed_through_water = SrcNode()";
        stw_observers [shape=rect, label="observers"];
        speed_through_water [shape=rect, label="data"];
        speed_through_water -> stw_observers [style=invis]; }
    subgraph cluster_difference {
        label="current = OpNode()";
        op [shape=rect];
        node [shape=rect]; edge [label=""];
        difference_o [label="data"];
        inputs;
        # inputs -> op:a [style=invis];
        # inputs -> op:b [style=invis];
        # op:o -> difference_o [style=invis];
        difference_observers [shape=rect, label="observers"];
        # difference_o -> difference_observers [style=invis];
    }
    subgraph cluster_ok {
        label="ok_current = OpNode()";
        node [shape=rect]; edge [label=""];
        ok_op [label="op"];
        ok_o [label="data"];
        ok_inputs [label="inputs"];
        ok_inputs -> ok_op [style=invis];
        ok_op -> ok_o [style=invis];
    }
    op_difference [shape=record; label="{{<a>|<b>}|def difference()|<o>}"; fontname="Helvetica-Bold"];
    op_isinrange [shape=record; label="{<a>|class IsInRange|<o>}"; fontname="Helvetica-Bold"];
    op -> op_difference;
    ok_op -> op_isinrange;
    node [label="", shape=none; width=0];
    sog_observers -> inputs [lhead=cluster_difference];
    stw_observers -> inputs [lhead=cluster_difference];
    difference_observers -> ok_inputs [lhead=cluster_ok];
    inputs -> sog_observers [lhead=cluster_sog];
    inputs -> stw_observers [lhead=cluster_stw];
    ok_inputs -> difference_observers [lhead=cluster_difference];
}

Figure 3. Objects and attributes of the example graph

Finally, let’s test the graph with some data. Here the print statements help us follow how data flows through the graph and how observer nodes are invalidated:

>>> speed_over_ground.set_data(np.array([22.0, 21.7, 21.9]))
SET  speed_over_ground = [ 22.   21.7  21.9]
SET  current = NO_DATA
SET  ok_current = NO_DATA

>>> speed_through_water.set_data(np.array([23.5, 23.8, 23.3]))
SET  speed_through_water = [ 23.5  23.8  23.3]
SET  current = NO_DATA
SET  ok_current = NO_DATA

>>> ok_current.get_data()
FIRE current([ 22.   21.7  21.9], [ 23.5  23.8  23.3])
SET  current = [ 1.5  2.1  1.4]
SET  ok_current = NO_DATA
FIRE ok_current([ 1.5  2.1  1.4])
SET  ok_current = [ True False  True]
array([ True, False,  True], dtype=bool)

>>> current.get_data()
array([ 1.5,  2.1,  1.4])

Note

Looking carefully at the debug output, you’ll notice an obvious shortcoming of this simplified implementation: All observer nodes are invalidated recursively even if they already have no value in the output port. This is handled properly in the real implementation of Lusmu.

Summary

We’ve created a stripped-down dataflow framework prototype in 29 lines of effective Python code. The prototype illustrates how lazy evaluation work in Lusmu, how the graph is built in memory, and what restrictions non-buffered dataflow sets for using the graph.

In the next part, we’ll look at some useful features for scientific data processing.