Skitter is a Domain Specific Language for building scalable distributed stream processing applications with custom distribution strategies.

Modern Distributed Stream Processing Engines offer developers limited control over how the various operations (map, reduce, join, …) in their applications are distributed over a cluster. We believe the performance of a distributed stream application can be improved by offering developers full control over the distribution logic of their applications. Skitter is a distributed stream processing language which enables the creation of custom distribution strategies, allowing developers to determine how operations are distributed over a cluster.

Skitter is developed as a DSL in Elixir, an extensible language built on top of the Erlang VM. It is available on GitHub and was developed in the context of of my PhD.

build

Customisable

Skitter enables the creation of custom distribution strategies. These strategies determine how a data processing application is distributed over multiple machines, allowing the creation of distribution logic tailored towards the properties of the application.

merge_type

Composable

Skitter applications are built by combining reusable data processing operations into a workflow. Each of these operations may be distributed by a different distribution strategy, enabling multiple distribution strategies to be used in a single application.

extension

Extendable

Skitter provides a trait-like system which makes it possible to extend existing distribution strategies. This makes it possible to build new strategies based on previously defined strategies, while also facilitating minor modifications to existing strategies.

Skitter by Example

Skitter applications are written in terms of three abstractions: operations, workflows and strategies. Operations define the data processing logic of an application, they are composed into workflows, which are used to define data processing pipelines. Each operation is paired with a distribution strategy, this strategy determines how the operation is distributed over the available machines at runtime. Below, we provide a bird's eye overview of these concepts based on a few simple examples.

Operations

The data processing logic of Skitter applications is expressed in operations. These operations are built in Skitter's operation definition lanauge. While writing an operation, a developer does not have to reason about distribution, they only have to specify how an operation reacts to incoming data. Operations can be created from scratch, or by writing a wrapper around existing software.

defoperation FahrenheitToCelcius, in: fahrenheit, out: celcius do
  defcb react(fahrenheit) do
    ((fahrenheit - 32) * (5 / 9)) ~> celcius
  end
end

This operation defines a single callback, react which converts its argument fahrenheit to celcius. The resulting value is emitted to the celcius port. At runtime, the strategy of this operation would call react, after which the emitted value is sent to operations downstream of FahrenheitToCelcius.

defoperation Average, in: value, out: current do
  state_struct total: 0, count: 0

  defcb react(value) do
    total <~ ~f{total} + value
    count <~ ~f{count} + 1
    ~f{total} / ~f{count} ~> current
  end
end

Operations can maintain state. This operation tracks the average of all the values it receives at runtime. It does this by counting the amount of values it receives (in the counter field) and the sum of all these values (total). When Average recieves a new value, it updates its fields, calculates the current average and emits this value to its successors.

Workflows

Operations can be composed into workflows. These workflows are built in a simple textual language where operations are linked to other operations or to other, nested, workflows. The example shown here defines a data processing pipeline which provides crowdsourced videos of participants in a running competition based on nearby spectators. Its graphical representation is shown below.

workflow do
  node(TCPSource, args: {"localhost", 4555}) ~> node(Parser) ~> joiner.participants
  node(TCPSource, args: {"localhost", 4556}) ~> node(Parser) ~> joiner.spectators

  node(Join, as: joiner, args: [windows_size: 5])
  ~> node(Rate)
  ~> node(Select)
  ~> node(Publish)
end

Strategies

A unique feature introduced by Skitter is the notion of a distribution strategy. Every Skitter operation must be paired with a strategy, this can be done in the operation definition or when it is used in a workflow. Like operations and workflows, strategies are created through the use of a DSL. A Strategy is defined by implementing several hooks defined by the Skitter runtime system.

defstrategy ImmutableLocal do
  defhook deploy(args) do
    Nodes.on_all_worker_cores(fn ->
      create_worker(
        fn -> call_component(:init, [args]).state end,
        :worker,
        :local
      )
    end)
    |> Map.new()
  end

  defhook deliver(msg, _) do
    send(Enum.random(deployment()[Nodes.self()]), msg)
  end

  defhook process(msg, state, :worker) do
    result = call_component(:react, state, nil, [msg])
    emit(result.emit)
    state
  end
end

This strategy handles operations with an immutable state. It does this by creating a worker for every core on every available machine in the cluster; any data emitted by a predecessor of the operation will will be sent to a worker on the local machine, which will call the react callback of the operation.

The workers of this strategy are created in the deploy hook, which is called by the runtime system when an operation needs to be deployed over the cluster. Any data returned by this hook is stored inside the deployment, which is automatically available in all other hooks. The deliver hook is invoked by the runtime when a predecessor of the operation emits data. This strategy selects a worker on the current node and forwards the data to this worker. Finally, the process hook is called when a worker receives data. In this case, the worker executes the react callback of the operation and specifies that the emitted data should be forwarded to all the successors of this operation.

defstrategy ShuffledImmutable, extends: ImmutableLocal do
  defhook send(msg, _) do
    node = Nodes.workers() |> Enum.random()
    send(Enum.random(deployment()[node]), msg)
  end
end

Strategies can be created based on other strategies. In the example shown here, a new strategy is created based on the ImmutableLocal strategy shown in the previous example. Unlike ImmutableLocal, this strategy does not attempt to keep work local, instead, it selects a random machine in the cluster to process the received data.

Getting Started

We have developed Skitter as a DSL in Elixir, it is available on GitHub. Detailed instructions on getting a Skitter project up and running can be found in the documentation. Instructions for getting started with older versions of Skitter described in specific papers can be found by following the documentation link beneath the paper in the "Publications" section below.

Publications

  • Skitter: A DSL for Distributed Reactive Workflows
    International Workshop on Reactive and Event-Based Languages and Systems (REBLS), November 2018
    This paper discusses the initial version of Skitter, as presented at REBLS 2018 and the SPLASH 2018 Poster Session. This version of Skitter does not allow developers to specify custom distribution strategies. Instead, an effect system was used to declaratively specify the properties of an operation (called "components" at the time). This information was used by the runtime to select an appropriate distribution strategy. The need to support additional effects motivated the creation of the extendable distribution strategies present in Skitter today.
    Paper Poster Slides Bibtex Documentation for this version of Skitter