Skitter is a Domain Specific Language for building scalable distributed stream processing applications with custom distribution strategies.

Modern Distributed Stream Processing Engines offer developers limited control over how the various operators (map, reduce, join, …) in their applications are distributed over a cluster. We believe the performance of a distributed stream application can be improved by offering developers full control over the distribution logic of their applications. Skitter is a distributed stream processing language which enables the creation of custom distribution strategies, allowing developers to determine how operators are distributed over a cluster.

Skitter is developed as a DSL in Elixir, an extensible language built on top of the Erlang VM. It is available on GitHub and was developed in the context of of my PhD.

build

Customisable

Skitter enables the creation of custom distribution strategies. These strategies determine how a data processing application is distributed over multiple machines, allowing the creation of distribution logic tailored towards the properties of the application.

merge_type

Composable

Skitter applications are built by combining reusable data processing components into a workflow. Each of these components may be distributed by a different distribution strategy, enabling multiple distribution strategies to be used in a single application.

extension

Extendable

Skitter provides a trait-like system which makes it possible to extend existing distribution strategies. This makes it possible to build new strategies based on previously defined strategies, while also facilitating minor modifications to existing strategies.

Skitter by Example

Skitter applications are written in terms of three abstractions: components, workflows and strategies. Components define the data processing logic of an application, they are composed into workflows, which are used to define data processing pipelines. Each component is paired with a distribution strategy, this strategy determines how the component is distributed over the available machines at runtime. Below, we provide a bird's eye overview of these concepts based on a few simple examples.

Components

The data processing logic of Skitter applications is expressed in reactive components. These components are built in Skitter's component definition language. While writing a component, a developer does not have to reason about distribution, they only have to specify how a component reacts to incoming data. Reactive components can be created from scratch, or by writing a wrapper around existing software.

defcomponent FahrenheitToCelcius, in: fahrenheit, out: celcius do
  defcb react(fahrenheit) do
    ((fahrenheit - 32) * (5 / 9)) ~> celcius
  end
end

This component defines a single callback, react which converts its argument fahrenheit to celcius. The resulting value is emitted to the celcius port. At runtime, the strategy of this component would call react, after which the emitted value is sent to components downstream of FahrenheitToCelcius.

defcomponent Average, in: value, out: current do
  state_struct total: 0, count: 0

  defcb react(value) do
    total <~ ~f{total} + value
    count <~ ~f{count} + 1
    ~f{total} / ~f{count} ~> current
  end
end

Components can maintain state. This component tracks the average of all the values it receives at runtime. It does this by counting the amount of values it receives (in the counter field) and the sum of all these values (total). When Average recieves a new value, it updates its fields, calculates the current average and emits this value to its successors.

Workflows

Reactive components can be composed into reactive workflows. These workflows are built in a simple textual language where various components are linked to other components or to other, nested, workflows. The example shown here defines a data processing pipeline which provides crowdsourced videos of participants in a running competition based on nearby spectators. Its graphical representation is shown below.

workflow do
  node(TCPSource, args: {"localhost", 4555}) ~> node(Parser) ~> joiner.participants
  node(TCPSource, args: {"localhost", 4556}) ~> node(Parser) ~> joiner.spectators

  node(Join, as: joiner, args: [windows_size: 5])
  ~> node(Rate)
  ~> node(Select)
  ~> node(Publish)
end

Strategies

A unique feature introduced by Skitter is the notion of a distribution strategy, every Skitter component must be paired with a strategy, this can be done in the component definition or when it is used in a workflow. Like components and workflows, strategies are created through the use of a DSL. A Strategy is defined by implementing several hooks defined by the Skitter runtime system.

defstrategy ImmutableLocal do
  defhook deploy(args) do
    Nodes.on_all_worker_cores(fn ->
      create_worker(
        fn -> call_component(:init, [args]).state end,
        :worker,
        :local
      )
    end)
    |> Map.new()
  end

  defhook deliver(msg, _) do
    send(Enum.random(deployment()[Nodes.self()]), msg)
  end

  defhook process(msg, state, :worker) do
    result = call_component(:react, state, [msg])
    emit(result.emit)
    state
  end
end

This strategy handles components with an immutable state. It does this by creating 8 workers on every available machine in the cluster; any data emitted by a predecessor of the component will will be sent to a worker on the local machine, which will call the react callback of the component.

The workers of this strategy are created in the deploy hook, which is called by the runtime system when a component needs to be deployed over the cluster. Any data returned by this hook is stored inside the deployment, which is automatically available in all other hooks. The deliver hook is invoked by the runtime when a predecessor of the component emits data. This strategy selects a worker on the current node and forwards the data to this worker. Finally, the process hook is called when a worker receives data. In this case, the worker executes the react callback of the component and specifies that the emitted data should be forwarded to all the successors of this component.

defstrategy ShuffledImmutable, extends: ImmutableLocal do
  defhook send(msg, _) do
    node = Nodes.workers() |> Enum.random()
    send(Enum.random(deployment()[node]), msg)
  end
end

Strategies can be created based on other strategies. In the example shown here, a new strategy is created based on the ImmutableLocal strategy shown in the previous example. Unlike ImmutableLocal, this strategy does not attempt to keep work local, instead, it selects a random machine in the cluster to process the received data.

Getting Started

We have developed Skitter as a DSL in Elixir, it is available on GitHub. Detailed instructions on getting a Skitter project up and running can be found in the documentation. Instructions for getting started with older versions of Skitter described in specific papers can be found by following the documentation link beneath the paper in the "Publications" section below.

Publications

  • Skitter: A DSL for Distributed Reactive Workflows
    International Workshop on Reactive and Event-Based Languages and Systems (REBLS), November 2018
    This paper discusses the initial version of Skitter, as presented at REBLS 2018 and the SPLASH 2018 Poster Session. This version of Skitter does not allow developers to specify custom distribution strategies. Instead, an effect system was used to declaratively specify the properties of a component. This information was used by the runtime to select an appropriate distribution strategy. The need to support additional effects motivated the creation of the extendable distribution strategies present in Skitter today.
    Paper Poster Slides Bibtex Documentation for this version of Skitter