State Transfer with Merkle Trees
If performance requirements demand it, computations can be shifted from clusters to cluster clients. Doing this typically requires some amount of shared state between the cluster and cluster clients, but there is no out of the box solution to replicate this data efficiently. Merkle tree supported state transfer can be a useful technique for efficiently replicating slowly changing data from the cluster to read-only cluster clients. This works especially well with DirectBuffer hosted data, such as an off heap repository.
Scenarios where this approach should be avoided, and the replicated state machine approach rather used:
- if the underlying buffer has frequent updates that are typically very small and sparse - such as a few bytes getting changed every few thousand bytes. This would result in a large volume of page transfers to mutate small pieces of data.
- if the underlying dataset cannot be represented as a binary buffer - for example, it is held as a list of POJOs. In that case, state transfer remains possible by simply sending over the current state of POJOs, but Merkle trees would either not be of much help, or would require extensive serialization.
- if the system needs near immediate updates. The Merkle tree approach is best suited for occasional replication, or replication following the application of a number of batch updates.
In order to improve a clustered system's throughput, we may wish to move some computation from the cluster to the edges. To do this, you typically need reference data available - data such as instruments, permissions etc. that are either not changing or are slowly changing. Replicating this data can be difficult to do efficiently - especially if you're working with off heap data structures. Aeron Cluster is itself running a replicated state machine within the Raft protocol, however, the Raft protocol does not include a mechanism for the replication of datasets (or byte buffers) from cluster to cluster clients.
There are two techniques available to replicate data across machines:
- state transfer: this can be a blunt tool, and is typically used for replicating memory across machines. With state transfer, the process simply transfers the current state of the world - without caring about how the data was created or modified. Raft uses this approach for snapshot transfer from cluster leader to new cluster followers on start up.
- replicated state machines: this is a finely grained approach, in which the exact set of operations performed are replicated across machines and replayed in the exact same order. The operations must be applied using deterministic logic. Raft uses this approach to apply changes to the cluster state once all snapshots are in place at follower nodes.
To replicate a DirectBuffer from the cluster to cluster client, we could use either state transfer or a replicated state machine. Each approach has its own pros and cons.
- Replicated state machines are the better solution for transactional data or any other data which has a high sensitivity to being consistent across nodes within a short amount of time. The primary issue with replicating via replicated state machines to cluster clients is with late join scenarios. With this approach, all replicated state machine log entries would need to be replayed in order on top of a starting state and replayed through a deterministic process. If you're running off of Aeron Archive for the transaction logs, you'd need to co-ordinate the replay and join onto an archive of operations performed.
- State transfer is best focused on slowly changing data, such as reference data that can safely be updated within seconds of a batch update. State transfer approach allows a client to join at any time, and, by using Merkle Trees - as in the Viewstamped Replication paper - the most efficient data replication data set is always computed. No snapshot is required.
Repositories and the Merkle tree
Repositories that are built atop byte buffers, such as found in Eider, would need some modifications to run efficiently with Merkle tree based synchronization. Most importantly, it helps to batch updates to the underlying buffer. For example, within an Aeron Cluster, you would accept a command and then 0..m updates would be applied to the underlying buffers. When recomputing the Merkle tree, it helps to understand which pages have been modified, and to take care to only recompute the nodes required.
State Transfer Process
The state transfer process runs on either a cluster timer, or some other external input, and operates in four phases:
- The server process in Aeron Cluster prepares for a new state transfer, and initiates the client state transfer by sending the current server Merkle tree to all clients
- Clients interpret the current state of the cluster, and request all changed pages
- The server responds to each client with the list of required pages
- Clients apply the pages to their local direct buffers, and the process completes.
Phase 1 of the process operates as follows:
- The server process within Aeron Cluster prepares a state transfer session for one or more repositories.
- the server process holds a State Transfer Term per repository. When starting a new state transfer, the transfer term is incremented per repository. This is used to ensure that clients and the server process in Aeron Cluster are working on the same state transfer task
- shadow copies of the current Merkle tree plus the Direct Buffer per replicated repository are created. These copies are the source used for the state transfer term, allowing the Eider Repository to continue accepting changes while the state transfer continues. This ensures that all clients end up with the same buffer as was seen at the start of the state transfer term.
- the Aeron Cluster can then send the Merkle tree to the required Aeron Cluster Clients
Aeron Clients that want to receive state transfer messages will need be wired up appropriately. For those that are wired up, phase 2 operates as follows:
- The client will receive the message, and read the byte buffer parameters, the used page count, and the state transfer term
- The client then compares the client side Eider Repository's Merkle against the one sent from the server. It compares each page's hash from top to bottom, only stepping deeper into the tree if there is a difference in the hash. If it finds differences, it can easily locate the pages which differ and take notes of all the pages that have changed.
- The client sends a message to the Aeron Cluster with the state transfer term and all the pages that need to be transferred over.
In phase 3, the Aeron Cluster hosted server process then completes the Aeron Cluster portion of the state transfer for a given session (i.e. Aeron Cluster client):
- It first checks that the inbound client page state transfer request matches the current state transfer term; if not matching, it will reject by sending the current state transfer term data and buffer over - the clients will then redo the comparison, and request any changes as before but with the new state transfer term.
- If the state transfer term matches, it reads the list of requested state transfer pages off the client request, and sends them along with the state transfer term to the remote client and page number.
Finally, in phase 4, the state transfer then finishes on the client side:
- After receiving the first state transfer page from the server on the same state transfer term, the client process will make a shadow copy of the replicated DirectBuffer.
- For each state transfer page, the client applies the change to the shadow buffer
- Once every page is completely replicated (as checked by the list produced in the first client step), the client computes the new Merkle tree and verifies it against the server Merkle tree for the same state transfer term. It only needs to check the top level hash to know the contents are identical.
- If everything matches as expected, the client replaces the underlying buffer in the Eider Repository, the Eider Repository rebuilds indexes, and the state transfer is complete.
Note that some aspects were simplified in the above description - notably cases around timeouts. These are dealt with by the client side - once the client has responded with a request to perform state transfer, it is responsible for starting a timer to monitor the progress. This timer can be used for retries as needed. The server process in Aeron Cluster purposely uses as little resources as possible.
- Updated 5 December 2020 with a link to the Viewstamp Replication Revisited paper.
- Updated 8 December 2020 with clarifications
- Updated 13 December 2020 with link to Merkle tree note
- --- Views