Feel the Cloud in your hands

Distributed systems are getting more and more popular and necessary for the industrial usage in IT. From Google to Microsoft, Facebook, Amazon and all other big shoots like them are concentrating on distributed storage, systems and analyzers. Cloud computing starts with distributed systems. The definition of Cloud Computing from NIST is as “Cloud computing is a model for enabling convenient, on-demand network access to a shared pool of configurable computing resources (e.g., networks, servers, storage, applications, and services) that can be rapidly provisioned and released with minimal management effort or service provider interaction.”

SWIM is a protocol that is used to collect the group membership information of a large set of machines (we will refer peer in this article) running in large distributed locations. Each and every peer will get acknowledged eventually about if any new peer has joined or any peer has failed.  If we can scale up this protocol with the number of peers then large systems can be built on this protocol. This protocol has been implemented in KTH university along with the help of SICS.

Implementing and Evaluating SWIM

The goal of this project is to implement and evaluate a decentralized membership service that provides information about all the participating processes. A membership service can easily be implemented using a heartbeat mechanism, where each process periodically sends a ping message to all the processes in the system. Failures are detected when a process does not responds to the ping message. However, this solution does not scale well when the number of processes in the system grow because heartbeat based mechanisms requires O(n^2 ) messages in each round, soon flooding the system. To get rid of the ping flooding we will implement SWIM.

The project is split into two phases. In the first phase of the project we will implement SWIM using Kompics that scales to at least hundreds of peers in simulation. In phase two we will modify the system to make it work on the open internet, which is elaborated in another post. The link of the project on GitHub is provided at the end of this article.

Now let’s get more elaborate on SWIM protocol. SWIM: Scalable Weakly-consistent Infection-style Group Membership Protocol mainly developed in Cornell University. We choose the SWIM protocol for the following reasons.

Failure detection with random ping: The specialty of their failure detection mechanism is it doesn’t flood the network with heartbeat messages rather it picks up one random individual peer at a time; and ping and probe that. If it fails in any case to reply the ping then it has been pinged indirectly by some other k peers. Thus it ensured eventual failure detection.

Information dissemination: Information of a failed (or joined) peer is not broadcast among the peers rather it is gossiped among the members. Thus it reduces the load of the messages in the network.

Information is piggybacked: Nodes are pinged in a gossip manner and information of node-join, node-fail or node-leave is piggybacked on the ping (or ack) message. That’s why no additional gossip packets need to disseminate. Thus reducing even more traffic.

Time bound failure detection: Instead of ping randomly peers are pinged in a round robin fashion. This gives time-bounded failure detection.

Solution starts like this, we have a node Mi which will start up with some bootstrap nodes (subset of random live nodes). Mi then starts the process by probing a single random node Mj among the bootstrap nodes by sending a ping. If Mj does not respond the ping then Mi ping indirectly through some k nodes say M1 M2 M3 …Mk.

Even then if Mi doesn’t receive any respond directly or indirectly form Mj, Mi treats Mj as suspended and after a certain period treats as failed node. Both during suspended and failed state Mi will disseminate these information in two separate messages (Mj-suspended and Mj-failed). When other nodes receives this message they mark Mj as suspended or failed accordingly. Meanwhile if Mj itself gets this kind of message then it immediately disseminate message (ie. Mj-live) about its aliveness. All this state change messages (Mj-suspended, Mj-failed, Mj-live) are piggybacked on the normal ping (or ack) message. Thus the failure of Mj is detected and the news of this failure is disseminated among the other nodes.

Figure A: SWIM failure detection mechanism.

The implementation is done in couple of phases and sub-phases (sections). In Phase 1 SWIM is implemented and evaluated and later we will talk about Phase 2.

Phase 1 – SWIM implementation

Bootstrap mechanism

The bootstrap mechanism provides a small subset of existing peers in the system. A new peer injects information about itself by randomly pinging one of the peers provided by the bootstrap server. A basic ping pong mechanism implemented. Select a random peer from the list of alive nodes and send a ping message. When a peer receives a ping message it responds back with a pong message. Using the log messages it checked that the ping mechanism is working.

Membership discovery

In this section we have implemented the piggybacking mechanism to disseminate information about new peers in the system. In the ping-message and pong-message the peers piggyback information about the changes (new/dead/suspected peers) in the system. Upon receiving the messages (ping or pong) the new information is merged with the local state.

Basically three different lists are implemented in a local peer –

  • a list for joined peers,
  • a list for deleted peers and
  • a list for suspected peers.

