View Source Operations
Skitter operations represent a reusable data processing step. An operation is responsible for processing any data it receives, potentially emitting new data in the process. Operations receive inputs on their input ports and emit output on their output ports. An operation is always combined with a distribution strategy, which determines how the operation is distributed.
Operations are defined through the use of
Skitter.DSL.Operation.defoperation/3
. As a first example, consider the
following operation, which adds 1 to each data record it receives.
defoperation Increment, in: value, out: incremented, strategy: Skitter.BIS.ImmutableLocal do
defcb react(value) do
value + 1 ~> incremented
end
end
This definition consists of meta-information about the operation and a
callback definition. The meta-information specifies that the operation can
accept values from a single input stream: value
, that it produces a single
output stream: incremented
, and that its default strategy is
Skitter.BIS.ImmutableLocal
.
The Increment
operation defines a single callback, react
, which is called
when a new value arrives on the value
stream. The callback is called with the
received value, adds 1 to it and emits the result to the incremented
output
stream through the use of the ~>
operator.
(Default) strategies
At runtime, an operation is always paired with a distribution strategy, which determines how it behaves. This pairing can happen in two places:
- An operation can specify a default distribution strategy which is used when the operation is not paired with another distribution strategy in a workflow.
- When an operation is embedded in a workflow as a
node, a strategy can be specified through
the use of
with:
. This strategy always takes precedence over a default distribution strategy.
A distribution strategy specifies which callbacks an operation should
implement. For instance, Skitter.BIS.ImmutableLocal
specifies that an
operation must define a react
callback and that this callback will be called
for every incoming data element. It is therefore recommended to always specify
a default distribution strategy when defining an operation.
Selecting a strategy
The appropriate strategy to use for an operation is determined by the
properties of the operation. For instance, if an operation is entirely
stateless, Skitter.BIS.ImmutableLocal
is an appropriate choice. If an
operation needs to manage some state which is partitioned over several keys,
Skitter.BIS.KeyedState
may be the right choice. If none of the built-in
strategies of Skitter are appropriate for the operation you are writing, it is
possible to define your own, as we discuss in the next section.
Steps to write an operation
To define an operation, follow these steps:
- Figure out the properties of the operation.
- Based on these properties, select an appropriate distribution strategy.
- Read the documentation of the distribution strategy and determine which
callbacks your operation needs to implement.
- If you need to write your own distribution strategy, determine the interface between the strategy and operation.
- Write the operation definition by defining the appropriate callbacks in
Skitter.DSL.Operation.defoperation/3
.
Dealing with state
Often, operations needs to manage some form of state. As an example, let's write
a Count
operation which counts the amount of times it sees each value:
defoperation Count, in: value, out: seen, strategy: Skitter.BIS.KeyedState do
initial_state 0
defcb key(value), do: value
defcb react(value) do
state() <~ state() + 1
{value, state()} ~> seen
end
end
The definition of this operation is similar to the definition of Increment
.
However, since this operation needs to manage state, we use the
Skitter.BIS.KeyedState
strategy. This strategy separates the incoming data
elements based on some key, and maintains a state for each key. When a new data
element arrives, the react
callback is called with the received value and
with the state of that key as its state. Inside react, this state can be
accessed through the use of state()
and
updated through the use of state() <~
.
The operation also defines its initial state by defining a callback named
initial_state
, which accepts no arguments. As a syntactic convenience,
initial_state
can be used to
define its initial state.
Finally, the Skitter.BIS.KeyedState
strategy requires that operations define
a key
callback which obtains a key for an incoming data record. In the case
of Count
, the incoming data record is used as key.
Complex state
Sometimes, operations maintain a complex state, defined as a struct with
several fields. The operation DSL defines
state_struct/1
,
<~
and
~f()
to facilitate this:
defoperation Average, in: value, out: average, strategy: Skitter.BIS.KeyedState do
state_struct total: 0, count: 0
defcb react(val) do
count <~ ~f{count} + 1
total <~ ~f{total} + val
~f{total} / ~f{count} ~> average
end
end
See Skitter.DSL.Operation.state_struct/1
for more information.
Other features
Emitting values
The ~>
operator is used to publish data to an
out port inside a callback. The ~>>
operator
can be used to emit several values at once. For instance, [1, 2, 3] ~>> port
will emit, 1
, 2
and 3
to port, one after the other.
As a special case, a (potentially infinite) Elixir stream can be passed to this
operator. Every value in this stream will then be emitted to the output stream.
If the stream is infinite, the operation will not respond to new data elements,
as it is busy emitting the values in the stream.
Source operations
Sources are defined as operations in Skitter. To define a source, select a
distribution strategy which defines a source (such as
Skitter.BIS.StreamSource
) and implement the callbacks defined in its
interface.
Configuration data
Often, an operation is created with an immutable state, which remains
unmodified throughout the lifetime of the operation. In Skitter, this immutable
state may be stored in the so-called configuration data, which can be accessed
inside a callback through the use of
config()
. Typically, strategies enable
operations to implement a conf
callback which defines the initial value of
this configuration data.
As an example, the Skitter.BIO.Map
operation stores the function to execute
inside its configuration data:
defoperation Skitter.BIO.Map, in: _, out: _ strategy: Skitter.BIS.ImmutableLocal do
defcb conf(func), do: func
defcb react(arg), do: config().(arg) ~> _
end
Accessing token information
A value received on an operation's input stream is wrapped inside a so-called
token, which contains additional information about the received data, such
as the port it was received on. If the strategy does not modify this token,
this information can be accessed inside a callback. For instance,
port_of/1
can be used to obtain the port
a data record was received on.
Operations can also add information to such a token through the use of
extend_meta/3
,
inherit_meta/2
, or the functions
defined in Skitter.Token
. Afterwards, this data can be obtained through the
use of meta_of/1
.