RxFusion Language
RxFusion is a dataflow language; an RxFusion application is essentially a definition of a directed graph that describes how data flows from the inputs through various operations to the outputs.
RxFusion provides a library of input, output and operator components, and a simple clean syntax to connect them together to form a dataflow graph.
RxFusion is an embedded domain-specific language (DSL) because it has a domain-specific syntax but that syntax is 100% compatible with a general-purpose language. The beauty of an embedded DSL is that it does not require any special development tools and developers can use all their favorite tools to write programs.
RxFusion is available in two flavors: RxFusion C++ (typically used for MCUs) and RxFusion Javascript (typically used for more advanced MCUs or CPUs). The concepts and operators are the same whether you are using C++ or Javascript, and there are only a few minor syntax differences due to differences between C++ and Javascript themselves.
RxFusion C++
An RxFusion C++ application consists of three parts: a preamble, input/output declarations, and the app function.
Preamble
The preamble consists of the include statements required by the application. At a minimum, this includes the RxFusion header files but may also require other includes depending on the target platform.
For the Arduino MKR1000, we might have the following which includes additional headers for the WiFi and real-time clock.
#include <WiFi101.h>
#include <RTCZero.h>
#include <RxFusion.h>
Input/output declarations
Inputs and outputs are declared as global variables of one of the RxFusion provided types. See the Classes documentation of a full list of the available input/output types.
Here's an example of the input/output declarations from an Arduino application:
AnalogIn<int> sensor(A1);
WiFiClient client;
MqttPub broker(client,"broker.hivemq.com",1883,"vtcc8/rxfusion/test","vtcc8");
sensor
is defined as an input of type AnalogIn<int>
on pin A1
.
Some of the input/output types are templated like this one because
they support multiple value types. For example, the AnalogIn
type
used here can produce values in any integer type. Many of the types
also require or have optional configuration parameters. In this case,
the pin is a required parameter of the AnalogIn
type.
broker
is an example of an output. The MqttPub
class publishes
data to a MQTT broker and has several configuration parameters.
Inputs/outputs that communicate with other devices over a network
typically take a transport object as a parameter like the client
object in this example. The transport classes are specific to the
target platform. Here, the WiFiClient
class is part of the Arduino
platform and provides transport over a WiFi network.
App function
The app function primarily consists of dataflow statements, but may also include any necessary platform-specific initialization. Here's an example from the same Arduino application referenced above:
void app() {
ConnectWifi(SSID, PASS);
sensor >> AverageOver<int>(1000) >> Format<int>("{\"sensor\":$1}") >> broker;
}
The ConnectWifi
is required to initialize the WiFi connection, and
the sensor >> ... >> broker
line is an example of a dataflow statement.
Dataflow statements
The core of the application is the dataflow statements in the app function. They declare how data should be filtered, transformed, aggregated and combined to produce output streams from the input streams.
Basic linear dataflow
Many applications consists of one or more simple statements that take an input, transform or aggregate the data and send it to some output. Let's start by dissecting the simple dataflow statement from the example above:
sensor >> AverageOver<int>(1000) >> Format<int>("{\"sensor\":$1}") >> broker;
It's meaning is informally understood fairly easily; the input stream
from sensor
flows through an average operator, a format operator and
is finally sent to an MQTT broker through the broker
output.
Together the operators and >>
combinator are used to manipulate
input streams to produce output streams. There are many operators
available including higher-order operators like filter and map. See
the Operators documentation for the full list.
C++ is strongly typed and the operators are templated to accept the
expected input and/or output types. For example, the AverageOver
operator produces an output stream of the same type as the input, so
it has a single type parameter that specifies both the input and
output types. In this case, sensor
produces a stream of type int
,
so that is given as the type parameter to the AverageOver
operator.
The Format
operator accepts any type input and has a fixed output
type xstring
, so it has a single type parameter to specify the input
type. The AverageOver<int>
operator produces values of int
, so
that is given as the input type of the Format
operator. The
Operators documentation indicates what the required
template parameters are.
The >>
combinator simply connects the output stream of the left
operand to the input stream of the right operand. There are also
other combinators described below.
Forking streams
Sometimes you may want to fork a stream into two different operators.
This can be easily done by creating a temporary variable to represent
the stream to fork and then using it in two dataflow statements.
Here's an example assuming sensor
is some input, and console
and
post
are outputs.
auto data = sensor >> Poll<int>(250);
data >> console;
data >> AverageOver<int>(1000) >> post;
This represents a dataflow graph like:
/-> console
sensor -> Poll ->
\-> AverageOver -> post
The application polls the sensors every 250 ms and sends the raw value
to the console as well as to an operator that outputs an average every
1000 ms to a web server via the post
output.
The sensor >> Poll<int>(250)
expression represents a sub-graph and
has an ugly templated type associated with it, but fortunately we can
avoid dealing with that by using the C++ auto
keyword.
Merging streams
If we can fork streams then we should of course be able to merge them as well. RxFusion has three different combinators for merging streams:
&
(zip vector) collects values from the input streams until it has
one from each and outputs a vector of the values of each stream.
Vectors hold a fixed number of elements of the same data type, so the
input streams must have the same type.
^
(zip tuple) collects values from the input streams until it has
one from each and outputs a tuple of the values of each stream.
Tuples can hold elements of different types, so the input streams do
not need to be of the same type.
+
(union) simple outputs any value received from multiple input
streams on a single output stream.
To use the zip combinator, your input streams should have the same
sampling rate. Here's an example using &
:
auto data = sensor >> Poll<int>(250);
((data >> Drop<int>(7)) & (data >> WinAverage<int,8>()) >> post;
This example uses the WinAverge
operator which outputs a rolling
average of the last 8 values. Unlike AverageOver
, this produces 1
average per input value. Thus this stream can be combined with the
raw stream using zip. However, we need to drop the first 7 values
from the raw input stream because WinAverage
will not start until it
has the first 8 values.
A dataflow graph of this example looks like:
/-------> Drop \
sensor -> Poll -> & -> post
\-> WinAverage /
Higher-order operators
There are a number of higher-order operators that take functions as
arguments such as Filter
, Map
and Scan
. These are easier to use
thanks to the anonymous functions in C++11.
Here is a Map
example building on the example of the previous section:
auto data = sensor >> Poll<int>(250);
auto both = (data >> Drop<int>(7)) & (data >> WinAverage<int,8>());
both >> Map<Vec<int,2>,int>([](Vec<int,2>& v)->int {
return v[0] - v[1];
}) >> post;
The Map
operator takes input values of any type and produces output
values of any type, so it has a template parameter for each. The zip
operator produces a Vec<int,2>
, so that's the input type and we are
going to output the difference, so the output type is int
.
The argument is a function that takes input values and computes output
values. In C++11, an anonymous function begins with []
, followed
by the formal parameters, ->
, the return type and then a function
body.
Value types
Beyond scalar types of C++, the values of a stream can be one of the following RxFusion types:
Vec<T,size>
- a vector of size elements of type T. The elements
of vectors are accessed using the []
index operator with a zero
based index.
Tuple2<T1,T2>
Tuple3<T1,T2,T3>
Tuple4<T1,T2,T3,T4>
- tuples of 2 to 4 elements of type T1, T2,
etc. The elements of a tuple are named fields _1
, _2
, etc.
xstring
- a string value. This is an opaque lazy string type
produced by the Format
operator.
RxFusion Javascript
An RxFusion Javascript application is a Node.js application with three parts: a preamble, input/output declarations, and one or more dataflow statements.
Preamble
The preamble consists of the necessary require statements for the
application. At a minimum, this includes the RxFusion npm module.
There is a generic npm module rxfusion
as well has platform-specific
modules that include input/output types that a specific to that
platform.
The UDOO Neo has a platform-specific RxFusion module, so on the Neo the preamble might be:
var rx = require('rxfusion-udoo');
The name of the variable you use in the require statement is important
when it comes time to write dataflow statements. The documentation
assumes it is rx
as shown here.
Input/output declarations
Inputs and outputs are declared as global variables of one of the
RxFusion provided types using the Javascript new
operator. See the
Classes documentation of a full list of the available
input/output types.
Here's an example of the input/output declarations from a UDOO Neo application:
var mcu = new Mcu();
var browser = new SocketIO(io, 'hist');
Input and outputs are always created with the new
operator. The Neo
is a dual-core chip, and the Mcu
input type is a Neo-specific input
which receives data from the MCU core. The SocketIO
output type
sends data to a WebSocket using the socket.io module.
Dataflow statements
The core of the application are the dataflow statements. They declare how data should be filtered, transformed, aggregated and combined to produce output streams from the input streams.
rx() magic
RxFusion uses operator overloading to define combinators that are used to build dataflow statements. Unfortunately, Javascript does not support overloaded operators! Enter rx() ...
When you require the rxfusion module, it returns a function that you
assign to a variable, typically rx
. This is a special function that
returns a function which builds dataflow statements and enables the
overloading of the >>
, &
, ^
, and +
operators.
Every dataflow statement must be wrapped in rx()(<statement>)
.
The rx()
part returns a statement builder function which is applied
to your dataflow statement to interpret the overloaded operators. The
builder function can only interpret one kind of combinator at a time,
so if you are mixing combinators in a single statement then you need
to wrap the sub-expressions also as in:
rx()(data & rx()(mcu >> Drop(7)) & rx()(mcu >> WinAverage(8)));
Here we are zipping together three streams with &
. Two of them are
sub-expressions built with >>
, so those get wrapped in rx()(...)
.
Basic linear dataflow
Many applications consists of one or more simple statements that take an input, transform or aggregate the data and send it to some output. Let's start by dissecting a simple dataflow statement:
rx()(mcu >> AverageOf(5) >> Window(16,true) >> browser);
Apart from the rx()
part described above, it's meaning is informally
understood fairly easily; the input stream from the mcu
input flows
through an average operator, a window operator and is finally sent to
the browser through the browser
output.
Forking streams
Sometimes you may want to fork a stream into two different operators. This can be easily done by creating a temporary variable to represent the stream to fork and then using it in two dataflow statements.
var averaged = rx()(mcu >> AverageOf(5));
rx()(averaged >> console);
rx()(averaged >> Window(16,true) >> browser);
Notice that each statement is wrapped in rx()(...)
. This example represents
a dataflow graph like:
/-> console
mcu -> AverageOf ->
\-> Window -> browser
This application averages values coming from the mcu
input and sends
those directly to the console, but also to a sliding window operator
which sends the last 16 values to the browser.
Merging streams
If we can fork streams then we should of course be able to merge them as well. RxFusion has three different combinators for merging streams:
&
, ^
(zip) collects values from the input streams until it has one
from each and outputs an array of the values of each stream. There is
no difference between &
and ^
in the Javascript version.
+
(union) simple outputs any value received from multiple input
streams on a single output stream.
To use the zip combinator, your input streams should have the same
sampling rate. Here's an example using &
:
var raw = rx()(mcu >> Drop(7));
var avg = rx()(mcu >> WinAverage(8));
rx()(rx()(raw & avg) >> console);
Notice that subexpressions also need to be wrapped in rx()(...)
.
This example uses the WinAverge
operator which outputs a rolling
average of the last 8 values. Unlike AverageOver
, this produces 1
average per input value. Thus this stream can be combined with the
raw stream using zip. However, we need to drop the first 7 values
from the raw input stream because WinAverage
will not start until it
has the first 8 values.
A dataflow graph of this example looks like:
/-------> Drop \
mcu -> & -> console
\-> WinAverage /
Higher-order operators
There are a number of higher-order operators that take functions as
arguments such as Filter
, Map
and Scan
. These are easy to use
thanks to the =>
anonymous function in ES6.
Here is a Map
example building on the example of the previous section:
var both = rx()(rx()(mcu >> Drop(7)) & rx()(mcu >> WinAverage(8)));
both >> Map(v => v[0] - v[1]) >> browser;
The argument of Map
is a function that takes input values and
computes output values. These are written succinctly in ES6 with the
so called fat arrow operator, =>
, which takes the parameter list on
the left and the return expression on the right.