Table of Contents
Concurrent Programming with Actors
Concurrency is an integral part of the AmbientTalk programming language. Rather than relying on threads and locks to generate and manage concurrency, AmbientTalk embraces actors as a much more object-oriented approach to concurrency. Before diving into the details of concurrency in AmbientTalk, we briefly put the main differences between the actor model and the thread-based model into context.
Threads vs Actors
In traditional programming languages, the control flow of a concurrent program is divided over a number of threads. Each thread operates concurrently and control can switch from one thread to another non-deterministically. If two threads have access to the same data (objects), they might cause erroneous behaviour (so-called race conditions) because of this non-determinacy. Therefore, thread-based programming languages introduce locks (in the form of monitors, semaphores, …) which enable the construction of so-called critical sections, which are pieces of program code in which only one thread can run sequentially at a time.
The advantages of the thread-based model are that the model itself is easy to understand, it is efficiently implementable and it can be used to create very fine-grained synchronization (e.g. multiple readers/one writer). The disadvantages are that the resulting program behaviour is very hard to understand because of implicit context switches, interleaved acquisition/release of locks which may lead to deadlock, etc.
The original actor model is based on a purely functional programming language. Over the years, and with the widespread acceptance of the object-oriented programming paradigm, actors have been merged with stateful objects into so-called active object models.
Generally speaking, an active object is an object that encapsulates its own thread of control. An active object also has a message queue or mailbox from which it processes incoming messages. Each message is processed sequentially. An active object responds to an incoming message by invoking the method corresponding to the message. The method is executed by the active object's own thread. Because of this sequential processing of incoming messages, race conditions cannot occur on the internal state of an active object. Objects communicate with active objects by sending them messages asynchronously: the messages are enqueued in the receiver's message queue, rather than being invoked immediately.
AmbientTalk Actors and Far References
In AmbientTalk, concurrency is spawned by creating actors: each actor is an autonomous processor. AmbientTalk's actors are based on the vat model of the E programming language. In AmbientTalk, an actor consists of a message queue (to store incoming messages), a thread of control (to execute the incoming messages) and a number of regular objects that are said to be hosted by the actor.
When an actor is created, it hosts a single object which is said to be the actor's behaviour: it is the “public interface” to the actor. The object that created the new actor gets a reference to this behaviour object, such that it can start sending messages to the new actor. An actor can be created in AmbientTalk as follows:
>def a := actor: { def sayHello() { system.println("Hello World") }; }; >><far ref:behaviour of <actormirror:9501984>>
As you can see, actors are created similar to objects. The actor:
method, defined in the global lexical scope, takes a closure as its sole argument and uses that closure to initialize the behaviour of the new actor. The creator of the actor immediately receives a so-called far reference to this behaviour object, and from that moment on the creating actor and the created actor run in parallel, each capable of processing incoming messages autonomously.
So what exactly is a far reference to an object? The terminology stems from the E language: it is an object reference that refers to an object hosted by another actor. The main difference between regular object references and far references is that regular references allow direct, synchronous access to an object, while far references only allow asynchronous access to the referenced object. This is enforced by the kind of messages that these references can carry, as will be explained in the next section.
The figure below summarizes AmbientTalk's concurrency model where actors are represented as communicating event loops. The dotted lines represent the actor's event loop threads which perpetually take messages from their message queue and synchronously execute the corresponding methods on the actor's owned objects. Note that an event loop thread never escapes its actor boundary: when communication with an object in another actor is required, a message is sent asynchronously via a far reference to the object. For example, as shown below, when A sends a message to B, the message is enqueued in the message queue of B's actor which eventually processes it.
Asynchronous Message Sending
AmbientTalk, like E, syntactically distinguishes between synchronous method invocation and asynchronous message sending. The former is expressed as o.)
while the latter is expressed as o←)
. Regular object references can carry both kinds of invocations. Synchronous method invocation behaves as in any typical object-oriented language. When an asynchronous message is sent to a local object (“local” meaning “hosted by the same actor”), the message is enqueued in the actor's own message queue and the method invocation will be executed at a later point in time.
Far references, like the reference stored in the variable a
above, only carry asynchronous message sends, and as such totally decouple objects hosted by different actors in time: objects can never be blocked waiting for an outstanding remote procedure call, they can only communicate by means of purely asynchronous message passing. This is a key property of AmbientTalk's concurrency model, and it is a crucial property in the context of distributed programming.
Hence, given the example above, the method sayHello
can only be invoked as follows given a far reference a
:
>a<-sayHello(); >>nil
The above code is simple enough to understand: the sayHello
message is asynchronously sent to the object pointed to by a
by enqueueing it in a
's message queue. The message send itself immediately returns nil
: asynchronous sends do not return a value by default.
But what happens when the method to invoke asynchronously has parameters that need to be passed. How does parameter passing work in the context of inter-actor message sending? The rules are simple enough:
- Objects and closures are always passed by far reference
- Native data types like numbers, text, tables, … are always passed by copy
Generally speaking, any object that encapsulates a lexical scope is passed by reference, because passing such an object by copy would entail passing the entire lexical scope by copy - a costly operation. Objects without a lexical scope, such as methods, can be copied without having to recursively copy any scope.
When an object is passed by reference, we mean that the formal parameter of a method will be bound to a far reference to the original object. When it is passed by copy, the formal parameter will be bound to a local copy of the object. For example, consider the following calculator
actor:
>def calculator := actor: { def add(x,y,customer) { customer<-result(x+y) }; }; >><far ref:behaviour of <actormirror:14115383>>
The add
method takes three parameters: two numbers to add, and a so-called customer object which is responsible for consuming the “return value” of the method. Here is how to invoke this method:
>calculator<-add(1,2,object: { def result(sum) { system.println("sum = " + sum); }; }); >>nil
Because of the parameter passing rules described above, the add
method will receive copies of the numbers 1
and 2
, will add them synchronously, and will send the result asynchronously to the customer object, which was passed by reference, i.e. customer
is bound to a far reference. Eventually, the actor that sent the add
message will itself receive a result
message, and when this message is processed by the anonymous consumer object, the result is printed:
sum = 3
Isolates
The parameter passing semantics defined above rule out any possibility for an object to be passed by copy. The reason for this semantics is that objects encapsulate a lexical scope, and parameter passing an object by copy would require the entire lexical scope to be parameter-passed as well.
To enable objects to be passed by copy between actors, a special type of objects is introduced. These objects are called isolates because they are isolated from their surrounding lexical scope. Continuing our previous example, imagine we want our calculator to work with complex numbers, which are typically objects that one would want to pass by copy. We can define complex numbers as isolate objects as follows:
>def complexNumber := isolate: { def re; // assume cartesian coordinates def im; def init(re,im) { self.re := re; self.im := im; }; def +(other) { self.new(re+other.re, im+other.im); }; }; >><obj:{super,super:=,re,re:=,im,im:=,...}[Isolate]>
The isolate:
primitive is actually syntactic sugar for the creation of an object that is automatically tagged with the /.at.types.Isolate
type tag. Any object that is tagged with this type tag is treated as an isolate. If you are a Java programmer, you can best compare this behaviour to having to implement the java.io.Serializable
interface to make a class's instances serializable.
An isolate differs from a regular object as follows:
- it has no access to its surrounding lexical scope; this means that an isolate only has access to its local fields and methods. An isolate does have access to the global lexical scope of its actor.
- it is parameter-passed by-copy rather than by-reference in inter-actor message sends. The copy of the isolate received by the remote actor can only access that actor's global lexical scope, no longer the global scope of its original host.
- external method definitions on isolates are disallowed. The reason for this is that external method definitions implicitly carry a lexical scope (the scope of their definition). Hence, if an isolate with external methods has to be copied, those scopes would have to be copied as well. Following the rule that objects encapsulating a lexical scope are pass-by-reference, we chose to disallow external methods on isolates.
Returning to the calculator example, the calculator can now add complex numbers locally and send (a copy of) the resulting complex number back to the customer:
>calculator<-add( complexNumber.new(1,1), complexNumber.new(2,2), object: { def result(sum) { system.println("sum=("+sum.re+","+sum.im+")"); }; }); >>nil sum=(3,3)
==
method on isolates to compare isolates based on their semantic identity, rather than on their object identity. For example, equality for complex numbers should be defined as:
def ==(other) { (re == other.re).and: { im == other.im } }
On a related note, it is good practice to consider isolates as immutable objects, since modifying an isolate will only modify its local copy.
As already explained, an isolate has no access whatsoever to its encompassing scope. Hence, the following code results in an exception:
>def x := 1; def adder := isolate: { def add(n) { x + n }; }; adder.add(3) >>Undefined variable access: x origin: at adder.add(3)
However, sometimes it is useful to initialize an isolate with the values of lexically visible variables. To this end, AmbientTalk allows the programmer to specify which lexical variables should be copied into the isolate itself, such that the isolate has its own, local copy of the variable. Lexical variables that need to be copied like this are specified as formal parameters to the closure passed to the isolate:
primitive, as follows:
>def x := 1; def adder := isolate: { |x| def add(n) { x + n }; }; adder.add(3) >>4
|x|
in the above example), these implicitly copied free variables are private to the isolate (so they cannot be accessed from outside the isolate).
Futures
As you may have noticed previously, asynchronous message sends do not return any value (that is, they return nil
). Quite often, the developer is required to work around this lack of return values by means of e.g. explicit customer objects, as shown previously in the calculator example. This, however, leads to less expressive, more difficult to understand code, where the control flow quickly becomes implicit.
The Concept
The most well-known language feature in concurrent and distributed languages (for example, in ABCL, the actor-based concurrent language) to reconcile return values with asynchronous message sends is the notion of a future. Futures are also commonly known by the name of promises (this is how they are called in the E language and in Argus). Wikipedia has an excellent article about promises, futures and their differences.
Futures are objects that represent return values that may not yet have been computed. Once the asynchronously invoked method has completed, the future is replaced with the actual return value, and objects that referred to the future transparently refer to the return value.
Using futures, it is possible to re-implement the previous example of requesting our calculator actor to add two numbers as follows:
def sum := calculator<-add(1,2);
Enabling futures
In AmbientTalk, futures are not native to the language. However, because of AmbientTalk's reflective infrastructure, it is possible to build futures on top of the language. The system library shipped with AmbientTalk contains exactly this: a reflective implementation that adds futures to the language kernel. This implementation can be found in the file at/lang/futures.at
.
To enable futures, it suffices to import the futures module and to enable it, as follows:
import /.at.lang.futures; enableFutures(true);
The first statement imports the futures module into the current lexical scope. This enables you as a developer to use some additional language constructs exported by the futures module, as will be explained later. The second statement enables the futures behaviour, causing any asynchronous message send to return a future rather than nil
. If false
is passed to the call to enableFutures
, only messages annotated explicitly as @FutureMessage
(or as @TwoWay
) will return a future.
Working with Unresolved Futures
We have described a future as a placeholder for the return value of an asynchronous message send which is eventually resolved with the expected result. However, we have yet to describe what objects can do with futures that are unresolved, i.e. what can an object do with a future that does not know its value yet? In many multithreaded languages that introduce the future abstraction, the language dictates that accessing (performing an operation on) an unresolved futures causes the active thread to block; the thread has to wait for the value of the future to be available, before it can carry on. This makes futures a preferred means of synchronising two threads with more freedom than is possible using a simple remote procedure call.
Blocking a thread on a future can be a major source of deadlocks, like any form of blocking. In the actor paradigm where communication between actors should remain strictly asynchronous, this behaviour is obviously unwanted. Furthermore, in a distributed programming context, it is highly undesirable to make one actor block on a future that may have to be resolved by an actor on another machine. It would make the application much more vulnerable because of latency or partial failure, resulting in unresponsive applications at best, and deadlocked applications at worst.
The solution proposed in the E language, and adopted in AmbientTalk, is to disallow direct access to a future by any object. Instead, objects may only send asynchronous messages to a future object. This enables the future to temporarily buffer such messages until its resolved value is known. The net effect of this solution is that futures actually can be “chained”, forming asynchronous pipelines of messages, as will be illustrated in the next section.
As an example of a pipeline of message sends, consider the following code:
def booleanFuture := remoteObject<-ask(something); booleanFuture<-ifTrue: { ... } ifFalse: { ... }
In this example, the message ifTrue:ifFalse:
is sent to a future which will be resolved with a boolean object (the answer to the ask
method). When the booleanFuture
becomes resolved, it will forward the message to its resolved value. The above code shows how you can postpone the execution of code until a future becomes resolved. However, this particular synchronisation technique only works for (futures for) booleans. The following section describes a more general synchronisation technique to await the value of a future for any kind of value.
Working with Resolved Futures
When a future eventually becomes resolved with a value, any messages that were accumulated by the future are forwarded asynchronously to the actual return value, such that it appears as if the original object had sent the messages to the actual return value in the first place.
==
method. A word of warning though: equality on futures is defined as pointer equality, so a future will only be equal to itself. It does not compare the parameter object with its actual value, if it would be resolved.
As explained above, it is always correct to use asynchronous message sends to communicate with a future. Sometimes, however, we may want to perform some operation on the return value other than message sending, for example, printing it to the screen. If you print the future directly, you get the following:
def sum := calculator<-add(1,2); system.println(sum); >> <unresolved future>
AmbientTalk prints the future to the screen. At a later point in time, printing the future again may result in the following:
>system.println(sum); >> <resolved future:3>
This time, the future was printed when the return value was computed. But what if we simply want to inform the user of the actual value of sum
? In such cases, you need to register an observer with the future, which will be asynchronously notified when the actual value of the future has been computed.
In AmbientTalk, this observer takes the form of a closure which will be applied asynchronously, taking as its only argument the actual value of the future. Registering the observer can be easily done by means of the when:becomes:
function, exported by the futures module:
def sumFuture := calculator<-add(1,2); when: sumFuture becomes: { |sum| system.println("The sum is " + sum); };
The first argument to when:becomes:
is the future to observe. The second argument is a closure that takes the actual return value as a formal parameter. If there is a possibility that the asynchronously invoked method can raise an exception, this exception can be caught asynchronously by means of the when:becomes:catch:
variant:
def sumFuture := calculator<-add(1,2); when: sumFuture becomes: { |sum| system.println("The sum is " + sum); } catch: { |exc| system.println("Exception: " + exc.message); };
Or, you can specify a type tag to only catch specific exceptions:
def divFuture := calculator<-divide(a,b); when: divFuture becomes: { |div| system.println("The division is " + div); } catch: DivisionByZero using: { |exc| system.println("Cannot divide "+a+" by zero!"); };
The when:*
functions are a very easy mechanism to synchronise on the value of a future without actually making an actor block: remember that all the when:becomes:
function does is register the closure with the future. After that, the actor simply continues processing the statement following when:becomes:
. Also, even if the future is already resolved at the time the closure observer is registered, the closure is guaranteed to be applied asynchronously. This guarantees that the code following a when:becomes:
block is executed before the registered closure itself:
when: sumFuture becomes: { |sum| system.println("... and here later."); }; system.print("Always here first"); >>Always here first... and here later.
Finally, it is useful to know that when:becomes:
itself returns a future, which will be resolved with the value of applying the observer closure:
def fut := when: calculator<-add(1,2) becomes: { |sum| calculator<-add(sum,3) };
When the future for ←add(1,2)
becomes resolved with sum
, the fut
future will be resolved with the future for the ←add(sum,3)
message. When that message finally returns yet another sum, that sum will become the value of fut
. This example also illustrates how futures can be chained by means of the when:becomes:
function forming the so-called asynchrounous pipelines of messages.
Futures and Annotated Messages
As previously explained, there are two modes for enabling futures in AmbientTalk. Invoking enableFutures(true)
makes asynchronous sends return a future by default. Invoking enableFutures(false)
returns nil
by default. No matter how you enabled futures, you can always override the default setting by explicitly annotating the message send itself by means of two type tags exported by the futures module, as explained below.
When a message send is annotated with the OneWay
type tag, it will never attach a future to the message. This is primarily useful if you have enabled futures by default, but want to send a one-way message requiring no result. In this case, simply send the message as follows:
o<-m()@OneWay
When a message send is annotated with the @FutureMessage
or @TwoWay
type tag, a future is attached to the message, but only if futures have been enabled! This is primarily useful if you have enabled futures, but not by default, because you don't want to incur the overhead of future-type message sends on each of the messages sent. In cases where futures become useful, simply send the message as follows:
o<-m()@TwoWay
When a message send is annotated with the @Due
type tag, the attached future is expected to be resolved before a specified deadline. As shown below, the annotation takes as parameter a timeout value (in milliseconds) relative to the time at which a message is sent. The future is automatically ruined with a TimeoutException
if the timeout elapses before the return value was received. This is primarily useful to have time-based delivery policy guarantees on asynchronous messages.
o<-m()@Due(timeout);
Due
annotation can be found in the section about leased object references in the chapter on distributed programming.
Finally, it is possible to first invoke enableFutures(false)
and later enable it by default anyway by invoking enableFutures(true)
. However, once futures have been enabled by default, they can no longer be “turned off” by default. The reason for this is that if two separate files load the futures module and one enables futures by default and the other does not, then the net result is that they will be enabled by default, which will make both applications work correctly. If futures could be disabled, this can cause one object to unexpectedly make other objects crash because they depend on futures.
Conditional Synchronisation with Futures
Futures are useful to synchronise on the return value of an asynchronous message send. However, objects hosted by different actors may often want to synchronise based on other events or conditions. In such cases, futures can be created and resolved explicitly. The interface to the programmer is about the same as that specified by the E language:
// to create an explicit future: def [future, resolver] := makeFuture(); // to explicitly resolve a future resolver.resolve(val);
The makeFuture
function exported by the futures module returns two values: a new, unresolved future, and an object called a resolver that can be used to resolve the future. As shown in the example, the future can be passed around as a regular object, and code can synchronise on the future by registering an observer on it, as shown previously. The resolver can be handed out separately to other parts of the program, which calculate the value for the future. This enables objects to “synchronise” on the future without being restricted to return values.
The resolver also defines a ruin(e)
method which can be used to explicitly ruin a future, causing any attached when:becomes:catch:
blocks to trigger their catch:
clause.
As an example of such conditional synchronization, consider the following example first proposed by Henry Lieberman in his 1987 paper on “Object-oriented Programming in ACT-1”. A dating service allows lonely hearts to register their profile allowing it to match people based on their profiles. The dating service can be modelled as an object:
def makeDatingService() { def people := []; // a list of Questionnaire objects object: { def match(lonelyHeart) { // if an ideal mate is found in the list, // return its name // otherwise // create a future (a "promise") to answer // the lonelyHeart later, when an ideal made // has registered with the dating service } } }
Let us assume that a person is simply identified by a name and its sex:
def makePerson(nam, sx) { object: { def name := nam; def sex := sx; } };
The dating service has a little database stored as a simple list. This list does not contain person objects but rather Questionnaire objects. The questionnaire contains the logic necessary to match people. We will assume for the sake of the example that 2 people match if they are of the opposite sex. In addition, a questionnaire object can keep track of an “outstanding answer” (which we model as a first-class future) for a lonely heart that is still waiting for his or her perfect match.
def makeQuestionnaire(p) { def idealPersonResolver; object: { def person := p; def matches(otherQ) { otherQ.person.sex != p.sex }; def wait() { def [future, resolver] := makeFuture(); idealPersonResolver := resolver; future }; def notify(name) { idealPersonResolver.resolve(name) }; }; };
When a questionnaire is asked to wait()
, it returns a future and stores the future's corresponding resolver in a hidden field. Later, when a matching person is found, the future can be explicitly resolved by invoking the questionnaire's notify
method.
Armed with these abstractions, we can now fill in the missing logic of the match
method of the dating service:
def makeDatingService() { def people := []; // a list of Questionnaire objects object: { def match(lonelyHeart) { def lonelyHeartQ := makeQuestionnaire(lonelyHeart); { |return| people.each: { |idealMateQ| // an ideal mate was found if: (idealMateQ.matches(lonelyHeartQ).and: { lonelyHeartQ.matches(idealMateQ) }) then: { idealMateQ.notify(lonelyHeart.name); // notify idealMate // remove the person from the database people := people.filter: { |q| q != idealMateQ }; return(idealMateQ.person.name) // notify lonelyHeart } }; // no ideal mate was found, store its questionnaire in the database people := people + [lonelyHeartQ]; lonelyHeartQ.wait(); // return a future for the ideal person's name }.escape(); } } }
Below, we define an auxiliary method that illustrates how a lonely heart has to interact with the dating service.
def d := makeDatingService(); def register(p) { when: d<-match(p)@FutureMessage becomes: { |name| system.println(p.name + " matched with " + name); }; };
The key to our conditional synchronization is that the when:becomes:
listener will only be invoked if and when a matching person has registered itself with the dating service. As long as this condition has not been fulfilled, the future will remain unresolved. In fact, the future returned by d←match(p)
has been resolved with another future, the one returned by Questionnaire.wait()
. However, recall that a future resolved by another unresolved future does not really count as being “resolved” and will only trigger its listeners once its “dependent” future has been resolved.
The complete source code of the above example can be found in the file at/demo/DatingService.at
found in the distribution. The directory at/demo
also contains an example solution to Dijkstra's famous “dining philosophers” problem, which makes use of a similar technique to achieve conditional synchronization of the philosophers.
Actor Mirrors
An actor in AmbientTalk is primarily a host for regular objects. It is equipped with a message queue to receive asynchronous messages sent to one of its objects. The mirrors on these objects have corresponding meta-level operations such as send
and receive
that can be overridden to customise e.g. message sending on a per-object basis.
Some operations, such as creating and sending asynchronous messages are useful to reify at the actor level. With such a reification, the programmer could override the way messages are sent by any object from within an actor. The mirror on an actor that reifies these operations can be acquired by invoking the function reflectOnActor()
, defined at top-level. Evaluating reflectOnActor
returns the mirror on the current actor. It defines operations such as createMessage
and send
which can be explicitly invoked, or even overridden by the programmer.
Overriding the actor's metaobject protocol can be done by installing a new protocol object. This is done by invoking the becomeMirroredBy:
method on the actor's mirror. The following code installs a new MOP that logs any messages sent by any object in the current actor:
def actor := reflectOnActor(); def oldmirror := actor.becomeMirroredBy: (extend: actor with: { def send(msg) { log(msg); super^send(msg); }; });
Notice that, in this example, the new metaobject protocol is an extension of the old protocol. This enables it to invoke its parent's behaviour simply by means of a super-send. Note also that the becomeMirroredBy:
primitive returns the previously installed mirror. This is useful for when an actor mirror should only temporarily be installed. The old mirror can later be re-installed.
at/lang/futures.at
file in the system library. This file implements future-type message passing. It uses a custom actor mirror that overrides createMessage
– which is invoked whenever an asynchronous message is created – to attach a future to the message.
Other methods that can be overridden are require
and provide
, which reify the export and discovery of objects, and createMirror
, which is invoked by the reflect:
primitive. By overriding this factory method, it becomes possible to easily customize the behaviour of mirrors defined on local objects.
Nesting Actors
In AmbientTalk, objects can be nested inside other objects and functions can be defined in other functions. It makes sense to ask whether actors can also be nested. The answer is that yes, actors too can be nested. However, this is not as straightforward as it seems. Consider the following example:
def outer := actor: { def x := 1; def get() { x }; def set(v) { x := v }; def inner := actor: { def get() { x }; def set(v) { x := v }; }; };
If both the outer
and inner
actors lexically see x
, they could modify it concurrently, reintroducing race conditions on the internal state of an actor. Therefore, when defining an actor using a block of code, we disallow direct access to the enclosing lexical scope by the new actor. Actors behave similarly to isolates in this respect. The above example code will work, but the programmer has to keep in mind that the x
variable accessed by inner
is a copy of the x
variable of outer
. Hence, assignments to x
by inner
will not affect outer
and vice versa.
Recall that isolates could be given access to their enclosing lexical scope either by specifying accessed variables as formal parameters to their initializing closure or by having the interpreter derive the lexically free variables automatically. If the programmer wants to make explicit the fact that x
is copied, the example can also be rewritten as:
def outer := actor: { def x := 1; def get() { x }; def set(v) { x := v }; def inner := actor: { |x| def get() { x }; def set(v) { x := v }; }; };
It still makes sense to nest actors, but each actor will have its own local copy of lexically shared variables. Furthermore, the value bound to the copied variable is parameter-passed according to inter-actor parameter passing rules, so if x
would be bound to an object, then outer
would have a normal reference to the object, while inner
would receive a far reference to the object.