Following a journal

At LMAX Exchange, we generate a huge amount of market data. At the moment, we’re spending some time putting together some new applications that want to be able to access this data. This post will briefly explain one facet of this work, and, in particular, a couple of interesting bugs we had to fix along the way.

Two important requirements conspired to make this trickier than usual:

  • these applications must not be able to communicate back pressure to the trading exchange.

  • these applications should, in normal operation, have access to data that is within a few seconds of real time

The existing market data store

We already have a reliable on-disk journal of all of our market data that is written to in a timely fashion. We’ve also put in some equipment to allow us to join the stream of events the journal experiences in real time.

To cope with the "no back pressure" requirement, we may be disconnected from that stream at any point (if the stream sender detects any sign of not keeping up it will terminate our session).

Unfortunately, this market data stream is stateful, so if we get disconnected, we need to rejoin at the point we last saw a message. The source application can only keep a small buffer of the stream in memory, so we’re going to have to replay from disk until we’re back in range of the live buffer, and then rejoin it.

Journal following peril

The journal writer serializes market data events using the same equipment we use to stream them over the network – it just pops them in a file instead. Once a file reaches a certain size, we move to a new file and start a job in the background to compress the newly-completed-file.

Our replay solution looks a bit like this:

   10 receive replay request for timestamp
   20 locate files after timestamp
   30 replay those files
   40 attempt to rejoin live
   50 if unsuccessful, goto 20, but with timestamp = last replayed timestamp

We put this together. All our tests pass. We try it in a real environment and quickly encounter an EOFException.

We had, in our tests, conveniently made the gzipping of journal files synchronous. Unfortunately this was not true in reality, so it was perfectly possible to see a bundle of files like the following:

   journal-1.raw.gz
   journal-2.raw.gz
   journal-3.raw.gz <-- gzip in progress, partial file
   journal-3.raw <-- left in place until gzip complete
   journal-4.raw <-- currently live file

We considered trying to be clever and working out what gzips were in progress for a while. We played with some code that did that and it got big; we’ll try something else. How about if we gzip to a file extension that replay would not consider, and then perform a more atomic looking rename only when the file was complete? Simple, and, we thought, effective.

We triumphantly deployed this fix, and then immediately ran into a slightly knottier issue – one of our pieces of safety equipment detected a gap in sequence numbers during replay.

Sequence gaps

We stuck in some extra debugging information to help us out, and it only confused us further. The sequence gaps were within a file, not between files, and when we examined that file afterwards we found no sequence gaps in it. Usefully though, we did discover that we only had this issue when we were looking at the currently active file.

We had once again run into an issue that our single threaded tests could tell us nothing about. We looked deeper into the workings of the journaller and found our answer.

The inner workings of the journaller

Like most code on the hot path at work, the journaller has been put together thoughtfully. It deals with market data events in small batches (usually between one and twenty messages), and at the end of each batch, those events are guaranteed to be on disk.

Internally, it manages this by maintaining a 4KB buffer internally, and writing the entirety of that buffer to a memory mapped file at the end of every batch. It also calls ‘sync’ at the end of each batch to ensure the bytes have reached the disk. When the block is full (or an event appears that will not fit in the remaining space), the buffer is emptied and a new block is begun.

Now, the journal reader was originally built to cope with completed journal files, and so as soon as it sees any part of a block that looks empty (messages are written length prefixed, so any 0 length message is treated as end of block), it skips to the next block.

So, what’s our bug then? It’s the following sequence of events:

Journal writer thread               | Journal reader thread
writes message to block 1           | 
writes message to block 1           |
writes message to block 1           |
syncs block 1 to filesystem         |
writes message to block 1 (*)       | starts reading block 1
writes message to block 1 (*)       | reads all of block 1 into its own buffer
syncs block 1 to filesystem         | processing block 1 
block 1 ends                        | ...
writes message to block 2           | ...
syncs block 2 to filesystem         | ...
...                                 | reads all of block 2 into its own buffer

In that sequence, the reader misses the events annotated with (*) – it reads the block before those messages have been sync’d to the disk, sees what looks like an empty block remainder there, and by the time it is ready to read the next block, some messages are there for it to read, so it continues, and immediately discovers it has a sequence gap.

A few quick and unsafe fixes suggest themselves – can we detect if we’re reading the last block of a file? Can we read the block again until some condition is met? We thought about these and eventually discarded them. Instead, we introduced an extra piece of information to the journal reader – that of the maximal sequence that is currently available for read, defined as the last sequence written to the last complete block.

As soon as the journal reader reaches this sequence it assumes it is ‘near-live’ and attempts to rejoin the in-memory stream. Given the single threaded nature of the journal writer, informing the reader what is ‘safe’ is just a matter of updating a volatile long – nice and cheap.

We shipped this rather less triumphantly and discovered, to our immense relief, that we were now free of obvious bugs. Now we can get back to actually working with this data!

Conclusion

All of these bugs fell into the gaps where we had explicitly decided "that case is too tricky to put tests around, because it involves interactions across multiple racing threads, and tests like that invariably end up being intermittent and of low value".

Now, that’s not a bad plan, assuming you already trust the equipment that’s managing the inter thread communication. We didn’t think too much about this in our case because we picked a hidden mechanism for communicating – the on disk filesystem. While it can be sensible to let a database handle matters of concurrency for you, allowing a custom binary reader/writer pairing to do so via a bundle of files is perhaps less of a good idea.

With hindsight we’d probably have refactored a tiny working example of the reading/writing code out of the application and run it with same threading model. We could then have tested this exploratively in situ, and those tests might, if they were sound enough, have made it into the build somehow (they’d become ‘checks’ if you believe James Marcus Bach’s creed).

Executive Summary

At some point, we must choose to stop testing. Once the decision is made, any ‘untested’ behaviour has to be protected and documented in some other way, or it is very easily broken, or forgotten; not least by the original developers, who may be unnecessarily surprised when that untested behaviour is a bunch of gnarly bugs.

Round trip fuzz tests

Some cute round trip test tricks

In the last post, we looked at layering our deserialization code to keep things simple. This time, we’ll enjoy the delightful testing benefits this effort yields.

Round trip tests

We can do a round trip test whenever we pair an interface with some IO in the following fashion:

interface Service
{
    void onThing(Thing thing);
}

void bind(final InputStream input, final Service service)
{
    // This will deserialize method calls from the input and invoke
    // them on the 'real' implementation that's been passed in

    // We'll call this the receiver
}

Service bindToRemote(final Outputstream output)
{
    // This will create an implementation that serializes the calls it receives
    // onto the output stream. Presumably, a bound input sits somewhere at the other end.

    // We'll call this the transmitter
}

This approach can turn any an interface that only passes information from caller to callee – i.e tell don’t ask like you mean it, also known as a messaging contract, relatively easily. The symmetry of the arrangement is its power; whatever we call on the transmitter should (eventually) be called on the receiver.

N.B Inevitably at some point, I’m going to stop referring to method calls and start talking instead about messages. For me, method calls on messaging contracts === messages.

Let’s look at some concrete examples based on the two serialization problems we looked at last time.

Back to JsonElement et al

Reminder: our wire protocol has to take function calls on the following interface:

public interface AgentToServerApplicationProtocol
{
    void reportAgentPhase(AgentPhase agentPhase);

    void reportAgentStatus(AgentStatus status);

    void reportCommandResult(long correlationId, LocalCommandResult<JsonElement> result);
}

…and serialize them over a websocket connection.

Here are (some of) the classes we’ll be using. The ones under test are AgentToServerProxy and AgentToServerProtocolReceiver. As a pair, they provide a transparent interface to transport function calls between one physical host and another.

public interface Sender
{
    void send(final Consumer<OutputStream> sendee);
}

public class AgentToServerProxy implements AgentToServerApplicationProtocol
{
    private final Sender sender;

    public AgentToServerProxy(final Sender sender)
    {
        this.sender = sender;
    }
    // ... each method is json serialized, and we call
    // ... the sender appropriately to do so, like this:
    @Override
    public void reportAgentPhase(final AgentPhase agentPhase)
    {
        final String jsonMessage = toJson("reportAgentPhase", agentPhase);
        sendJson(jsonMessage);
    }

    private void sendJson(final String json)
    {
        sender.send(stream ->
        {
            try
            {
                stream.write(json.getBytes());
            }
            catch (final IOException ioe)
            {
                throw new UncheckedIOException(ioe);
            }
        });
    }
}

public class AgentToServerProtocolReceiver
{
    private final AgentToServerApplicationProtocol target;

    void dispatch(final InputStream stream)
    {
        // ... parses the json off the stream, and invokes the appropriate method on target...
    }
}

…and here’s our test code


private final AgentToServerApplicationProtocol application = 
    mockery.mock(AgentToServerApplicationProtocol.class);

private final AgentToServerProtocolReceiver agentToServerProtocolReceiver = 
    new AgentToServerProtocolReceiver(application);

private final AgentToServerProtocolSender sender = new AgentToServerProtocolSender(sendee ->
{
    final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
    sendee.accept(baos);
    final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    agentToServerProtocolReceiver.dispatch(bais);
});

@Test
public void roundTripReportAgentPhase()
{
    mockery.checking(new Expectations()
    {{
        application.reportAgentPhase(AgentPhase.FAILED);
    }});

    sender.reportAgentPhase(AgentPhase.FAILED);
}

Here, the ‘message’ of reportAgentPhase is serialized and deserialized in the same test. We assert that the method is appropriately called on the mocked target. N.B All of our method parameters need to override equals() correctly for this to work. Bring me data classes, java 10!

What’s particularly nice about this construction is that generalizing the test to multiple ‘messages’ is delightfully simple. We can write a method like the following:

private void assertApplicationRoundTrip(final Consumer<AgentToServerApplicationProtocol> consumer)
{
    mockery.checking(new Expectations()
    {{
        consumer.accept(oneOf(application));
    }});
    consumer.accept(sender);
}

and all our tests now just look like

@Test
public void roundTripReportAgentPhase()
{
    assertApplicationRoundTrip(application -> application.reportAgentPhase(AgentPhase.FAILED));
}

We can even go further than this. In our particular scenario here, we know that our InputStream always contains only a single message. In more challenging serialization environments, we could be called with a fragment, or a stream containing multiple RPCs. Can we test for those cases as well?

Example 2: Reducto

Yes we can. Never ask a rhetorical question that you do not know the answer to.

In our first example, we live in the happy world of websockets, where fragmentation is a long forgotten nightmare, and batching is something that happens to baked goods.

We don’t have the same fortune in reducto’s RPC layer. We kindly ask netty to connect agent and server via TCP, and it very effectively does so. We then rudely shove arbitrarily large blobs of binary at those connections (a single rpc could span several packets).

This means that we might not have as much symmetry as we thought:

This is what the transmitter might do

    receive method invocation at index 1 with args a and b
    send 1 + serialize(a) + serialize(b) to the connection
    receive method invocation at index 1 with args c and d
    send 1 + serialize(c) + serialize(d) to the connection
    receive method invocation at index 2 with no args
    send 2 to the connection

Now, depending on the size of a, b, c and d, the receiver might get:

    1. a single packet containing all of these invocations
    2. a packet containing the index 1, and the first few bytes of a
       a packet containing the middle few bytes of a
       ...and so on...
       a packet containing the end of d, and finally, 2, to indicate the last call
    3. ...
    4. Loss. Deep, deep, loss.

