Thursday 22 September 2011

Single Writer Principle

When trying to build a highly scalable system the single biggest limitation on scalability is having multiple writers contend for any item of data or resource.  Sure, algorithms can be bad, but let’s assume they have a reasonable Big O notation so we'll focus on the scalability limitations of the systems design. 

I keep seeing people just accept having multiple writers as the norm.  There is a lot of research in computer science for managing this contention that boils down to 2 basic approaches.  One is to provide mutual exclusion to the contended resource while the mutation takes place; the other is to take an optimistic strategy and swap in the changes if the underlying resource has not changed while you created the new copy. 

Mutual Exclusion

Mutual exclusion is the means by which only one writer can have access to a protected resource at a time, and is usually implemented with a locking strategy.  Locking strategies require an arbitrator, usually the operating system kernel, to get involved when the contention occurs to decide who gains access and in what order.  This can be a very expensive process often requiring many more CPU cycles than the actual transaction to be applied to the business logic would use.  Those waiting to enter the critical section, in advance of performing the mutation must queue, and this queuing effect (Little's Law) causes latency to become unpredictable and ultimately restricts throughput.

Optimistic Concurrency Control

Optimistic strategies involve taking a copy of the data, modifying it, then copying back the changes if data has not mutated in the meantime.  If a change has happened in the meantime you repeat the process until successful.  This repeating of the process increases with contention and therefore causes a queuing effect just like with mutual exclusion.  If you work with a source code control system, such as Subversion or CVS, then you are using this algorithm every day.  Optimistic strategies can work with data but do not work so well with resources such as hardware because you cannot take a copy of the hardware!  The ability to perform the changes atomically to data is made possible by CAS instructions offered by the hardware.

Most locking strategies are composed from optimistic strategies for changing the lock state or mutual exclusion primitive.

Managing Contention vs. Doing Real Work

CPUs can typically process one or more instructions per cycle.  For example, modern Intel CPU cores each have 6 execution units that can be doing a combination of arithmetic, branch logic, word manipulation and memory loads/stores in parallel.  If while doing work the CPU core incurs a cache miss, and has to go to main memory, it will stall for hundreds of cycles until the result of that memory request returns.  To try and improve things the CPU will make some speculative guesses as to what a memory request will return to continue processing.  If a second miss occurs the CPU will no longer speculate and simply wait for the memory request to return because it cannot typically keep the state for speculative execution beyond 2 cache misses.  Managing cache misses is the single largest limitation to scaling the performance of our current generation of CPUs.

Now what does this have to do with managing contention?  Well if two or more threads are using locks to provide mutual exclusion, at best they will be going to the L3 cache, or over a socket interconnect, to access share state of the lock using CAS operations.  These lock/CAS instructions cost 10s of cycles in the best case when un-contended, plus they cause out-of-order execution for the CPU to be suspended and load/store buffers to be flushed.  At worst, collisions occur and the kernel will need to get involved and put one or more of the threads to sleep until the lock is released.  This rescheduling of the blocked thread will result in cache pollution.  The situation can be even worse when the thread is re-scheduled on another core with a cold cache resulting in many cache misses. 

For highly contended data it is very easy to get into a situation whereby the system spends significantly more time managing contention than doing real work.  The table below gives an idea of basic costs for managing contention when the program state is very small and easy to reload from the L2/L3 cache, never mind main memory. 

MethodTime (ms)
One Thread300
One Thread with Memory Barrier4,700
One Thread with CAS5,700
Two Threads with CAS18,000
One Thread with Lock10,000
Two Threads with Lock118,000

This table illustrates the costs of incrementing a 64-bit counter 500 million times using a variety of techniques on a 2.4Ghz Westmere processor.   I can hear people coming back with “but this is a trivial example and real-world applications are not that contended”.  This is true but remember real-world applications have way more state, and what do you think happens to all that state which is warm in cache when the context switch occurs???  By measuring the basic cost of contention it is possible to extrapolate the scalability limits of a system which has contention points.  As multi-core becomes ever more significant another approach is required.  My last post illustrates the micro level effects of CAS operations on modern CPUs, whereby Sandybridge can be worse for CAS and locks.

Single Writer Designs

Now, what if you could design a system whereby any item of data, or resource, is only mutated by a single writer/thread?  It is actually easier than you think in my experience.  It is OK if multiple threads, or other execution contexts, read the same data.  CPUs can broadcast read only copies of data to other cores via the cache coherency sub-system.  This has a cost but it scales very well.

If you have a system that can honour this single writer principle then each execution context can spend all its time and resources processing the logic for its purpose, and not be wasting cycles and resource on dealing with the contention problem.  You can also scale up without limitation until the hardware is saturated.  There is also a really nice benefit in that when working on architectures, such as x86/x64, where at a hardware level they have a memory model, whereby load/store memory operations have preserved order, thus memory barriers are not required if you adhere strictly to the single writer principle.  On x86/x64 "loads can be re-ordered with older stores" according to the memory model so memory barriers are required when multiple threads mutate the same data across cores.  The single writer principle avoids this issue because it never has to deal with writing the latest version of a data item that may have been written by another thread and currently in the store buffer of another core.

So how can we drive towards single writer designs?  I’ve found it is a very natural thing.  Consider how humans, or any other autonomous creatures of nature, operate with their model of the world.  We all have our own model of the world contained in our own heads, i.e. We have a copy of the world state for our own use.  We mutate the state in our heads based on inputs (events/messages) we receive via our senses.  As we process these inputs and apply them to our model we may take action that produces outputs, which others can take as their own inputs.  None of us reach directly into each other’s heads and mess with the neurons.  If we did this it would be a serious breach of encapsulation!  Originally, Object Oriented (OO) design was all about message passing, and somehow along the way we bastardised the message passing to be method calls and even allowed direct field manipulation – Yuk!  Who's bright idea was it to allow public access to fields of an object?  You deserve your own special hell. 

At university I studied transputers and interesting languages like Occam.  I thought very elegant designs appeared by having the nodes collaborate via message passing rather than mutating shared state.  I’m sure some of this has inspired the Disruptor.  My experience with the Disruptor has shown that is it possible to build systems with one or more orders of magnitude better throughput than locking or contended state based approaches.  It also gives much more predictable latency that stays constant until the hardware is saturated rather than the traditional J-curve latency profile.

It is interesting to see the emergence of numerous approaches that lend themselves to single writer solutions such as Node.js, Erlang, Actor patterns, and SEDA to name a few.  Unfortunately most use queue based implementations underneath, which breaks the single writer principle, whereas the Disruptor strives to separate the concerns so that the single writer principle can be preserved for the common cases.

Now I’m not saying locks and optimistic strategies are bad and should not be used.  They are excellent for many problems.  For example, bootstrapping a concurrent system or making major state stages in configuration or reference data.  However if the main flow of transactions act on contended data, and locks or optimistic strategies have to be employed, then the scalability is fundamentally limited. 

The Principle at Scale

This principle works at all levels of scale.  Mandelbrot got this so right.  CPU cores are just nodes of execution and the cache system provides message passing for communication.  The same patterns apply if the processing node is a server and the communication system is a local network.  If a service, in SOA architecture parlance, is the only service that can write to its data store it can be made to scale and perform much better.  Let’s say that underlying data is stored in a database and other services can go directly to that data, without sending a message to the service that owns the data, then the data is contended and requires the database to manage the contention and coherence of that data.  This prevents the service from caching copies of the data for faster response to the clients and restricts how the data can be sharded.  Encapsulation has just been broken at a more macro level when multiple different services write to the same data store.

Summary

If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally.  This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues.  If you cannot move straight to a model like this, but are finding scalability issues related to contention, then start by asking the question, “How do I change this code to preserve the Single Writer Principle and thus avoid the contention?”

The Single Writer Principle is that for any item of data, or resource, that item of data should be owned by a single execution context for all mutations.

Sunday 11 September 2011

Adventures with AtomicLong

Sequencing events between threads is a common operation for many multi-threaded algorithms.  These sequences could be used for assigning identity to orders, trades, transactions, messages, events, etc.  Within the Disruptor we use a monotonic sequence for all events which is implemented as AtomicLong incrementAndGet for the multi-threaded publishing scenario.

While working on the latest version of the Disruptor I made some changes which I was convinced would improve performance, however the results surprised me.  I had removed some potentially megamorphic method calls and the performance got worse rather than better.  After a lot of investigation, I discovered that the megamorphic method calls were hiding a performance issue with the latest Intel Sandybridge processors.  With the megamorphic calls out of the way, the contention on the atomic sequence generation increased exposing the issue.  I've also observed this performance issue with other Java concurrent structures such as ArrayBlockingQueue.

I’ve been running various benchmarks on Sandybridge and have so far been impressed with performance improvements over Nehalem, especially for memory intensive applications due to the changes in its front-end.  However with this sequencing benchmark, I discovered that Sandybridge has taken a major step backward in performance with regard to atomic instructions.

Atomic instructions enable read-modify-write actions to be combined into an atomic operation.  A good example is incrementing a counter.  To complete the increment operation a thread must read the current value, increment it, and then write back the results.  In a multi-threaded environment these distinct operations could interleave with other threads doing the same with corrupt results as a consequence.  The normal way to avoid this interleaving is to take out a lock for mutual exclusion while performing the steps.  Locks are very expensive and often require kernel arbitration between threads.  Modern CPUs provide a number of atomic instructions which allow operations such as atomically incrementing a counter, or the ability to conditional set a pointer reference if the value is still as expected.  These operations are commonly referred to as CAS (Compare And Swap) instructions.  A good way to think of these CAS instructions is like optimistic locks, similar to what you experience when using a version control system like Subversion or CVS.  You try to make a change and if the version is what you expect then you succeed, otherwise the action aborts.

On x86/x64 these instructions are known as “lock” instructions.  The "lock" name comes from how a processor, after setting its lock signal, would lock the front-side/memory bus (FSB) for serialising memory access while the three steps of the operation took place atomically.  On more recent processors the lock instruction is simply implemented by getting an exclusive lock on the cache-line for modification.

These instructions are the basic building blocks used for implementing higher-level locks and semaphores.  This is, as will be explained shorty, why I've seen performing issues on Sandybridge for ArrayBlockingQueue in some of the Disruptor comparative performance tests.

Back to my benchmark.  The test was spending significantly more time in AtomicLong.incrementAndGet() than I had previously observed.  Initially, I suspected an issue with JDK 1.6.0_27 which I had just installed.  I ran the following test with various JVMs, including 1.7.0, and kept getting the same results.  I then booted different operating systems (Ubuntu, Fedora, Windows 7 - all 64-bit), again the same results.  This lead me to write an isolated test which I ran on Nehalem (2.8 GHz Core i7 860) and Sandybridge (2.2Ghz Core i7-2720QM).

import java.util.concurrent.atomic.AtomicLong;

public final class TestAtomicIncrement
    implements Runnable
{
    public static final long COUNT = 500L * 1000L * 1000L;
    public static final AtomicLong counter = new AtomicLong(0L);

    public static void main(final String[] args) throws Exception
    {
        final int numThreads = Integer.parseInt(args[0]);
        final long start = System.nanoTime();
        runTest(numThreads);
        System.out.println("duration = " + (System.nanoTime() - start));
        System.out.println("counter = " + counter);
    }

    private static void runTest(final int numThreads)
        throws InterruptedException
    {
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < threads.length; i++)
        {
            threads[i] = new Thread(new TestAtomicIncrement());
        }

        for (Thread t : threads)
        {
            t.start();
        }

        for (Thread t : threads)
        {
            t.join();
        }
    }

    public void run()
    {
        long i = 0L;
        while (i < COUNT)
        {
            i = counter.incrementAndGet();
        }
    }
}

