Where we’re going, we don’t need threads: Simulating Distributed Systems

Testing distributed systems is hard.

You probably already knew that. If you’ve spent even a little time writing distributed systems, you know that the relevant algorithms and techniques are complex and nuanced. Distributed systems end up full of gotchas and corner cases. The tests for these systems are even worse! If you actually care that your application is correct, it’s exactly the hard-to-reproduce failures that are going to make you want to throw your computer into the garbage and run off to live in the woods. 

Before you start looking for habitable caves, we have good news: there’s a way out of this problem, and it’s called deterministic simulation.

It’s all about ordering

Let’s take a step back and ask ourselves why distributed systems are so hard to test. In the standard unit testing methodology, you identify blocks of behavior (functions, classes), carefully control their inputs across the range of acceptable values, and validate their outputs. This is easy when your behavior is fully synchronous, because the only inputs you need to control are the actual data. 

But in a distributed system — which really means “any system with concurrency greater than 1” — while you do control the “inputs” in terms of arguments, you typically do not control another key variable: ordering of events. “Event” is abstract here, but for the most part, we are interested in points of interaction between concurrent actors. (Think: two threads printing to standard out, or a user trying to update the same field on their profile from multiple browser tabs.) Depending on what order each concurrent actor reads or writes some piece of data, your system can give wildly different results.

When you run a test, it runs in some order. But when you run it again, it might run in a different order, or it might not, depending on a bunch of totally invisible parameters! (What’s the background cpu load on your machine? How full is the heap in the test process? Did the JIT compiler run yet?) It’s often the case that it’s easy to get a test passing consistently in your own environment, only to have it fail when it runs in a different environment like your CI system. It might not even fail every time! Now you’re stuck trying to reproduce that failure with more adverse development conditions, repeatedly running the same test to try and catch that rare failure. You can easily lose hours or days trying to reproduce and fix these kinds of problems.

The traditional approach in this situation is to put in ad-hoc measures that exclude unexpected orderings of events: you replace implementations with mocks, or inject “extra” locking, or jump through hoops to verify the system reaches a specific state before you run your test assertions. This can be brittle, or worse, end up compromising the validity of your tests, since you’re no longer testing the code that you actually run in production. But the real problem with this approach is that it only prevents the unexpected orderings you can imagine up front! Your system can still reach unanticipated states based on orderings that you didn’t even know were possible.

No matter how many tests you write of your distributed system, if you can’t figure out some way to systematically test ordering, there are going to be vast swaths of territory that are completely uncharted. The more complex the system, the more likely it is that some totally bonkers emergent bad behavior is lurking just out of sight. 

Deterministic Simulation

Deterministic simulation is all about changing ordering from an uncontrolled variable into a controlled one. With that control, you can vary execution order directly, just like any other test input. This is an incredibly powerful technique for getting to a whole new level of rigor and repeatability in distributed systems testing, making it much easier to find bugs related to ordering and to repeat those orderings once they’ve been detected. It’s a gigantic productivity lever for your team as you build your ever-more-complex application.

This probably sounds pretty pie in the sky, and it’s true that there is no “out of the box” simulation tooling that can be bolted on to any application. But it turns out that building a simulation capability that is well-suited to your specific environment and codebase is totally achievable. Folks like FoundationDB have talked publicly about their experience following this path, and their success was a major inspiration to our own efforts. 

We’ve built up a functionally complete deterministic simulation capability at Red Planet Labs over the last few months. It’s been a transformative time for us that has already dramatically reduced the cost of finding and fixing complex ordering bugs in our distributed system. Some problems that took us weeks to resolve in the past are now getting fixed in a single afternoon! We’ve also gained the ability to test conditions that would have been difficult or impossible to create in the past. We’ll be building on top of these capabilities for a long time to come.

For us, the formula ended up being no parallelism + quantized execution + deterministic behavior = deterministic simulation. We’ll go through each of those terms below. The rest of this post will be devoted to talking through some of the interesting philosophical and design pattern discoveries we’ve made along the way, which will hopefully provide somewhere for you to start should you decide to go down this road. 