We still want to guarantee that the real receiver’s inner implementation gets those same messages in the same order, despite whatever batching and fragmentation occurs on the wire.

This doesn’t just need a round trip test. It needs a round trip fuzz test.

A round trip test with fuzzy batching

We can use much of the same equipment as we did previously – we invoke methods on our transmitter, and, at the end, verify that our receiver has had those same messages invoked on it, in the same order (although, given the nature of TCP, the ordering is perhaps less important).

The only difference is that we are going to reach into the passthrough byte streams, and chunk them arbitrarily (randomly, in fact).

We’ll look at the server to agent communication in reducto, because the interface is smaller.

import java.nio.ByteBuffer;
import java.time.ZonedDateTime;
import java.util.Optional;

public interface ServerToAgent {
    void installDefinitions(String className);

    void populateBucket(
        String cacheName,
        long currentBucketStart,
        long currentBucketEnd);

    void iterate(
        String cacheName,
        long iterationKey,
        ZonedDateTime from,
        ZonedDateTime toExclusive,
        String installingClass,
        String definitionName,
    Optional<ByteBuffer> wireFilterArgs);

    void defineCache(
        String cacheName,
        String cacheComponentFactoryClass);
}

We’ll also need the following interfaces:

public interface Channel
{
    void write(ByteBuf buffer);

    ByteBuf alloc(int messageLength);
}

interface FlushableChannel extends Channel
{
    void flush();
}

interface Invocation<T>
{
    void run(final T t);
}

…and then our implementation looks as follows:

    static <T> void runTest(
        final Mockery mockery,
        final Class<T> target,
        final Function<T, Consumer<ByteBuf>> invokerFactory,
        final Function<Channel, T> stubCreator,
        final List<Invocation<T>> possibleInvocations
    )
    {
        final Random random = new Random(System.currentTimeMillis());
        final T mock = mockery.mock(target, "thorough serialization target");
        final Consumer<ByteBuf> invoker = invokerFactory.apply(mock);

        runWith(mockery, stubCreator, possibleInvocations, random, mock, new RandomFragmentChannel(random, invoker));
    }


    private static <T> void runWith(
        final Mockery mockery,
        final Function<Channel, T> stubCreator,
        final List<Invocation<T>> possibleInvocations,
        final Random random,
        final T mock,
        final FlushableChannel channel)
    {
        final T stub = stubCreator.apply(channel);
        final List<Invocation<T>> invocations = generateList(
            random, r -> possibleInvocations.get(r.nextInt(possibleInvocations.size())));

        for (Invocation<T> invocation : invocations)
        {
            mockery.checking(
                new Expectations()
                {{
                    invocation.run(oneOf(mock));
                }}
            );
            invocation.run(stub);
        }

        channel.flush();
    }
Test data flow

The generic, T represents the interface we are attempting to remotely call. The stubCreator is misnamed – it actually creates the transmitter or proxy side. The invokerFactory creates the receiving end: it can wrap a given an instance of T, parsing messages from passed in ByteBufs.

We foolishly seed our Random with the current time. The magic is mostly in RandomFragmentChannel so we’d best have a good look at it.

private static class RandomFragmentChannel implements Channel, FlushableChannel
{
    private final ByteBufAllocator allocator;
    private final Random random;
    private final Consumer<ByteBuf> invoker;

    RandomFragmentChannel(Random random, Consumer<ByteBuf> invoker)
    {
        this.random = random;
        this.invoker = invoker;
        this.allocator = new UnpooledByteBufAllocator(false, true);
    }

    @Override
    public void write(ByteBuf buffer)
    {
        int delivered = 0;
        int toDeliver = buffer.readableBytes();
        while (delivered < toDeliver)
        {
            int remaining = toDeliver - delivered;
            int bufSize = 1 + random.nextInt(remaining);
            ByteBuf actual = allocator.buffer(bufSize);
            buffer.readBytes(actual);
            invoker.accept(actual);
            delivered += bufSize;
        }
    }

    @Override
    public ByteBuf alloc(int messageLength)
    {
        return allocator.buffer(messageLength);
    }

    @Override
    public void flush()
    {
    }
}
More test data flow

Looking at this closely, we can see that this class is quite well named. It only fragments; it never batches. Each buffer passed to it is delivered to the invoker within the same call; it may just end up being torn into several pieces before that happens.

We could go further, and equip our fragmenter with a buffer of its own, so it can both fragment and batch randomly. I…am not entirely sure why I did not do that at the time. Instead what seems to have happened is this: I added a differently behaving channel, and ran the fuzz test with both, as follows:

static <T> void runTest(
        final Mockery mockery,
        final Class<T> target,
        final Function<T, Consumer<ByteBuf>> invokerFactory,
        final Function<Channel, T> stubCreator,
        final List<Invocation<T>> possibleInvocations
    )
{
    final Random random = new Random(System.currentTimeMillis());
    final T mock = mockery.mock(target, "thorough serialization target");
    final Consumer<ByteBuf> invoker = invokerFactory.apply(mock);

    runWith(mockery, stubCreator, possibleInvocations, random, mock, new RandomFragmentChannel(random, invoker));
    
    runWith(mockery, stubCreator, possibleInvocations, random, mock, new OneGiantMessageChannel(invoker));
    // ^^^^^^^ this was mysteriously omitted earlier
}

private static class OneGiantMessageChannel implements Channel, FlushableChannel
{
    private final Consumer<ByteBuf> invoker;
    private final UnpooledByteBufAllocator allocator;
    private ByteBuf buffer;

    OneGiantMessageChannel(Consumer<ByteBuf> invoker)
    {
        this.invoker = invoker;
        this.allocator = new UnpooledByteBufAllocator(false, true);
        this.buffer = this.allocator.buffer(128);
    }

    @Override
    public void write(final ByteBuf inboundBuffer)
    {
        if (this.buffer.writableBytes() < inboundBuffer.readableBytes())
        {
            final ByteBuf newBuf = allocator.buffer(this.buffer.capacity() * 2);

            this.buffer.readBytes(newBuf, this.buffer.readableBytes());

            this.buffer = newBuf;
            write(inboundBuffer);
        }
        else
        {
            inboundBuffer.readBytes(this.buffer, inboundBuffer.readableBytes());
        }
    }

    @Override
    public ByteBuf alloc(int messageLength)
    {
        return allocator.buffer(messageLength);
    }

    @Override
    public void flush()
    {
        invoker.accept(buffer);
    }
}

An aside: I particularly like class names that start with One, because it immediately reminds me of old Radio 1 soundbytes where they were trying to advertise "One Big Sunday"; a universally awful music event, with a really good marketing team. The soundbyte mostly consisted of someone saying "One Big Sunday" in a rhythmically pleasing way. I can hear the class name "One Giant Message Channel" in that same voice.

Anyway. At some point, I should probably go and write the one true RandomFragmentingBatchingChannel implementation, and simplify this. Or there might be value in keeping all three.

Are we done? Not quite. We could do even better.

Even more randomness

Up until now, we’ve just been given a list of possible Invocations and then boldly claimed that, as they pass through serialization unharmed, despite batching or fragmentation, everything must be ok.

What does the call site of our tests look like? Have we covered the full space of Invocations?

private static final List<Invocation<ServerToAgent>> ALL_POSSIBLE_INVOCATIONS =
        Arrays.asList(
            agent -> agent.defineCache("foo", "bar"),
            agent -> agent.installDefinitions("foo"),
            agent -> agent.iterate(
                "foo",
                252252L,
                ZonedDateTime.ofInstant(Instant.ofEpochMilli(645646L), ZoneId.of("UTC")),
                ZonedDateTime.ofInstant(Instant.ofEpochMilli(54964797289L), ZoneId.of("UTC")),
                "bar",
                "baz",
                Optional.empty()),
            agent -> agent.populateBucket("foo", 23298L, 2352L)
        );

We have not. We could tweak the definition of runTest to take Function<Random, <List<Invocation<T>>> instead of List<Invocation<T>> for even more fuzzing power.

    static <T> void runTest(
        final Mockery mockery,
        final Class<T> target,
        final Function<T, Consumer<ByteBuf>> invokerFactory,
        final Function<Channel, T> stubCreator,
        final Function<Random, List<Invocation<T>>> possibleInvocations
    )
    {
        final Random random = new Random(System.currentTimeMillis());
        final T mock = mockery.mock(target, "thorough serialization target");
        final Consumer<ByteBuf> invoker = invokerFactory.apply(mock);

        runWith(mockery, stubCreator, possibleInvocations.apply(random), random, mock, new RandomFragmentChannel(random, invoker));
    }

That should find plenty of bugs, assuming we are capable of writing a sufficiently expressive function to generate those invocations.

Conclusion

We’ve put together a some fuzz tests for two serialization layers.

We’ve managed to avoid talking about the other name for this technique: Property Based Testing. What we’ve tried to do is prove the ‘symmetric rpc delivery’ property about both of our serialization libraries using a fuzz mechanism.

We could have picked one the QuickChecks and hypothesises (hypotheses?) of this world to help us out. Why? Well, they have features that our tiny framework here does not have; notably:

  1. They are capable of taking a randomly generated input that causes failure, and ‘shrinking’ it to a minimal failing example.
  2. They can much more easily tune precisely how many runs each particular test should get
  3. The choice of seed (and, later, its exposure to facilitate debugging) is more sensibly handled.

These frameworks might make the job of writing (and maintaining) these sorts of tests simpler; we mention them here mostly for completeness – we didn’t end up needing their power in either of our examples. I suspect that moving to randomly generated Invocations in the second case would be a good point to switch, however.

Finally, some food for thought. Up until now, I would have offered the following statement about the relationship between testability and software:

High quality software easily admits testing.

I should probably reduce that to

All the high quality software I have seen has foundational commitment to testability.

but let’s go with the slightly bolder version, so we can ask the following:

Would it be even better to say:

Higher quality software easily admits property based tests

?

You might not need to make your serialization layer generic

…indeed, your life might get simpler if you don’t.

This post will talk through two examples where clever serialization would have been an option, but stupid alternatives actually turned out to be preferable.

Example 1: An LMAX deployment tool

We tried to make a piece of one of our deployment tools a little better, and in doing so we broke it in a very interesting way.

Breaking it

Please be aware, the following code snippet is in groovy, not java.


