====== Distributed Programming ======
Building on the actor-based concurrency model explained in the [[actors|previous chapter]], this chapter discusses the distribution provisions of AmbientTalk. For actors to communicate across the boundaries of a single device, actors need to be capable of discovering one another's presence and need to be resilient to intermittent disconnections of their communication partners.
These requirements correspond to the cornerstones of the Ambient-Oriented Programming paradigm. The seamless integration of language support for dealing with partial failures and performing service discovery, hinge on AmbientTalk's concurrency model based on actors and far references. This chapter will explore the discovery mechanisms to create far references which span different devices, and illustrate how such //remote// far references are able to deal with partial failures in mobile ad hoc networks.
Before delving in these topics, we illustrate how to activate the network facilities of AmbientTalk in the next section.
===== Going Online =====
AmbientTalk provides an unique native object, named ''network'', that responds to two methods that control the network access to an AmbientTalk virtual machine. More specifically, ''network.online()'' and ''network.offline()'' make a virtual machine go online and offline, respectively.
When the virtual machine goes online, the built-in service discovery mechanism is able to broadcast the presence of locally discoverable objects to all virtual machines in the environment, as well as acquaint local objects with discoverable objects on other devices. The precise details of AmbientTalk's discovery support will be further explained in the following section.
Taking a virtual machine offline on the other hand will immediately sever all connections with other virtual machines and thus, induce a partial failure for all references that transgress the boundaries of a single virtual machine. This is a deliberate design choice made to facilitate the simulation of transient disconnections. Note that such disconnections do not render far references unusable, as we shall explain [[distribution#partial_failure_handling|below]].
Be aware that by default the network access is shut down.
===== Exporting and Discovering Objects =====
AmbientTalk provides language support to make some objects available to other objects residing in remote actors by means of the ''export:as:'' construct. The ''export:as:'' construct takes as argument an object that is made remotely accessible and a service type under which the object can be discovered. For example, one exports a printing service as follows:
deftype Printer;
def service := object: {
def print(aDoc) {
system.println("printing " +aDoc);
}
};
export: service as: Printer;
When an object its exported by its actor, it becomes discoverable by other actors by means of the service type. Internally, this means that the object is placed in the export table of its actor. As shown in the example, a service type is represented by a type tag. This means that services types are not associated with a set of methods, but they denote an abstract publication topic that objects exports. As a type tag, a service type can thus be a subtype of one or more other service types. For example, an object could offer a color printing services by exporting the following type tag:
deftype ColorPrinter <: Printer;
The ''export:as:'' construct returns a publication object that responds to a ''cancel'' method that can be used to cancel the publication, i.e, unexport the object.
==== Discovering objects ====
AmbientTalk has a built-in peer-to-peer discovery lookup mechanism based on a publish-subscribe scheme that was designed to be able to discover objects in mobile ad hoc network interactions where no centralized lookup infrastructure may be available.
As previously explained, objects broadcast to the network the service types they offer using the export statement. AmbientTalk also provides language constructs to install an observer whose block of code will be triggered when a remote object of a certain service type becomes available in the network. For example, one can discover a proximate buddy of an instant messenger application by means of the ''when:discovered:'' construct as follows:
when: InstantMessenger discovered: { |messenger|
when: (messenger<-getName()) becomes: { |name|
buddyList.put(name, messenger);
system.println("Added buddy: " + name);
};
};
The code block to execute when the service type becomes available is parameterized with the actual remote reference to the discovered service object. In the example above, ''messenger'' is a remote reference to the remote object exporting the ''InstantMessenger'' service type. Imagine the interaction between the instant messenger applications executing the above code of two persons, e.g. //Bart// and //Lisa//. When //Bart's// instant messenger and //Lisa's// instant messenger come into one another's communication range, //Bart// will discover //Lisa// and //Lisa// will discover //Bart// since both are exporting and also searching the ''InstantMessenger'' service type. Once discovered, both will interchange their names and store it in their ''buddyList''.
We are using a future to get the return value of the ''getName'' asynchronous message invocation. For further details about futures and the ''when:becomes:'' language construct, we refer the reader to the previous chapter on the [[actors|concurrency model of AmbientTalk]].
The ''when:discovered:'' observers will be triggered only once when an ''InstantMessenger'' service type becomes available in the network. In order to be able to discover all other instant messenger buddies available in the network, the ''whenever:discovered:'' construct should be used. As ''when:discovered'', ''whenever:discovered'' takes a service type and a block of code that will be triggered when a remote object of a certain type becomes available in the network. However, the block of code specified in ''whenever:discovered'' can be fired multiple times upon discovering several exported objects.
As ''export:as'', both constructs returns a subscription object that responds to a ''cancel'' method that can be used to cancel the subscription so that the block of code is no longer invoked. Note that objects exported by an actor do not trigger the actor's own ''when:discovered:'' nor ''whenever:discovered:'' observers.
===== Dealing with Transient Failures =====
Let us consider again the example instant messenger application described in previous section to further explain the semantics of AmbientTalk's remote object references, which we call //remote far references//, and how they deal with transient disconnections.
When an object discovers a service type, the ''when:becomes:'' observers are triggered receiving as parameter a far reference to the remote object discovered. As explained in previous sections, far references operates asynchronously. When a client object sends a message via a remote far reference, the message is buffered in the far reference and the client does not even wait for the message to be delivered. This is crucial in distributed computing in order to prevent race conditions. The parameter passing semantics for messages sent to remote objects works similar to the inter-actor message sending semantics:
- Objects are always passed //by far reference//, except for isolate objects which are passed by copy.
- Native data types are always passed by copy.
When a remote far reference receives a message, it flushes the message to the remote object providing that it is connected. If the remote far reference is disconnected, messages are accumulated in its inbox in order to be transmitted once the reference becomes reconnected at a later point in time, when the network connection is restored.
Therefore, a remote far reference allows its clients to make abstraction from the actual network connection state. However, it is often useful for an application to be informed when a connection to a remote object is lost or restored. To this end, AmbientTalk offers language constructs to install observers on a far reference which are triggered whenever the reference becomes disconnected or reconnected. For example, the instant messenger application can notify the user whenever a buddy moves in or out of the communication range as follows:
when: InstantMessenger discovered: { |messenger|
...
whenever: messenger disconnected: {
system.println("Buddy offline: " + name);
};
whenever: messenger reconnected: {
system.println("Buddy online: " + name);
};
};
This code illustrates how the instant messenger application gets notified when a buddy goes online or offline. In the above code, ''messenger'' is a remote reference to another remote buddy discovered. Note that by installing observers that trigger upon disconnection, developers can clean certain resources when a remote reference becomes disconnected. However, when an instant messenger disconnects, the remote object referred to by ''messenger'' remains exported. This implies that a disconnected remote reference remains valid (it can reconnect), but also that the remote object remains referred to by a disconnected remote reference. This reference thus prevents the remote object from being garbage collected. In fact, ''messenger'' is never garbage unless its subscription is explicitly cancelled. But other types of objects which are only relevant within the context of an interaction should eventually become candidates for garbage collection. In the next section, we detail how AmbientTalk deals with distributed memory management.
In order to cope with partial failures, AmbientTalk also allows developers to retract all currently unsent messages from the remote far reference outbox by means of the ''retract'' language construct. This is especially useful in the context of distribution, since developers can have explicit control over the messages that are buffered but have not been sent while the remote far reference was disconnected. Any undelivered messages accumulated by the remote reference can then be e.g. forwarded to another remote object or simply cancelled.
The ''retract'' language construct takes as argument the far reference of which to retract buffered outgoing messages. One can store the unsent messages upon disconnection of a service type ''Service'' as follows:
when: Service discovered: { | reference |
when: reference disconnected: {
messages := retract: reference;
}
}
The ''retract:'' call returns a table containing copies of all messenges that were sent to this far reference, but not yet transmitted by the far reference to the remote object pointed to. Note that this has the side effect that the returned messages will not be sent automatically anymore; the programmer is thus responsible to explicitly resend all messages that were retracted but still need to be sent.
The function ''when:disconnected:'' behaves like the previously discussed function ''whenever:disconnected:'', except it triggers the associated closure at most once, not upon //every// disconnect.
===== Dealing with Permanent Failures =====
As explained in the previous section, remote far references have been designed to be resilient to intermittent disconnections by default. This behaviour is desirable because it can be expected that many partial failures in mobile ad hoc networks are the result of transient network partitions. However, not all network partitions are transient. For example, a remote device has crashed or has moved out of the wireless communication range and does not return. Such permanent failures should also be dealt with by means of compensating actions, e.g. application-level failure handling code.
To deal with permanent failures, AmbientTalk uses the concept of leasing. A lease denotes the right to access a resource for a specific duration that is negotiated by the owner of a resource and a resource claimant (called the lease grantor and lease holder, respectively) when the access is first requested. At the discretion of the lease grantor a lease can be renewed, prolonging access to the resource. In AmbientTalk, we represent leases as a special kind of remote far references, which we call //leased object references//.
====Leased Object References====
A leased object reference is a remote far reference that grants access to a remote object for a limited period of time. When the time period has elapsed, the access to the remote object is terminated and the leased reference is said to //expire//. Similarly to remote far references, a leased reference abstracts client objects from the actual network connection state. Client objects can send a message to the remote object even if a leased reference is disconnected at that time. Message are accumulated in order to be transmitted when the reference becomes reconnected. When the leased reference expires it, messages are discarded since an expired leased reference behaves as a //permanently// disconnected remote far reference. The figure below shows a diagram of the different states of a leased reference.
{{ :at:tutorial:leasedref-state.png |:at:tutorial:leasedref-state.png?160x30}}
====Working with leased object references====
The code snippet below illustrates a leased far reference in the context of an online shopping application. In the example, a client object can ask a server to start a shopping session by sending it the ''openSession'' message. In response to this message, the server returns a new session object which implements methods that allow a client to place items in its shopping cart or to check out. The returned session gets leased by means of the ''lease:for:'' construct.
def openSession() {
def shoppingCart := Cart.new(); // to store purchased items
def session := object: {
def addItemToCart(anItem) { ... }
def checkOutCart() { ... }
};
def leasedSession := lease: minutes(5) for: session;
leasedSession; // return lease on the session to the client
};
The ''lease:for:'' construct takes as parameters a time interval (in milliseconds) and the remote object to which it grants access, and returns a leased reference that remains valid for the indicated time interval (5 minutes in the example). The construct alters the parameter passing semantics of the remote object to which it grants access, which gets parameter passed by lease rather than the default by far reference semantics.
We assume the use of futures to get the return value of the ''openSession'' asynchronous message invocation. For further details about futures, we refer the reader to the previous chapter on the [[actors|concurrency model of AmbientTalk]].
At client side, a customer can ask a server to open a shopping session as follows:
def mySession := server<-openSession()@FutureMessage;
...
mySession<-addItemToCart(selectedItem);
The future attached to the ''openSession message'' will be resolved with a leased reference to a session object which remains valid for the next 5 minutes. From that moment on, the client can use the leased reference as if it were the session object itself until it expires. Similar to far references, client objects can only send messages to remote objects asynchronously. The lifetime of a leased reference can be explicitly controlled by renewing or revoking it before it expires as shown below:
renew: mySession for: minutes(5);
revoke: mySession;
The ''renew:'' construct requests a prolongation of the specified leased reference with a new interval of time which can be different than the initial time. The ''revoke:'' construct cancels the given leased reference. Cancelling a lease is in a sense analogous to a natural expiration of the lease, but it requires communication between the client and server side of the leased reference.
When no renewal is performed due to a network partition outlasting the lease time period or in absence of utilization, the leased reference expires when the its lease time elapses. Both client and service objects can schedule clean-up actions with the leased reference upon expiration by means of the ''when:expires:'' construct as follows:
when: mySession expired: {
... // free up resources used by this session e.g. the cart
}
The construct takes as parameters a leased reference and a block of code that is asynchronously triggered upon the lease expiration. This allows client and service objects to treat a failure as permanent (i.e. to detect when the reference is permanently broken) and to perform application-level failure handling. At server side, this has important benefits for memory management. Once all leased references to a service object have expired, the object becomes subject to garbage collection once it is no longer locally referenced.
====Leasing patterns====
As is the case in other leasing mechanisms, determining the proper lease renewal period is not straightforward and may even depend on system parameters such as the number of clients. In AmbientTalk, we incorporate two leasing variants on leased object references which transparently adapt their lease period under certain circumstances.
The first variant is a //renew-on-call// leased reference which automatically prolongs the lease upon each method call received
by the remote object. In other words, as long as the client uses the remote object, the leased reference is transparently renewed by the interpreter. In the the shopping application example, once a client establishes a shopping session with the server, the session should remain active as long as the shopping process is active, i.e. ''addItemToCart'' or ''checkOutCart'' messages are received in the session object. A renew-on-call leased reference can be used for the session object to model that kind of collaboration as follows:
def openSession() {
def shoppingCart := Cart.new(); // to store purchased items
def session := object: {
def addItemToCart(anItem) { ... }
def checkOutCart() { ... }
};
def leasedSession := renewOnCallLease: minutes(5) for: session;
leasedSession; // return lease on the session to the client
};
Similar to ''lease:for:'', this construct takes as parameters a time interval (in milliseconds) and the remote object to which it grants access, and returns a leased reference that remains initially valid for 5 minutes but it is automatically renewed each time the remote object receives a message. The renewal time applied on every call is the initial interval of time specified at creation by default.
The second variant is a //single-call// leased reference which automatically revokes the lease upon performing a successful method call on the remote object. The ''singleCallLease:for:'' construct allows developers to create single-call leased references as follows:
def myObject: = object:{
...
};
def leasedObject := singleCallLease: minutes(5) for: myObject;
Similar to the other two constructs, ''singleCallLease:for:'' takes as parameters a time interval (in milliseconds) and the remote object to which it grants access, and returns a leased reference that remains valid for only a single call. In other words, the leased reference expires after the remote object receives a single message. However, if no message has been received within the specified time interval, the leased reference also expires.
====Integrating leasing with future-type message passing====
Single-call leases are useful for objects adhering to a single call pattern, such as callback objects. Callback objects are often used in asynchronous message passing schemes in order for remote object to be able to return values. These callback objects are typically remotely accessed only once by remote objects with the computed return value. In AmbientTalk, futures serves as an implicit callback.
We have integrated leasing into futures by parameter-passing a future attached to an asynchronous message via a singe-call lease which either expires due to a timeout or upon the reception of the computed return value. The timeout for the implicit single-call lease on a future can be set by annotating the asynchronous message with a ''@Due'' annotation as follows:
def sessionFuture := server<-openSession()@Due(minutes(10));
when: sessionFuture becomes: { |session|
// open session with server
}catch: TimeoutException using: { |e|
// unable to open a session, do some clean-up if necessary
}
If the future is resolved, the session variable stores a leased object reference to the remote session object. A ''TimeoutException'' is raised when the future’s lease expires, i.e. when the reception of the computed return value is not received before the lease time elapsed.
Note that specifying a ''catch:'' block for the ''TimeoutException'' is equivalent to installing a ''when:expired:'' observer on the future’s (server-side) lease.
====Importing leased object references====
Similar to futures, leased object references have been built reflectively on top of AmbientTalk. The system library shipped with AmbientTalk contains the reflective implemention that adds leased references to the language kernel. This implementation can be found in the file at/lang/leasedrefs.at and /at/lang/leasedrefstrait.at.
To use the language constructs for leased references, you should import the leasedref module as follows:
import /.at.lang.leasedrefs;
leasedrefs module exports support primitives to manipulate time intervals (i.e. ''minutes'', ''seconds'', ''millisecs'') so that you do not need to explicitly import the timer module. Remember to exclude those methods from the leasedrefs import statement if some other module has already imported them, e.g. if futures are enabled.
More information pertaining to the API of the leased references language module can be found in the [[appendix|appendix]].
===== Taking Offline Remote Objects =====
AmbientTalk distributed memory management scheme has been based on [[http://en.wikipedia.org/wiki/Reference_counting#Variants_of_reference_counting|reference listing]] and network objects. Similar to these techniques, remote far references are implemented by means of a proxy at client-side, which encapsulates a wire representation of the exported object and whose methods are stubs that transform local message sends into distributed message sends. Messages sent to the server include method invocation information, as well as the wire representation of the receiver. Each actor maintains an export object table which maps wire representations onto local references to exported objects. This table is used by the interpreter to resolve remote far references into local object references. Objects in the export table are considered as root objects for the [[http://en.wikipedia.org/wiki/Garbage_collection_(computer_science)|local garbage collector]]. When an object is removed from the export table, it is no longer accessible remotely and, importantly, it no longer belongs to the set of root objects and as such, it becomes subject to garbage collection once it is no longer locally referenced.
As previously explained, in order to deal with volatile connections, remote far references tolerate network disconnections by default. To enforce this, objects pointed by remote far references are not automatically taken offline when all their remote far references become disconnected, as traditionally happens in the above mentioned techniques. Rather, the object is kept in the export table since disconnected remote far references remain valid and still refer to the server object while disconnected. This implies a remote object is never reclaimed. AmbientTalk provides the ''takeOffline:'' primitive that takes a remote object offline, enabling the object to be eventually locally garbage collection.
takeOffline: object
The primitive takes as parameter an object which is removed from the export table of the actor where the code is executed. When the object is removed from the export table, all remote far reference to the object become invalidated and the object no longer belongs to the set of root objects and as such, it can be eventually reclaimed by Java's local garbage collector once it is no longer locally referenced. Although the actual reclamation of an unexported object may be triggered at a later point in time, any attempt to access via a remote far reference results in an ObjectOffline exception notifying the client object that the object was taken offline and thus, its remote far references is invalid.
[[distribution#dealing_with_permanent_failures|Leased object references]] make use of the ''takeOffline:'' primitive to terminate the access to a remote object once the lease time elapses. Note that the ''takeOffline'' primitive can be considered the equivalent to the so-called [[http://en.wikipedia.org/wiki/Manual_memory_management|''delete'' operation]] provided by some sequential languages without built-in local garbage collection. Rather than using this primitive for garbage collection purposes, regular AmbientTalk developers are encouraged to make use of the high-level language constructs for leasing explained [[distribution#dealing_with_permanent_failures|in the previous section]] which aids them to deal with both transient and permanent disconnections and properly reclaim their remote objects.
====Working with objects taken offline====
On the client side, taking offline an object results in a permanent disconnection of the remote far references pointing to it. In other words, despite having network connection, unexporting an object renders remote far references permanently disconnected. This implies that client have to deal explicitly with unexported objects.
Clients can get notified when an object is taken offline by means of ''when:disconnected:'' observers. Disconnected observers get triggered since the taking offline event is considered as a logical disconnection between two devices. However, once an object taken offline is removed from the export table, it cannot go online anymore. As a result, the disconnected and reconnected observers will be no longer trigger for that remote far reference.
Additionally, AmbientTalk provides dedicated takenOffline observers which get triggered only once when the object is taken offline. They are installed similarly to when:disconnected observers as follows:
when: messenger takenOffline: {
system.println("Buddy offline: " + name);
//clean certain resources associated to the buddy
};
The construct takes as parameter a far reference and a block of code that is executed when the taken offline event is notified to the virtual machine. ''when:takenOffline'' observers can be considered as a special kind of ''when:disconnected'' observers which are triggered only in case of "logical" disconnection, but not in cases of network disconnections.
Disconnection, reconnection and takenOffline observers can be also installed for the inter-actor far references. Such inter-actor far references are local to a virtual machine and as such, network failures cannot happen. In that case, the disconnected and takenOffline observers are triggered when the object pointed to by the far reference is taken offline. Reconnected observers won't be ever triggered.
====Distributed unit testing and takeOffline====
As previously mentioned, the ''takeOffline'' primitive notifies connected virtual machines when taking an object offline. Since AmbientTalk 2.12, the primitive not only notifies remote virtual machines, but it also notifies the local virtual machine where the ''takeOffline'' was executed so that if any inner actor has local far references to that object, it will also get notified. In other words, if an object is taken offline, all actors holding (local or remote) far references to it get notified.
These semantics is useful for unit test purposes. The [[appendix#unit_testing_framework|unit testing framework]] shipped with AmbientTalk has support for asynchronous test invocations which can be used to perform concurrent or distributed unit tests. In fact, distributed unit tests are simulated by means of concurrency, i.e. using different actors in your unit test which make use of the network facilities. However, distributed unit tests cannot simulate network disconnections by means of ''network.offline'' since the construct works at virtual machine level. In other words, ''network.offline'' simulates a network disconnection of a complete virtual machine, but what those tests need is to simulate the disconnection of a particular actor inside the virtual machine where they are running.
By means of the ''takeOffline'' primitive, the intra-actor access to an object can be cut (independently if the actor is located within the same virtual machine or in a remote one), enabling the simulation of the disconnection of an actor inside a virtual machine.