Figure 1.

After running this test on 4 different Sandybridge processors with a range of clock speeds, I concluded that using LOCK CMPXCHG, under contention with increasing numbers of threads, is much less scalable than the previous Nehalem generation of processors.  Figure 1. above charts the results in nanoseconds duration to complete 500 million increments of a counter with increasing thread count.  Less is better.

I confirmed the JVM was generating the correct instructions for the CAS loop by getting Hotspot to print the assembler it generated.  I also confirmed that Hotspot generated identical assembler instructions for both Nehalem and Sandybridge.

I then decided to investigate further and write the following C++ program to test the relevant lock instructions to compare Nehalem and Sandybridge.  I know from using “objdump -d” on the binary that the GNU Atomic Builtins generate the lock instructions for ADD, XADD, and CMPXCHG, for the respectively named functions below. 
#include <time.h>
#include <pthread.h>
#include <stdlib.h>
#include <iostream>

typedef unsigned long long uint64;
const uint64 COUNT = 500LL * 1000LL * 1000LL;
volatile uint64 counter = 0;

void* run_add(void* numThreads)
{
    register uint64 value = (COUNT / *((int*)numThreads)) + 1;
    while (--value != 0)
    {
        __sync_add_and_fetch(&counter, 1);
    }
}

void* run_xadd(void*)
{
    register uint64 value = counter;
    while (value < COUNT)
    {
        value = __sync_add_and_fetch(&counter, 1);
    }
}

