Distributed transactions and scalability issues in large-scale distributed systems

Distributed transactions is the main evil of scalability

It is very hard to scale distributed transactions to an extremely high level, moreover they reduce throughput. Unlike a transaction on a local database, a distributed transaction involves altering data on multiple nodes. It can be a database + JMS broker or just a set of different databases. As an example let’s recall a classical 2-phase commit (2PC later) – a type of atomic commitment protocol in a back-end service with high volume of transactions. This protocol provides ACID-like properties for global transaction processing. I won’t go into details how it works under the hood, I’ll just tell you that C (Consistency) from ACID is the main evil of  scalability in distributed systems. It puts a great burden due to its complex coordination algorithm.  Overall throughput can drop up to a few times. Locks in all of the data sources are being held during 2PC. The longer duration locks create the risk of higher contention. The 2PC coordinator also represents a Single Point of Failure, which is unacceptable for critical systems.  For systems that have reasonably high volume of messages, or sensitive SLAs, it’s worth giving up strong consistency for throughput.

But how can I live without distributed transactions to achieve higher scalability?

Algorithms such as 2PC use “Exactly Once” technique whereas we will use “At least Once” technique. The difference is that a developer should take care of that in his application code to cope with it. Most queueing technologies provide acknowledgements that a message has been accepted (handling is a separate deal). Databases use local transactions. We can deal with downstream failures without coordination. Read on!

Idempotence and fault tolerance

From math, idempotence is as simple as that:


That is, the result stays the same, no matter how many times a function gets called on the same argument. In distributed world Idempotence implies that an operation can be invoked repeatedly without changing the result. Why do I need one? Because we should somehow resolve processing duplicate requests in case of a system failure. Let’s make it clear by considering an example. A client-appliction sends a financial transaction to a server (there might be a cluster of them load-balanced or just one) and waits for acknowledgment. For some reason, at this particular time:

  • A server goes down or
  • Client goes down or
  • Network failure happens

In all of these 3 cases, a client-app didn’t get an acknowledgment message (reply) from the server about a transaction status. Of course, the client then should retry this transaction. The server must ensure that this financial transaction is accomplished “At least Once”. Here comes to the rescue idempotence. The server must remember a state – that a transaction with this Id has already been processed including saved acknowledgement message in order to check that it exists and reply with its acknowledgement message in case it does. We don’t have expensive distributed transactions anymore – “At least Once” is a more relaxed and scalable approach. That is, instead of locking resources everywhere, we can assume that messages will arrive at least once.

Optimistic locking

Even though this technique is quite old, one goes well with idempotence. If two people are trying to affect change to the same entity at the same time we don’t lock database records, rather we use a concept of versioning and optionally uniqueness. The idea is to save a version of each entity record in the database but to make sure before saving it wasn’t changed. A simple example is a self-service kiosk where people check-in before boarding at the airport. They can select a vacant seat from the seat map.


Each seat has a version = 1. When multiple people make their choice in parallel before proceeding the system simply checks if a seat-version hasn’t changed. If it has a user is notified that the seat already been taken while she was thinking. This is a very simple example where version either can be 1 or 2. A more difficult situation could be in order-management systems where an order might have many versions but that doesn’t change the point how optimistic locking works.  The idea again yields great trade-off in terms of speed because we don’t use locking-mechanism.

Local atomic transactions and unique constraints

Local atomic transactions are usually restricted to a single store. Local transactions are primarily needed to apply a set of operations atomically to a single resource (e.g. relational database) as well as ensure correct ordering of operations within a transaction. In some cases, we can do away with transactions, particularly if we don’t care about the order of operations within a transaction. In that case we can process operations asynchronously leading to a better throughput again. Sometimes, a model requiring the order can be redesigned for  asynchronicity of operations.

Putting it all together

In order to achieve greater throughput a system should correspond to the following principles:

  1. You can retry the operation if there is a failure down the stream based on idempotence.
  2. Don’t use transactions and use optimistic locking if possible – it’s much cheaper.
  3. Local transactions based on a single phase commit for each resource are more scalable than distributed ones increasing overall application availability.
  4. Messages may be reordered.