@Override
void accept(String serviceName, List<TopicEndPoint> topics)
{
    // FIXME: this is highly suspicious. Why are we collecting as TEP when it is TEP already?
    List<TopicEndPoint> topicDetailsList = topics.collect { it as TopicEndPoint }

This code is in the part of this tool that checks that all of our reliable messaging components agree on sequence numbers. We couldn’t find any reason for that particular use of groovy‘s as operator, so we replaced it the following:

List<TopicEndPoint> topicDetailsList = topics == null ? Collections.emptyList() : topics;

That made all of our tests pass, so we felt safe to ship. One groovy idiom fewer (we’re trying to migrate all our groovy back to java; we’re not good groovy programmers, and we like our type safety enforced rather than optional).

Some time later

An email arrives from a colleague.

Hey folks,

I reverted your change to TopicSequenceMismatchFinder.groovy.

It bails out with an exception when we try to check the system is in sync in staging.

It's quite an odd exception:

"Cannot cast groovy.json.internal.LazyMap to TopicEndPoint"

Better luck next time!

Back to the drawing board

Oh well, we thought, at least the as is explicable now – it probably has the smarts to turn LazyMap into TopicEndPoint, assuming the structure of the decoded json looks right. We were somewhat peeved that List<LazyMap> had been allowed to masquerade as List<TopicEndPoint> all the way down to here, though, so we set off in search of precisely whereabouts this had been allowed to happen.

Those of you with a keen sense of inference will have guessed where we ended up – in a serialization layer.

How this deployment tool talks

We have a set of servers on which we run the various applications that make up an exchange instance. Each of those servers has a deployment agent process running on it to allow coordination of actions across that environment. Those agents all talk to the same deployment server which provides a single point to perform maintenance on that environment.

At the point where we check the system is in sync, the server sends each connected agent a command: reportTopicDetails. The agents respond asynchronously. When the server has received a response from every agent, it collects the results together and calls into our TopicSequenceMismatchFinder class to see if everything is OK.

server agent communication

Server -> Agent -> Server communication in a bit more detail

So, the server sends commands to the agent, and the agent (eventually) responds with the results. We wrap up the asynchronicity in a class named RemoteAgent which allows a part of the server to pretend that these RPCs are synchronous.

The trouble starts when we deserialize that response from the agent. A part of its response is generic; the full format roughly looks like this class.

public class LocalCommandResult<T>
{
    private static final Gson GSON = new Gson();

    private long correlationId;
    private boolean success;
    private Map<String, T> returnValue;
    private String details = "";
    private String hostName;
}

At the point where a response arrives from an agent, we look up the command that is in flight and ‘complete’ it with the deserialized response.

@Override
public void reportCommandResult(final long correlationId, LocalCommandResult result)
{
    this.lastUpdated = System.currentTimeMillis();
    final RemoteCommandResult status = inFlightCommandResultByCorrelationId.remove(correlationId);
    if (status != null)
    {
        status.complete(result);
    }
}

The keener eyed reader will already have noticed the lack of generics on that LocalCommandResult. Indeed this is the problem – the T varies depending on the command being handled by the agent.

The deserialization layer is highly clever in some ways; it’s been built to manage arbitrary RPCs from agent to server, not just command responses. This makes it quite difficult to instruct it what type to use to deserialize any given LocalCommandResult‘s inner details into; so it cheats, and leaves it as LazyMap, forcing that as cast later on. It’s also written in groovy, which we want to migrate away from.

We took a look at the different RPCs we sent from agent to server and discovered there were precisely four of them. Not nearly enough to require something as clever as the existing groovy! We quickly knocked together a pure java implementation. We also made reportCommandResult subtly different:

void reportCommandResult(long correlationId, LocalCommandResult<JsonElement> result)
                                                               ^^^^^^^^^^^^^

If it feels odd that our serialization layer calls us with an only partially deserialized result, this post is for you. Yes, we’re now done with the motivation part of this example, and we’re heading into the actual content.

Why partial deserialization?

Well, in this case, it’s a question of who knows what. We could, just before we fire off the command to the agent, inform the serialization layer what type to deserialize that command’s eventual result as.

This would mean storing two things by the correlation id in different layers. The serialization layer would hold a map of correlation id to expected return type, and RemoteAgent a map of correlation id to something that looks a bit like CompletableFuture<LocalCommandResult<...>>.

It feels uncontroversial to prefer the system that has to maintain less state. If we inform RemoteCommandResult what type it will be completed with (via TypeToken or similar), it can complete the parsing of the response when it is completed. It even has the option to transform the parse error into a failing result and propagate it upstream.

private final java.lang.reflect.Type type;

void complete(LocalCommandResult<JsonElement> result) {
    futureResult.set(parseTypedResult(result))
}

private LocalCommandResult parseTypedResult(LocalCommandResult<JsonElement> result)
{
    if (result.getReturnValue() == null)
    {
        return new LocalCommandResult<>(result.getCorrelationId(), result.getSuccess(), null, result.getDetails(), result.getHostName());
    }
    else
    {
        final Map<String, Object> actualResults = 
                result.getReturnValue().entrySet()
                        .stream()
                        .collect(
                            Collectors.toMap(
                               Map.Entry::getKey,
                               entry -> GSON.fromJson(entry.getValue(), type)));
        return new LocalCommandResult<>(result.getCorrelationId(), result.getSuccess(), actualResults, result.getDetails(), result.getHostName());
    }
}

What we’ve done here is choose a generic enough class for serialization that the java generic piece can be deferred to a different point in our program.

An option we ignored

At the time, we did not consider changing the sending side to make it include type information. That might have made this example moot, assuming that all of those types are addressable by the serialization layer.

Example 2: Reducto

Another example of generics-via-generic-object

In our first example, our universal type was JsonElement. We could have gone lower and used String. Or lower still and gone for ByteBuffer or byte[]. Fortunately, in the weeks before our problems with LazyMap I’d done just this in a side project, reducto.

Reducto is a very simple map reduce like project that focusses on data that is indexed by time (or, to be precise, indexed by a single, long, column). So every piece of data in reducto is (timestamp, something) where the type of something is user defined.

Items get grouped together in buckets that span a particular time period. Those buckets are then distributed across an arbitrary number of agent processes, and you can do the usual Amdahl sympathetic map reduce pattern across those agents in parallel.

For an end user, a reduction needs the following pieces:

public final class ReductionDefinition<T, U, F>
{
    public final Supplier<U> initialSupplier;
    public final BiConsumer<U, T> reduceOne;
    public final BiConsumer<U, U> reduceMany;
    public final Serializer<U> serializer;
    public final FilterDefinition<F, T> filterDefinition;

    public ReductionDefinition(
        Supplier<U> initialSupplier,
        BiConsumer<U, T> reduceOne,
        BiConsumer<U, U> reduceMany,
        Serializer<U> serializer,
        FilterDefinition<F, T> filterDefinition)
    {
        this.initialSupplier = initialSupplier;
        this.reduceOne = reduceOne;
        this.reduceMany = reduceMany;
        this.serializer = serializer;
        this.filterDefinition = filterDefinition;
    }
}

N.B I can hear the screaming already. Where are the functions? Why are those things all consumers? We’ll talk about this another time, I promise…

Now, this means that, at some point, reducto is going to have to send U values from agent to server, in order to assemble the completely reduced result.

Here’s the shape of the RPC between the agent and the server to do just that, with one type removed.

interface AgentToServer
{

    void bucketComplete(
             String agentId,
         String cacheName,
         long iterationKey,
             long currentBucketKey,
             ??? result);
}

Now, what should the type of result be? Does it help to know that, at the network layer, the agent is talking to the server over a TCP socket, using a custom binary protocol?

The loveliest answer would probably be Object, with the actual object magically being the correct U for whatever reduction is (con?)currently in progress.

We could certainly make this the case, but what would that mean for the design of the server?

Perhaps a better question would be: who knows how to deserialize that type from a binary blob? The reduction definition. How do we know which reduction definition this particular ‘message’ applies to? Well, we need to look at the cacheName and the iteration key. The parameters are coming from inside the RPC we’re trying to deserialize. To make this work, serialization would have to look like this:

    final String agentId = readString(stream);
    final String cacheName = readString(stream);
    final int iterationKey = readInt(stream);
    final int bucketKey = readInt(stream);
    final Deserializer deserializer = server.findDeserializer(cacheName, iterationKey);
    final Object o = deserializer.readObject(stream);
    server.bucketComplete(
        agentId,
        cacheName,
        iterationKey,
        bucketKey,
        o);     

That’s average for two reasons – we’ve broken the nice encapsulation the server previously exercised over serialization knowledge. Just as bad – what are we going to do first in the bucketComplete call? We’re going to look up this iteration using the same two keys again.

We could completely mix the deserialization and server layers to fix this, but that feels even worse.

We could, again, put some type information inside the stream, but that still would need some sort of lookup mechanism.

If you aren’t convinced these alternatives are worse, imagine writing tests for the serialization layer for each until you do. It shouldn’t take long.

What’s the answer? Surely you can guess!

    void bucketComplete(
         String agentId,
         String cacheName,
         long iterationKey,
         long currentBucketKey,
         ByteBuffer result);

Once again, we defer parsing that result until later; and again, at the point where we do that, the options for error handling are better. We keep reducto‘s own RPC layer blissfully unaware of whatever terrifying somethings end users might dream of. The TimeCacheServer interface remains a messaging contract. Tests are gloriously simple to write.

The only regrettable outcome is the lack of deployable generics voodoo; we’ll have to find somewhere else to score those points.

Conclusion

This envelope/layering trick turns up everywhere in the world of serialization. Most TCP or UDP packet handling code will throw up an immediate example – we don’t usually pass deserialization equipment to a socket – it passes us bytes now and again, assuming we will know what they are.

We shouldn’t be overly concerned if, at some point downstream, we pass some of those unparsed bytes (or even a parsed intermediate format) on to someone else, who might know better what to do with them.

Next time

Most of these decisions are driven from trying to write tests, and suddenly finding that it is hard to do so. Now we’ve got our encapsulation in the right place, what testing benefits will we receive?

Monitoring without polling

We have recently added an extra (optional) call back to the disruptor library. This post will walk through one of our motivations for doing this: monitoring.

Before we start – what are we monitoring, and why?

At LMAX, the vast majority of our applications look a bit like this:

The traditional LMAX architecture diagram

Here, events are received from the network on a single thread, and placed in an input (or application) ring buffer. Another thread (we’ll call it the application thread) processes these messages, and may emplace events in several output (or ‘publisher’) ring buffers. Each of those ring buffers has another thread dedicated to sending those events back out to the network.

Now, the vast majority of the time, everything just works – all our applications happily process the input they’ve been given, and the output they produce is handled capably by the publisher threads. Sometimes, however, things go wrong.

Two (eventually) catastrophic failure modes: hosed, and wedged

There are probably more agreed terms from queueing theory that we could adhere to, but these are more fun. A hosed system is processing data as fast as it can, but the input rate exceeds its maximum throughput, so its input ring buffer gradually fills up. A wedged system has encountered some sort of problem to cause its throughput to be hugely reduced, or to stop altogether.

Attempt 1: Monitor ringbuffer depth

We can scan the sequence numbers within the ringbuffer and see how far apart our writer and reader threads are.

public final class RingBufferDepthMonitor
{
    private final String ringBufferName;
    private final Cursored writeSequence;
    private final Sequence[] readSequences;
    private final int numberOfReadSequences;

    public RingBufferDepthMonitor(
            final String ringBufferName,
            final Cursored writeSequence,
            final Sequence... readSequences)
    {
        this.ringBufferName = ringBufferName;
        this.writeSequence = writeSequence;
        this.readSequences = readSequences;
        numberOfReadSequences = readSequences.length;
    }

    public long getRingBufferDepth()
    {
        long minimumReadSequence = Long.MAX_VALUE;
        for (int i = 0; i < numberOfReadSequences; i++)
        {
            minimumReadSequence = Math.min(minimumReadSequence, readSequences[i].get());
        }

        return Math.max(0, writeSequence.getCursor() - minimumReadSequence);
    }

    String getRingBufferName()
    {
        return ringBufferName;
    }
}

This is a naive but remarkable effective approach. It can spot most hosing/wedging issues, so long as the interval at which we poll is sufficiently small, and the ringbuffers we are looking at are of reasonable size compared to the input rate.

Note, however, that it is somewhat error prone – we can’t read every sequence (this code also works in some applications where there are multiple processing threads) atomically, so we’ll either underestimate or overestimate the depth, depending on the order in which we perform the reads. The choice made in this implementation is left as an exercise to the reader.

It isn’t, from a diagnostic perspective, the most efficient thing we could do. How do we know which of hosed or wedged we are? Well, we need to look at the read and write sequences progression, as well as the depth. We could have created a synthetic ‘depth’ metric derived from those numbers (as we can certainly arrange for them to be monitored near simultaneously) rather than calculating it directly in our applications. We’ve only recently gained the ability to do this (and it’s limited to our alerting tool) though, so this code persists.

The problem with polling

No, not for elections. For monitoring. Perhaps we should say sampling rather than polling, to disambiguate. Anyway. The problem is that sampling will only ever spot problems that have a duration in the same order as the sampling interval.

A picture of a pathological (and, frankly unrealistic) sampling conditions

We can repeatedly tighten the interval for our ringbuffer monitoring, but this feels wasteful. Is there a better way? Let’s have a look at how our processing threads take data out of our ring buffer.

BatchEventProcessor

Typically, an interaction with a ringbuffer might look a bit like this:

1. What's my sequence? n
2. Is item (n+1) ready?
   a.) yes -> process item (n+1), update my sequence to (n+1)
   b.) no -> wait a bit, then go back to the beginning of 2.

