View Source Skitter.Workflow (Skitter v0.7.1)

Workflow type definition and utilities.

A workflow defines a data processing pipeline. It is defined as a set of nodes connected through various links. A node contains a data processing operation and its distribution strategy. A workflow stores these nodes and links between them, along with additional meta-information about the workflow.

In order to enable the reuse of workflows, workflows may define in -and out ports. When this is done, these workflows may be embedded inside another workflow. Note that a workflow is always flattened using flatten/1 before it is deployed.

This module defines the workflow type along with some utilities to work with this type. It is not recommended to define a workflow manually. Instead, the use of Skitter.DSL.Workflow.workflow/2 is preferred.

Summary

Types

Operation initialization arguments

Link destination.

Collection of outgoing links.

Instance name

Operation embedded inside a workflow.

t()

Internal workflow representation.

Workflow embedded inside a workflow.

Functions

Recursively inline any nested workflow of a workflow.

Verify if the links in a workflow are valid.

Verify if the links in a workflow are valid using verify/1.

Types

@type args() :: any()

Operation initialization arguments

This type stores the arguments passed to the operation in the workflow definition.

Link destination.

This type stores the destination of a link. A link can point to a port of a node or to an out port of the workflow. In the first case, the name of the node and its out port are stored, in the second, only the name of the out port is stored.

@type links() :: [{Skitter.Operation.port_name(), [destination()]}]

Collection of outgoing links.

Links are stored as a keyword list. Each key in this list represents an out port, while the value of this key is a list which references the destinations of this out port.

@type name() :: atom()

Instance name

A name is used to refer to a node embedded inside a workflow.

@type operation_node() :: %Skitter.Workflow.Node.Operation{
  args: args(),
  links: links(),
  operation: Skitter.Operation.t(),
  strategy: Skitter.Strategy.t()
}

Operation embedded inside a workflow.

An operation in a workflow is stored along with its strategy, initialization arguments (which are passed to Skitter.Strategy.Operation.deploy/2) and the outgoing links of each of its out ports.

Workflows can override the strategy of an operation, therefore, the strategy specified here may not be the same as the strategy returned by Skitter.Operation.strategy/1.

@type t() :: %Skitter.Workflow{
  in: links(),
  nodes: %{required(name()) => operation_node() | workflow_node()},
  out: [Skitter.Operation.port_name()]
}

Internal workflow representation.

A workflow is stored as a map, where each name refers to a single node, which is either a t:operation/0 or t:workflow/0. Besides this, the in -and out ports of the workflow are stored. The outgoing links of the in ports of a workflow are stored along with the in ports.

@type workflow_node() :: %Skitter.Workflow.Node.Workflow{
  links: links(),
  workflow: t()
}

Workflow embedded inside a workflow.

A workflow nested inside a workflow is stored along with the outgoing links of its out ports.

Functions

@spec flatten(t()) :: t()

Recursively inline any nested workflow of a workflow.

This function ensures any workflow embedded in the provided workflow is inlined into the provided workflow.

Examples

will be converted to:

iex> defoperation Simple, in: p, out: p do
...> end
iex> defoperation Join, in: [left, right], out: p do
...> end
iex> inner = %Workflow{
...>   in: [foo: [node1: :p, node2: :p]],
...>   out: [:bar],
...>   nodes: %{
...>     node1: %Node.Operation{operation: Simple, links: [p: [node3: :left]]},
...>     node2: %Node.Operation{operation: Simple, links: [p: [node3: :right]]},
...>     node3: %Node.Operation{operation: Join, links: [p: [:bar]]}
...> }}
iex> parent = %Workflow{
...>   nodes: %{
...>     node_pre: %Node.Operation{operation: Simple, links: [p: [nested1: :foo, nested2: :foo]]},
...>     nested1: %Node.Workflow{workflow: inner, links: [bar: [node_post: :left]]},
...>     nested2: %Node.Workflow{workflow: inner, links: [bar: [node_post: :right]]},
...>     node_post: %Node.Operation{operation: Join}
...> }}
iex> flatten(parent)
%Workflow{
  nodes: %{
    node_pre: %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node1": :p, "nested1#node2": :p, "nested2#node1": :p, "nested2#node2": :p]]},
    "nested1#node1": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node3": :left]]},
    "nested1#node2": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested1#node3": :right]]},
    "nested1#node3": %Node.Operation{operation: Skitter.WorkflowTest.Join, links: [p: [node_post: :left]]},
    "nested2#node1": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested2#node3": :left]]},
    "nested2#node2": %Node.Operation{operation: Skitter.WorkflowTest.Simple, links: [p: ["nested2#node3": :right]]},
    "nested2#node3": %Node.Operation{operation: Skitter.WorkflowTest.Join, links: [p: [node_post: :right]]},
    node_post: %Node.Operation{operation: Skitter.WorkflowTest.Join}
  }
}
@spec verify(t()) :: :ok | [destination()]

Verify if the links in a workflow are valid.

This function verifies if every link in the workflow has a valid source and destination. That is, the link should depart from an existing workflow or operation port and arrive at one. Note that this function does not traverse nested workflows.

Examples

iex> defoperation Example, in: p, out: p do
...> end
iex> verify(%Workflow{nodes: %{
...>   foo: %Node.Operation{operation: Example, links: [p: [bar: :p]]},
...>   bar: %Node.Operation{operation: Example},
...> }})
:ok
iex> verify(%Workflow{nodes: %{
...>   foo: %Node.Operation{operation: Example, links: [p: [baz: :p]]},
...>   bar: %Node.Operation{operation: Example},
...> }})
[{{:foo, :p}, {:baz, :p}}]
@spec verify!(t()) :: t() | no_return()

Verify if the links in a workflow are valid using verify/1.

This function uses verify/1 to verify if every link in a workflow has a valid source and destination. If this is not the case, it raises a Skitter.DefinitionError. When the workflow is valid, the worfklow itself is returned.

Examples

iex> defoperation Example, in: p, out: p do
...> end
iex> verify!(%Workflow{nodes: %{
...>   foo: %Node.Operation{operation: Example, links: [p: [bar: :p]]},
...>   bar: %Node.Operation{operation: Example},
...> }})
%Workflow{nodes: %{
  foo: %Node.Operation{operation: Skitter.WorkflowTest.Example, links: [p: [bar: :p]]},
  bar: %Node.Operation{operation: Skitter.WorkflowTest.Example},
}}
iex> verify!(%Workflow{nodes: %{
...>   foo: %Node.Operation{operation: Example, links: [p: [baz: :p]]},
...>   bar: %Node.Operation{operation: Example},
...> }})
** (Skitter.DefinitionError) Invalid link: {:foo, :p} ~> {:baz, :p}