No Parallelism

If you let concurrent actors in your simulated system actually execute in parallel, you really hamstring your ability to control execution order, since there are fundamentally no guarantees about the order in which their tasks execute. To give your simulation complete control, you must explicitly remove all parallelism. One way to achieve this is to run all concurrent actors on a single thread in the same process. 

Wait — you’re trying to test distributed systems. Isn’t parallelism a fundamental characteristic of distributed systems? How can you test anything interesting if you have no parallelism? While distributed systems are indeed both concurrent and parallel, the vast majority of the problems these systems experience are a result of logical concurrency mistakes rather than actual parallel resource contention.  

When you have two co-located concurrent actors running in parallel, they can both try to access a shared resource in the same exact instant. This is called contention. Problems caused by contention are often solved by using some kind of lock, which literally reduces the allowed parallelism to 1. However, even when there is no contention, your concurrent actors can still misbehave, based solely on how their accesses interleave with each other. These problems arise from flawed concurrency logic present at a higher level of the application, and are rarely caused — or fixed — by locking. 

But when your concurrent actors don’t share the same process or hardware, they can’t access shared resources in the first place — they have to use some kind of asynchronous message passing and wait for a reply. They cannot contend. That means all bugs found in the interaction between two independent concurrent actors must therefore be of the concurrency kind, which means they only depend on order. 

A simulation scheme with no parallelism can readily explore orderings in the concurrency space, but is incapable of exploring orderings in the contention space. This is a good trade off, though, because the bulk of the problems you’re going to experience in a distributed system are of the concurrent type. 

Contention problems are still problems that have to be tested and fixed. However, it’s common for distributed system implementations to minimize direct access to shared resources and emphasize use of message-passing style even when co-located, since it makes the entire system more consistent and easier to understand. With this approach, any implementation details that can cause contention get abstracted into the “framework” layer of your application, where they can be tested exhaustively to verify correctness without having to worry about the bigger picture app-level concurrency. When this pattern is used, the no-parallelism simulation approach makes no sacrifices in its ability to explore orderings.

Quantized Execution

If you want to be able to control “what happens next” in a granular fashion, you need to change the way concurrent work streams are expressed in your application. If you define your concurrent work streams naively — as threads, for instance — then in production you get all the isolation and independence attributes you want, but you can’t introspect concurrent work streams during simulation. What the simulation really needs is for concurrent work streams to be uniquely identified and to expose their individual work items.

We call the idea of chunked work in identifiable streams quantized execution. This sounds intense, but in practice, it just means that instead of using threads, you organize everything in terms of ExecutorServices and Runnables / Callables that are submitted to them. You tie it all together with a central “executor service factory” that allows you to effectively inject the execution mode into your runtime system just like any other dependency. In production, the executor service factory returns “real” independent ExecutorServices backed by their own threads, and they will run in a totally uncoordinated manner (that is, governed only by the concurrency structures you put in place). 

But when the system is under simulation, you inject a factory that returns special “facade” ExecutorServices. They capture submitted tasks into a queue, but don’t actually execute anything until they’re explicitly told to do so. The overall simulation controller uses this interface to make the decision about who gets to execute their next work item.

A meaningful cost of this approach is that your application can no longer be defined in terms of “naive” concurrency / parallelism constructs, since those constructs don’t work in simulations. For instance, if you try to do a blocking wait on a response from another actor, it will never complete, because you’re blocking the only thread in your system! Any part of your application that will be simulated must start to follow the cooperative multitasking model; chances are good that this will eventually spread to all parts of your application. 

Refactoring your system as a whole to work in these terms can be painful and expensive, especially while you’re getting used to it. But after you’ve gotten over the hump, you will have an equivalent system that’s better in a lot of ways. The biggest advantage of this approach is that you always run the same code, whether you’re in production or in simulation — there’s no need to vary the behavior of your components to specifically accommodate simulation. This minimizes the chances of your mocks diverging from their real-world counterparts, which is a major risk otherwise.