In reality however, this is not necessarily the most efficient way to proceed. If we consider the case where entries (n + i) for i in [0, 10] are already ready for processing, we’ll query each of those items for readiness (which must incur some cost, as readiness depends on the action of another thread).

We could instead do something like the following:

1. What's my sequence? n
2. What is the maximum available sequence now? n + i
   a.) If i > 0, process items [n + 1, n + i]
   b.) Otherwise, wait a bit, then go back to the beginning of 2

In the case where there’s only a single writer (a condition we eagerly seek at LMAX), obtaining the maximum available sequence is just a volatile read, so we’ve gone from i volatile reads, to 1, regardless of the size of i.

Let’s have a look at the actual code that does this in the disruptor library.

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

sequence.set(availableSequence);

This looks a bit like the pseudo code. The only major difference is that the ‘waiting a bit’ is hidden within sequenceBarrier.waitFor. This is a deliberately polymorphic piece of code to allow the wait behaviour at that point to be configurable (you can spin, sleep, park, etc).

Does this help us in our enquiries? A little bit.

Batch size as a proxy for wedged/hosedness

Remember: we’re trying to detect issues that have a duration smaller than the polling interval without having to waste resources polling all the time. One extra fact to introduce – typically our polling interval is around a second, and in the places we really care about, any wedging/hosing event lasting milliseconds is of interest to us.

In these conditions, we can use i, or batch size, to let us know when something interesting is happening at a fine grained level. Why? Well, how did we get to a point where so many messages were available. Either

a.) The writer thread received a burst of messages from the network or b.) The processing thread spent more time processing than usual

Here, case a is ‘hosed’ and b is ‘wedged’, to a certain degree. In reality, of course, both of these things could be true.

Theoretically, we can now spot smaller hosing and wedging incidents. Recording and monitoring them turns out to be the trickier part.

Batch size tracking

Unfortunately, the nature of batching makes it difficult to discover the size of batches experienced by a processing thread from, uh, anywhere other than the processing thread. In general, that thread has already got a lot of important, money earning work to do, and ideally should not have to perform any monitoring, if possible.

We could push every batch size from that thread to another, via another buffer, but that would penalize a well performing system that’s seeing a large number of very small batches. We could publish batches only over a configured size, but that sounds fiddly. What we’d really like is a histogram of batch sizes across a particular interval.

What we currently have is a single datapoint from that histogram: the maximum batch size. We publish this (roughly) every second. We will avoid diving into why it is ‘rough’ in this post, though.

private long batchSize;

@Override
public void onEvent(final T event, final long sequence, final boolean endOfBatch) throws Exception
{
    eventHandler.onEvent(event, sequence, endOfBatch);
    batchSize++;

    if (endOfBatch)
    {
        batchSizeListener.accept(batchSize);
        batchSize = 0;
    }
}

Here. our batch size listener has the smarts to maintain a maximum, and appropriately communicate with a monitoring thread to achieve our goal of publishing a max batch size per second.

In the latest disruptor, we can avoid having to write this code and simply implement BatchStartAware in any event handler that cares.

What’s left?

When we see a large maximum batch size in a particular interval – is that a little wedge event, or a little hose event? Are there any other species of problem that could confound us, too?

Well, at this point, we start to run into issues that are somewhat out of our control. Our networking input threads are reading from circular buffers too, but if they behave in a batched fashion, they don’t make that behaviour clear in the shape of their callback. So, rather than ruling in a burst in input, we’ll perhaps have to rule out a problem in processing.

It’s nanotime, er, time

We’re reaching the point where what we actually want is a lightweight profiler. This is a topic for another post, because we very quickly start having to understand what’s in those ring buffer entries.

To sum up

If you want to avoid polling for depth, look at your writer and reader’s execution patterns to see if there’s anything you could exploit instead. If you’re using disruptor, why not try out BatchSizeAware?

Looking for heap distress

​​​​​Recently at LMAX we’ve had a couple of services suffer from memory leaks.

In both cases, we noticed the problem much later than we’d like. One stricken application started to apply backpressure on (really rather important) upstream components. Another caused an account related action to be intermittently unavailable.

What we’d like to be able to do is detect when any of our java services is in acute heap distress as soon as it enters that state. We already have monitoring around causes of backpressure; we want to spot our issue a little earlier on in the causality graph (that’s two ridiculous names you’ve invented just in one paragraph. Enough! – Ed).

uh oh

Handily, each service already publishes its heap usage and gc times to our home-grown monitoring app. Even better, our alerting application already streams live events from that monitor into some alerting rules, so we have some existing scaffolding to build on top of.

We reworked the way the existing streaming sensors in our alerting app worked until we had something like the following:


void addStreamingSensor(
    final String keyPattern,
    final Consumer<MonitoringEvent> consumer,
    final FaultSensor faultSensor);

interface FaultSensor {
    Collection<Fault> sense();
}

class MonitoringEvent {
    public final long epochMillis;
    public final String key;
    public final double value;

    public MonitoringEvent(long epochMillis, String key, double value) {
        this.epochMillis = epochMillis;
        this.key = key;
        this.value = value;
    }
}

class Fault {
    public final String description;
    public final String url;

    public Fault(final String description, final String url) {
        this.description = description;
        this.url = url;
    }
}

This construction will set things up so that monitoring events matching our key pattern are delivered to our consumer.

Side note: we ended up with the following signature, mostly because using this little known java feature always makes me smile:

    <T> extends Consumer<MonitoringEvent> & FaultSensor> void addStreamingSensor(final String keyPattern, final T t);

Occasionally, a different thread will call sense to see if any faults are currently present.

In our case, the keys we have available to us (per application, per host) are ‘heap.used’, ‘heap.max’ (both in bytes) and ‘gc.pause’. Can we reliably detect heap distress from such a stream of events?

Bad ideas we considered

Percentage of wall clock time spent in GC

We could work out what percentage of time an application is spending in GC, and alert if it goes above a threshold.

This isn’t too bad an idea. Unfortunately, that metric is published at most once per second per application, and it lists the maximum pause time seen in that second, not the sum. We could fix this, but we’d have to wait for a software release to get feedback.

Persistent memory usage above a threshold percentage

One thing we see in both of our failing applications is that they see consistent heap usage somewhere near to their max value. We could keep track of how many observations we see above a particular usage threshold, and once that observation count reaches a large enough number, alert.

This would detect errors like the ones that we would see, but it would also detect slow growing heaps in well behaved applications. Alert code that produces more false alarms than real ones is, from our experience, worse than no alerting at all.

    @Test
    public void slowlyRisingUsageIsNotAFault()
    {
        useHealthyHeapThresholdOf(90, Percent);

        reportHeapMax("hostOne", "serviceOne", 100.0);
        reportHeapUsage("hostOne", "serviceOne", 65.0);
        reportHeapUsage("hostOne", "serviceOne", 75.0);
        reportHeapUsage("hostOne", "serviceOne", 85.0);
        reportHeapUsage("hostOne", "serviceOne", 95.0);

        assertFaultFree();
    }

Alerting if an inferred GC reclaims less than a certain percentage of memory

We can keep hold of the last seen usage, and when a new value arrives, check if it is smaller. If it is, we can infer that a GC has occurred. If the amount of memory freed by that GC is too small, we could alert.

Closer still, but can still generate some false positives. A collection can be triggered even when percentage heap usage is low; at that point it might be impossible to reclaim the required amount.

    @Test
    public void punyAmountsFreedByGcAreNotFaultsIfEnoughHeapIsStillFree()
    {
        useHealthyHeapThresholdOf(90, Percent);

        reportHeapMax("hostOne", "serviceOne", 100.0);

        reportHeapUsage("hostOne", "serviceOne", 85.0);
        reportHeapUsage("hostOne", "serviceOne", 86.0);
        reportHeapUsage("hostOne", "serviceOne", 87.0);
        reportHeapUsage("hostOne", "serviceOne", 85.0);

        assertFaultFree();
    }

Our eventual solution

Alert if, after an inferred GC, more than x% of the heap is still in use.

Still not perfect, but just about good enough. Why? Well, we see multiple ‘heap.used’ metrics published across a GC cycle, some of which may still be above the warning usage threshold. Those intermediate values could cause false positives.

In practice, we found that we call ‘currentFaults’ infrequently enough to not be a problem. If we did, it would be the work of a couple of minutes to create a wrapper for our detector that only reported faults that had been present for multiple seconds.

    @Test
    public void afterAnInferredGcFaultIsReportedUntilHeapDropsBelowThreshold()
    {
        useHealthyHeapThresholdOf(90, Percent);

        reportHeapMax("hostOne", "serviceOne", 100.0);
        reportHeapUsage("hostOne", "serviceOne", 85.0);
        reportHeapUsage("hostOne", "serviceOne", 95.0);
        reportHeapUsage("hostOne", "serviceOne", 93.0);

        assertFaulty("hostOne", "serviceOne");

        reportHeapUsage("hostOne", "serviceOne", 90.1);

        assertFaulty("hostOne", "serviceOne");

        reportHeapUsage("hostOne", "serviceOne", 89.0);

        assertFaultFree();
    }

Bugs we totally missed

Some applications run on multiple hosts in the same environment

This one was a bit silly.

We run our applications in six or seven different ‘environments’ that we consider important enough to monitor. Each environment might have application ‘x’ deployed on hosts ‘h1’ and ‘h2’. We managed to not test this at all in our spike implementation and immediately generated large numbers of false positives. Whoops.

    @Test
    public void servicesOnDifferentHostsDoNotInterfereWithEachOther()
    {
        useHealthyHeapThresholdOf(90, Percent);

        reportHeapMax("hostOne", "serviceOne", 100.0);
        reportHeapMax("hostTwo", "serviceOne", 100.0);

        reportHeapUsage("hostOne", "serviceOne", 85.0);
        reportHeapUsage("hostOne", "serviceOne", 95.0);
        reportHeapUsage("hostTwo", "serviceOne", 93.0);

        assertFaultFree();
    }

