Copyright Red Hat 2014 - 2020

This document is licensed under the "Creative Commons Attribution-ShareAlike (CC-BY-SA) 3.0" license.

This is the jgroups-raft manual. It provides information about

  • Design and architecture

  • Configuration and use

of jgroups-raft.

Bela Ban, Kreuzlingen Switzerland 2015

1. Overview

The jgroups-raft project is an implementation of Raft in JGroups.

It provides a consensus based system where leader election and changes are committed by consensus (majority agreement). A fixed number of nodes form a cluster and each node is a state machine. A leader is elected by consensus and all changes happen through the leader which replicates them to all nodes, which add them to their persistent log.

Because Raft guarantees that there’s only ever one leader at any time, and changes are identified uniquely, all state machines receive the same ordered stream of updates and thus have the exact same state.

Raft favors consistency over availability; in terms of the Cap theorem, jgroups-raft is a CP system. This means jgroups-raft is highly consistent, and the data replicated to nodes will never diverge, even in the face of network partitions (split brains).

In case of a network partition, in a cluster of N nodes, at least N/2+1 nodes have to be running for the system to be available.

If for example, in a 5 node cluster, 2 nodes go down, then the system can still commit changes and elect leaders as 3 is still the majority. However, if another node goes down, the system becomes unavailable and client requests will be rejected. (Depending on configuration, there may still be some limited form of read-only availability.)

By implementing jgroups-raft in JGroups, the following benefits can be had:

  • Transports already available: UDP, TCP

    • Contains thread pools, priority delivery (OOB), batching etc

  • Variety of discovery protocols

  • Encryption, authentication, compression

  • Fragmentation, reliability over UDP

  • Multicasting for larger clusters

  • Failure detection

  • Sync/async cluster RPCs

The code required to be written for a full Raft implementation is smaller than if it had been implemented outside of JGroups.

The feature set of jgroups-raft includes

  • Leader election and append entries functionality by consensus

  • Persistent log (using LevelDB)

  • Dynamic addition and removal of cluster nodes

  • Cluster wide atomic counters

  • Replicated hash maps (replicated state machines)

1.1. Architecture

The architecture of jgroups-raft is shown below.

Architecture of jgroups-raft,align=left,valign=top,width=
Figure 1. The architecture of jgroups-raft

The components that make up jgroups-raft are

  • A JGroups protocol stack with jgroups-raft specific protocols added:

    • NO_DUPES: makes sure that a jgroups-raft node does not appear in a view more than once

    • ELECTION: handles leader election

    • RAFT: implements the Raft algorithm, ie. appending entries to the persistent log, committing them, syncing new members etc

    • REDIRECT (not shown): redirects requests to the leader

    • CLIENT: accepts client requests over a socket, executes them and sends the results back to the clients

  • Channel: this is a regular JGroups JChannel or ForkChannel

  • RaftHandle: the main class for users of jgroups-raft to interact with

  • StateMachine: an implementation of StateMachine. This is typically a replicated state machine. jgroups-raft ships with a number of building blocks implementing StateMachine such as CounterService or ReplicatedStateMachine.

The figure above shows one node in a cluster, but the other nodes have the same setup except that every node is required to have a different raft_id (defined in RAFT). This is a string which defines one cluster member; all members need to have different raft_ids (more on this later).

2. Using jgroups-raft

2.1. Cluster members and identity

Each cluster member has an address (a UUID) assigned by JGroups and a raft_id which needs to be assigned by the user. The latter is a string (e.g. "A") which needs to be unique in the entire cluster. In other words, the raft_id is the identity of a member for the sake of jgroups-raft.

A Raft cluster has a fixed size, so that a majority can be computed for leader election and appending of entries. The members allowed into the cluster is defined in RAFT.members, e.g.

<raft.RAFT members="A,B,C" raft_id="${raft_id:undefined}"/>

This defines a cluster of 3 members: "A", "B" and "C" (whose majority is 2).

