View Source Operators
Many stream processing frameworks (e.g. Spark, Flink, …) use an operator-based
programming model where applications are expressed by chaining calls to
well-known operations such as map, join, reduce, and others. In Skitter,
however, applications are expressed by combining operations defined elsewhere.
Nevertheless, the well-known operations described above can easily be implemented in Skitter. Skitter provides the following operations out of the box:
Skitter.BIO.FilterSkitter.BIO.FlatMapSkitter.BIO.KeyedReduceSkitter.BIO.MapSkitter.BIO.MessageSourceSkitter.BIO.PrintSkitter.BIO.SendSkitter.BIO.StreamSourceSkitter.BIO.TCPSource
These operations can be used in a workflow definition:
workflow do
node(Skitter.BIO.StreamSource, args: ~w(Hello Skitter Hello World!))
~> node(Skitter.BIO.FlatMap, args: &String.split/1)
~> node(Skitter.BIO.KeyedReduce, args: {
fn word -> word end,
fn word, ctr -> {ctr + 1, {word, ctr + 1}} end,
0
})
~> node(Skitter.BIO.Print)
endHowever, it quickly becomes tedious to write applications like this. Therefore,
the Skitter.BIO module defines several macros which provide shorthands for
using these built-in operations. The
Skitter workflow DSL imports these
shorthands implicitly. Thus the application above can be written as follows:
workflow do
stream_source(~w(Hello Skitter Hello World!))
~> flat_map(&String.split/1)
~> keyed_reduce(fn word -> word end, fn word, ctr -> {ctr + 1, {word, ctr + 1}} end, 0)
~> print()
endIn Skitter, these macros are called operators.
Defining Custom Operators
It is possible to define your own operators which can then be imported into the workflow DSL.
For instance, Skitter.BIO.map/2 is defined as follows:
defmacro map(func, opts \\ []) do
opts = [args: func] ++ opts
quote(do: node(Skitter.BIO.Map, unquote(opts)))
endThis macro accepts two arguments: the function which will be mapped over the
incoming data and any other options (i.e. the as: and with: options
accepted by Skitter.DSL.Workflow.node/2). Based on these, it creates a call
to Skitter.DSL.Workflow.node/2, with the function argument merged into the
node's option list. This call to node must be quoted, as it will be inserted
into the workflow DSL.
Thus, an operator definition must do the following:
- It must be defined as a macro.
- It must return a quoted call to
Skitter.DSL.Workflow.node/2. - It must accept optional options, which are passed as the second argument to
Skitter.DSL.Workflow.node/2.
Operators defined this way can then be imported into the workflow DSL using
import/2.
To wrap up, let's show a complete example of defining and using a custom
operator. We will define an operator which uses Skitter.BIO.FlatMap and embed
it in the workflow shown above.
First, we define the operator in a module:
defmodule MyCustomOperators do
defmacro my_flat_map(func, opts \\ []) do
quote(do: node(Skitter.BIO.FlatMap, unquote([args: func] ++ opts)))
end
endAfter, we define a workflow importing the module after which we can use the operator:
workflow do
import MyCustomOperators
stream_source(~w(Hello Skitter Hello World!))
~> my_flat_map(&String.split/1)
~> keyed_reduce(fn word -> word end, fn word, ctr -> {ctr + 1, {word, ctr + 1}} end, 0)
~> print()
end