When a peer joins the system it pings its neighbor piggybacking its own information as a joined peer; upon receiving the ping message the neighbor merges that newly joined peer with its local joined-peer list and responds a pong-message piggybacking all its local information (joined-peers, deleted-peers and suspected peers). The neighbor also adds that new peer in another newly-joined-peers-FIFO list along with other newly joined peers (if any). This FIFO list is then piggybacked to its ping-message and disseminated to other peers. Thus the information of a newly joined peer is disseminated to the system. When the system converges all the nodes has the complete information of right number of joined peers or failed peers form the system.

Failure detection

When a peer does not respond to ping messages it is declared suspected for the time being. Peers that fail to respond to ping messages are not declared dead right away. Such peers are declared to be suspected. The information about the suspected peers is gossiped around. Suspected peers are treated just like alive peers until they are conformed to have failed. If the state of the suspected process is not contested then the peer is declared dead after some predefined timeout. A suspected state of the process is overridden when

  • The peer finds out that other peers have started to suspect it, in such a case it can inject Alive- messages in the system
  • When a peer successfully manages to ping a suspected peer, it declares that the peer is no longer suspected.

The state of the suspected peer can be contested ie. Say a peer A is suspecting peer X and spreading suspect-X message to its neighbors and another peer B is trying to recover the peer X by spreading alive-X to its neighbors. Now a common neighbor C of A and B will have both suspect-X and alive-X message at the same time and will end up in confusion about which message is the latest message. To solve this confusion we have implemented a message version mechanism. In this mechanism peer X will always set and increment a version number whenever it sends its alive-X message. Details about this mechanism is discussed in the SWIM paper as incarnation number.

Now to probe a peer X, first it is pinged by a peer Y. If X fails to pong back then Y picks some random K number of neighbors and use then to ping peer X indirectly. Also, we have done both random and round-robin probe target selection in this project and observed different behaviors of the system based on the mechanism of probe target selection. As per our observation, random probe target selection performs better for the convergence of membership list at each node, whereas round-robin probe target selection is better for converged failure detection at all nodes.

Failure detection test has been implemented on the scenario file SwimScenarioP1T2.java file where we start 100 nodes and then kill 5 of them. The system successfully converges and the dead nodes are disseminated among the alive nodes. Dead nodes are suspected for a while before they were declared dead.

Failure detection with suspicion mechanism has been implemented on the scenario file SwimScenarioP1T3.java file where we started 7 nodes and then a node 28 has been disconnected. As 28 is disconnected all other nodes starts to suspect it and itself starts to suspect its neighbor. Then within a short period of time (before it is declared dead by other nodes) it is re-connected again. After reincarnation 28 disseminates LIVE message with higher incarnation number and thus it becomes alive-node to all other nodes again.

Failure detection with K-indirect ping has been implemented on the scenario file SwimScenarioP1T5.java file where 3 (node id 10, 16 and 20) nodes has been taken where 16 has only one link to connect 10. Then the link between 16 and 10 is broken. At this point 16 will start suspecting 10 as well as send ping-request to 20 for 10. Thus 16 will detect 10 with indirect ping to 20. Currently the indirect ping is set to PING_REQ_RANDOM_K =2 in the SWIM component but it can be calibrated.

We have run multiple experiments for varying amount of failures in different network sizes. We did not set any limit on information exchange for these experiments. The scenario file used for our experiments is SwimScenarioP1T6.java. We have declared a variable, named total_nodes to bootstrap different size networks. For each network size (if network size > number of failures), we have killed 1, 5, 10, 20 and 30 peers. We measure the convergence time, when all alive peers are able to detect the failures of all killed nodes. The result is reported in Figure 1. From Figure 1, we can see that convergence time does not depend on number of failures; however, increases with network size. This is due to the latency of spread of infection through the group, so larger group size results in larger latency. Our result also coincide with the evaluation presented in [1].

Figure 1: Failure Detection Convergence Time for varying amount of failures and network sizes

We have set the information exchange limit to (1,2,3, .. lnn,lnn). The scenario file used is SwimScenarioP1T7.java. For network size of 100, we have killed 5, 10 and 20 nodes. For each amount of failures, we have measured convergence time for different limits of information exchange. Convergence time implies the time required for all alive nodes to detect all failures in the system. Figure 2 shows the result. As we can see, when information exchange limit is set to the lowest, 1, it takes the highest time to converge, due to the highest latency of infection-style dissemination using limited information exchange. However, increasing the limit of information exchange by 1, reduces convergence time almost by half. As the amount of information piggybacked on the ping-pong messages increases, the convergence time decreases, for all amounts of failures.

Figure 2: Failure Detection Convergence Time for limiting information exchange and network size of 100.

The whole project is developed under Kompics framework which is a component model framework for developing distributed systems. SWIM is implemented as SWIM component in SwimCopm.java file. The component architecture for this project is as figure 3.

Figure 3
Figure 3


  1. SWIM: Scalable Weakly-consistent Infection-style Group Membership Protocol
  2. Kompics
  3. Project

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s