These are the raft_id attributes of the 3 members, so attribute raft_id in the example above needs to be one of them. If we don’t start this member with the system property -Draft_id=X (where X needs to be "A", "B", or "C"), then the member will start up as "undefined" is not a member of {"A", "B", "C"}.

Note Note that while RAFT ensures that non-members cannot join a cluster, the NO_DUPES protocol makes sure that no duplicate member can join. Example: if we have RAFT.members="A,B,C" and actual members "A" and "B" joined, then a join attempt by a member with duplicate name "B" will be rejected and that member won’t be able to join.

Attribute raft_id is also used to define the location of the persistent log; unless log_name is defined in RAFT, the location is computed as <temp_dir>/<raft_id>.log, e.g. /tmp/A.log.

Note that members can be added and removed dynamically (without taking the entire cluster down, changing the configuration and restarting it), see Adding and removing members dynamically.

2.2. RaftHandle

As shown in The architecture of jgroups-raft, RaftHandle is the main class users will be dealing with. It provides methods to change state (append entries) in the replicated state machines, and a state machine can be registered with it. The state machine will be initialized at startup and updated by jgroups-raft whenever there is a change.

A successful change is committed to the persistent logs of all cluster members and applied to their state machines, so all state machines have exactly the same state.

2.2.1. Creation

An instance of RaftHandle is associated with exactly one JGroups channel, and can be created as follows:

JChannel ch=new JChannel("/home/bela/raft.xml"); 1
RaftHandle handle=new RaftHandle(ch, this);      2
ch.connect("raft-cluster");                      3
1 A new JGroups channel is created (see the JGroups manual for details on the JGroups API)
2 A RaftHandle instance is created over the channel (which must be non-null). The second argument is an implementation of StateMachine. If null, no changes will be applied. The state machine can be set with stateMachine(StateMachine sm).
3 The channel is connected which causes the member to join the cluster

2.2.2. Making changes

The setX() methods can be used to make changes:

byte[] set(byte[] buf, int offset, int length) throws Exception; 1
byte[] set(byte[] buf, int offset, int length, long timeout, TimeUnit unit) throws Exception; 2
CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length); 3
1 Synchronous change; the caller will be blocked until the change has been forwarded to the leader, which sends it to all cluster members which apply it to their persistent logs and ack the change back to the leader. Once the leader gets a majority of acks, it commits the change to its own log, applies it to its state machine and returns the response to the caller. The state machines thus only contain committed changes.
2 Same as above, except that this call is bounded with a timeout. If it elapses before a majority of acks have been received, a TimeoutException will be thrown.
3 Asynchronous change; this method returns immediately with a CompletableFuture which can be used to retrieve the result later, or to provide some code that’s executed as soon as the result is available (e.g. whenComplete()).

The contents of the request and response buffers is application specific.

For example, if we implemented a replicated hash map, then a request could be a put(key,value). The put() would have to be serialized into the buffer, as well as the key and the value.

When committing the change, every state machine needs to de-serialize the buffer into a put(key,value) and apply it to its state (see Implementing a StateMachine). If there is a return value to the put() call, e.g. the previous value associated with key, then it will be serialized into a buffer and returned as result of one of the setX() calls.

2.2.3. Implementing a StateMachine

StateMachine is an interface and is defined as follows:

