Wednesday 17 October 2012

Compact Off-Heap Structures/Tuples In Java

In my last post I detailed the implications of the access patterns your code takes to main memory.  Since then I've had a lot of questions about what can be done in Java to enable more predictable memory layout.  There are patterns that can be applied using array backed structures which I will discuss in another post.   This post will explore how to simulate a feature sorely missing in Java - arrays of structures similar to what C has to offer.

Structures are very useful, both on the stack and the heap.  To my knowledge it is not possible to simulate this feature on the Java stack.  Not being able to do this on the stack is such as shame because it greatly limits the performance of some parallel algorithms, however that is a rant for another day.

In Java, all user defined types have to exist on the heap.  The Java heap is managed by the garbage collector in the general case, however there is more to the wider heap in a Java process.  With the introduction of direct ByteBuffer, memory can be allocated which is not tracked by the garbage collector because it can be available to native code for tasks like avoiding the copying of data to and from the kernel for IO.  So one method of managing structures is to fake them within a ByteBuffer as a reasonable approach.  This can allow compact data representations, but has performance and size limitations.  For example, it is not possible to have a ByteBuffer greater than 2GB, and all access is bounds checked which impacts performance.  An alternative exists using Unsafe that is both faster and and not size constrained like ByteBuffer.

The approach I'm about to detail is not traditional Java.  If your problem space is dealing with big data, or extreme performance, then there are benefits to be had.  If your data sets are small, and performance is not an issue, then run away now to avoid getting sucked into the dark arts of native memory management.

The benefits of the approach I'm about to detail are:
  1. Significantly improved performance 
  2. More compact data representation
  3. Ability to work with very large data sets while avoiding nasty GC pauses[1]
With all choices there are consequences.  By taking the approach detailed below you take responsibility for some of the memory managment yourself.  Getting it wrong can lead to memory leaks, or worse, you can crash the JVM!  Proceed with caution...

Suitable Example - Trade Data

A common challenge faced in finance applications is capturing and working with very large volumes of order and trade data.  For the example I will create a large table of in-memory trade data that can have analysis queries run against it.  This table will be built using 2 contrasting approaches.  Firstly, I'll take the traditional Java approach of creating a large array and reference individual Trade objects.  Secondly, I keep the usage code identical but replace the large array and Trade objects with an off-heap array of structures that can be manipulated via a Flyweight pattern.

If for the traditional Java approach I used some other data structure, such as a Map or Tree, then the memory footprint would be even greater and the performance lower.