void* run_cas(void*)
{
    register uint64 value = 0;
    while (value < COUNT)
    {
        do
        {
            value = counter;
        }
        while (!__sync_bool_compare_and_swap(&counter, value, value + 1));
    }
}

int main (int argc, char* argv[])
{
    const int NUM_THREADS = atoi(argv[1]);

    pthread_t threads[NUM_THREADS];
    void* status;
    timespec ts_start;
    timespec ts_finish;
    clock_gettime(CLOCK_MONOTONIC, &ts_start);

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_create(&threads[i], NULL, run_add, (void*)&NUM_THREADS);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i], &status);
    }

    clock_gettime(CLOCK_MONOTONIC, &ts_finish);

    uint64 start = (ts_start.tv_sec * 1000000000LL) + ts_start.tv_nsec;
    uint64 finish = (ts_finish.tv_sec * 1000000000LL) + ts_finish.tv_nsec;
    uint64 duration = finish - start;

    std::cout << "threads = "  << NUM_THREADS << std::endl;
    std::cout << "duration = " << duration << std::endl;
    std::cout << "counter = "  << counter << std::endl;

    return 0;
}
Figure 2.

It is clear from Figure 2. that Nehalem performs nearly an order of magnitude better for atomic operations as contention increases with threads.  I found LOCK ADD and LOCK XADD to be similar so I've only charted XADD for clarity.  The CAS operations for C++ and Java are comparable.

