View Source Skitter.Workflow (Skitter v0.7.1)
Workflow type definition and utilities.
A workflow defines a data processing pipeline. It is defined as a set of nodes connected through various links. A node contains a data processing operation and its distribution strategy. A workflow stores these nodes and links between them, along with additional meta-information about the workflow.
In order to enable the reuse of workflows, workflows may define in -and out ports. When this is
done, these workflows may be embedded inside another workflow. Note that a workflow is always
flattened using flatten/1
before it is deployed.
This module defines the workflow type along with some utilities to work with this type. It is
not recommended to define a workflow manually. Instead, the use of
Skitter.DSL.Workflow.workflow/2
is preferred.
Summary
Types
Operation initialization arguments
Link destination.
Collection of outgoing links.
Instance name
Operation embedded inside a workflow.
Internal workflow representation.
Workflow embedded inside a workflow.
Types
@type args() :: any()
Operation initialization arguments
This type stores the arguments passed to the operation in the workflow definition.
@type destination() :: {name(), Skitter.Operation.port_name()} | Skitter.Operation.port_name()
Link destination.
This type stores the destination of a link. A link can point to a port of a node or to an out port of the workflow. In the first case, the name of the node and its out port are stored, in the second, only the name of the out port is stored.
@type links() :: [{Skitter.Operation.port_name(), [destination()]}]
Collection of outgoing links.
Links are stored as a keyword list. Each key in this list represents an out port, while the value of this key is a list which references the destinations of this out port.
@type name() :: atom()
Instance name
A name is used to refer to a node embedded inside a workflow.
@type operation_node() :: %Skitter.Workflow.Node.Operation{ args: args(), links: links(), operation: Skitter.Operation.t(), strategy: Skitter.Strategy.t() }
Operation embedded inside a workflow.
An operation in a workflow is stored along with its strategy, initialization arguments (which
are passed to Skitter.Strategy.Operation.deploy/2
) and the outgoing links of each of its out
ports.
Workflows can override the strategy of an operation, therefore, the strategy specified here may
not be the same as the strategy returned by Skitter.Operation.strategy/1
.
@type t() :: %Skitter.Workflow{ in: links(), nodes: %{required(name()) => operation_node() | workflow_node()}, out: [Skitter.Operation.port_name()] }
Internal workflow representation.
A workflow is stored as a map, where each name refers to a single node, which is either a
t:operation/0
or t:workflow/0
. Besides this, the in -and out ports of the workflow are
stored. The outgoing links of the in ports of a workflow are stored along with the in ports.
Workflow embedded inside a workflow.
A workflow nested inside a workflow is stored along with the outgoing links of its out ports.
Functions
Recursively inline any nested workflow of a workflow.
This function ensures any workflow embedded in the provided workflow is inlined into the provided workflow.
Examples
will be converted to:
iex> defoperation Simple, in: p, out: p do
...> end
iex> defoperation Join, in: [left, right], out: p do
...> end
iex> inner = %Workflow{
...> in: [foo: [node1: :p, node2: :p]],
...> out: [:bar],
...> nodes: %{
...> node1: %Node.Operation{operation: Simple, links: [p: [node3: :left]]},
...> node2: %Node.Operation{operation: Simple, links: [p: [node3: :right]]},
...> node3: %Node.Operation{operation: Join, links: [p: [:bar]]}
...> }}
iex> parent = %Workflow{
...> nodes: %{
...> node_pre: %Node.Operation{operation: Simple, links: [p: [nested1: :foo, nested2: :foo]]},
...> nested1: %Node.Workflow{workflow: inner, links: [bar: [node_post: :left]]},
...> nested2: %Node.Workflow{workflow: inner, links: [bar: [node_post: :right]]},
...> node_post: %Node.Operation{operation: Join}
...> }}
iex> flatten(parent)
%Workflow{
nodes: %{
node_pre: %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node1": :p, "nested1#node2": :p, "nested2#node1": :p, "nested2#node2": :p]]},
"nested1#node1": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node3": :left]]},
"nested1#node2": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node3": :right]]},
"nested1#node3": %Node.Operation{operation: Skitter.WorkflowTest.Join, links: [p: [node_post: :left]]},
"nested2#node1": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested2#node3": :left]]},
"nested2#node2": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested2#node3": :right]]},
"nested2#node3": %Node.Operation{operation: Skitter.WorkflowTest.Join, links: [p: [node_post: :right]]},
node_post: %Node.Operation{operation: Skitter.WorkflowTest.Join}
}
}
@spec verify(t()) :: :ok | [destination()]
Verify if the links in a workflow are valid.
This function verifies if every link in the workflow has a valid source and destination. That is, the link should depart from an existing workflow or operation port and arrive at one. Note that this function does not traverse nested workflows.
Examples
iex> defoperation Example, in: p, out: p do
...> end
iex> verify(%Workflow{nodes: %{
...> foo: %Node.Operation{operation: Example, links: [p: [bar: :p]]},
...> bar: %Node.Operation{operation: Example},
...> }})
:ok
iex> verify(%Workflow{nodes: %{
...> foo: %Node.Operation{operation: Example, links: [p: [baz: :p]]},
...> bar: %Node.Operation{operation: Example},
...> }})
[{{:foo, :p}, {:baz, :p}}]
Verify if the links in a workflow are valid using verify/1
.
This function uses verify/1
to verify if every link in a workflow has a valid source and
destination. If this is not the case, it raises a Skitter.DefinitionError
. When the workflow
is valid, the worfklow itself is returned.
Examples
iex> defoperation Example, in: p, out: p do
...> end
iex> verify!(%Workflow{nodes: %{
...> foo: %Node.Operation{operation: Example, links: [p: [bar: :p]]},
...> bar: %Node.Operation{operation: Example},
...> }})
%Workflow{nodes: %{
foo: %Node.Operation{operation: Skitter.WorkflowTest.Example, links: [p: [bar: :p]]},
bar: %Node.Operation{operation: Skitter.WorkflowTest.Example},
}}
iex> verify!(%Workflow{nodes: %{
...> foo: %Node.Operation{operation: Example, links: [p: [baz: :p]]},
...> bar: %Node.Operation{operation: Example},
...> }})
** (Skitter.DefinitionError) Invalid link: {:foo, :p} ~> {:baz, :p}