public interface StateMachine {
    byte[] apply(byte[] data, int offset, int length) throws Exception;  1
    void   readContentFrom(DataInput in) throws Exception;               2
    void   writeContentTo(DataOutput out) throws Exception;              3
}
1 This method is called whenever a log entry is committed. The buffer’s contents are application specific (e.g this could be a serialized put(key,value) as discussed above. If there is a return value of applying the change to the state machine, it needs to be serialized so that it can be returned to the caller (e.g. a client).
2 This method is called when RAFT needs to initialize a state machine from a snapshot (a dump of a state machine’s contents to an external stream (e.g. a file)). The writeContentTo() method below wrote the contents to a file before, in an application specific format, and this method now needs to read the contents back into the state machine.
3 This method is the opposite of readContentFrom() and writes the contents of this state machine to a stream (e.g. a file).

2.2.4. Snapshotting and state transfer

All cluster members maintain a persistent log and append all changes as log entries to the end of the log. To prevent logs from growing indefinitely, a snapshot of the state machine can be made and the log truncated. This is done (programatically) with method snapshot(), or declaratively (see below).

This method calls StateMachine.writeContentTo() to dump the state of the state machine into a snapshot file and then truncates the log. New members who don’t have a log yet are initialized by sending them the snapshot first. After that, they will catch up via the regular Raft mechanism.

Logs can be snapshot automatically by setting RAFT.max_log_size to the max number of bytes that a log is allowed to grow to until a snapshot is taken.

2.2.5. Miscellaneous methods

Other methods in RaftHandle include:

leader()

Returns the address of the current Raft leader, or null if there is no leader (e.g. in case there was no majority to elect a leader)

isLeader()

Whether or not the current member is the leader

addRoleListener(RAFT.RoleChange listener)

Allows to register a RoleChange listener which is notified when the current member changes its role (Leader, Follower, Candidate)

currentTerm()

Returns the current term (see Raft for details)

lastApplied()

Returns the index of the last log entry that was appended to the log

commitIndex()

Returns the index of the last log entry that was committed

raft()

Returns a reference to the RAFT protocol in the current member’s stack. Provided for experts who need to access RAFT directly.

raftId(String id)

Used to set the raft_id programmatically (note that this can also be done by setting raft_id in RAFT in the XML configuration. For example, the following code sets raft_id from the command line:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);   2
    RaftHandle handle=new RaftHandle(ch, this).raftId(raft_id); 3
    ch.connect("raft-cluster");  4
}

public static void main(String[] args) throws Exception {
    new bla().start(args[0]);  1
}
1 The raft_id can for example be passed to the program as an argument
2 The channel is created and its logical name set to be the same as raft_id. This is not necessary, but convenient.
3 Now raft_id can be set via RaftHandle.raftId(String id).

2.3. Configuration

The configuration of a member is either done declaratively via an XML config file or programmatically. Refer to the JGroups documentation for details.

A sample XML configuration file is shown below (edited for brevity):

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <UDP
         mcast_addr="228.5.5.5"
         mcast_port="${jgroups.udp.mcast_port:45588}"/>
    <PING />
    <MERGE3 />
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <pbcast.NAKACK2 xmit_interval="500"
                    discard_delivered_msgs="true"/>
    <UNICAST3 xmit_interval="500"
              max_msg_batch_size="500"/>
    <pbcast.STABLE desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <raft.NO_DUPES/>                                                         1
    <pbcast.GMS print_local_addr="true" join_timeout="2000"
                view_bundling="true"/>
    <UFC max_credits="2M" min_threshold="0.4"/>
    <MFC max_credits="2M" min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <raft.ELECTION election_min_interval="100" election_max_interval="500"/> 2
    <raft.RAFT members="A,B,C,D" raft_id="${raft_id:undefined}"/>            3
    <raft.REDIRECT/>                                                         4
    <raft.CLIENT bind_addr="0.0.0.0" />                                      5
</config>
1 NO_DUPES: checks that joining a new member doesn’t lead to duplicate raft_ids in the membership. Rejects the JOIN if it would. Must be placed somewhere below GMS
2 ELECTION: this protocol implements leader election, as defined in Raft. It is independent from RAFT and could (and may, in the future) be replaced with a different election protocol. Attributes election_min_interval and election_max_interval define the range from which jgroups-raft picks a random election timeout.
3 RAFT: the main protocol implementing log appending and committing, handling state machine updates, snapshotting etc. Attribute members defines the (fixed) membership (may still be redfined by addServer/removeServer log entries when initializing a member from the persistent log). Attribute raft_id defines the ID of the current member (needs to be an element of members, as discussed earlier).
4 REDIRECT is used to redirect requests to the current Raft leader, or to throw an exception if no member is leader
5 CLIENT listens on a socket (port 1965 by default) for client requests, executes them and sends the result back to the clients. Currently, addServer and removeServer has been implemented.

This is a regular JGroups XML configuration, except that jgroups-raft added a few additional protocols.

