View Source Skitter.Operation behaviour (Skitter v0.7.1)
Operation type definition and utilities.
An operation is a reusable data processing step that can be embedded inside of a workflow. It is defined as the combination of a set of callbacks, which implement the logic of the operation, and some metadata, which define how the operation is embedded inside a workflow and how it is distributed over the cluster at runtime.
A skitter operation is defined as an elixir module which implements the Skitter.Operation
behaviour. This behaviour defines various elixir callbacks which are used to track operation
information such as the defined callbacks. Instead of implementing an operation as an elixir
module, it is recommend to use Skitter.DSL.Operation.defoperation/3
, which automatically
generates the appropriate callbacks.
This module defines the operation type and behaviour along with utilities to handle operations.
Callbacks
An operation defines various callbacks: functions which implement the data processing logic of an operation. These callbacks need to have the ability to modify state and emit data when they are called. Callbacks are implemented as elixir functions with a few properties:
- Callbacks accept
state/0
andconfig/0
as their first and second arguments. - All other arguments provided to the callback must be wrapped in a
Skitter.Token.t/0
. - Callbacks return a
result/0
struct, which wraps the result of the callback call along with the updated state and emitted data.
Besides this, callbacks track additional information about whether they access or modify state and which data they emit. This information is stored inside the behaviour callbacks defined in this module.
Examples
Since operations need to be defined in a module the example code shown in this module's documentation assumes the following module is defined:
defmodule OperationModule do
@behaviour Skitter.Operation
alias Skitter.Operation.Callback.{Info, Result}
def _sk_operation_info(:strategy), do: Strategy
def _sk_operation_info(:in_ports), do: [:input]
def _sk_operation_info(:out_ports), do: [:output]
def _sk_callbacks, do: MapSet.new(example: 1)
def _sk_callback_info(:example, 1) do
%Info{read?: true, write?: false, emit?: true}
end
def example(state, config, arg) do
result = state * config
%Result{state: state, emit: [arg: arg], result: result}
end
end
Summary
Types
Arguments passed to a callback when it is called.
Configuration passed to the callback when it is called.
Output emitted by a callback.
Additional callback information. Can be retrieved with info/2
.
Input/output interface of Skitter operations.
Input/output interface of Skitter operations.
Values returned by a callback when it is called.
State passed to the callback when it is called.
An operation is defined as a module.
Callbacks
Return the callback information of callback name
, arity
.
Return the names and arities of all the callbacks defined in this module.
Returns the meta-information of the operation.
Functions
Obtain the arity of operation
.
Call callback callback_name
with state
, config
and arguments
.
Call callback_name
defined by operation
if it exists.
Check if operation
defines a callback with name
and arity
.
Obtain the callback information for callback name
, arity
in operation
.
Obtain the set of all callbacks defined in operation
.
Get the index of an in port.
Obtain the list of in ports of operation
.
Convert an index of an in port to a port name.
Convert an index of an out port to a port name.
Obtain the initial state for operation
.
Check if a given value refers to an operation module.
Get the index of an out port.
Obtain the list of out ports of operation
.
Check if an operation is a sink.
Check if an operation is a source.
Obtain the default strategy of operation
.
Types
@type args() :: [Skitter.Token.t() | any()]
Arguments passed to a callback when it is called.
The arguments are provided as tokens or plain values wrapped in a list.
@type config() :: any()
Configuration passed to the callback when it is called.
The configuration represents the immutable state of an operation. It is explicitly separated
from the mutable state/0
to enable strategies to explicitly differentiate between handling
mutable and immutable data.
@type emit() :: [{port_name(), Enumerable.t()}]
Output emitted by a callback.
Emitted data is returned as a keyword list where the output for each out port is specified. When
no data is emitted on a port, the port should be omitted from the list. The data emitted by a
callback for a port should be wrapped in an Enumerable.t/0
. Each element in this enumerable
will be sent to downstream nodes separately.
The data emitted for a port (i.e. the data inside the enumerable) may either be a "plain" Elixir value or a Skitter token. The Skitter runtime will automatically wrap plain values in a Skitter token when they are sent to downstream operations.
@type info() :: %Skitter.Operation.Callback.Info{ emit?: boolean(), read?: boolean(), write?: boolean() }
Additional callback information. Can be retrieved with info/2
.
The following information is stored:
:read?
: Boolean which indicates if the callback reads the operation state.:write?
: Boolean which indicates if the callback updates the operation state.:emit
: Boolean which indicates if the callback emits data.
@type port_index() :: non_neg_integer()
Input/output interface of Skitter operations.
The ports of an operation determine its external interface. A port can be referred to by its
name, which is stored as an atom. Inside strategies, it may be useful to use the index of a port
to remain agnostic to its name. in_port_to_index/2
and out_port_to_index/2
can be used to
convert a port to its index.
@type port_name() :: atom()
Input/output interface of Skitter operations.
The ports of an operation determine its external interface. A port can be referred to by its
name, which is stored as an atom. Inside strategies, it may be useful to use the index of a port
to remain agnostic to its name. in_port_to_index/2
and out_port_to_index/2
can be used to
convert a port to its index.
Values returned by a callback when it is called.
The following information is stored:
:result
: The actual result of the callback, i.e. the final value returned in its body.:state
: The (possibly modified) state after calling the callback.:emit
: The output emitted by the callback.
@type state() :: any()
State passed to the callback when it is called.
The state represents the mutable state of an operation. It is explicitly separated from the
immutable config/0
to enable strategies to explicitly differentiate between handling mutable
and immutable data.
@type t() :: module()
An operation is defined as a module.
This module should implement the Skitter.Operation
behaviour.
Callbacks
Return the callback information of callback name
, arity
.
Return the names and arities of all the callbacks defined in this module.
@callback _sk_operation_info(:in_ports) :: [port_name()]
@callback _sk_operation_info(:out_ports) :: [port_name()]
@callback _sk_operation_info(:strategy) :: Skitter.Strategy.t() | nil
Returns the meta-information of the operation.
The following information is stored:
:in_ports
: A list of port names which represents in ports through which the operation receives incoming data.:out_ports
: A list of out ports names which represents the out ports the operation can use to emit data.:strategy
: TheSkitter.Strategy
of the operation.nil
may be provided instead, in which case a strategy must be provided when the operation is embedded in a workflow.
Functions
Obtain the arity of operation
.
The arity is defined as the amount of in ports the operation defines.
Examples
iex> arity(OperationModule)
1
Call callback callback_name
with state
, config
and arguments
.
The provided arguments are automatically wrapped in a Skitter.Token.t/0
using
Skitter.Token.wrap/1
.
Examples
iex> call(OperationModule, :example, 10, 2, [:foo])
%Skitter.Operation.Callback.Result{state: 10, result: 20, emit: [arg: %Token{value: :foo}]}
iex> call(OperationModule, :example, 10, 2, [%Token{value: :foo, meta: %{bar: :baz}}])
%Skitter.Operation.Callback.Result{state: 10, result: 20, emit: [arg: %Token{value: :foo, meta: %{bar: :baz}}]}
Call callback_name
defined by operation
if it exists.
Calls the callback (using call/5
) with the given name with state
, config
and args
if
{name, length(args)}
exists. If the callback does not exist, a result with an empty emit
list, nil
as result and nil
as state is returned.
Examples
iex> call_if_exists(OperationModule, :example, 10, 2, [:foo])
%Skitter.Operation.Callback.Result{state: 10, result: 20, emit: [arg: %Token{value: :foo}]}
iex> call_if_exists(OperationModule, :example, 10, 2, [:foo, :bar])
%Skitter.Operation.Callback.Result{state: nil, result: nil, emit: []}
Check if operation
defines a callback with name
and arity
.
Examples
iex> callback_exists?(OperationModule, :example, 1) true
iex> callback_exists?(OperationModule, :example, 2) false
Obtain the callback information for callback name
, arity
in operation
.
Examples
iex> callback_info(OperationModule, :example, 1)
%Info{read?: true, write?: false, emit?: true}
Obtain the set of all callbacks defined in operation
.
Examples
iex> callbacks(OperationModule) MapSet.new([example: 1])
@spec in_port_to_index(t(), port_name()) :: port_index() | nil
Get the index of an in port.
Examples
iex> in_port_to_index(OperationModule, :input)
0
iex> in_port_to_index(OperationModule, :other)
nil
Obtain the list of in ports of operation
.
Examples
iex> in_ports(OperationModule)
[:input]
@spec index_to_in_port(t(), port_index()) :: port_name() | nil
Convert an index of an in port to a port name.
Examples
iex> index_to_in_port(OperationModule, 0)
:input
iex> index_to_in_port(OperationModule, 1)
nil
@spec index_to_out_port(t(), port_index()) :: port_name() | nil
Convert an index of an out port to a port name.
Examples
iex> index_to_out_port(OperationModule, 0)
:output
iex> index_to_out_port(OperationModule, 1)
nil
Obtain the initial state for operation
.
Some operations handle stateful logic. Often, these operations have the notion of an "initial state": the state the operation has before it received any data records.
By convention, this initial state is defined as a callback named initial_state
. This function
calls this callback if it exists with an empty argument list and the provided configuration (or
nil
, if not configuration is passed). If the initial_state
callback is not defined, nil
is
returned.
Examples
iex> initial_state(OperationModule)
nil
If InitialStateModule
is defined as follows:
defmodule InitialStateModule do
@behaviour Skitter.Operation
alias Skitter.Operation.Callback.{Info, Result}
def _sk_operation_info(:strategy), do: Strategy
def _sk_operation_info(:in_ports), do: []
def _sk_operation_info(:out_ports), do: []
def _sk_callbacks, do: MapSet.new(initial_state: 0)
def _sk_callback_info(:initial_state, 0) do
%Info{read?: false, write?: false, emit?: false}
end
def initial_state(_, :foo), do: %Result{result: :bar}
def initial_state(_, _), do: %Result{result: 42}
end
iex> initial_state(InitialStateModule)
42
iex> initial_state(InitialStateModule, :foo)
:bar
Check if a given value refers to an operation module.
Examples
iex> operation?(5)
false
iex> operation?(String)
false
iex> operation?(OperationModule)
true
@spec out_port_to_index(t(), port_name()) :: port_index() | nil
Get the index of an out port.
Examples
iex> out_port_to_index(OperationModule, :output)
0
iex> out_port_to_index(OperationModule, :other)
nil
Obtain the list of out ports of operation
.
Examples
iex> out_ports(OperationModule)
[:output]
Check if an operation is a sink.
An operation is a sink if it does not have any out ports.
Examples
iex> sink?(OperationModule)
false
Check if an operation is a source.
An operation is a source if it does not have any in ports.
Examples
iex> source?(OperationModule)
false
@spec strategy(t()) :: Skitter.Strategy.t() | nil
Obtain the default strategy of operation
.
Examples
iex> strategy(OperationModule)
Strategy