Wrapping up

Such great systems as Google’s Bigtable or Spanner don’t support traditional ACID transactions because they have a heavy overhead on a highly distributed data storage model. I was lucky to use all above techniques in my applications too involving mission-critical financial transactions and must say that a few years ago not so many people knew about the techniques but now I can hear about them more and more often. Oh yeah, I almost forgot! I urge you to read this great article written by Pat Helland that has even more use-cases. I bumped at it during my research to know more. And remember, you can live without distributed transactions if you implement idempotence and downstream failures correctly.


1. Your Coffee Shop Doesn’t Use Two-Phase Commit by Gregor Hohpe.

2. Idempotence Is Not a Medical Condition by Pat Helland.

3. 2PC or not 2PC, Wherefore Art Thou XA? by Dan Pritchett.

4. Life beyond Distributed Transactions: an Apostate’s Opinion by Pat Helland.


Service Discovery in distributed systems

After quite a while I’ve decided to continue writing my blog. There are always lots of ideas to share with you but as usual a lack of time.


Service Discovery is an architectural pattern and a key component of most distributed systems and service oriented architectures.

 In simple terms it is a central gateway for all client applications that want to use other services. A client application doesn’t have to know where a particular service is located (usually IP:port), moreover a service can be moved/redeployed to arbitrary boxes for any reasons. There’s no need to change connection details, update configs or whatever – Service Discovery will take care of that. You as a client-app just need to ask it to get access to other services. The pattern yields a good benefit especially when there are hundreds of client applications and/or dozens of services.

Most of the work in this article was made relying on trials and errors as well some research. I urge you to read also a great survey from Jason Wilder’s blog on Open Source Service Discovery frameworks.

In this particular section I won’t mention CAP-properties as they can vary depending on implementation. But CAP is important. If you want to know more about it I recommend moving to this article devoted to the CAP-theorem.

High level components

Service Discovery is the process of finding a suitable Service and its location for a given task that asked a service consumer.  Let’s break down Service Discovery into 3 main components:

Service Requestor

Any consuming application that want to use some service,  that is a client application or a consumer, user, etc.

Service Provider

In the most basic scenario there is a Service Provider that finds this service and talks to Service Requestor.

Service Registry

Service Registry stores information about all services. It can be either static or dynamic. This can be IP:port, access rights and so forth.

This model comes directly from SOA but can be adapted to your needs. Conversation among 3 above entities might differ depending on implementation. As a quick example of such segregation refer to Web Service Discovery.

Discovery options

1. Static service registry

This one is a very basic and easy option. A big drawback – it is static. Every time a service moves to another box, static repo should be updated manually. We won’t go further here as it is quite obvious and have little benefit for flexibility.

2. Announcement

This pattern implies that as soon as a service is up, it registers itself in the Service Registry – sometimes such a notification is called Announcement. This simple approach greatly simplifies maintenance of service registry. It’s worth mentioning  also that in distributed world we always care about No Single Point Of Failure principle where we have at least 2 redundant nodes for a service. Service Registry is usually implemented as a separate  independent component to be self-sufficient. Just for simplicity the registry can be a simple replicated memCached, ehCache or another storage, not necessarily cache.

A few words about ZooKeeper as a solution for Service Discovery.

A counterexample is ZooKeeper that does’t follow Shared Nothing architectural pattern. Service Registry is coupled with the Service Provider. In ZooKeeper’s world coordination-nodes in ensemble are NOT independent of each other because they share Service Registry and there’s a write-contention due to the order of messages. The last word leads to ZooKeeper’s poor write-scalability due to its synchronous replication among nodes. But in terms of requirements most applications involved into configuration management including Service Discovery don’t require frequent writes but rather reads not blocking overall scalability.

Let’s pick up where we left off –  announcement.

The process of announcement is depicted below:


3. Multicasting

Using IGMP-protocol, a Service provider subscribes to a particular group, all services send a multicast message to this group. The downside of this approach is that IGMP is optional for IPv4 and not every hardware supports it.

Health checking