2.4. Adding and removing members dynamically

The RAFT protocol provides methods addServer(String raft_id) and removeServer(String raft_id) to add and remove servers from the static membership (defined by RAFT.members). Only one server at a time can be added and removed, and adding or removing a server needs a majority ack to be committed.

Both methods are exposed via JMX, so jconsole could be used. However, jgroups-raft also provides a script (client.sh) to do this in a more convenient way. The script uses Client to connect to a member’s CLIENT protocol running at localhost:1965 (can be changed). The request is then forwarded to the current leader.

The steps to add a member are as follows (say we have RAFT.members="A,B,C" and want to add "D"):

  • Call bin/client.sh -add D

    • If needed, -port PORT or -bind_addr ADDR can be given, e.g. if we need to reach a member running on a different host

  • Once A (the leader) processed addServer("D"), everybody’s RAFT.members is "A","B","C","D"

  • At this point, the XML configuration files should be updated so that RAFT.members="A,B,C,D"

  • If not, members will read the correct membership when getting initialized by their logs

  • A new member D can now be started (its XML config needs to have the correct members attribute !)

3. Building blocks

Similar to JGroups' building blocks, jgroups-raft also has building blocks, which provide additional functionality on top of a RaftHandle. They are typically given a JChannel, create a RaftHandle and register themselves as StateMachine with the handle. Building blocks offer a different interface to the users, e.g. a replicated hashmap with puts and gets, or a distributed counter or lock.

3.1. ReplicatedStateMachine

ReplicatedStateMachine is a key-value store replicating its contents to all cluster members. Contrary to the JGroups equivalent (ReplicatedHashMap), changes are replicated by consensus and logged to a persistent log.

While the JGroups version is allowed to make progress during network partitions, and users need to merge possibly diverging state from different partitions after a partition heals, ReplicatedStateMachine will allow progress only in the majority partition, so no state merging needs to be done after a partition heals.

Not having to merge state is certainly simpler, but comes at the expense of availability: if N/2+1 members leave or split into different partitions, ReplicatedStateMachine will be unavailable (all requests will time out).

However, the advantage is that the members' states will never diverge.

ReplicatedStateMachine requires a JChannel in its constructor and has put(), get() and remove() methods. The code below shows how to create an instance of ReplicatedStateMachine and add an element to it:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);
    ReplicatedStateMachine<String,String> rsm=new ReplicatedStateMachine<>(ch);
    rsm.raftId(raft_id);
    ch.connect("rsm-cluster");
    rsm.put("name", "Bela");
}

There’s a demo ReplicatedStateMachineDemo which can be used to interactively use ReplicatedStateMachine.

3.2. CounterService

CounterService provides a replicated counter which can get be set, get and compare-and-set, implementing JGroups' Counter interface:

public interface Counter {
    public long    get();
    public void    set(long new_value);
    public boolean compareAndSet(long expect, long update);
    public long    incrementAndGet();
    public long    decrementAndGet();
    public long    addAndGet(long delta);
}

A Counter implementation is created via the CounterService building block (edited):

public class CounterService implements StateMachine {
    public CounterService(Channel ch);
    public long           replTimeout();
    public CounterService replTimeout(long timeout);
    public boolean        allowDirtyReads();
    public CounterService allowDirtyReads(boolean flag);
    public CounterService raftId(String id);

    /**
     * Returns an existing counter, or creates a new one if none exists
     * @param name Name of the counter, different counters have to have different names
     * @param initial_value The initial value of a new counter if there is no existing counter.
     * Ignored if the counter already exists
     * @return The counter implementation
     */
    public Counter getOrCreateCounter(String name, long initial_value) throws Exception;


    /**
     * Deletes a counter instance (on the coordinator)
     * @param name The name of the counter. No-op if the counter doesn't exist
     */
    public void deleteCounter(String name) throws Exception;
}

CounterService is mainly used to get an existing or create a new Counter implementation (getOrCreateCounter()), or to delete an existing counter (deleteCounter()).