More mysterious false positives

Having now written enough tests to convince ourselves that we weren’t totally inept, we launched the alerting app locally. We turned the warning threshold down to 10% to see if it spotted anything, and immediately saw some alerts. Clicking each alert (the ‘url’ feature of a fault is used to allow a user to see the data upon which the decision to alert was raised) led us to the following chart.

no sign of any GC activity

The data underlying the chart looks like this:

epoch millis ,heap usage (bytes)
1494497832745,1.477746688E9
1494497833564,1.479913472E9
1494497833745,1.479913472E9
1494497834564,1.48424704E9
1494497834745,1.48424704E9
1494497835564,1.488580608E9
1494497835745,1.488580608E9
1494497836564,1.492914176E9
1494497836745,1.492914176E9
1494497837564,1.49508096E9
1494497837745,1.49508096E9
1494497838564,1.49508096E9
1494497838745,1.49508096E9
1494497839564,1.497247744E9
1494497839745,1.497247744E9
1494497840564,1.497247744E9

This was a bit odd. We added some judicious println at the point where we raised the alert, and found we had received the following:

Raising alert: heap dropped from 1.49508096E9 to 1.492914176E9

At this point, a light went on in my head, and we went back to log not only the values we were seeing, but also the timestamps of the monitoring events.

Raising alert: heap dropped from 1.49508096E9 (at 1494497837564) to 1.492914176E9 (at 1494497836745)

Aha. We appear to have gone back in time. How is this happening, though?

Well, we deliberately use UDP to transmit metrics from our applications to our monitoring. We’re OK with occasionally losing some metrics, we’re definitely not happy if being unable to send them prevents our app from actually running. This is a potentially controversial choice, let’s agree to not discuss it here :-).

When metrics arrive at the monitoring app, it has two jobs. One, to squirrel the data into a time series database on disk. Two, to transmit the data to appropriately subscribed websocket clients.

The chart and raw data above are both served, post hoc, via the on disk store. The websocket feed is not. The UDP packets of monitoring data were arriving out of order, and it was truthfully forwarding those metrics to our alerting in the same way.

We could probably fix that with some judiciously applied buffering, but given that each event we see is timestamped, we can simply drop events that are timestamped earlier than the last value we saw for a given service/host pair.

    @Test
    public void doNotReportFaultsBasedOnOutOfOrderData()
    {
        useHealthyHeapThresholdOf(90, Percent);

        reportHeapMax("hostOne", "serviceOne", 100.0);
        
        reportHeapUsage("hostOne", "serviceOne", 85.0);
        currentTime += 10;
        reportHeapUsage("hostOne", "serviceOne", 95.0);
        currentTime -= 5;
        reportHeapUsage("hostOne", "serviceOne", 93.0);

        assertFaultFree();
    }

Moral(s) of the story

Even the simplest thing needs more thinking about than you might normally expect! We got things wrong a lot, but it didn’t matter, because:

  • we had a very testable interface, so we could improve quickly
  • we could run the whole app locally to tell us what tests we missed

A performance mystery (part two)

In part one, we discovered that our multicast receipt thread was being stalled by page faults.

In part two, we’ll dig down into the causes of those page faults, and with some help from our friends at Informatica, get to the bottom of things.

Systems Cavalry Arrives

Let’s focus on just the first stack trace to begin with.

We can have a bit of a guess at what’s going on just by looking at the symbol names. Let’s flip the stack so we can go from out to in:

  1. Java_com_latencybusters_lbm_LBMContext_lbmContextProcessEvents [/opt/lbm/lib/liblbmj.so.5.0.0]

This is the function that the LBM thread repeatedly calls. This is presumably a java wrapper for the native library’s equivalent.

  1. lbm_context_process_events [/opt/lbm/lib/liblbm.so.7.0.1]

The native version of process events

  1. lbm_timer_expire [/opt/lbm/lib/liblbm.so.7.0.1]

Somewhere in LBM, a timer has expired

  1. lbm_rate_ctlr_handle_timer [/opt/lbm/lib/liblbm.so.7.0.1]

Somewhere in LBM‘s rate controller, we’re handling timer expiry?

  1. _dl_runtime_resolve [/lib64/ld-2.12.so]

Which means we need resolve a symbol?

  1. _dl_fixup [/lib64/ld-2.12.so]

Which needs fixing up?

This already feels odd. Why are we resolving a symbol more than fifteen minutes after we started our application? Is there code in LBM that is only triggered for the first time at this point in the performance run?

What we want to know is what happens at

 0x7efeba91b7a3 : lbm_rate_ctlr_handle_timer+0x9e3/0xbe0 [/opt/lbm/lib/liblbm.so.7.0.1]

Let’s see if we can work that out.

objdump

The stack trace tells us where to look for a symbol, so we invoke

# objdump -D /opt/lbm/lib/liblbm.so.7.0.1 > ~/lbm-symbols.out
# less ~/lbm-symbols.out
# /lbm_rate_ctlr_handle_timer
000000000015ddc0 <lbm_rate_ctlr_handle_timer>:
  15ddc0:       41 57                   push   %r15
  15ddc2:       41 56                   push   %r14

The stack tells us we are +0x9e3 from the start of that function, which should be at 15ddc0 + 9e3 = 15e7a3

  15e799:       e8 62 af f4 ff          callq  a9700 <pthread_mutex_unlock@plt>
  15e79e:       e8 0d 26 f4 ff          callq  a0db0 <sched_yield@plt>
  15e7a3:       48 8b 7c 24 18          mov    0x18(%rsp),%rdi
  15e7a8:       e8 13 a2 f4 ff          callq  a89c0 <pthread_mutex_lock@plt>
  15e7ad:       e9 ab fc ff ff          jmpq   15e45d <lbm_rate_ctlr_handle_timer+0x69d>

We guess that the mov instruction can’t really be requiring symbol resolution. It seems more likely that the instruction immediately before, to callq a0db0 <sched_yield@plt>, is responsible.

We theorise that LBM is calling sched_yield, and for some reason, it doesn’t already know where sched_yield is. ld is enlisted to find it for us, and that takes a while; perhaps longer than usual because of the memory pressure the system is under.

We’ll take a brief detour to explain how our application is supposed to work out where functions like sched_yield are, then expand our theory based on that.

ELF

No, not Emerson, Lake and Farmer. And yes, before you ask, yes, DWARF and ELF are from the same people.

ELF is a file format used by binaries (and binary libraries).

# file /opt/lbm/lib/liblbm.so.7.0.1
/opt/lbm/lib/liblbm.so.7.0.1: ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, not stripped

The @plt we see in the output of objdump expands to procedure linkage table. This is a specific section of the ELF format which lists the symbols that were not available at link time; i.e when this library was being built at Informatica. This is sensible – we definitely want to have LBM call the currently running OS’s implementation of sched_yield rather than having the build machine’s version statically linked into it.

What happens when we call sched_yield for the first time in liblbm.so.7.0.1, though? Well, it depends who our program interpreter is. We don’t expect liblbm.so.7.0.1 to have an ELF interpreter entry, as it’s a library rather than a binary; our java binary should, though.

# readelf -l /opt/zvm/bin/java

Elf file type is EXEC (Executable file)
Entry point 0x400540
There are 8 program headers, starting at offset 64

Program Headers:
  Type           Offset             VirtAddr           PhysAddr
                 FileSiz            MemSiz              Flags  Align
  PHDR           0x0000000000000040 0x0000000000400040 0x0000000000400040
                 0x00000000000001c0 0x00000000000001c0  R E    8
  INTERP         0x0000000000000200 0x0000000000400200 0x0000000000400200
                 0x000000000000001c 0x000000000000001c  R      1
      [Requesting program interpreter: /lib64/ld-linux-x86-64.so.2]

So, when we load the native LBM library during JNI initialisation, the LBM library .sos will be interpreted by /lib64/ld-linux-x86-64.so.2.

By default, this interpreter lazily initialises references in the plt. Just as malloced memory might only be assigned a physical memory page when we attempt to write to it, the runtime will only resolve (and then cache) a symbol location in the plt at first invocation time.

With this knowledge, we are down to three possibilities.

  1. This is the very first time LBM has invoked sched_yield (possible, but unlikely)
  2. Our dynamic linker is misbehaving (astronomically unlikely)
  3. Our understanding of how this is supposed to work is incorrect (likely)

There’s only one thing for it: it’s time to write a tiny C program.

#cat call_sched_yield.c
#include <sched.h>

int main(int argc, char* argv[]) {
  sched_yield();
  sched_yield();

  return 0;
}

We compile this program:

gcc call_sched_yield.c

Examine its ELF interpreter:

# readelf -l a.out

Elf file type is EXEC (Executable file)
Entry point 0x4003e0
There are 8 program headers, starting at offset 64

Program Headers:
  Type           Offset             VirtAddr           PhysAddr
                 FileSiz            MemSiz              Flags  Align
  PHDR           0x0000000000000040 0x0000000000400040 0x0000000000400040
                 0x00000000000001c0 0x00000000000001c0  R E    8
  INTERP         0x0000000000000200 0x0000000000400200 0x0000000000400200
                 0x000000000000001c 0x000000000000001c  R      1
      [Requesting program interpreter: /lib64/ld-linux-x86-64.so.2]

and then run it with some special ld environment variables to see if our understanding is correct.

# LD_DEBUG=all ./a.out > ld_debug_one.out 2>&1
# LD_BIND_NOW=y LD_DEBUG=all ./a.out > ld_debug_two.out 2>&1

LD_DEBUG is an instruction to ld-linux to print debugging information about the work the linker is doing. LD_BIND_NOW is an instruction to ld-linux to override the lazy initialisation behaviour and load all symbols in the plt at start time.