It is also very interesting how XADD greatly outperforms CAS and gives a nice scalable profile.  For 3 threads and above, XADD does not degrade further and simply performs at the rate at which the processor can keep the caches coherent.  Nehalem and Sandybridge level out respectively at ~100m and ~20m XADD operations per second for 3+ concurrent threads, whereas CAS continues to degrade with increasing thread count because of contention.  Naturally, performance degrades when QPI links are involved for a multi-socket scenario.  Oracle have now accepted that not supporting XADD is a bug and will hopefully fix it soon for the JVM. 

As to the performance I’ve observed with Sandybridge, it would be great if others could confirm my findings so we can all feedback to Intel and have this addressed.  I've not been able to get my hands on a server class system with Sandybridge.  I can confirm that for the "tick" to Westmere, the performance is similar to Nehalem and not an issue.  The "tock" to Sandybridge seems to introduce the issue.

Update: After discussions with Intel I wrote the following blog entry.

Friday 2 September 2011

Modelling Is Everything

I’m often asked, “What is the best way to learn about building high-performance systems”? There are many perfectly valid answers to this question but there is one thing that stands out for me above everything else, and that is modelling. Modelling what you need to implement is the most important and effective step in the process. I’d go further and say this principle applies to any development and the rest is just typing :-)

Domain Driven Design (DDD) advocates modelling the domain and expressing this model in code as fundamental to the successful delivery and ongoing maintenance of software. I wholeheartedly agree with this. How often do we see code that is an approximation of the problem domain? Code that exhibits behaviour which approximates to what is required via inappropriate abstractions and mappings which just about cope. Those mappings between what is in the code and the real domain are only contained in the developers’ heads and this is just not good enough.