For Red Planet Labs, the shift to universal cooperative multitasking meant that our system as a whole is more reactive, and we don’t rely on blocking behavior except when interacting with 3rd-party libraries we don’t control. There are a lot of factors that go into it, but on the whole, our simulations tend to be meaningfully faster than their unsimulated counterparts, even when the unsimulated version gets to use multiple cpu cores! 

Deterministic Behavior

We’ve removed genuine parallelism’s unpredictability and made our actors’ work streams transparent with quantized execution. The final ingredient needed to make simulations into a productivity powerhouse is determinism.

Testing of distributed systems is often burdened with two divergent requirements: the ability to reproduce the same result consistently, and the ability to vary or explore possible orderings to produce novel failures. It turns out that it’s pretty easy to get both requirements out of simulations.

At every step of execution, our simulation must select an ExecutorService and let it complete one work item. There are a lot of ways the selection could be implemented, but the scheme that covers the broadest set of needs with the least complexity is using a seeded random number generator to choose an actor randomly. 

By making selection random, you avoid accidentally creating an ordering that is more orderly than what would happen naturally in the real world. If you run the same simulation many times, you can expect execution order to vary, iteratively exploring some fraction of all theoretically possible orderings. But by using a seeded random number generator as the source of that random selection, that variability instantly becomes repeatable. 

To rerun some “interesting” ordering, all you need is the random seed that generated that sequence of selections. When you specify that seed again directly, the properties of PRNGs kick in to give you the same sequence of selections. As long as all the other inputs to the simulation are also the same, then you will repeat the interesting ordering exactly.

It’s worth really hammering on that last point: simulations are only repeatable if all the behavior in them is deterministic. Any source of non-determinism could thwart your efforts, and it’s easy to be sabotaged by very subtle mistakes. If you ever generate a UUID, use random numbers, implicitly rely on hashmap ordering, or refer to system time, your simulation can produce different results even with the same random seed. 

Non-deterministic simulations are very frustrating, since they are often hard to detect in the first place and tedious to diagnose and fix once detected. RPL expects to do more work on our tooling in this area over time; we’ll be sure to share any interesting takeaways.

Beyond Simulation

Even just the basic capabilities of simulation are incredibly impactful when it comes to testing a distributed system. But it turns out that there are all sorts of things you can do on top of simulation that make it even more useful. A system that is compatible with simulation has gone to lengths to expose the seams between many sub-components; you can exploit those seams to create conditions in tests that would otherwise be very difficult to replicate. For instance:

  • Preventing a particular actor from being selected for execution simulates a garbage collection pause
  • Stopping all the executors associated with a logical “process” simulates process death
  • Adding delay to the execution of network IO tasks simulates network latency

And many more! RPL has only scratched the surface in this area, but we expect to exploit this avenue a ton as we seek to test our system under ever more adverse conditions.

Serializing and Deserializing Clojure Fns with Nippy

We use Nippy at Red Planet Labs whenever we need general purpose serialization / deserialization of Clojure objects. It’s easy to use, fast, and covers virtually everything you could ever want to serialize in a Clojure program.

One notable exception is Clojure fns. There are several places in the distributed computing application we’re developing where storing or transmitting a serialized fn is really the best option. It’s worth talking about what we mean when we say serialized fn. We aren’t looking to serialize and distribute code — we already have a good mechanism for that. (It’s called a jar.) Instead, what we want to do is transmit fn instances between processes as a way to capture state and intention. Clojure already provides the primitives needed to treat a fn declared in a lexical context as an object; we just want to extend that so it works across process boundaries.

To be clear, this isn’t exactly a deficiency in Nippy. There are a lot of good reasons why you should never do this! The semantics of serializing a fn instance at one point in time and deserializing it again later, possibly in another process or even on another machine, leaves a lot of opportunities for mistakes to be made. Suffice it to say that your use cases should make it easy to ensure that fn implementations — the actual code — are always available when and where you deserialize an instance, and that they remain consistent across time. In our case, these characteristics are relatively easy to guarantee, so we can use this strategy safely.