# tail -n 15 debug_one.out 
     74235: 
     74235: initialize program: ./a.out
     74235: 
     74235: 
     74235: transferring control: ./a.out
     74235: 
     74235: symbol=sched_yield;  lookup in file=./a.out [0]
     74235: symbol=sched_yield;  lookup in file=/lib64/libc.so.6 [0]
     74235: binding file ./a.out [0] to /lib64/libc.so.6 [0]: normal symbol `sched_yield&#39; [GLIBC_2.2.5]
     74235: 
     74235: calling fini: ./a.out [0]
     74235: 
     74235: 
     74235: calling fini: /lib64/libc.so.6 [0]
     74235: 
     
# tail -n 11 debug_ld_two.out
     76976: initialize program: ./a.out
     76976: 
     76976: 
     76976: transferring control: ./a.out
     76976: 
     76976: 
     76976: calling fini: ./a.out [0]
     76976: 
     76976: 
     76976: calling fini: /lib64/libc.so.6 [0]
     76976: 

‘transferring control’ is the point at which our program’s main is entered. We can see that without LD_BIND_NOW, the first call to sched_yield triggers a lookup. That lookup succeeds and then nothing else happens until the program terminates.

With LD_BIND_NOW=y, however, we see no ld behaviour at all during main.

Our small test program has the same interpreter as java, and behaves exactly as we expect. We should start to consider case 1 (LBM calling sched_yield for the first time) as a more serious contender.

How can we test this? Well, let’s set LD_BIND_NOW=y for the market data service and see what happens.

Result!

In this chart, each block represents a run through the performance environment. The calming blue blocks are NAK free, the yellow blocks saw LBM recovery, and the red blocks experienced a NAK. Guess where we set LD_BIND_NOW=y. 1

Five days of NAKs 

Conclusions

  1. Record all the things. The number of tables in our perftest schema has grown to seventeen. Only three of those contain the latency distributions we originally set out to measure; much of this investigation was made easier by the data in the others.

  2. Abstractions are great. No way do I want to have to think about this stuff all the time.

  3. We will sometimes, unfortunately, have to look behind the curtain and figure out what is going on. Linux has phenomenal tooling to help do this, even in cases where you don’t have the source code (although I think we’d have really struggled if the LBM libs were stripped).

  4. systemtap really shines in problems of this ilk, where you need to capture a lot of detail about a single event.

Epilogue

We still have a few things to explain. It’s still weird that this is the first time LBM calls sched_yield. What’s triggering that behaviour?

We got an excellent response from Informatica explaining how this might happen:

 
Looking through the traces and the UM source code, the
sched_yield() in the lbm_rate_ctlr_handle_timer() function will only
occur under very rare circumstances.

The context has to hit the LBTRM or LBTRU rate limit.  On wakeup, the
lbm_rate_ctlr_handle_timer() function attempts to "wakeup" all of the
blocked transports.  The wakeup can fail due to an internal lock being
held.  In that particular case, the lbm_rate_ctlr_handle_timer() will
call sched_yield().

One other thing that the market data service does is publish orderbook snapshots to a market data viewer application. It does this once per second. It turns out that sending all the state of all the orderbooks in our performance environment can occasionally trigger LBM‘s rate limiting.

Blocked Send Count
N.B That key records how many times a send call was blocked in a reporting interval – so we block one or two writes most seconds.

What we didn’t know was that hitting a rate limit on a publisher could potentially cause jitter on the receive path. Informatica have plans to amend this in future versions, and we can work around it by bringing rate limiting inside our own code.

This also explains some other anomalies we discovered when we analysed when NAKs happened:

  1. We only ever had one market data service problem per run. This matches exactly with the lazy initialisation of the plt.
  2. The growth in the depth of the OrderBookEvents socket buffer always began within 20ms of a second boundary. This matches with the trigger for republishing orderbook snapshots in the market data service (the start of a second), and the rate limiting interval we give LBM (20ms).

Unanswered questions

We didn’t look into the other stack traces captured by stap. We can reasonably assume they had the same cause, as they disappear with the adoption of LD_BIND_NOW.

We didn’t look too deeply at why symbol resolution is causing a pagefault that proves so difficult to service. Other processes are surely calling sched_yield frequently, so how does looking it up take so long? We might get to this in a future post.

And finally…

We should point out that there was a great deal more trial and error involved in this investigation than this post really communicates.

Much of the monitoring information we used exists only because we were stuck, had a theory about what the problem could be, then added monitoring to confirm or disprove that theory. It took us a lot of those cycles (and a fair few annoyances like perf‘s timestamp behaviour) over a number of months before we had enough information to dive in with system tap.


  1. This picture was taken a little after we originally turned LD_BIND_NOW on. The first runs that were free of market data service NAKs showed us that another important application in the environment been suffering packet drops since its host was last rebooted. This was adding more red to the NAK display on the dashboard, but we’d missed the issue because we had grown so used to the red. We fixed the packet loss, disabled LD_BIND_NOW to show how bad things were, then enabled it again to generate the unbroken sequence of calm blue blocks afterward.

A performance mystery (part one)

We recently fixed a long standing performance issue at LMAX.

The path we followed to fixing it was sufficiently windy to merit a couple of posts.

In this first post we’ll define our issue and then attempt to figure out its cause.

Problem Identified

At LMAX we have an application named the market data service. It’s multicast receipt thread is not keeping up – it occasionally stalls for around 150ms. This causes data loss; the service has to request retransmission (it NAKs) from upstream. This causes a degradation in system latency, which we want to avoid.

Two quick definitions:

LBM: the technology we use to talk multicast (this is an Informatica product). Under the hood, it uses epoll_wait.

OrderBookEvents: the name of the high volume multicast topic we’re not keeping up with.

Ruling out the obvious suspects

Here are a few things we ruled out early on:

  1. A burst of traffic

We took packet captures and arrival rate was constant. We took captures on other subscribers to the same data and the arrival rate was constant there, too.

  1. Preemption of multicast receipt thread

We’ve talked about reducing this sort of jitter here and here.

The code and configuration we use to do multicast receipt is shared across several applications. We’d expect to see this issue everywhere if we’d managed to screw this up, and we don’t. 1

  1. LMAX code being slow to respond to data passed up from LBM

The LBM thread calls us back with data from that topic. Helpfully it also records a statistic that captures the maximum callback time for a given interval. It is not big enough to explain the issue.

Worried that the sampling rate of that metric might be hiding an issue, we added our own monitoring to time each individual callback. This also found no evidence of a problem large enough to be responsible.

  1. Getting blocked in epoll_wait for longer than the interval passed to it.

We configured our LBM thread to ‘spin’. We pass it a parameter to instruct it such that its calls to epoll_wait specify a timeout of 0. This tells epoll_wait to return immediately if no events are present. The problem remained.

Let’s zoom out a little.

What’s different about the Market Data Service?

Or, better: why isn’t this happening to any of the other OrderBookEvents subscribers? We have several running in this environment and they behave flawlessly.

Well, it does a lot more disk IO. The other subscribers tend to be gateways. This is LMAX speak for "translates internal LMAX multicast traffic into different format client sockets". One example might be a FiX market data application.

The market data service is special; it journals the OrderBookEvents topic to disk, creating a system of record. It maintains another disk based format to support market surveillance. If that weren’t enough, it also manages the recording of various csv files that end up being part of our market data API. We didn’t do a tremendous job of keeping this application simple!

We might think that the root cause is slow disk access, but this cannot be the case. The writes to disk occur either on or downstream of our business logic thread, not the LBM thread. The metrics we capture on the buffer between LBM and those threads show us that we’re easily keeping up.

We’ve ruled out preemption via isolation. We’ve ruled out a bursty network. We’ve ruled out the effect of slow disk writes.

What other shared system resources are there? Are they able to make the market data service behave so much worse than our other applications?

The Linux Memory Subsystem

The programming interface for much of the market data service might well be files, but at runtime, Linux may cache many of those files in memory. A brief google for "Linux Page Cache" will find a variety of introductions to this topic; we won’t go into it here.

A picture might speed us up a little bit here. Here’s a chart of how much system memory is free across a performance run, in kilobytes.

Eating Memory Under Pressure

We can see that around fifteen minutes into the run, we’ve eaten up all but 2Gb of the available RAM. The shape of the graph changes into a sawtooth. We’re still eating memory, but the OS periodically frees some by evicting the least recently used files from memory.

If this is our issue, we’d guess that all of our NAKs should be occurring after the system enters that sawtooth area. Indeed, a brief look at the timings in the performance test results database shows this to be the case.

How could this be causing problems on the LBM thread, though? It shouldn’t be touching the file system, so whether a file is cached or not should not impact it. Perhaps it attempts to allocate memory at a really inopportune moment and the page cache has to evict some files back to the disk in order to supply it.

In the parlance of the memory subsystem this is a major page fault. Let’s see if we can capture the page faults occurring on the LBM thread and correlate them with an OrderBookEvents NAK.

Capturing page faults

We want to be able to see every page fault on the LBM thread encounters that takes more than x milliseconds. There’s a few ways to do this; let’s pick one.

  1. perf

This is, apparently, a one liner.

perf record -e major-faults -ag

This records all major faults system wide – we could add more flags to make it process specific.

A couple of things are a bit harder to do at this point:

  • how do we work out how long it takes to service a fault?
  • how do we correlate the fault timing with our NAKs?

The second is the real kicker. perf, by default, uses the system uptime as the ‘time’ of an event it records. perf record‘s man page claims that -T also samples epoch timestamps, and that perf report -D can view them. As far as I could work out, the timestamps are buried somewhere in the binary part shown to you in the report.

The pain of having to parse the perf format and rework the timestamps was enough to put me off.

  1. ftrace

While this has sensible timestamps; how do we measure the duration of a page fault rather than just measuring when they occur?

  1. systemtap

This is what I ended up using.

Not least because googling "systemtap trace page faults" takes you straight to a working example.

Systemtap

Let’s not go into any detail about how systemtap works. Suffice to say it allows you to execute a section of code at particular points in existing binaries, even those in kernel space.

Here’s the script we found earlier:

#! /usr/bin/env stap

global fault_entry_time, fault_address, fault_access
global time_offset

probe begin { time_offset = gettimeofday_us() }

probe vm.pagefault {
  t = gettimeofday_us()
  fault_entry_time[tid()] = t
  fault_address[tid()] = address
  fault_access[tid()] = write_access ? "w" : "r"
}

probe vm.pagefault.return {
  t=gettimeofday_us()
  if (!(tid() in fault_entry_time)) next
  e = t - fault_entry_time[tid()]
  if (vm_fault_contains(fault_type,VM_FAULT_MINOR)) {
    ftype="minor"
  } else if (vm_fault_contains(fault_type,VM_FAULT_MAJOR)) {
    ftype="major"
  } else {
    next #only want to deal with minor and major page faults
  }

  printf("%d:%d:%id:%s:%s:%d\n",
    t - time_offset, tid(), fault_address[tid()], fault_access[tid()], ftype, e)
  #free up memory
  delete fault_entry_time[tid()]
  delete fault_address[tid()]
  delete fault_access[tid()]
}

This script adds two code sections (‘probes’ in systemtap lingo) at the beginning and end of something called vm.pagefault. The interpretation of the probe code is left as an exercise to the reader.

Let’s try to run that script; there’s a small chance it might immediately tell us the answer.

# stap pfaults.stp 
WARNING: cannot find module kernel debuginfo: No DWARF information found [man warning::debuginfo]
semantic error: while resolving probe point: identifier 'kernel' at /usr/share/systemtap/tapset/linux/memory.stp:68:8
        source:                      kernel.function("__handle_mm_fault@mm/memory.c").call
                                     ^

semantic error: no match (similar functions: __handle_mm_fault, handle_mm_fault, handle_pte_fault, drm_vm_fault, shm_fault)
semantic error: while resolving probe point: identifier 'kernel' at :67:22
        source: probe vm.pagefault = kernel.function("handle_mm_fault@mm/memory.c").call !,
                                     ^

semantic error: no match (similar functions: handle_mm_fault, __handle_mm_fault, handle_pte_fault, drm_vm_fault, shm_fault)
semantic error: while resolving probe point: identifier 'vm' at pfaults.stp:8:7
        source: probe vm.pagefault {
                      ^

semantic error: no match
Pass 2: analysis failed.  [man error::pass2]
Number of similar error messages suppressed: 3.
Number of similar warning messages suppressed: 3.
Rerun with -v to see them.

If only! This is disheartening, but we’re getting quite used to things that don’t work as we expect by this stage, so let’s look at the error output properly for clues.

Systemtap is telling us two things, both of which are bad news:

WARNING: cannot find module kernel debuginfo: No DWARF information found [man warning::debuginfo]

"I can’t find debug information for the kernel you’re running :-("

semantic error: while resolving probe point: identifier 'kernel' at /usr/share/systemtap/tapset/linux/memory.stp:68:8
        source:                      kernel.function("__handle_mm_fault@mm/memory.c").call

"I can’t find the place you asked me to probe"

…and one thing which is very, very helpful indeed.

semantic error: no match (similar functions: __handle_mm_fault, handle_mm_fault, handle_pte_fault, drm_vm_fault, shm_fault)

"Did you mean one of these?"

It looks like systemtap is trying to tell us that it doesn’t know what __handle_mm_fault@mm/memory.c is, but it has heard of __handle_mm_fault. Perhaps vm.pagefault might be some sort of alias.

# man stap
STAP(1)

NAME
       stap - systemtap script translator/driver

SYNOPSIS
       stap [ OPTIONS ] FILENAME [ ARGUMENTS ]
       stap [ OPTIONS ] - [ ARGUMENTS ]
       stap [ OPTIONS ] -e SCRIPT [ ARGUMENTS ]
       stap [ OPTIONS ] -l PROBE [ ARGUMENTS ]
       stap [ OPTIONS ] -L PROBE [ ARGUMENTS ]
       stap [ OPTIONS ] --dump-probe-types
       stap [ OPTIONS ] --dump-probe-aliases
       stap [ OPTIONS ] --dump-functions

--dump-probe-aliases, you say?

# stap --dump-probe-aliases | grep pagefault
vm.pagefault = kernel.function("handle_mm_fault@mm/memory.c").call!, kernel.function("__handle_mm_fault@mm/memory.c").call
vm.pagefault.return = kernel.function("handle_mm_fault@mm/memory.c").return!, kernel.function("__handle_mm_fault@mm/memory.c").return

Perhaps we can avoid the alias altogether and just bind probes to kernel.function("handle_mm_fault") and kernel.function("__handle_mm_fault")?

We make the following change:

-probe vm.pagefault {
+probe kernel.function("handle_mm_fault").call!, kernel.function("__handle_mm_fault").call {

-probe vm.pagefault.return {
+probe kernel.function("handle_mm_fault").return!, kernel.function("__handle_mm_fault").return {

…and now we get the following output:

[root@ldperf-pf2-mmmd01 perf]# stap pfaults-dealiased.stp 
WARNING: never-assigned local variable 'address' (similar: fault_address, t, fault_access, write_access, time_offset): identifier 'address' at pfaults-dealiased.stp:11:26
 source:   fault_address[tid()] = address
                                  ^
WARNING: never-assigned local variable 'write_access' (similar: fault_access, address, t, fault_address, time_offset): identifier 'write_access' at :12:25
 source:   fault_access[tid()] = write_access ? "w" : "r"
                                 ^
WARNING: never-assigned local variable 'fault_type' (similar: ftype, e, fault_access, t, fault_address): identifier 'fault_type' at :19:25
 source:   if (vm_fault_contains(fault_type,VM_FAULT_MINOR)) {

I confess, I am still confused as to what this means. I also don’t really know what those variables are meant to address in that script; if I look at the kernel source that defines __handle_mm_fault for our kernel version, there are no parameters named write_access or fault_type, nor are any variables declared with those names in the method body. Perhaps that example script works in an older kernel.

I assumed this meant that I would probably be getting untrustworthy output. With the addition of the first error we got (WARNING: cannot find module kernel debuginfo: No DWARF information found [man warning::debuginfo]), I went back to google with some slightly different keywords, and eventually discovered DWARFless probing.

I rewrote the script using that approach, and then realised that I really didn’t care about the address or the write_access. All I needed was the type of fault, and that was just as easily accessible in the return value of __handle_mm_fault.

I ended up with a script that looks like this

# Trace page faults for a process (and any of its threads?)
#
# Usage: stap process_page_faults.stp -x <pid>;

global fault_entry_time

probe begin {
  printf("systemtap has started\n")
}

probe kernel.function("handle_mm_fault").call!, kernel.function("__handle_mm_fault").call {
  pid = pid()
  if (pid == target())
  {
    id = tid()
    t = gettimeofday_us()
    fault_entry_time[id] = t
  }
}

probe kernel.function("handle_mm_fault").return!, kernel.function("__handle_mm_fault").return {
  pid = pid()
  if (pid == target()) {
    id = tid()
    t = gettimeofday_us()
    if (!(id in fault_entry_time)) next
    start_time = fault_entry_time[id]
    e = t - start_time
    if (e > 1000)
    {
      printf("%d:%d:%d:%d:%d:%d\n",
             start_time, t, id, pid, e, returnval())
    }
    #free up memory
    delete fault_entry_time[id]
  }
}

I also wrote a few wrapper scripts to detect the presence of the market data service and appropriately start and stop the systemtap script at the right time.

We start to get the following output:

server not there; waiting a bit
2016-10-14 12:32:52.331562 : 200 OK
mds lbm thread not there yet, waiting a bit
2016-10-14 12:33:07.343710 : 200 OK
2016-10-14 12:33:07.344100 : found lbm tid: 133881
2016-10-14 12:33:07.429192 : found mds pid: 133210
2016-10-14 12:33:07.429236 : ['python', 'kill_stap_on_no_mds.py']
2016-10-14 12:33:07.452395 : ['stap', '-x', '133210', 'process_page_faults.stp']

1475682078014325:1475682078038965:133881:133210:24640:1028 
1475682078039052:1475682078079234:133881:133210:40182:1028
1475682078079293:1475682078119595:133881:133210:40302:1028
1475682078119649:1475682078135669:133881:133210:16020:1028
1475682078135731:1475682078157507:133881:133210:21776:1028
1475682078157571:1475682078163527:133881:133210:5956:1028

That’s 147 milliseconds of wall clock time spent attempting to service a major page fault on the LBM thread. (The fifth column is the time spent in microseconds).

If we take that first microsecond timestamp and poke it into epochconverter.com, we get

1475682078014325
Assuming that this timestamp is in microseconds (1/1,000,000 second):
GMT: Wed, 05 Oct 2016 15:41:18 GMT

Splitting that timestamp into seconds (1475682078) and microseconds (014325), we see that we’re only 14 milliseconds into 15:41:18.

Let’s take a look at the OrderBookEvents UDP socket depth at that time:

Buffer full correlates
This is excellent news. Now, can we work out what is triggering the fault?

Well, outside of application start/stop time, these are the only faults of significant duration the LBM thread sees. This means we can add further debugging information without worrying too much about data volume, so let’s print a userspace stack trace for each recorded fault using print_ubacktrace.

We go through several cycles fixing warnings of the form

WARNING: missing unwind data for module, rerun with 'stap -d some_module.so'

until we end up with a slightly longer command line

['stap', '-x', '133210', '-d', '/lib64/ld-2.12.so', '-d', '/opt/lbm/lib/liblbm.so.7.0.1', '-d', '/opt/lbm/lib/liblbmj.so.5.0.0', 'process_page_faults.stp']

and stack traces that look a bit like this (only the first is included for space sanity purposes):

 0x30e0e0dfe0 : _dl_fixup+0x2e/0x1c0 [/lib64/ld-2.12.so]
 0x30e0e148f5 : _dl_runtime_resolve+0x35/0x70 [/lib64/ld-2.12.so]
 0x7efeba91b7a3 : lbm_rate_ctlr_handle_timer+0x9e3/0xbe0 [/opt/lbm/lib/liblbm.so.7.0.1]
 0x7efeba879c82 : lbm_timer_expire+0x372/0x490 [/opt/lbm/lib/liblbm.so.7.0.1]
 0x7efeba895eb5 : lbm_context_process_events+0x2d5/0x7f0 [/opt/lbm/lib/liblbm.so.7.0.1]
 0x7efebae54ca1 : Java_com_latencybusters_lbm_LBMContext_lbmContextProcessEvents+0x41/0x80 [/opt/lbm/lib/liblbmj.so.5.0.0]
 0x31d292ad

At this point, I am solidly out of my depth – the symbols in ld-2.12.so are opaque to me. I don’t have the source for LBM to work out what’s going on in those frames, either. I enlist help from our systems team, and send an email to Informatica with some questions. We’ll pick up from there in the next post.


  1. We’ve gone as far as to write monitoring that detects when more than a single thread runs on an isolated core. This has caught several bugs where we’ve forgotten to correctly pin/affinitise a new thread in several applications, but it didn’t found anything wrong here.

Variance expression in functional programming

This time we’re going to explore how functional programs express variance. We’ll consider the same case as we did for TDA – getting a stored value from a Map, and storing a new value in a Map.

Retrieval

lookup :: Ord k => k -> Map k a -> Maybe a

In Haskell, the variance is expressed in the return type.

data Maybe a = Nothing
             | Just a

We might express this as follows:

variance ⊂ return type

Storage

We want to put a new value into the map.

Once again, the entropy of this function is two, see the previous variance post for further discussion.

Here’s the comprehensive version:

insertLookupWithKey :: Ord k => (k -> a -> a -> a) -> k -> a -> Map k a -> (Maybe a, Map k a)

This function actually allows the caller to do more than our original requirement, so a little explanation of that type signature is perhaps necessary.

(k -> a -> a -> a)

The caller passes in this function, which takes the key, the new value and the old value and produces the value that will be stored (most implementations will simply return the new value, rather than performing a calculation, mind).

Data.Map will only call this function if a value is already present – this information is present in the last part of the type signature:

k -> a -> Map k a -> (Maybe a, Map k a)

This is the part we’re going to focus on. Here we pass the key, new value, and the map to operate on. Note that the return type contains both the possibility of an old value (the presence of which tells us our value transforming function was called) and the new map.

Diversion one: encapsulation in FP

It’s starting to look as though functional programs eschew encapsulation in favour of type signature totality (definitely a thing). This is not true.

  • What if Maybe didn’t export its two constructors?

I fear the answer for some may be “huh”? So, a brief explanation of how this might work.

Haskell makes it relatively simple to obscure your implementation details. At the moment, we know that:

Maybe a = Just a
        | Nothing

In fact though, we only get to know this because the original author allowed those constructors out to play. It would be just as valid to export two functions that did the same but obscure the constructors, like so:

module Maybe (Maybe,just,nothing) where

data Maybe a = Just a
             | Nothing

just :: a -> Maybe a
just = Just
nothing :: Maybe a
nothing = Nothing

instance Functor Maybe where
fmap f (Just a) = Just $ f a
fmap _ Nothing = Nothing

Now, at the point of invocation, we can’t pattern match any more. We have to use the fact that Maybe is an instance of Functor in order to do anything with the value inside; there’s no way at all for us to get it out.

Example: reversing the value, if it exists.

fmap reverse $ lookup "key" map

So, it looks like the core of the FP world; Functors and Monads are very interested in not letting us execute a function on a value directly; you pass the function you would like executed to the enclosing context, and it can choose (based on the value inside) how to execute it.

In particular in Haskell, syntactic sugar (do,<-, etc) is provided to make it appear to the eye that this is not the case. That’s only for humans, though; in reality the value never escapes.

Uh, so what now?

We used the term “return type” on the assumption that it is the last thing we see in the type signature of a function. This is either almost true or almost completely false.

Tony Morris tells us that all Haskell functions take one argument, and, once you’ve read that link, I’ll hope you’ll agree he is right.

This means that for a function which appears to take n arguments there are in fact n - 1 return types. Or, perhaps, they have one return type, and it’s everything we see after the first ->.

We’ll stick with:

variance ⊂ return type

Totalitarianism, dogma and consequences

That’s enough pretentious subheadings -Ed

Just as we distilled OOP into a very dogmatic TDA, we’ve taken a very dogmatic view of what FP is. Effects are delayed until the last possible moment, and mutable state is excised.

Oddly, unlike in the TDA case, our chosen implementation language, Haskell, is very keen on enforcing this dogma. Attempting to perform calculations or effects outside of a function’s type signature is* a compile error.

From here on out, we’ll assume dogmatic FP === FP and that non dogmatic usages of FP languages are charlatanic. This isn’t necessarily true, but it will save us a few words here and there.

(*almost always is. There is always Debug.trace)

Diversion 2: A mirror image

Cast your mind back to our purified TDA map insert. What would it look like if we translate it to Haskell?

put :: (Ord k) => Map k a -> k -> a -> (a -> IO()) -> IO() -> IO()

You can think of a -> IO() as being a bit like Consumer and IO() as being like Runnable. Haskell’s lazy predisposition allows us to pass around putStrLn "nothing here" as an expression without executing it.

Let’s rename that, and fiddle with argument order:

tda_insert :: (Ord k) => k -> a -> Map k a -> (a -> IO()) -> IO() -> IO()

Let’s also consider the core of the FP signature:

fp_insert :: (Ord k) => k -> a -> Map k a -> (Maybe a, Map k a)

Now, recall our stricter, non pattern matching Maybe implementation from earlier. This told us that Maybe is actually a context for executing functions that take an a. It also provides the following:

maybe :: b -> (a -> b) -> Maybe a -> b

If we just plug b = IO() into that, we get:

maybe :: IO() -> (a -> IO()) -> Maybe a -> IO()

and with some further shuffling, it becomes this:

maybe_ish :: Maybe a -> IO() -> (a -> IO()) -> IO()

Now, if we elide Map k a from fp_insert (thus making the state change implicit)

fp_insert_ish :: (Ord k) => k -> a -> Map k a -> Maybe a

…and now, we almost have

tda_insert = maybe_ish . fp_insert_ish

This is very interesting! It feels like we’ve come across a little piece of symmetry between the two approaches.

One way to think of this, perhaps, is that our TDA insert has inlined Maybe. It acts both as an associative container and as a context for function execution. The only difference (mind out though, it’s a big one) is that fp_insert is far stricter about what those functions are allowed to do.

We perhaps shouldn’t be surprised at this; what we’ve really found is an artifact of tight encapsulation, which we introduced by definition in both cases.

Why compose isn’t quite working

fp_insert_ish, as far as . is concerned, has type

fp_insert_ish :: (Ord k) => k -> (a -> Map k a -> Maybe a)

and maybe_ish is really wanting a Maybe a to start with. One can bind enough of fp_insert_ish‘s parameters to get this to work; we’ll not spend the time doing that here.

Having found some symmetry (admittedly with a little bit of hand waving), let’s consider a less friendly asymmetry.

Interactions between FP and OOP

A lot of hot air has been generated about the exciting possibility of combining the powers of OOP and FP approaches, as if they are some sort of transformer robots that get a attack bonus, amazing music and lens flares whenever they touch.

It’s not particularly clear whether this is true. We won’t try to answer this question here at all. All we can do here is look at TDA‘s interaction with FP.

  • Can FP code call TDA code?

No

The TDA approach deliberately obscures some variance in its type signatures. You’d have to do some serious gymnastics to express the same thing in Haskell, and, once the core is infected with mutability, the IO() virus spreads inexorably outwards.

  • Can TDA code call FP code?

Just about.

One can start to preach the church of return type in the core of an application and gently push it outwards (TDA style callers can update a mutable reference to the newly returned FP core, then send out the (also returned) events).

This will cause some dissonance, but, and this is the very important point, only at changeover points. Purity, unlike mutability, isn’t upwardly mobile.

Diversion 3: Not today…

Whether this has any bearing on OOP / FP interweaving is left temporarily as an exercise to the reader.

Serendipitously, Eric Meijer just wrote something excellent on the wider question of hybrid OOP/FP approaches.

Conclusion

We found a tiny Maybe hidden in our TDA code; a symmetry due to the encapsulation shared between our two approaches.

One wonders if other exercises TDAing the living daylights out of other data structures would uncover any other fundamental types.

We found also one asymmetry between TDA and FP; implicit mutation simply isn’t permitted in FP, so we can nest the approaches in one direction only.

Extremely satisfying summary

Once we start to mutate, anyone above must mutate with us.

So: Pick the point of no return very carefully.

Next time

A selection from:

  • Do our truths about TDA tell us anything about OOP?
  • Does our TDA world bring us any new problems?
  • Does our TDA world bring us any benefits?
  • Will we ever get to talking about the Actor model?

Post script

There may be a small hiatus over the next couple of months while I develop related material into a workshop for 2014’s SPA Conference ( This session is mine ).

Variance expression in tell don’t ask programs

This time we’re going to explore how tell don’t ask (TDA) programs express variance, and hopefully come across an interesting corollary of our TDA definition.

We’ll consider two ubiquitous operations: getting and storing a value into a map. In both cases we’ll discuss the standard Java implementation before creating a speculative TDA version.

Example one: value retrieval

We want to lookup the value associated with a particular key.

This operation has entropy two. Either the map contains the value or it does not.

java.util.Map

V get(Object key);

Here the returned value is either:

  • null to signify absence
  • the present value

Amusingly nothing prevents us from storing a null value other than some very hopeful javadoc.

Clearly this interface violates our TDA definition.

  • Should we be perturbed by this?

If such a core library concept is against us, perhaps we’ve chosen a poor language as for TDA implementation; Java, after all, is frequently mocked for not being a tremendous OO language.

What about a lauded OO language, like Smalltalk? Well, Smalltalk’s dictionary class still provides value access via at:, which generates an error if a value isn’t present.

Even the more robust accessor

at: key ifAbsent: action

still returns the present value, rather than forwarding a message to it.

Let’s continue to persist with our TDA approach and see what happens.

TDA like we mean it

void forward(final K key, final Consumer<V> onPresent, final Runnable onAbsent);

Now we obey our TDA definition and end up passing in two more objects.

There’s that number two again! It sounds like we’ve expressed the variance in function parameters – we’ve given the map two functions that it could call depending on whether a value is present or not, and we trust it to call only one of them.

Perhaps we could express the TDA variance as follows:

variance ⊂ downstream call graph

Example two: Storage

Now we want to put a new value into the map.

Once again, the entropy of this function sounds like it might be two:

  • There could be a value already present for that key; we overwrite it
  • There is no existing value and we just add the new key value pair.

In both cases the state of the map changes.

Java standard

V put(K key, V value)

Once again, not quite tell don’t ask compliant.

Side note: If one looks at the javadoc on java.util.Map, one notices that with the addition of exceptions and the possibilities of null the entropy is now far higher than two.

TDA like we mean it

void put(K key, V value, Consumer<V> oldValueExisted, Runnable noExistingValue);

Here again we have two extra parameters.

  • Does this new definition express the complete variance?

N.B We could argue that oldValueExisted and thereWasNoExistingValue don’t have access to the new key and value. Nothing in the TDA definition prevents us from binding this information into the implementations at construction time, though.

What’s really missing is the variance in the map’s state. This is left implicit – the caller still has a reference to the (now mutated) map, and the old version is gone, forever!

What if we wanted to use an immutable map in this style? A naive first attempt might look something like this:

Map<K,V> put(K key, V value, Consumer<V> onOldValue, Runnable noExistingValue);

The map decides which function to call next, but must also provide a new map to pass back to the original caller. This violates our TDA definition, however, and if either of the two callbacks want to play this game, we’re suddenly in trouble (they could be longer lived objects than a simple closure – how do we update their callers with their new values?).

It’s hard to see how we can scale this approach.

Conjecture: TDA mutation expression

What a pretentious subheading.

void put(K key, V value, Consumer<V> onOldValue, 
         Runnable noExistingValue, Consumer<Map<K,V>> onNewMap);

Now we’re fully tell don’t ask compliant, but what are we going to do with that new map?

Well, temporarily ignoring TDA, a caller could pass themselves as onNewMap. It could then tell its callers that it has changed, and so on, up the stack.

  • Can we do that and keep to the TDA definition?

We need the traditional extra layer of indirection.

Messages outgoing from our object can no longer be delivered directly, but will need to be captured/dispatched by some piece of architecture that knows where messages should be routed to and delivers them appropriately.

What’s the tapping noise I hear? Ye gods, we’ve just dug up the foundations of the actor model.

So, we have two choices:

  • Keep direct dispatch, but leave state change implicit
  • Abandon direct dispatch and build towards an actor like model.

Very important scoping note

In order to achieve TDA certification with the actor model, we’ll actually need to abandon synchronous dispatch (and qualify the TDA definition slightly).

Precisely why this is necessary is left (for now) as an exercise for the reader. We’ll spend the rest of this post discussing simple, direct dispatch TDA, and leave discussion of the actor model for another day.

The Consequences Of Implicit Mutation

This really ought to be the name of the next Blotted Science album.

We have roughly agreed that in TDA world:

variance ⊂ downstream call graph + (implicitly) mutable object state

This is perhaps more elegantly expressed as:

variance ⊂ downstream call graph for current method invocation +
           downstream call graph of future method invocations

That sounds quite worrying, no? We could see the variance of a call at the point of any future invocation. That’s a helluva lot of variance!

That is true, but our insistence on TDA at least allows each object to know precisely what messages it might get passed (and thus the possible states it can get into).

This fits with my experience in this area; as soon as we disregard TDA, understanding (and changing) code becomes extremely difficult.

On the up side

We do gain some immutability elsewhere. A large chunk of our object graph often is fixed at start time (spot the final fields of a given object).

For example, anyone using our TDA map could hold on to it for the entire program runtime, even though its contents (and thus behaviour) will evolve over time. The TDA map feels more like a subscription registry.

Conclusion

Caveat implementor!

TDA as we have defined it leads inexorably to mutable objects.

The only way to be sane (citation needed) in a mutable world is to encapsulate decisions made about state to the places where the state is. In a way, TDA is both the cause and solution of all of our problems.

Coming up

We are yet to discover the limits of TDA, and we haven’t really explored the effects of TDA at anything other than the unit level.

If TDA is such a good idea:

  • Why don’t library classes (like Map) obey our definition?
  • What effect does their lawlessness have on code that wants to be good?

We’ll try to make some headway on this soon.

In addition, we also need to cover:

  1. Variance expression and encapsulation in functional programs (FP).

    This should lead to a nice lemma on nesting FP in TDA (and vice versa). We’ll probably go here next, as a brief diversion (not least because I just extracted most of it from this post).

  2. The actor model