Once a service is registered in Service Registry a Service Provider needs to know if it’s alive and healthy (that is, 100% of service is available).

Option 1 – on reconnect: As soon as a Service Requestor understands that there’s no connection to the service it worked with, it reverts back to the Service Provider to get a new one. Each time a Service Requestor accesses Service Provider, one does health-checking (let’s say it is almost the same as heartbeats plus probably some other stuff) to make sure that the service is alive, maintaining the Service Registry and returning response to the client containing information about a healthy service.

Option 2 – heartbeating: Service Provider receives heartbeats within equal time-slices from registered services. Once heartbeat is lost, the Service Provider removes corresponding record from the Service Registry. This is a good option to get a more real-time information though it is more difficult to implement. This way works ZooKeeper and other distributed systems.

Service Down_pn

In both options if some service goes down clients get back to Service Provider. What strategy to use is up to the developer but option 1 is more complex but a better one in most scenarios.

Load Balancing

Load balancing can be easily implemented inside Service Provider as one becomes a central gateway of access for all Service Requestors to other services. For the sake of simplicity a very simple variation is round-robin. I won’t go into details as it is a different topic and you can find on the internet.


Again as with load balancing you might want to apply tokens with expiration or cryptographic keys.

Cooperation of all components

On a very high level the communication of components can happend as on the following picture:

Service Request_pb


We have defined what a Service Discovery is, broke it down into 3 components and described a few basic approaches.

HighLoad Conference 2013

Today I was lucky to have attended the annual developers conference HighLoad++ 2013. This is the 7-th time. Some conference sessions are translated into English and available here: http://www.highload.co/freeonline/


Listeners heard many buzzwords like horizontal scalability, replication, sharding and NoSql. Despite that I missed the first day of the conference what I really liked was not just new trends in technology but invaluable speakers’ experience  on real projects with pros and cons, especially:

1.  “MySql versus something else” by Mark Callaghan [FaceBook] on storage efficiency, framework for analysis and benchmarking and database algorithms: https://www.facebook.com/MySQLatFacebook


2. “Cassandra vs In-Memory Data Grid in eCommerce” by Alexander Soloviev  [Grid Dynamics] was very informative on deep analysis and benchmarking against different cases and their pros and cons.

3. “Query Optimizer in MariaDB: now w/o indices” by Sergey Golubchik [Monty Program Ab]

MinHash algorithm or how to quickly find similarities among 2 documents

MinHash is a technique from Locality Sensitive Hashing allowing to find similarities among 2 sets. This is a  buzzword frequently met in Data Mining  and Data Science fields of CS. What surprising is that this method was invented in 1997 and used in AltaVista web-search engine back in the 90s to find similarities among web-documents and it also can be used to:

  • Find duplicates
  • Near duplicate image detection
  • Near neighbor search

Basically the algorithm can be applied to anything that can be presented by numbers.

Let’s start with a bit of math from theory of probability and statistics.

Define a formula of two sets A and B:

 J(A,B) = {{|A \cap B|}\over{|A \cup B|}}.

This is so-called  a Jaccard coefficient.

Where: J ∈ [0..1]

j = 0 – if A ∩ B = 0, that is 2 sets are disjoint meaning there are no similarities

j = 1 – if A ∩ B = A = B, that is 2 sets are identical.

A, B are more similar when their Jaccard coefficient is closer to 1.

This simple formula is cumbersome if the sets are quite large, e.g. 2 web-documents of more than 1MB in size. Ouch, that’s too much. 1MB of text-data is 1,048,576 characters provided that 1 ASCII char = 8 bits (of course for unicode charset it is greater).

Now that we understand a bit of theory let’s try to apply hashing to Jaccard coefficient. Everywhere I hear hashing it always leads to randomized algorithms.

Ok, let’s move on. The main idea is that similar objects hash to the same bucket. This follows from the fact that probability of collision higher for similar objects.

Here we give an example for 2 sets A and B but the algorithm can be applied to any number of sets.