To create an instance of CounterService, a JChannel has to be passed to the constructor. The sample code below shows how to use this:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);
    CounterService cs=new CounterService(ch);               1
    ch.connect("counter-cluster");
    Counter counter=cs.getOrCreateCounter("mycounter", 1);  2
    counter.incrementAndGet();                              3
    counter.compareAndSet(2, 5);                            4
    long current_value=counter.get();                       5
}
1 First a CounterService is created and given a reference to a channel
2 Once the member has joined the cluster, we create a counter named "mycounter" with an initial value of 1
3 The counter is then incremented to 2
4 Now a compare-and-set operation sets the counter to 5 if it was 2
5 The last operation fetches the current value of "mycounter"

Any member in the cluster can change the same counter and all operations are ordered by the Raft leader, which causes the replicated counters to have exactly the same value in all members.

Comparing this to the JGroups equivalent, a jgroups-raft counter never diverges in different members, again at the expense of availability. In the JGroups version, counters are always available, but may diverge, e.g. in a split brain scenario, and have to be reconciled by the application after the split brain is resolved.

There’s a demo CounterServiceDemo which can be used to interactively manipulate replicated counters.

3.2.1. Reads and consensus

Currently (as of jgroups-raft version 0.2), reading a counter is by default dirty, meaning that a read may return a stale value.

This can be changed by calling counter_service.allowDirtyReads(false).

However, this inserts a dummy read log entry which returns the value of counter when committed. Since this dummy entry is ordered correctly wrt writes in the log, it will always return correct values.

The cost is that reads take up space in the persistent logs and that we need consensus (majority) for reads. In the next release of jgroups-raft, the mechanism for client reads as suggested in the Raft paper will be implemented. See Issue 18 for details.

Note Non-dirty reads has not yet been implemented in ReplicatedStateMachine.

3.3. Singleton service

A singleton service is a service which is supposed to run only once in an entire cluster. Typically, in JGroups, a singleton service is started on the first member of a cluster. For example, if we have {A,B,C,D,E}, the singleton service (or services) would be running on A.

If we have a partition, such that the cluster falls apart into {A,B,C} and {D,E}, then an additional singleton would be started on D, as D became coordinator and doesn’t know {A,B,C} didn’t leave, but were partitioned away instead.

When the partition ends, if D is not coordinator anymore, it would stop its singleton services.

If multiple singletons (as provided by JGroups, e.g. during a network split) cannot be tolerated by the application, and the application has a requirement that at most one singleton service can be running (better none than two), jgroups-raft can be used.

The mechanism to implement singleton services in jgroups-raft is leader election: it is guaranteed that at most one leader exists in a given cluster at the same time. This is exactly what we need for singletons. The code below shows how to do this:

JChannel ch=null;
RaftHandle handle=new RaftHandle(ch, this); 1
handle.addRoleListener(role -> {            2
    if(role == Role.Leader)                 3
        // start singleton services
    else
        // stop singleton services
});
1 A RaftHandle is created over a channel
2 A RAFT.RoleChange callback is registered with the handle. Alternatively, addRoleListener() could be called directly on an instance of RAFT retrieved from the protocol stack associated with the given channel
3 When we become the Raft leader, the singleton services can be started, when not, they should be stopped (if running)

4. List of protocols

This chapter describes the most frequently used protocols, and their configuration.

Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with jgroups-raft), e.g. raft.xml, and make only minimal changes to it.

4.1. NO_DUPES

This protocol prevents duplicate members from joining the cluster. The protocol needs to be located somewhere below GMS.

NO_DUPES catches JOIN requests from a joiner to the JGroups coordinator and checks if the joiner’s raft_id is already contained in the current membership, and rejects the JOIN if this is the case.

For example, if we have current members {A,B} and another member with raft_id "B" joins, then the joiner would get the following exception when trying to join the cluster:

-------------------------------------------------------------------
GMS: address=B, cluster=cntrs, physical address=127.0.0.1:64733
-------------------------------------------------------------------
Exception in thread "main" java.lang.Exception: connecting to channel "cntrs" failed
	at org.jgroups.JChannel._connect(JChannel.java:570)
	at org.jgroups.JChannel.connect(JChannel.java:294)
	at org.jgroups.JChannel.connect(JChannel.java:279)
	at org.jgroups.raft.demos.CounterServiceDemo.start(CounterServiceDemo.java:32)
	at org.jgroups.raft.demos.CounterServiceDemo.main(CounterServiceDemo.java:163)
Caused by: java.lang.SecurityException: join of B rejected as it would create a view with duplicate members (current view: [B|1] (2) [B, A])
	at org.jgroups.protocols.pbcast.ClientGmsImpl.isJoinResponseValid(ClientGmsImpl.java:187)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.installViewIfValidJoinRsp(ClientGmsImpl.java:153)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.joinInternal(ClientGmsImpl.java:111)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.join(ClientGmsImpl.java:41)
	at org.jgroups.protocols.pbcast.GMS.down(GMS.java:1087)
	at org.jgroups.protocols.FlowControl.down(FlowControl.java:353)
	at org.jgroups.protocols.FlowControl.down(FlowControl.java:353)
	at org.jgroups.protocols.FRAG2.down(FRAG2.java:136)
	at org.jgroups.protocols.RSVP.down(RSVP.java:153)
	at org.jgroups.protocols.pbcast.STATE_TRANSFER.down(STATE_TRANSFER.java:202)
	at org.jgroups.protocols.raft.ELECTION.down(ELECTION.java:112)
	at org.jgroups.protocols.raft.RAFT.down(RAFT.java:442)
	at org.jgroups.protocols.raft.REDIRECT.down(REDIRECT.java:103)
	at org.jgroups.stack.ProtocolStack.down(ProtocolStack.java:1038)
	at org.jgroups.JChannel.down(JChannel.java:791)
	at org.jgroups.JChannel._connect(JChannel.java:564)
	... 4 more
[mac] /Users/bela/jgroups-raft$

The error message is SecurityException: join of B rejected as it would create a view with duplicate members (current view: [B|1] (2) [B, A]), which shows that view {B,A} already contains a member with raft_id B, and so the JOIN request of the new member is rejected.

4.2. ELECTION

ELECTION is the protocol which performs leader election, as defined by Raft. Its attributes define the election timeout and the heartbeat interval (see Raft for details).

Table 1. ELECTION
Name Description

election_max_interval

Max election interval (ms). The actual election interval is computed as a random value in range [election_min_interval..election_max_interval]

election_min_interval

Min election interval (ms)

heartbeat_interval

Interval (in ms) at which a leader sends out heartbeats

4.3. RAFT

RAFT is the main protocol in jgroups-raft; it implements log appending and committing, snapshotting and log compaction, syncing of new members and so on.

Table 2. RAFT
Name Description

log_args

Arguments to the log impl, e.g. k1=v1,k2=v2. These will be passed to init()

log_class

The fully qualified name of the class implementing Log

log_name

The name of the log. The logical name of the channel (if defined) is used by default. Note that logs for different processes on the same host need to be different

max_log_size

Max number of bytes a log can have until a snapshot is created

members

List of members (logical names); majority is computed from it

raft_id

The identifier of this node. Needs to be unique and an element of members. Must not be null

resend_interval

Interval (ms) at which AppendEntries messages are resent to members which haven’t received them yet

snapshot_name

The name of the snapshot. By default, <log_name>.snapshot will be used

4.4. REDIRECT

The REDIRECT protocol needs to be somewhere above RAFT. It keeps track of the current Raft leader and redirects requests to the right leader. If there is no leader, e.g. because there’s no majority to elect one, an exception will be thrown.

4.5. CLIENT

CLIENT listens on a socket for client requests. When a request is received, it is sent down where it will be forwarded (by REDIRECT) to the current leader which executes the request. The responses is then sent back to the client.

Table 3. CLIENT
Name Description

bind_addr

The bind address which should be used by the server socket. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address

idle_time

Number of ms a thread can be idle before being removed from the thread pool

max_threads

Max number of threads in the thread pool

min_threads

The min threads in the thread pool

port

Port to listen for client requests