Traditional Java Approach
public class TestJavaMemoryLayout
{
    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static JavaMemoryTrade[] trades;

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final JavaMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);
    }

    private static JavaMemoryTrade get(final int index)
    {
        return trades[index];
    }

    public static void init()
    {
        trades = new JavaMemoryTrade[NUM_RECORDS];

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            JavaMemoryTrade trade = new JavaMemoryTrade();
            trades[i] = trade;

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result = (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class JavaMemoryTrade
    {
        private long tradeId;
        private long clientId;
        private int venueCode;
        private int instrumentCode;
        private long price;
        private long quantity;
        private char side;

        public long getTradeId()
        {
            return tradeId;
        }

        public void setTradeId(final long tradeId)
        {
            this.tradeId = tradeId;
        }

        public long getClientId()
        {
            return clientId;
        }

        public void setClientId(final long clientId)
        {
            this.clientId = clientId;
        }

        public int getVenueCode()
        {
            return venueCode;
        }

        public void setVenueCode(final int venueCode)
        {
            this.venueCode = venueCode;
        }

        public int getInstrumentCode()
        {
            return instrumentCode;
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            this.instrumentCode = instrumentCode;
        }

        public long getPrice()
        {
            return price;
        }

        public void setPrice(final long price)
        {
            this.price = price;
        }

        public long getQuantity()
        {
            return quantity;
        }

        public void setQuantity(final long quantity)
        {
            this.quantity = quantity;
        }

        public char getSide()
        {
            return side;
        }

        public void setSide(final char side)
        {
            this.side = side;
        }
    }
}
Compact Off-Heap Structures
import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestDirectMemoryLayout
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static long address;
    private static final DirectMemoryTrade flyweight = new DirectMemoryTrade();

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final DirectMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);

        destroy();
    }

    private static DirectMemoryTrade get(final int index)
    {
        final long offset = address + (index * DirectMemoryTrade.getObjectSize());
        flyweight.setObjectOffset(offset);
        return flyweight;
    }

    public static void init()
    {
        final long requiredHeap = NUM_RECORDS * DirectMemoryTrade.getObjectSize();
        address = unsafe.allocateMemory(requiredHeap);

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            DirectMemoryTrade trade = get(i);

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static void destroy()
    {
        unsafe.freeMemory(address);
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result |= (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class DirectMemoryTrade
    {
        private static long offset = 0;

        private static final long tradeIdOffset = offset += 0;
        private static final long clientIdOffset = offset += 8;
        private static final long venueCodeOffset = offset += 8;
        private static final long instrumentCodeOffset = offset += 4;
        private static final long priceOffset = offset += 4;
        private static final long quantityOffset = offset += 8;
        private static final long sideOffset = offset += 8;

        private static final long objectSize = offset += 2;

        private long objectOffset;

        public static long getObjectSize()
        {
            return objectSize;
        }

        void setObjectOffset(final long objectOffset)
        {
            this.objectOffset = objectOffset;
        }

        public long getTradeId()
        {
            return unsafe.getLong(objectOffset + tradeIdOffset);
        }

        public void setTradeId(final long tradeId)
        {
            unsafe.putLong(objectOffset + tradeIdOffset, tradeId);
        }

        public long getClientId()
        {
            return unsafe.getLong(objectOffset + clientIdOffset);
        }

        public void setClientId(final long clientId)
        {
            unsafe.putLong(objectOffset + clientIdOffset, clientId);
        }

        public int getVenueCode()
        {
            return unsafe.getInt(objectOffset + venueCodeOffset);
        }

        public void setVenueCode(final int venueCode)
        {
            unsafe.putInt(objectOffset + venueCodeOffset, venueCode);
        }

        public int getInstrumentCode()
        {
            return unsafe.getInt(objectOffset + instrumentCodeOffset);
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            unsafe.putInt(objectOffset + instrumentCodeOffset, instrumentCode);
        }

        public long getPrice()
        {
            return unsafe.getLong(objectOffset + priceOffset);
        }

        public void setPrice(final long price)
        {
            unsafe.putLong(objectOffset + priceOffset, price);
        }

        public long getQuantity()
        {
            return unsafe.getLong(objectOffset + quantityOffset);
        }

        public void setQuantity(final long quantity)
        {
            unsafe.putLong(objectOffset + quantityOffset, quantity);
        }

        public char getSide()
        {
            return unsafe.getChar(objectOffset + sideOffset);
        }

        public void setSide(final char side)
        {
            unsafe.putChar(objectOffset + sideOffset, side);
        }
    }
}
Results
Intel i7-860 @ 2.8GHz, 8GB RAM DDR3 1333MHz, 
Windows 7 64-bit, Java 1.7.0_07
=============================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,901,104 free
0 - duration 19334ms
Memory 4,116,054,016 total, 1,109,964,752 free
1 - duration 14295ms
Memory 4,116,054,016 total, 1,108,455,504 free
2 - duration 14272ms
Memory 3,817,799,680 total, 815,308,600 free
3 - duration 28358ms
Memory 3,817,799,680 total, 810,552,816 free
4 - duration 32487ms

java -server TestDirectMemoryLayout
Memory 128,647,168 total, 126,391,384 free
0 - duration 983ms
Memory 128,647,168 total, 126,992,160 free
1 - duration 958ms
Memory 128,647,168 total, 127,663,408 free
2 - duration 873ms
Memory 128,647,168 total, 127,663,408 free
3 - duration 886ms
Memory 128,647,168 total, 127,663,408 free
4 - duration 884ms

Intel i7-2760QM @ 2.40GHz, 8GB RAM DDR3 1600MHz, 
Linux 3.4.11 kernel 64-bit, Java 1.7.0_07
=================================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,912,960 free
0 - duration 12262ms
Memory 4,116,054,016 total, 1,109,962,832 free
1 - duration 9822ms
Memory 4,116,054,016 total, 1,108,458,720 free
2 - duration 10239ms
Memory 3,817,799,680 total, 815,307,640 free
3 - duration 21558ms
Memory 3,817,799,680 total, 810,551,856 free
4 - duration 23074ms

java -server TestDirectMemoryLayout 
Memory 123,994,112 total, 121,818,528 free
0 - duration 634ms
Memory 123,994,112 total, 122,455,944 free
1 - duration 619ms
Memory 123,994,112 total, 123,103,320 free
2 - duration 546ms
Memory 123,994,112 total, 123,103,320 free
3 - duration 547ms
Memory 123,994,112 total, 123,103,320 free
4 - duration 534ms
Analysis

Let's compare the results to the 3 benefits promised above.

1.  Significantly improved performance

The evidence here is pretty clear cut.  Using the off-heap structures approach is more than an order of magnitude faster.  At the most extreme, look at the 5th run on a Sandy Bridge processor, we have 43.2 times difference in duration to complete the task.  It is also a nice illustration of how well Sandy Bridge does with predictable access patterns to data.  Not only is the performance significantly better it is also more consistent.  As the heap becomes fragmented, and thus access patterns become more random, the performance degrades as can be seen in the later runs with standard Java approach.

2.  More compact data representation

For our off-heap representation each object requires 42-bytes.  To store 50 million of these, as in the example, we require 2,100,000,000 bytes.  The memory required by the JVM heap is:

   memory required = total memory - free memory - base JVM needs 

     2,883,248,712 = 3,817,799,680 - 810,551,856 - 123,999,112

This implies the JVM needs ~40% more memory to represent the same data.  The reason for this overhead is the array of references to the Java objects plus the object headers.  In a previous post I discussed object layout in Java.

When working with very large data sets this overhead can become a significant limiting factor.

3.  Ability to work with very large data sets while avoiding nasty GC pauses

The sample code above forces a GC cycle before each run and can improve the consistency of the results in some cases.  Feel free to remove the call to System.gc() and observe the implications for yourself.  If you run the tests adding the following command line arguments then the garbage collector will output in painful detail what happened.

-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintSafepointStatistics

From analysing the output I can see the application underwent a total of 29 GC cycles.  Pause times are listed below by extracting the lines from the output indicating when the application threads are stopped.
With System.gc() before each run
================================
Total time for which application threads were stopped: 0.0085280 seconds
Total time for which application threads were stopped: 0.7280530 seconds
Total time for which application threads were stopped: 8.1703460 seconds
Total time for which application threads were stopped: 5.6112210 seconds
Total time for which application threads were stopped: 1.2531370 seconds
Total time for which application threads were stopped: 7.6392250 seconds
Total time for which application threads were stopped: 5.7847050 seconds
Total time for which application threads were stopped: 1.3070470 seconds
Total time for which application threads were stopped: 8.2520880 seconds
Total time for which application threads were stopped: 6.0949910 seconds
Total time for which application threads were stopped: 1.3988480 seconds
Total time for which application threads were stopped: 8.1793240 seconds
Total time for which application threads were stopped: 6.4138720 seconds
Total time for which application threads were stopped: 4.4991670 seconds
Total time for which application threads were stopped: 4.5612290 seconds
Total time for which application threads were stopped: 0.3598490 seconds
Total time for which application threads were stopped: 0.7111000 seconds
Total time for which application threads were stopped: 1.4426750 seconds
Total time for which application threads were stopped: 1.5931500 seconds
Total time for which application threads were stopped: 10.9484920 seconds
Total time for which application threads were stopped: 7.0707230 seconds

Without System.gc() before each run
===================================
Test run times
0 - duration 12120ms
1 - duration 9439ms
2 - duration 9844ms
3 - duration 20933ms
4 - duration 23041ms

Total time for which application threads were stopped: 0.0170860 seconds
Total time for which application threads were stopped: 0.7915350 seconds
Total time for which application threads were stopped: 10.7153320 seconds
Total time for which application threads were stopped: 5.6234650 seconds
Total time for which application threads were stopped: 1.2689950 seconds
Total time for which application threads were stopped: 7.6238170 seconds
Total time for which application threads were stopped: 6.0114540 seconds
Total time for which application threads were stopped: 1.2990070 seconds
Total time for which application threads were stopped: 7.9918480 seconds
Total time for which application threads were stopped: 5.9997920 seconds
Total time for which application threads were stopped: 1.3430040 seconds
Total time for which application threads were stopped: 8.0759940 seconds
Total time for which application threads were stopped: 6.3980610 seconds
Total time for which application threads were stopped: 4.5572100 seconds
Total time for which application threads were stopped: 4.6193830 seconds
Total time for which application threads were stopped: 0.3877930 seconds
Total time for which application threads were stopped: 0.7429270 seconds
Total time for which application threads were stopped: 1.5248070 seconds
Total time for which application threads were stopped: 1.5312130 seconds
Total time for which application threads were stopped: 10.9120250 seconds
Total time for which application threads were stopped: 7.3528590 seconds
It can been seen from the output that a significant proportion of the time is spent in the garbage collector.  When your threads are stopped your application is not responsive.  These tests have been done with default GC settings.  It is possible to tune the GC for better results but this can be a highly skilled and significant effort.  The only JVM I know that copes well by not imposing long pause times, even under high-throughput conditions, is the Azul concurrent compacting collector.

When profiling this application, I can see that the majority of the time is spent allocating the objects and promoting them to the old generation because they do not fit in the young generation.  The initialisation costs can be removed from the timing but that is not realistic.  If the traditional Java approach is taken the state needs to be built up before the query can take place.  The end user of an application has to wait for the state to be built up and the query executed.

This test is really quite trivial.  Imagine working with similar data sets but at the 100 GB scale.

Note: When the garbage collector compacts a region, then objects that were next to each other can be moved far apart.  This can result in TLB and other cache misses.

Side Note On Serialization

A huge benefit of using off-heap structures in this manner is how they can be very easily serialised to network, or storage, by a simple memory copy as I have shown in the previous post.  This way we can completely bypass intermediate buffer and object allocation.

Conclusion

If you are willing to do some C style programming for large datasets it is possible to control the memory layout in Java by going off-heap.  If you do, the benefits in performance, compactness, and avoiding GC issues are significant.  However this is an approach that should not be used for all applications.  Its benefits are only noticable for very large datasets, or the extremes of performance in throughput and/or latency. 

I hope the Java community can collectively realise the importance of supporting structures both on the heap and the stack.  John Rose has done some excellent work in this area defining how tuples could be added to the JVM.  His talk on Arrays 2.0 from the JVM Language Summit this year is really worth a watch.  John discusses options for arrays of structures, and structures of arrays, in his talk.  If the tuples, as proposed by John, were available then the test described here could have comparable performance and be a more pleasant programming style.  The whole array of structures could be allocated in a single action thus bypassing the copy of individual objects across generations, and it would be stored in a compact contiguous fashion.  This would remove the significant GC issues for this class of problem.

Lately, I was comparing standard data structures between Java and .Net.  In some cases I observed a 6-10X performance advantage to .Net for things like maps and dictionaries when .Net used native structure support.  Let's get this into Java as soon as possible!

It is also pretty obvious from the results that if we are to use Java for real-time analysis on big data, then our standard garbage collectors need to significantly improve and support true concurrent operations.

[1] - To my knowledge the only JVM that deals well with very large heaps is Azul Zing