1. Essentially, we need to construct a set of independent hash functions <h1,h2,h3,…hk> randomly.  k = O(1/ε2), ε > 0 such that the expected error of the estimate is at most ε. For example, 400 hashes would be required to estimate J(A,B) with an expected error less than or equal to .05. So, k can be varied to increase/decrease the likelihood of false negatives.

2. Next we initialize for each set A and B the  value to infinity.

3. For each element s in both sets A and B we compute the element’s hash:

 such as: If  then .

Eventually we should have  for both sets A and B.

4. If 2 sets A and B are similar then the probability P(  A =  B) = |A ∩ B| / |A U B|- is high and it is the actual Jaccard coefficient!

5. We calculated   statistics to estimate how similar are these 2 sets. General formula is: Similarity = identical s / k

In real world this requires considering more thoroughly different parameters, hash-functions etc. However, to demonstrate the algorithm I wrote a simple java code:

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

public class LSHMinHash<T> {

    private final int hashes[];
    private final int numOfHashes;
    private final int numOfSets;
    private final Set<T> setA;
    private final Set<T> setB;
    private final Map<T, boolean[]> matrix;
    private final int[][] minHashes;

    public LSHMinHash(double e, Set<T> setA, Set<T> setB){
        this.numOfHashes = (int)(1 / (e * e));
        this.numOfSets = 2;
        this.setA = setA;
        this.setB = setB;
        matrix = buildSetMatrix(setA, setB);
        minHashes = initMinHashes(numOfSets, numOfHashes);
        hashes = computeHashes(numOfHashes);

    private Map<T,boolean[]> buildSetMatrix(Set<T> setA, Set<T> setB) {

        Map<T,boolean[]> matrix = new HashMap<T,boolean[]>();

        for(T element : setA){
            matrix.put(element, new boolean[] { true, false } );

        for(T element : setB){
                matrix.put(element, new boolean[] { true, true } );
            }else if(!matrix.containsKey(element)){
                matrix.put(element, new boolean[] { false, true } );

        return matrix;

    private int[][] initMinHashes(int numOfSets, int numOfHashes) {
        int[][] minHashes = new int[numOfSets][numOfHashes];

        for (int i = 0; i < numOfSets; i++) {
            for (int j = 0; j < numOfHashes; j++) {
                minHashes[i][j] = Integer.MAX_VALUE;
        return minHashes;

    private int[] computeHashes(int numOfHashes) {
        int[] hashes = new int[numOfHashes];
        Random r = new Random(31);

        for (int i = 0; i < numOfHashes; i++){
            int a = (int)r.nextInt();
            int b = (int)r.nextInt();
            int c = (int)r.nextInt();
            hashes[i] = (int)((a * (a * b * c >> 4) + a * a * b * c + c) & 0xFFFFFFFF);
        return hashes;

    private void computeMinHashForSet(Set<T> set, int setIndex){
        int hashIndex = 0;

        for(T element : matrix.keySet()) {
            for (int i = 0; i < numOfHashes; i++) {
                if(set.contains(element)) {
                    int hashValue = hashes[hashIndex];
                    if (hashValue < minHashes[setIndex][hashIndex]) {
                        minHashes[setIndex][hashIndex] = hashValue;

    private double computeMinHash(int[][] minHashes, int numOfHashes) {
        int identicalMinHashes = 0;
        for (int i = 0; i < numOfHashes; i++){
            if (minHashes[0][i] == minHashes[1][i]) {
        return (1.0 * identicalMinHashes) / numOfHashes;

    public double findSimilarities() {
        computeMinHashForSet(setA, 0);
        computeMinHashForSet(setB, 1);
        return computeMinHash(minHashes, numOfHashes);

    public static void main(String[] args){
        Set<String> setA = new HashSet<String>();
        setA.add("IS ");

        Set<String> setB = new HashSet<String>();
        setB.add("IS ");

        double errorFactor = 0.001;

        LSHMinHash<String> minHash = new LSHMinHash<String>(errorFactor, setA, setB);

Google Code Jam. Qualification Round 2013. Problem D – Treasure. Solved.


This was the problem I wasn’t able to optimise with given time constraint in April.  Later I killed half a day to complete it thinking of different algorithms from the graph theory such as Eulerian path
and Chinese postman problems. I was surprised at that time this problem was put into qualification round as it is really challenging. In order to solve one the graph problem should be refined and modeled properly. During modeling a graph I used to always map only one type of entity to vertices.  As a counter-example a Bipartite graph has vertices which are divided into two disjoint sets U and V such that every edge connects a vertex in U to one in V. The solution to this one has nothing to do with Bipartite graph attacking approaches except that there are 2 types of vertices.

The solution

1. Check that the number of chests matches the number of corresponding key types. A simple arithmetic to count. Otherwise there is no solution as we don’t have enough keys to open all chests.

2. All vertices in a graph G should be reachable from vertex v0.

Construct a directed graph G=(V,E), where K is a set of all keys and C is a set of all chests:

  • V = {K} U {C} U {0}.
  • {0} = v0 is a starting vertex corresponding to the root vertex. Outgoing edges from it are key(s) given us initially.
  • E = {Set of all directed edges connecting K and C}. (i,j) ∈ E if either {i=chest, j=key} or {i=key, j=chest}

In simple words we have a directed graph where each vertex is either a chest or a key, directed edges form a  connection in between. We need to check that there’s a directed path between vertex v0 and every other vertex ∈ E, otherwise we cannot open all chests and solution is impossible. Note that we need to check directed paths to all key-vertices from v0. This is a standard single-source reachability and can be done applying a Depth-First-Search algorithm:

void dfs(Graph g, int v) {
marked[v] = true;
for (int w : g.adj(v)) {
if (!marked[w]) dfs(G, w);

Technically, if there are unmarked vertices, the problem is unsolvable. Time complexity is O(E+V). This check cuts lots of branches saving us time.

Now let’s demonstrate the above on a real example. Our treasure trove consists of four chests, and you began with exactly one key of type 1.

See the table with input data:

Chest Number  |  Key Type To Open Chest  |  Key Types Inside
1             |  1                       |  None
2             |  1                       |  1, 3
3             |  2                       |  None
4             |  3                       |  2

Let’s draw a graph where v0 is a staring point of traversal (i.e. initial key of type = 1)


As depicted there are 2 solvable configurations – <2,1,4,3> and <2,4,3,1>. All paths from v0 to the keys are reachable. The proof of this condition can done by induction and left to the reader.

3. Find a “lexicographically smallest” solution. This last condition added real hardness to the problem. If there are multiple solutions then we find a “lexicographically smallest” way to open the boxes. Each vertex is always traversed from lowest to highest, this can be achieved by constructing a graph using adjacency lists that reflect numbers of keys and chests in increasing order and we subsequently iterate them applying a bit optimized Depth-First-Search. Minimal configuration starts from v0. If we have  >= 1 key initially given, we just have to traverse from all of them increasingly. On each step we always update a number of keys in possession. If we’ve run out of keys then we cut the branch and backtrack, starting again where we left off with the next configuration. E.g. opening a chest No. 1 at the very first time will lead to the deadlock (actually this one is a trivial case as it is on step No 1). Such cases are not on critical path and we need backtrack only to the previous calling vertex.

Conclusion: There are not so many configurations unlike straightforward brute force. Many optimization problems like this one are met in algorithmic contests. Just algorithms and data structures are not sufficient. To pass given time constraint there should be the right strategy to cut unsolvable branches. That’s it!

How automatic sharding works or consistent hashing under the hood


Here we’re going to talk primarily about Consistent hashing. This technique involves such concepts as adaptive load balancing, routing, partitioning in distributed computing. There are many articles on the internet on this technique (refer to the list of references for some of them) but I haven’t found information about how and where to keep the ring of hashes, thus I’ve decided to describe some options with pros and cons. In order to make this post more clear for a wider audience I will first try to write a brief introduction of what this is all about and tell about the ring storage strategies at the end of this issue. So if you’re already familiar with the algorithm you may want to skip over the main stuff and move on to the last chapter for pros and cons of descibed approaces.

Distributed cache and straightforward uniform load balancing

Key-value stores are extremely fast in single search-queries. A very popular one is a distributed hash table (DHT) kept in a fully decentralized manner, and thus particularly adapted to unstable networks where nodes can leave or join at any moment. Note that DHT is not suitable for range-queries albeit and I will probably write a separate post about special data structures responsible for that. Now let’s consider a classic case – you have a cluster of cache-servers where you load-balance a huge data set uniformly. To be able to determine on which node a pair <key, value> should be kept we use a simple hash-mapping:

Cache machine = hash(o) mod n where: n – number of machines in a cluster and o is an object to put/lookup.

What happens when a number of machines changes at runtime? You might add/remove a number of machines in a cluster for e.g. scalability reasons, a failure or whatever.  The change triggers moving almost all objects to new locations due to rehashing.  Each key-value pair will get reallocated completely across the cluster. You’ll end up moving a fraction n/(n+1) of your data to new machines. Indeed, this fact degrades all of the advantages of distributed hash tables. We need somehow to avoid this messy remapping. This is where consistent hashing comes in.

Consistent hashing

The main idea is to hash both data ids and cache-machines to a numeric range using the same hash-function. E.g. in Java a primitive type int has a number range of values between -231 to 231-1.  Assume the interval is [0,  231-1] for simplicity (java primitives cannot be unsigned). Now let’s join starting and ending points together of this interval to create a ring so the values wrap around. We do not have 231 -1 available servers, the large size of the ring being merely intended to avoid collisions. As a hash function a good choise is either be e.g. MD5 or SHA-1 algorithm. As a machine’s number we can take its IP-address and apply that hash function to it. By taking from the result the first 8 bytes we can map it to our ring [0,231-1].


Both the nodes and the keys are mapped to the same range on the ring. Ok, now we need to understand how to identify on this ring which data ids belong to which server’s IP. It’s really simple, we just move clockwise starting from zero (starting point on the ring) following the main rule of consistent hashing: If IP-n1 and IP-n2 are 2 adjacent nodes on the ring all data ids on the ring between them belong to IP-n1. That’s it. ring_mapping

As depicted: { Id1, Id2, Id3} ∈ IP-3; {Id4} ∈ IP-1; ∅ ∈ IP-2.

Conclusion: Using consistent hashing we do not need to rehash the whole data set. Instead, the new server takes place at a position determined by the hash value on the ring, and part of the objects stored on its successor must be moved. The reorganization is local, as all the other nodes remain unaffected. if you add a machine to the cluster, only the data that needs to live on that machine is moved there; all the other data stays where it is. Because the hash function remains unaffected, the scheme maintains its consistency over the successive evolutions of the network configuration. Like naive hashing, consistent hashing spreads the distributed dictionary almost evenly across the cluster. One point to mention is what happens when a node goes down due to some disaster. In this case consistent hashing alone doesn’t meet our requirements of reliability due to loss of data. Therefore there should definetely be replication and high availability which is feasible and out of scope of this introduction. You may want to find good references at the end of these article to find out more.

Problems with pure consistent hashing

In a nutshell, the basic consistent hashing has the following problems:

  • There is a huge amount of data to be rehashed.
  • A node picking a range of keys results in one node potentially carrying a larger keyspace than others, therefore still creating disbalance.
  • Leaving/Joining a ring leads to disbalance of data.
  • A more powerful machine needs to process more data than others.
  • A fraction of data to be moved is less unpredictable and much higher.

Virtual nodes solve these issues.

Virtual nodes come to the rescue

Virtual nodes minimize changes to a node’s assigned range by a number of smaller ranges to a single node. In other words, amount of data to be moved from one physical node to others is minimized. Let’s split a real node into a number of virtual nodes. The idea is to build equally-sized subintervals (partitions) for each real server on the ring by dividing the hash-space into P evenly sized partitions, and assign P/N partitions per host. When a node joins/leaves all data from partitions of all real servers are uniformly get assigned to a new server and given back to remaining ones respectively. The number of virtual nodes is picked once during building of the ring  and never changes over the lifetime of the cluster. This ensures that each node picks equal size of data from the full data set, that is P/N and thus our data now are distributed more uniformly. This enforces that the number of virtual nodes must be much higher than the number of real ones.


Here’s a pretty simple java-code of consistency ring’s  with virtual nodes.

public class Ring {

	private SortedMap<Long, T> ring = new TreeMap<Long, T>();
	private HashMap<String, T> nodeMap = new HashMap<String, T>();
	private MD5Hash hash = new MD5Hash();
	private vNodeCount;

	public Ring(int vnodeCount, Collection pNodes) {

		this.vnodeCount = vnodeCount;

		for (T pNode : pNodes) {
			addNode(ring, nodeMap, pNode, vnodeCount);

	private void addNode(T pNode, int vnodeCount) {
		for (int i = 0; i < vnodeCount; i++) {
			ring.put(hash.hash(pNode.toString() + i), pNode);

        public void removeNode(T node, int vnodeCount) {
          for (int i = 0; i < vnodeCount; i++) {
            ring.remove(hash.hash(pNode.toString() + i));

	private T getNodeByObjectId(String objectId) {

		long hashValue = hash.hash(objectId);

		if (!ring.containsKey(hashValue)) {
			SortedMap<Long, T> tailMap = ring.tailMap(hashValue);
			hashValue = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();

		return ring.get(hashValue);

	private static class MD5Hash {
		MessageDigest instance;

		public MD5Hash() {
			try {
				instance = MessageDigest.getInstance("MD5");
			} catch (NoSuchAlgorithmException e) {

		long hash(String key) {
			byte[] digest = instance.digest();

	        long h = 0;
	        for (int i = 0; i < 4; i++) {
	            h <<= 8;
	            h |= ((int) digest[i]) & 0xFF;
	        return h;


Strategies to keep a data structure of the ring and their pros and cons

There are a few options on where to keep ring’s data structure:

  • Central point of coordination: A dedicated machine keeps a ring and works as a central load-balancer which routes request to appropriate nodes.
  1. Pros: Very simple implementation. This would be a good fit for not a dynamic system having small number of nodes and/or data.
  2. Cons: A big drawback of this approach is scalability and reliability. Stable distributed systems don’t have a single poing of failure.
  • No central point of coordination – full duplication: Each node keeps a full copy of the ring. Applicable for stable networks. This option is used e.g. in Amazon Dynamo.
  1. Pros: Queries are routed in one hop directly to the appropriate cache-server.
  2. Cons: Join/Leave of a server from the ring  requires notification/amendment of all cache-servers in the ring.
  • No central point of coordination – partial duplication: Each node keeps a partial copy of the ring. This option is direct implementation of CHORD algorithm. In terms of DHT each cache-machine has its predessesor and successor and when receiving a query one checks if it has the key or not. If there’s no such a key on that machine, a mapping function is used to determine which of its neighbors (successor and predessesor) has the least distance to that key. Then it forwards the query to its neighbor thas has the least distance. The process continues until a current cache-machine finds the key and sends it back.
  1. Pros: For highly dynamic changes the previous option is not a fit due to heavy overhead of gossiping among nodes. Thus this option is the choice in this case.
  2. Cons: No direct routing of messages. The complexity of routing a message to the destination node in a ring is O(lg N).

Current trends in consistent hashing

There is a huge boom nowadays of new products that implement this technique. Some of them are: Dynamo, Riak, Cassandra, MemCached, Voldemort, CouchDB, Oracle Coherence, Trackerless Bit-Torrent networks, Web-caching frameworks, Content distribution networks.


Annual conference Java One 2013


Java One 2013, Moscow

Today is the first day of annual conference “Java One 2013” At Crocus Expo, Moscow Russia. Sponsors are as follows: Deutsche bank, Luxoft, odnoklassniki.ru and several other companies.

Traditionally the conference lasts for 2 days with 94 sessions and 65 speakers around the world. Some of them are from the previous year. On the whole the venue and organizations is better this year and the plan has really interesting sessions on modern buzzwords nowadays like Scala, Cloud space and distributed caches.