One of the best things about Nippy is how easy it is to extend via the extend-freeze and extend-thaw APIs. Using this extension mechanism, we added transparent support for serializing and deserializing fn instances. Once you require the com.rpl.nippy-serializable-fn namespace, nippy/freeze! and nippy/thaw! will handle fn instances, either at the top level or nested within other objects, without any additional syntax:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
(ns example
  (:require [taoensso.nippy :as nippy]
            [com.rpl.nippy-serializable-fn]))
 
(defn my-fn [a b c]
  (+ a b c))

(def thawed-myfn
  (nippy/thaw! (nippy/freeze! my-fn)))
(thawed-myfn 1 2 3) ; 6

(let [x 10
      afn (fn [a b c] (+ a b c x))      
      fn-bytes (nippy/freeze! afn)      
      thawed-fn (nippy/thaw! fn-bytes)]  
  (thawed-fn 1 2 3) ; 16  
  )

And that’s it! In our experiments, performance of freezing and thawing fns was comparable to freezing and thawing Clojure Records with similar amounts of “stuff” in them. You can find this extension on the Red Planet Labs GitHub, or add it as a dependency via Clojars.

A look under the hood

From the programmer perspective, there are (broadly) two “kinds” of fns in Clojure: declared fns (via defn) and anonymous fns (via (fn [] …)). But from an implementation perspective, what we really end up caring about is two different factors: whether the fn has anything in its closure, and whether it has metadata associated. At the highest level, to serialize a fn, we need to capture the “type” of it, anything in the closure, and any associated metadata. If we have all those things serialized in the byte stream, then we can reconstruct that fn instance at a later point in time.

Let’s take a quick side track into how Clojure actually implements fns. At compile time, Clojure generates a Java class for each fn. The most obvious part of this class is the invoke method, but there’s also a constructor, and possibly some fields (more on these later). When you interact with a fn instance in regular Clojure code, you are interacting with an instance of this generated Java class. When you invoke a Clojure fn instance, you are literally calling the invoke method with the provided arguments.

When a fn instance has nothing in it’s closure, then it’s really “static” in the Java sense of the term. Invocation relies only on arguments to the invoke method, not on any additional context. Whether you’re dealing with a declared fn or an anonymous fn, all you need to serialize in order to capture the essence of this kind of fn is the generated Java class name. It can be deserialized just by reading the name out of the byte stream and instantiating the fn by class name. We go a little farther here and make sure that when you deserialize a declared fn, you actually get the singleton instance stored in the named global var.

What about when you have a fn with a lexical closure? This is where the generated constructor and fields come into play. The Clojure compiler determines referenced local vars at compile time, and then adds a constructor argument and a corresponding protected field for each one. When the resulting fn is instantiated at runtime, the constructor is called with the values of local vars, capturing them for future invocation. Their invoke method implementation refers to both the invoking arguments and the constructing arguments, providing all the needed context to complete computation.

To serialize fns with a closure, we need to serialize the type along with all the values stored in the protected fields. Luckily, detecting fields used for storing closure values is easy and reliable, so all we need to do is get ahold of those values and recursively serialize them. The tricky part is that, naively, the only way to do this is via Java Reflection, which is a big performance no-no. The good news is that the JVM has continued to improve in this department, and now offers a much more sophisticated and high performance API for doing “reflective” operations called MethodHandles. A full discussion of MethodHandles is outside the scope of this post, but the key detail is that a MethodHandle has virtually identical performance to regular method invocation or direct field access, but can be constructed from Java Reflection primitives. We use this combination of APIs to dynamically analyze a fn instance’s closure fields and then generate an efficient serializer function, so the Java Reflection costs are only paid once per unique fn type encountered.

The other top-level case we mentioned above is fns with metadata. This is something most Clojure programmers encounter primarily through protocol fns. The implementation of these fns is different yet again — all fns with metadata are an instance of a special wrapper class “clojure.lang.AFunction$1”, which does nothing except hold a metadata map alongside the actual fn instance, and dispatches invocations down to the actual fn. When we try to freeze a fn like this, all we have to do is serialize the metadata and the unwrapped fn in sequence using Nippy and we’re done!