When requiring high-performance, code for parts of the system often have to model what is happening with the CPU, memory, storage sub-systems, or network sub-systems. When we have imperfect abstractions on top of these domains, performance can be very adversely affected. The goal of my “Mechanical Sympathy” blog is to peek at what is under the hood so we can improve our abstractions.

What is a Model?

A model does not need to be the result of a 3-year exercise producing UML. It can be, and often is best as, people communicating via various means including speech, drawings, illustrations, metaphors, analogies, etc, to build a mental model for shared understanding. If an accurate and distilled understanding can be reached then this model can be turned into code with great results.

Infrastructure Domain Models

If developers writing a concurrent framework do not have a good model of how a typical cache sub-system works, i.e. it uses message passing to exchange cache lines, then the framework is unlikely to perform well or be correct. If their code drives the cache sub-system with mechanical sympathy and understanding, it is less likely to have bugs and more likely to perform well.

It is much easier to predict performance from a sound model when coming from an understanding of the infrastructure for the underlying platform and its published abilities. For example, if you know how many packets per second a network sub-system can handle, and the size of its transfer unit, then it is easy to extrapolate expected bandwidth. With this model based understanding we can test our code for expectations with confidence.

I’ve fixed many performance issues whereby a framework treated a storage sub-system as stream-based when it is really a block-based model. If you update part of a file on disk, the block to be updated must be read, the changes applied, and the results written back. Now if you know the system is block based and the boundaries of the blocks, you can write whole blocks back without incurring the read, modify, write back cycle replacing these actions with a single write. This applies even when appending to a file as the last block is likely to have been partially written previously.

Business Domain Models

The same thinking should be applied to the models we construct for the business domain. If a business process is modelled accurately, then the software will not surprise its end users. When we draw up a model it is important to describe the relationships for cardinality and the characteristics by which they will be traversed. This understanding will guide the selection of data structures to those best suited for implementing the relationships. I often see people use a list for a relationship which is mostly searched by key, for this case a map could be more appropriate. Are the entities at the other end of a relationship ordered? A tree or skiplist implementation may be a better option.

Identity

Identity of entities in a model is so important. All models have to be entered in some way, and this normally starts with an entity from which to walk. That entity could be “Customer” by customer ID but could equally be “DiskBlock” by filename and offset in an infrastructure domain. The identity of each entity in the system needs to be clear so the model can be accessed efficiently. If for each interaction with a model we waste precious cycles trying to find our entity as a starting point, then other optimisations can become almost irrelevant. Make identity explicit in your model and, if necessary, index entities by their identity so you can efficiently enter the model for each interaction.

Refine as we learn

It is also important to keep refining a model as we learn. If the model grows as a series of extensions without refining and distilling, then we end up with a spaghetti mess that is very difficult to manage when trying to achieve predictable performance. Never mind how difficult it is to maintain and support. Everyday we learn new things. Reflect this in the model and keep it up to date.

Implement no more, but also no less, than what is needed!

The fastest code is code that does just what is needed and no more. Perform the instructions to complete the task and no more. Really fast code is normally not a weird mess of bit-shifting and complier tricks. It is best to start with something clean and elegant. Then measure to see if you are within performance targets. So often this will be sufficient. Sometimes performance will be a surprise. You then need to apply science to test and measure before jumping to conclusions. A profiler will often tell you where the time is being taken. Once the basic modelling mistakes and assumptions have been corrected, it usually takes just a little mechanical sympathy to reach the performance goal. Unused code is waste. Try not to create it. If you happen to create some, then remove it from your codebase as soon as you notice it.

Conclusion

When cross-functional requirements, such as performance and availability, are critical to success, I’ve found the most important thing is to get the model correct for the domain at all levels. That is, take the principles of DDD and make sure your code is an appropriate reflection of each domain. Be that the domain of business applications, or the domain of interactions with infrastructure, I’ve found modelling is everything.