How Rama is tested: a primer on testing distributed systems

There are a number of properties that are non-negotiable in any software system, such as no data loss, no stalling, and timely recovery from faults. These properties are particularly difficult to achieve in a distributed system, as the vast number of permutations of timings, event orderings, and faults makes reasoning about a distributed system extremely challenging.

If we’ve learned one thing developing Rama, it’s this: a distributed system is only as robust as the quality and thoroughness of its tests. More so, there are certain testing techniques which must be used if you’re going to have any hope of building a robust system resilient to the crazy edge cases that inevitably happen in production. If a distributed system isn’t tested with these techniques, you shouldn’t trust it.

Rama being such a general purpose system, capable of handling all the computation and storage needs for entire backends at scale, has a particularly large testing surface. First, there’s the core functionality of Rama to test: distributed logs (depots), ETL topologies (streaming and microbatching), indexes (PStates), and queries. Then there are the “module transition” operations for updating or scaling running Rama applications. Finally, there’s replication. Replication needs to be tested when healthy, when all replicas are keeping up, and also when replicas fall behind and need to catch up. Compounding the testing surface is that all of these need to be tested under the conditions of any number of faults happening at any time, such as disk errors, nodes losing power, and network partitions.

Testing is largely a sampling problem. Each sample exercises the system at a particular state, with input data of some size and shape, at some amount of load, and with some set of faults at some frequency. A testing strategy needs to sample this input space in a representative way. In a highly concurrent distributed system, where there are so many ways that events can be randomly ordered across different threads, achieving a representative sample is difficult. And if something isn’t tested, it’s either broken or will be broken in the near future.

Another huge issue in testing is reproducibility. A test that’s hard to reproduce is very expensive, or possibly even impossible, to debug. Test failures in concurrent systems are frequently difficult to reproduce due to the inherent randomness of its execution. As you’ll see shortly, this is another major focus of our testing strategy for Rama.

To fully cover the testing surface of Rama, we use multiple strategies. The first line of defense is our unit tests, which we run twice every hour as well as with every commit. In addition to that, we do automated chaos testing 24/7 on multiple distributed clusters. The last major piece is load testing, where we run large clusters for long periods of time with large amounts of data. There’s lots of detail to how we approach all of these which I’ll cover in the subsequent sections.

The straightforward but wrong way to unit test

The straightforward way to unit test is to run the system like how it would be operating in production, with many threads and lots of concurrency. In this context, instead of running the system across many processes and many nodes, you instead simulate the various processes comprising the system all within the same testing process. Intuitively, running a system as close as possible to how it operates in production seems like a good way to test. However, this approach has massive flaws that make it a poor basis for which to unit test a distributed system.

In Rama we implemented this approach in a testing tool called “in-process cluster” (IPC). For basic things, like verifying how PStates are updated in a module, this approach works fine. Failures in tests like these are fairly easy to track down. However, when testing more complex scenarios like leadership switches during processing, random worker deaths, or far-horizon catchup, debugging can get extremely difficult.

When developing tests for scenarios like those using IPC, we frequently ran into situations where we would get a test failure that would only reproduce once in a hundred test runs. Sometimes it would be even less, like one in a thousand test runs. To debug the test, we would add logging and then run the test over and over until it reproduced. Unfortunately, in IPC the very act of adding logging changes the timings of execution and can make reproduction less likely. It may make reproduction so unlikely that it’s basically impossible to reproduce. So then you have to get very creative to find ways to get more information without changing the timing so much that it no longer reproduces. This cycle of adding logging and reproducing is very time-consuming. Sometimes a single bug would take us weeks to figure out.

The expense of debugging isn’t the worst issue of IPC though. The worst issue is how difficult it is to thoroughly explore the testing space. The vast majority of issues that we’ve debugged in Rama have had to do with ordering of events. Many bugs can be triggered by one particular thread getting randomly stalled for an unusual amount of time (e.g. from GC). Other bugs can come from rare orderings of events on a single thread. Exploring these scenarios in IPC requires using semaphores to try to coordinate the threads in question into these unusual permutations, or just hoping that random timings during execution sometimes explore these orderings. Exploring internal timeouts and the consequences of those requires even more creativity.

Fortunately, we found a better way.

Deterministic simulation

Inspired by what the FoundationDB team wrote about simulation testing, we invested heavily into implementing “deterministic simulation” for Rama. Deterministic simulation is counterintuitive, but it is by far the best way to unit test a distributed system.

The idea of deterministic simulation is to run the whole system on a single thread. What would normally be its own thread in production is instead an executor explicitly managed by a central controller. The controller runs in a loop choosing which executor gets to run an event each iteration. A random number generator is used to choose executors, and a test is fully reproducible by just using the same seed for the RNG. Tests are easily debugged since adding logging does not change the reproducibility of the test like it frequently does for IPC.

Deterministic simulation removes all concurrency from execution of Rama during tests. This seems like it would be a bad thing by making the unit test environment fundamentally different from production. However, our experience that the vast majority of issues have to do with event ordering and timing means the exact opposite. Deterministic simulation is incredible – almost magical – for diagnosing and debugging these issues. Deterministic simulation isn’t sufficient as a complete testing strategy, as you still need tests that exercise potential concurrency issues, but it is overwhelmingly better for most tests.

Implementing deterministic simulation required us to refactor the Rama implementation to be able to run in either “production mode” using many threads, or “simulation mode” using a single thread. We also needed to abstract the notion of time. In simulation, all entities interact with a time source that’s managed by the central controller of simulation. Usually, we advance time by a small amount per iteration, but sometimes we use other strategies. For examples, tests specifically of timeouts usually disable automatic time advancement and advance time explicitly. They may advance time to one millisecond before the timeout, verify nothing happens, and then verify the timeout behavior is triggered when advancing by one additional millisecond.

Here’s pseudocode on what a simulation test looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
(with-simulated-cluster [cluster]
  (let [module (get-module-definition-for-test)]
    (cluster-launch-module!
      cluster
      module
      {:tasks   4
       :threads 2
       :workers 2})

    (execute-until! cluster
      (= [RUNNING] (module-state cluster)))

    (execute-until! cluster
      (with-context (task-context cluster module 0)
        (some-predicate-using-task-local-data?)))

    (with-simulation-options {:time-strategy nil}
      (execute-until! cluster
        (another-predicate?)))

    (is (yet-another-predicate?))

    (advance-time! cluster 10000 :millis)

    (with-simulation-options {:time-strategy nil}
      (execute-until! cluster
        (different-predicate?)))
    ))

In a simulation test, nothing happens unless you explicitly tell it to execute. So in between any of those execute-until! calls, the simulation is frozen. While frozen, we can peer into the state of any of the executors in the simulation and assert on it. Those execute-until! calls run the simulation until the precise event that causes a condition to become true. We can then assert on the entire state of the system at that point, across what in production would be many threads across many processes. From the :time-strategy options and usage of advance-time! , you can see how we’re able to explicitly control time in the simulation.

Besides reproducibility, deterministic simulation hugely improves our ability to explore the testing space of Rama. We are no longer beholden to random timings or require burdensome coordination with semaphores to explore unusual event orderings. We can suspend a particular executor to simulate a GC pause. We can use a non-uniform distribution to prefer certain executors over others. We can insert faults or other disturbances at extremely precise moments.

Each run of a simulated test is a sample of the testing space, so it’s critical to continually run all tests all the time. Each seed explores a different permutation of events. We run our full build twice an hour as well as with each commit, so we get at least 48 samples of each test every day.

Uncoordinated simulation tests

Many of our simulation tests check specific behaviors, like “when a microbatch fails, the work is retried from the previous commit”. A different category of simulation tests are what we call “uncoordinated simulation tests”.

In an uncoordinated simulation test, we create many entities that interact with a Rama cluster with one or more modules deployed on it. Some of those entities may be appending new data. Others may be querying PStates. Others may be performing module operations like module update or scaling. Then there may be some antagonistic entities disturbing the cluster, like creating random disk faults, killing workers at random, or suspending executors at random. Each entity has a rate at which they perform these actions, such as “one depot append every 100 milliseconds of simulated time”. Most importantly, the entities are not coordinated with one another and take actions independently. They’re registered as executors in the simulation and are given opportunity to run events when chosen by the central controller. In tests like these, we verify high-level properties like no data loss, no stalling, data being processed in the correct order, and all replicas eventually catching up to become part of the ISR.

Uncoordinated simulation tests are particularly good at finding race conditions. The randomness and lack of coordination causes runs of the test to eventually explore all possible race conditions. And since the test is fully reproducible, we can easily track down the cause of any failures no matter how obscure the event ordering.

Chaos testing on real clusters

As mentioned, deterministic simulation is only part of our testing strategy. As powerful as it is, there are a few critical things it doesn’t cover:

  • True concurrency issues, like improper concurrent access to shared memory
  • Memory or other resource leaks
  • Running cluster as separate processes on separate nodes

To cover these gaps, another critical piece is automated testing on a few distributed clusters that we run 24/7. We call these our “quality clusters”.

Each cluster has a dedicated process controlling it. This central process interacts with the cluster either by: appending new data, doing queries, or performing a variety of disturbances. The disturbances include random worker kills, full cluster restarts, network partitions, and suspending nodes to force them into far-horizon catchup. The central process performs high-level assertions about no data loss, cluster integrity, nothing being stalled, and nothing unexpected in any log file.

There’s tension between disturbing the cluster, which often involves killing/restarting processes, and also wanting to test long-lived processes to verify there’s no memory leaks. So the central process chooses one task group and any workers containing replicas of that task group to be protected for the lifetime of the cluster. Any routines that cause process death never pick those workers. This enables the cluster to test long-lived processes as well as the full range of disturbances.

We have three quality clusters, called “nightly”, “disturbances-monthly”, and “module-operations”. The “nightly” cluster is replaced every day with the latest Rama commit and is meant to catch recent regressions quickly.

The “disturbances-monthly” cluster is replaced every month with the latest commit. By the end of the run, if all went well, the longest-lived process will have been running for a month. This cluster deploys many modules exercising the full feature-set of Rama, and it also runs a special module called “LoadModule” which creates heavy load on all the other modules. “disturbances-monthly” performs the full range of disturbances. It does not do any module operations like module update or scaling, however, since this cluster tests long-lived processes and those routines replace every process for a module.

The “module-operations” cluster runs the same set of modules as “disturbances-monthly” and is dedicated to exercising module update and scaling. It also performs disturbances during these module operations to verify their fault-tolerance. A module operation should always either succeed completely or abort. An abort can be due to there being too much chaos on the cluster, like such frequent worker kills that the operation can’t go through in a reasonable amount of time. “module-operations” verifies these operations never stall under any conditions and that there’s never any data loss.

Issues surfaced by these clusters are much harder to debug as they are frequently difficult to reproduce. If the cause of an issue isn’t obvious, our first line of attack is to try to reproduce the issue in a simulation test. If it reproduces in simulation, then it’s easy to debug from there. This has proven to be effective for most issues surfaced, and then that simulation test serves as a regression test moving forward.

Load testing

Another type of testing we do is load testing. The goal of load testing is to test bigger clusters with bigger individual partitions than we achieve in our month-long quality clusters. When everything is bigger, things like timeouts and stall detection are better exercised.

Load testing is expensive, requiring us to run hundreds of nodes for long periods of time. So we currently do load tests as one-offs rather than continuously (e.g. ahead of a release). For example, leading up to the launch of our Twitter-scale Mastodon instance in August, we did thorough load testing for long periods of time.

We’re currently working on implementing incremental backups for Rama. Once that’s done, we’re planning to use that in our quality clusters to start off each new cluster with the data that accumulated on the last cluster. This will enable our quality testing to operate with large individual partitions, which will enable continuous testing of that one aspect of load testing.

Conclusion

We take testing very seriously at Red Planet Labs. We spend more than 50% of our time on testing, and we believe any software team building infrastructure must do the same to be able to deliver a robust tool.

We’re constantly working on improving our test coverage, like by incorporating new kinds of faults in our simulations and quality clusters. An idea we had recently is “long-running simulation tests” where we run simulations for days or weeks at a time. This testing strategy is interesting, since though any failures will be reproducible, a failure that happens after two weeks of running will still take two weeks to reproduce. But possibly these simulations will be able uncover issues that only occur after extended runtimes, and being able to reproduce a potentially obscure issue is still very valuable.

Software cannot be understood purely in the abstract. It requires empirical evidence to know how it behaves in the strenuous conditions it will face in real-world deployments. A major reason it took us 10 years to build Rama was going through that process of testing, iterating, and testing some more until we were confident Rama was ready for production use.

Introducing Rama’s Clojure API: build end-to-end scalable backends in 100x less code

Today we’ve released Rama’s Clojure API, including detailed reference documentation and API docs. Information about how to add it as a dependency in your projects is on this page.

Rama is a new programming platform for building high-performance scalable backends, integrating and generalizing data ingestion, processing, indexing, and querying. It’s so general-purpose that it can build entire backends with extremely diverse computation and storage needs on its own, without any of the impedance mismatches that have plagued backend development for decades. One way to think of Rama is as a “programmable datastore on steroids”, where you mold your datastore to fit your application rather than the other way around. It can build interactive, consumer-facing applications just as easily as it can build complex analytics applications.

In August we revealed Rama for the first time by building and operating a Twitter-scale Mastodon instance that’s 100x less code than Twitter wrote to build the equivalent. We ran the instance with 100M bots posting 3,500 times per second at 403 average fanout. In the blog post, we explored the implementation of this instance in depth and how it makes use of Rama’s unique capabilities. The code for that instance is open source.

This post is a self-contained introduction to Rama and its Clojure API. The Clojure API is actually the native API to Rama, and the Java API we released in August is a thin wrapper around it. I’ll start the exploration with a brief overview of the concepts of Rama, and then I’ll dive into the ins and outs of using the Clojure API.

Overview of Rama

Every backend must balance what information gets precomputed versus what gets computed on the fly at query time. As a developer, you must tackle four things when building a backend: how to receive new data, what indexes to create with what structures, how to process new data, and how to query your indexes to serve application requests.

A typical architecture might look like this:

  • An API server receives new data
  • The API server writes that data, potentially in aggregate form, into one or more databases (e.g. Postgres, ElasticSearch, Cassandra, etc.)
  • The API server handles application requests by querying one or more databases

Sometimes a queue system is inserted between the API server and the backend, with a separate system deployed (e.g. custom workers, Kafka Streams, Storm) to process that queue and update the databases. There are tons of variations of what combinations of tools are used to construct a backend, and there’s further variation in what tools are used to deploy and monitor these systems.

There’s many performance, scalability, and fault-tolerance issues to tackle when building systems this way. But the most insidious problem is complexity. Just the sheer number of tools you have to operate creates huge integration and deployment complexity. Plus, each tool you use is narrow and only able to handle certain use cases. With databases, you frequently have to twist your domain model to fit the database’s data model, creating major impedance mismatches at the core of your backend.

Rama can build entire backends end-to-end, integrating and generalizing every aspect of what it takes to do so. Applications built with Rama are scalable, high-performance, and fault-tolerant. Rama has four concepts, corresponding to “data ingestion”, “data processing”, “indexing”, and “querying”:

Rama gives you total flexibility in designing what gets precomputed in your backend versus what gets computed at query time. At a high-level, this programming model is event sourcing with materialized views. When programming Rama, you materialize as many views as needed in whatever shapes are most optimal to serve your application’s use cases.

Let’s start with how indexes work in Rama, as this is a key part of how Rama is so flexible and reduces complexity so much. Indexes are called “partitioned states”, which we usually refer to as “PStates”. Unlike a database, which has a strict “data model” (e.g. relational, key/value, document, column-oriented, graph, etc.), PStates are specified in terms of data structures. Each “data model” is really just a specific combination of data structures – key/value is a map, document is a map of maps, column-oriented is a map of sorted maps, and so on. PStates allow you to express all those data models using the simpler primitive of data structures, and it can express infinite more “data models” by combining data structures in other ways.

PStates are durable, distributed, and replicated. That is, they’re not in-memory structures, and each partition can be much larger than the memory on a machine. Even nested data structures can be larger than memory, and you can still read and write to them extremely efficiently.

Data comes in to Rama through “depots”. A “depot” is a durable, distributed, and replicated log of data. They’re similar to Apache Kafka except integrated with the rest of Rama.

"ETL"s, extract-transform-load topologies, process incoming data from depots as it arrives to materialize PStates. Most of the time spent programming Rama is spent making ETLs. As you’ll see shortly, the ETL API is extremely expressive. It’s a Turing-complete dataflow-based API that seamlessly distributes computation.

The last concept in Rama is “query”. As will be no surprise to Clojure programmers hearing the description of PStates as arbitrary combinations of data structures, Specter’s paths are the core API for querying them. Rama has an internal fork of Specter that adds powerful reactive capabilities to them. In addition to this, you can also make “query topologies”. These are predefined, on-demand, realtime, distributed queries that can query and aggregate data across any number of PStates and any number of the partitions of those PStates. These are the analogue of “predefined queries” in traditional databases, except programmed with the same dataflow API as used to program ETLs and far more capable.

All these concepts are packaged together into a Rama application as a “module”. A module is an arbitrary collection of depots, ETLs, PStates, and query topologies. Modules are launched onto a Rama cluster, and they can later be updated with new code or scaled up/down.

For examples of different ways in which these concepts are combined towards extremely different use cases, you can read about our Mastodon implementation or check out the self-contained, thoroughly commented examples in the rama-demo-gallery project.

Basic example

Let’s now dive into the Clojure API. You can follow along at the REPL by cloning the rama-clojure-starter project and running lein repl .

We’ll start with a basic word count application, and then we’ll build an auction application with timed listings, bids, and notifications of winners and losers. As we go, I’ll explain the various pieces of the Clojure API.

First, let’s require the namespaces needed:

1
2
3
4
5
6
(use 'com.rpl.rama)
(use 'com.rpl.rama.path)
(require '[com.rpl.rama.ops :as ops])
(require '[com.rpl.rama.aggs :as aggs])
(require '[com.rpl.rama.test :as rtest])
(require '[clojure.string :as str])

com.rpl.rama.path is Rama’s internal fork of Specter, and for the most part it’s API-equivalent to the open-source version.

Next, let’s define the word count module:

1
2
3
4
5
6
7
8
9
10
11
(defmodule WordCountModule [setup topologies]
  (declare-depot setup *sentences-depot :random)
  (let [s (stream-topology topologies "word-count")]
    (declare-pstate s $$word-counts {String Long})
    (<<sources s
      (source> *sentences-depot :> *sentence)
      (str/split (str/lower-case *sentence) #" " :> *words)
      (ops/explode *words :> *word)
      (|hash *word)
      (+compound $$word-counts {*word (aggs/+count)})
      )))

This is concise, but there’s a lot to unpack here. defmodule defines a module as a regular Clojure function that takes in parameters setup and topologies . setup is used to declare depots and dependencies to depots, PStates, and query topologies in other modules, while topologies is used to declare ETL and query topologies.

This module defines one depot called *sentences-depot . Symbols beginning with * are variables in Rama dataflow code, and the declare-depot macro lets you declare a depot’s name as a symbol – just as it will be referred later in dataflow code.

The last argument to declare-depot specifies the depot’s partitioning scheme. In this case :random is used, which causes an appended sentence to go to a random depot partition. In cases where you want events for the same entity to go to the same depot partition, so that they’re processed in the order in which they were created, you would use the hash-by partitioner. The declare-depot documentation goes over all the ways you can define depot partitioners.

Next, the module declares a stream topology with the name "word-count" . There are two types of ETL topologies in Rama, streaming and microbatching, with different performance characteristics between them. In this case, using a stream topology means the PState will be fully updated within 1-2 milliseconds from appending a sentence. See the documentation on streaming and microbatching for more details.

Next, the PState $$word-counts is declared with the schema {String Long} . This means each partition of the PState stores a map with String keys and Long values. Each PState can have a completely different schema, and if you declare a subschema with “subindexing”, that nested data structure can efficiently contain more elements than fit into memory. Here are some more examples of schemas:

  • {String {clojure.lang.Keyword Long}}
  • {String {String #{Integer}}}
  • {Long (set-schema String {:subindex? true})}
  • Long
  • {Long (map-schema Long (set-schema Long {:subindex? true}) {:subindex? true})}

As you can see, you can have subindexed structures within subindexed structures, and the top-level schema doesn’t have to be a map. The Long schema specifies that each partition of the PState is a simple Long value. A PState like that is useful for ID generation, for example.

Critically, all PStates are durable on disk and replicated incrementally. This makes them suitable for any use case for which databases are currently used.

The last part of the module defines the ETL logic to process depot records and perform PState updates. The <<sources macro defines the ETL logic using Rama’s dataflow API. The dataflow API is different than regular Clojure programming. Whereas a Clojure function is based on “call and response” – you call a function and get a single result back – dataflow is “call and emit”. That is, you call an operation and it emits values to downstream code. Operations can emit one time, many times, or even zero times. They can also emit multiple fields per emit or emit to independent output streams. Dataflow operations also don’t have to emit synchronously – they can emit asynchronously on a completely different partition on a different machine.

For now, let’s focus on this particular example. We’ll look more at the dataflow API and the new programming paradigm it expresses in the next section.

Within a <<sources block, each call to source> subscribes the topology to a depot. Here, the topology subscribes to *sentence-depot and binds new sentences to the variable *sentence . Within dataflow code, symbols beginning with * , % , and, $$ are interpreted as variables, while other symbols resolve as normal Clojure values. * variables are values, % variables are anonymous operations, and $$ variables are PStates.

Processing of sentences happens in parallel across all partitions of subscribed depots. The next line, (str/split (str/lower-case *sentence) #" " :> *words) , executes regular Clojure functions to compute the list of words in each sentence by using a regex to split on whitespace. The :> keyword distinguishes the input from the output, and in this case the output is bound to the variable *words . As you can see here, you can nest expressions just like you can with regular Clojure. This code is equivalent to:

1
2
(str/lower-case *sentence :> *lowercase)
(str/split *lowercase #" " :> *words)

The next line, (ops/explode *words :> *word) , calls the built-in explode operation. explode emits each element of the provided list individually. So if *words contained ["hello" "world"] , the explode call would emit two times. The subsequent code runs for each emit.

The (|hash *word) call is a partitioner. Partitioners work just like any other operation by receiving input and emitting. The difference is they may emit on a completely different thread on a completely different machine. This call says to move the computation according to the hash of the value in *word . This causes the same word to always get processed by the same partition of the module while evenly distributing different words across all partitions.

Partitioners are a great example of how seamless it is to write distributed code with Rama. Because they’re based on the same “call and emit” paradigm as all other operations, code that’s moving around the cluster like this can be read linearly. And because they’re no different from other operations, they compose with other dataflow code just like any operation. As you’ll see in the next section, you can also express conditionals and loops with the dataflow API. So you can trivially do things like a looping computation with partitioners in the body that hops around the cluster with each iteration of the loop.

The last line of the ETL updates each word’s count in the $$word-counts PState. This write is expressed with “compound aggregation”, which specifies the write in the shape of the data structure being written to. In this case it aggregates a map with the key *word and the value updated with the aggs/+count aggregator. Aggregators automatically take care of initializing non-existent values. So the first time the word “hello” is written to the PState, it knows to start the aggregation at 0 instead of nil . The naming convention for aggregators is to prefix them with + . It’s not necessary, but we find this helps with readability.

Reads and writes to PStates in an ETL operate on the partition of the PState that is colocated with the ETL event. That is, the PStates don’t exist on separate processes or nodes. This is what we mean when we say Rama colocates computation and storage.

Running the word count module

Rama has a facility called InProcessCluster (“IPC”) that simulates a Rama cluster in process. It works just like a real cluster and is an ideal environment for experimentation and unit testing. Let’s run WordCountModule in this environment.

First, let’s create the cluster:

1
(def ipc (rtest/create-ipc))

Next, let’s launch the module on it. “Tasks” are Rama’s name for a module’s partitions and refer to the fact that partitions of a module perform both computation and storage. Here we run four tasks across two threads:

1
(rtest/launch-module! ipc WordCountModule {:tasks 4 :threads 2})

Next, let’s fetch clients to the depot and PState of the module:

1
2
(def sentences-depot (foreign-depot ipc (get-module-name WordCountModule) "*sentences-depot"))
(def word-counts (foreign-pstate ipc (get-module-name WordCountModule) "$$word-counts"))

The term “foreign” refers to Rama objects that live outside of modules. Now, let’s append some data to the depot:

1
2
3
(foreign-append! sentences-depot "Hello world")
(foreign-append! sentences-depot "hello hello goodbye")
(foreign-append! sentences-depot "Alice says hello")

By default, depot appends block until all colocated stream topologies have finished processing the record. So at this point, we know the $$word-counts PState has been updated. Let’s check the word counts for “hello” and “goodbye”:

1
2
(foreign-select-one (keypath "hello") word-counts) ; => 4
(foreign-select-one (keypath "goodbye") word-counts) ; => 1

Queries on PState clients use paths to express the query. These examples are extremely simple since we’re just fetching the values in a map, but you’ll see more complicated queries in the auctions example later in this post.

Lastly, we can shut down the InProcessCluster like this:

1
(close! ipc)

That’s all there is to it. The way in which depot and PState clients are fetched and used with IPC is exactly the same as you would interact with a real cluster.

While these examples used Rama’s blocking API, it’s also important to note there are non-blocking variants of all foreign methods that return CompletableFuture objects. These include foreign-append-async!, foreign-select-one-async, and others.

Exploring the dataflow API

Before building the auction application, let’s briefly explore Rama’s dataflow API. As described, operations in this API are based on “call and emit” rather than the “call and response” you’re used to from Clojure and most other languages.

You can explore the dataflow API from the REPL outside the context of modules. The only parts of Rama not available in this context are partitioners since they don’t make sense in this single-threaded context. Let’s start by printing “Hello, world!”:

1
2
3
4
(use 'com.rpl.rama)

(?<-
  (println "Hello, world!"))

This prints:

1
Hello, world!

The ?<- compiles and executes a block of dataflow code. So far, this is identical to how you would write it in regular Clojure.

Let’s define a custom operation that emits multiple times:

1
2
3
4
5
6
7
(deframaop foo [*a]
  (:> (inc *a))
  (:> (dec *a)))

(?<-
  (foo 5 :> *v)
  (println *v))

This prints:

1
2
6
4

deframaop defines a Rama operation, and when :> is used as an operation it emits to the :> output of the caller. This is also referred to as “invoking the continuation”. So when foo is called, the subsequent code is run for each emit.

Here’s an example of using an operation to filter data:

1
2
3
4
5
6
7
8
(deframaop my-filter [*v]
  (<<if *v
    (:>)))

(?<-
  (ops/range> 0 5 :> *v)
  (my-filter (even? *v))
  (println *v))

range> is like Clojure’s range except emits per value rather than returning a sequence. This prints:

1
2
3
0
2
4

my-filter uses a conditional to only emit when the value is true and is equivalent to the built-in operation filter>. <<if is the most common way to write conditional dataflow logic. Here’s another example of using <<if :

1
2
3
4
5
6
7
(?<-
  (<<if (= 1 2)
    (println "true branch 1")
    (println "true branch 2")
   (else>)
    (println "else branch 1")
    (println "else branch 2")))

This prints:

1
2
else branch 1
else branch 2

<<if is built upon the more primitive if> , which is a Rama operation that emits to the :then> and :else> output streams. Using that primitive, the previous code can be expressed like this:

1
2
3
4
5
6
7
(?<-
  (if> (= 1 2) :then> <then> :else>)
  (println "else branch 1")
  (println "else branch 2")
  (hook> <then>)
  (println "true branch 1")
  (println "true branch 2"))

This example is demonstrating a few new concepts. First, operations can emit to other output streams besides :> , in this case emitting to :then> and :else> . Second, dataflow code can branch, and you can explicitly manipulate the graph of computation. Symbols surrounded with < and > are called “anchors”, and they label a point in dataflow code. By default, dataflow code attaches to the previous code, but if you use hook> then you can change where the subsequent code attaches.

An interesting thing about if> is it’s not a special form, unlike Clojure and other programming languages. You can actually pass it around like so:

1
2
3
4
5
6
7
8
9
(deframaop bar [%f *v]
  (%f *v :then>)
  (:>))

(?<-
  (bar if> true)
  (println "A")
  (bar if> false)
  (println "B"))

This prints:

1
A

“B” does not print since %f emits to the :else> branch in that case, which has no code attached to it. Variables beginning with % are anonymous operations that can be invoked.

The general term for an operation in Rama is “fragment”. A fragment can be either a ramaop or ramafn . A ramafn is an operation that emits exactly one time to :> , and that’s the last thing they do (like Clojure functions). You can define a ramafn with Rama dataflow like so:

1
2
3
4
(deframafn myfn [*a *b]
  (:> (+ *a *b 10)))

(myfn 1 2)

This prints:

1
13

As you can see, a ramafn can be invoked from regular Clojure code as well as from Rama code. If a ramafn definition doesn’t emit or emits multiple times, you’ll get a runtime error. A ramafn executes more efficiently than a ramaop when the Rama compiler knows a callsite is invoking a ramafn rather than a ramaop .

Lastly, let’s take a look at a dataflow loop:

1
2
3
4
5
6
7
(?<-
  (loop<- [*v 0 :> *i]
    (println "Loop iter")
    (<<if (< *v 5)
      (:> *v)
      (continue> (inc *v))))
  (println "Emitted:" *i))

This prints:

1
2
3
4
5
6
7
8
9
10
11
Loop iter
Emitted: 0
Loop iter
Emitted: 1
Loop iter
Emitted: 2
Loop iter
Emitted: 3
Loop iter
Emitted: 4
Loop iter

Dataflow loops are similar to Clojure loops, but they can emit multiple times. The order of the prints also indicates how execution works: when emitting from the loop, the continuation of the loop is invoked immediately. The continue> call doesn’t happen until the continuation finishes executing.

This is only a taste of Rama dataflow, and there’s a lot more to explore. Fragments being a generalization of functions are a very potent concept, and much of Rama’s implementation is written in this language. Fragments and dataflow are excellent abstractions for writing parallel, asynchronous, and reactive code, which is why they’re the basis of ETLs and query topologies. It’s also worth noting that Rama dataflow compiles to very efficient bytecode. For more information about the dataflow API, check out this page from the documentation.

Building an auction application

Let’s take a look at a slightly larger example that showcases more of what Rama can do. We’ll build an auction application with timed listings, bids, and notifications of winners and losers. This application will utilize multiple PStates and demonstrate the advantages of colocating computation and storage.

First, let’s do the necessary requires:

1
2
3
4
5
6
7
(use 'com.rpl.rama)
(use 'com.rpl.rama.path)
(require '[com.rpl.rama.ops :as ops])
(require '[com.rpl.rama.aggs :as aggs])
(require '[com.rpl.rama.test :as rtest])
(import 'com.rpl.rama.helpers.ModuleUniqueIdPState)
(import 'com.rpl.rama.helpers.TopologyScheduler)

This example will utilize two small utilities from the open-source rama-helpers project, ModuleUniqueIDPState and TopologyScheduler . Even though those are written with the Java API, they can be used seamlessly from the Clojure API.

Let’s build up the module step by step, starting with making and viewing listings. Then we’ll add bids and notifications afterward.

Here’s the data type we’ll use to represent a listing:

1
(defrecord Listing [user-id post expiration-time-millis])

Next, let’s define the depot to receive new listings:

1
2
(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))

The depot partitioner (hash-by :user-id) controls on which partition processing begins in the ETL. Since the PState for listings will be partitioned by user IDs, setting the depot partitioner this way means no additional partitioning is needed in the ETL logic. This is simpler and more efficient than using a :random depot partitioner like was done in the word count example.

Next, let’s define the topology and needed PStates:

1
2
3
4
5
6
7
(let [s (stream-topology topologies "auction")
      idgen (ModuleUniqueIdPState. "$$id")]
  (declare-pstate s $$user-listings {Long ; user-id
                                      (map-schema Long ; listing-id
                                                  String ; post
                                                  {:subindex? true})})
  (.declarePState idgen s)

The $$user-listings PState stores every listing made by a user in a submap. The submap is marked as subindexed, which tells Rama to index its elements individually. This allows the submap to be read and written to efficiently even if it grows to huge size (like larger than memory). Since a user can have an arbitrary number of listings, subindexing that map is appropriate.

ModuleUniqueIDPState is a small utility from rama-helpers for generating unique 64-bit IDs. It works by declaring a PState tracking a counter on each task and combining that counter with the task ID when generating an ID. The .declarePState call declares the PState it uses.

Lastly, let’s define the ETL that maintains the $$user-listings PState:

1
2
3
4
5
6
(<<sources s
  (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
  (java-macro! (.genId idgen "*listing-id"))
  (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                    $$user-listings)
  )))

java-macro! allows dataflow code generated by the Java API to be used directly in the Clojure API. In this case .genId binds a new variable *listing-id with a newly generated ID. Then, the topology simply writes the listing into the PState under the correct keys.

The complete module definition look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(defrecord Listing [user-id post expiration-time-millis])

(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))

  (let [s (stream-topology topologies "auction")
        idgen (ModuleUniqueIdPState. "$$id")]
    (declare-pstate s $$user-listings {Long ; user ID
                                        (map-schema Long ; listing ID
                                                    String ; post
                                                    {:subindex? true})})
    (.declarePState idgen s)

    (<<sources s
      (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
      (java-macro! (.genId idgen "*listing-id"))
      (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                        $$user-listings)
      )))

Let’s run a quick test to verify it works:

1
2
3
4
5
6
7
8
(with-open [ipc (rtest/create-ipc)]
  (rtest/launch-module! ipc AuctionModule {:tasks 4 :threads 2})
  (let [module-name (get-module-name AuctionModule)
        listing-depot (foreign-depot ipc module-name "*listing-depot")
        user-listings (foreign-pstate ipc module-name "$$user-listings")]
    (foreign-append! listing-depot (->Listing 1 "Listing 1" 0))
    (println "Listings:" (foreign-select [(keypath 1) ALL] user-listings))
    ))

All this test code does is add one listing and then print what was added to the $$user-listings PState. This prints:

1
Listings: [[0 Listing 1]]

Adding bids

Now, let’s add bids to the module. In the next section we’ll finish the module by adding expirations and notifications. Adding bids will require two new records:

1
2
(defrecord Bid [bidder-id user-id listing-id amount])
(defrecord ListingPointer [user-id listing-id])

The first record represents a new bid on a listing, and the second record will be used in one of the new PStates.

Next, let’s define the depots for the module:

1
2
3
(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))
  (declare-depot setup *bid-depot (hash-by :user-id))

*listing-depot is the same as before, and *bid-depot will receive Bid objects.

Next, let’s define the topology and its PStates:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(let [s (stream-topology topologies "auction")
      idgen (ModuleUniqueIdPState. "$$id")]
  (declare-pstate s $$user-listings {Long ; user ID
                                      (map-schema Long ; listing ID
                                                  String ; post
                                                  {:subindex? true})})
  (declare-pstate s $$listing-bidders {Long ; listing ID
                                        (set-schema Long ; user ID
                                                    {:subindex? true})})
  (declare-pstate s $$listing-top-bid {Long ; listing ID
                                        (fixed-keys-schema {:user-id Long
                                                            :amount Long})})
  (declare-pstate s $$user-bids {Long (map-schema ListingPointer
                                                  Long ; amount
                                                  {:subindex? true})})
  (.declarePState idgen s)

There are three new PStates here. $$listing-bidders tracks everyone who has bid on a listing. Besides being a useful view on its own, it will also be used later for delivering notifications. It’s a map from listing ID to a set of user IDs. $$listing-top-bid tracks who is currently the top bidder for each listing. It’s a map from listing ID to information about the top bidder.

The final PState $$user-bids tracks each bid made by a user. To understand the need for the ListingPointer type, let’s explore how these PStates are partitioned in this implementation.

Listings have their own ID, but their PStates will be partitioned by the user ID that made the listing. This keeps the bidder / “top bid” information for a listing colocated with its information in the $$user-listings PState. You don’t have to design the PStates this way, but keeping all information for the same entity on the same partition is generally a good idea since it speeds up queries that want to look at multiple PStates at the same time. So in this design, to look up information about a listing you need both the listing ID and the owning user ID. This is why the $$user-bids PState tracks a ListingPointer rather than just a listing ID.

Next, let’s begin defining the ETL logic:

1
2
3
4
5
(<<sources s
  (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
  (java-macro! (.genId idgen "*listing-id"))
  (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                    $$user-listings)

Adding bids doesn’t change anything about processing listings, so this part is exactly the same as the previous section. Next, let’s add the logic to process bids:

1
2
3
4
5
6
7
8
9
10
11
(source> *bid-depot :> {:keys [*bidder-id *user-id *listing-id *amount]})
(local-transform> [(keypath *listing-id) NONE-ELEM (termval *bidder-id)]
                  $$listing-bidders)
(local-transform> [(keypath *listing-id)
                   (selected? :amount (nil->val 0) (pred< *amount))
                   (termval {:user-id *bidder-id :amount *amount})]
                  $$listing-top-bid)
(|hash *bidder-id)
(->ListingPointer *user-id *listing-id :> *pointer)
(local-transform> [(keypath *bidder-id *pointer) (termval *amount)] $$user-bids)
)))

This ETL code updates all the bid-related PStates. First, it adds the bidder’s user ID to the $$listing-bidders PState. Then, it updates the $$listing-top-bid PState by checking whether the new bid is greater than the previous top bid. This logic is expressed as part of the path to update the PState.

A critical property of Rama used here is that only one event runs on a module task at a time. So while this logic is executing, nothing else is running on this task: other bid events, foreign PState queries, or other events. So it’s impossible for multiple bids to update the $$listing-top-bid PState at the same time. The colocation of computation and storage in Rama gives you the atomicity and transactional properties needed for this use case.

The last piece of this ETL records the user’s bid in the $$user-bids PState. This PState is partitioned by the bidder’s user ID, so a |hash partition is done first to relocate the computation to the correct task.

Here’s the complete module definition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
(defrecord Listing [user-id post expiration-time-millis])
(defrecord Bid [bidder-id user-id listing-id amount])
(defrecord ListingPointer [user-id listing-id])

(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))
  (declare-depot setup *bid-depot (hash-by :user-id))

  (let [s (stream-topology topologies "auction")
        idgen (ModuleUniqueIdPState. "$$id")]
    (declare-pstate s $$user-listings {Long ; user ID
                                        (map-schema Long ; listing ID
                                                    String ; post
                                                    {:subindex? true})})
    (declare-pstate s $$listing-bidders {Long ; listing ID
                                          (set-schema Long ; user ID
                                                      {:subindex? true})})
    (declare-pstate s $$listing-top-bid {Long ; listing ID
                                          (fixed-keys-schema {:user-id Long
                                                              :amount Long})})
    (declare-pstate s $$user-bids {Long (map-schema ListingPointer
                                                    Long ; amount
                                                    {:subindex? true})})
    (.declarePState idgen s)

    (<<sources s
      (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
      (java-macro! (.genId idgen "*listing-id"))
      (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                        $$user-listings)

      (source> *bid-depot :> {:keys [*bidder-id *user-id *listing-id *amount]})
      (local-transform> [(keypath *listing-id) NONE-ELEM (termval *bidder-id)]
                        $$listing-bidders)
      (local-transform> [(keypath *listing-id)
                         (selected? :amount (nil->val 0) (pred< *amount))
                         (termval {:user-id *bidder-id :amount *amount})]
                        $$listing-top-bid)
      (|hash *bidder-id)
      (->ListingPointer *user-id *listing-id :> *pointer)
      (local-transform> [(keypath *bidder-id *pointer) (termval *amount)] $$user-bids)
      )))

Let’s run another quick test to verify this works:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(with-open [ipc (rtest/create-ipc)]
  (rtest/launch-module! ipc AuctionModule {:tasks 4 :threads 2})
  (let [module-name (get-module-name AuctionModule)
        listing-depot (foreign-depot ipc module-name "*listing-depot")
        bid-depot (foreign-depot ipc module-name "*bid-depot")
        user-listings (foreign-pstate ipc module-name "$$user-listings")
        listing-bidders (foreign-pstate ipc module-name "$$listing-bidders")
        listing-top-bid (foreign-pstate ipc module-name "$$listing-top-bid")
        user-bids (foreign-pstate ipc module-name "$$user-bids")

        larry-id 0
        hank-id 1
        artie-id 2
        beverly-id 3

        _ (foreign-append! listing-depot (->Listing larry-id "Listing 1" 0))
        larry1 (foreign-select-one [(keypath larry-id) LAST FIRST] user-listings)]
    (foreign-append! bid-depot (->Bid hank-id larry-id larry1 45))
    (foreign-append! bid-depot (->Bid artie-id larry-id larry1 50))
    (foreign-append! bid-depot (->Bid beverly-id larry-id larry1 48))

    (println "Listing bidders:" (foreign-select [(keypath larry1) ALL]
                                                listing-bidders
                                                {:pkey larry-id}))
    (println "Top bid:" (foreign-select-one (keypath larry1)
                                            listing-top-bid
                                            {:pkey larry-id}))
    (println "Hank's bids:" (foreign-select [(keypath hank-id) ALL] user-bids))
    ))

Running this prints:

1
2
3
Listing bidders: [1 2 3]
Top bid: {:user-id 2, :amount 50}
Hank's bids: [[#user.ListingPointer{:user-id 0, :listing-id 0} 45]]

Something new in this test code is the use of the :pkey option in the foreign select calls. A foreign PState query must determine which partition of the PState to query. It does this with a “partitioning key”. Without the :pkey option, it extracts the partitioning key from the first keypath in the query path. This is convenient since it’s very common for the partitioning key to be the same as the top-level key in the index. That’s not the case for these listing PStates, however. The :pkey option allows you to specify the partitioning key explicitly, which in this case is the user ID who owns the listing.

Adding listing expirations and notifications

Let’s finish the application by adding listing expirations and notifications. Let’s start by defining some helper functions needed by the implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(defn owner-notification [listing-id winner-id amount]
  (if winner-id
    (str "Auction for listing " listing-id
          " finished with winner " winner-id
          " for the amount " amount)
    (str "Auction for listing " listing-id " finished with no winner")
    ))

(defn winner-notification [user-id listing-id amount]
  (str "You won the auction for listing " user-id "/" listing-id " for the amount " amount))

(defn loser-notification [user-id listing-id]
  (str "You lost the auction for listing " user-id "/" listing-id))

(defn sorted-set-last [^java.util.SortedSet set]
  (.last set))

We’ll also need one new record definition:

1
(defrecord ListingWithId [id listing])

A separate topology will be handling notifications, and this record will be needed by that topology.

Next, let’s once again define the depots for the module:

1
2
3
4
(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))
  (declare-depot setup *bid-depot (hash-by :user-id))
  (declare-depot setup *listing-with-id-depot :disallow)

There’s one new depot here called *listing-with-id-depot . When a Listing is assigned an ID, a ListingWithId object will be added to this depot. This allows the separate notifications topology to consume that data. You can efficiently have as many consumers as you want to a depot, whether in the same module or across multiple modules, so such a depot is useful for other use cases as well. For example, you could implement full-text search on listings by consuming *listing-with-id-depot .

The :disallow depot partitioner disallows records to be appended to this depot using foreign-append! . This depot will instead be appended to by the module as it generates listing IDs.

Next, let’s define the stream topology and its PStates:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(let [s (stream-topology topologies "auction")
      idgen (ModuleUniqueIdPState. "$$id")]
  (declare-pstate s $$user-listings {Long ; user ID
                                      (map-schema Long ; listing ID
                                                  String ; post
                                                  {:subindex? true})})
  (declare-pstate s $$listing-bidders {Long ; listing ID
                                        (set-schema Long ; user ID
                                                    {:subindex? true})})
  (declare-pstate s $$listing-top-bid {Long ; listing ID
                                        (fixed-keys-schema {:user-id Long
                                                            :amount Long})})
  (declare-pstate s $$user-bids {Long (map-schema ListingPointer
                                                  Long ; amount
                                                  {:subindex? true})})
  (.declarePState idgen s)

This is exactly the same as before. Next, let’s begin defining the ETL:

1
2
3
4
5
6
7
8
(<<sources s
  (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
  (java-macro! (.genId idgen "*listing-id"))
  (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                    $$user-listings)
  (depot-partition-append! *listing-with-id-depot
                           (->ListingWithId *listing-id *listing)
                           :append-ack)

This adds a call to depot-partition-append! to this portion of the ETL. Unlike foreign-append! , depot appends within topologies go directly to the depot partition colocated with the ETL event. This is consistent with how writes to PStates work. Because there’s no other partitioning here, *listing-with-id-depot will be partitioned exactly the same as *listing-depot .

Next, let’s see the ETL for processing bids:

1
2
3
4
5
6
7
8
9
10
11
12
13
(source> *bid-depot :> {:keys [*bidder-id *user-id *listing-id *amount]})
(local-select> (keypath *listing-id) $$finished-listings :> *finished?)
(filter> (not *finished?))
(local-transform> [(keypath *listing-id) NONE-ELEM (termval *bidder-id)]
                  $$listing-bidders)
(local-transform> [(keypath *listing-id)
                   (selected? :amount (nil->val 0) (pred< *amount))
                   (termval {:user-id *bidder-id :amount *amount})]
                  $$listing-top-bid)
(|hash *bidder-id)
(->ListingPointer *user-id *listing-id :> *pointer)
(local-transform> [(keypath *bidder-id *pointer) (termval *amount)] $$user-bids)
))

The only change here is the addition of the first two lines of the ETL, which checks whether the auction is still active. The $$finished-listings PState will be defined in the other topology, and it’s a map from listing ID to a boolean flag. If the auction is finished, then the bid is ignored and no PStates are updated.

Next, let’s take a look at the start of the definition of the new ETL for this module. This ETL handles expiring listings and notifications.

1
2
3
4
5
(let [mb (microbatch-topology topologies "expirations")      
      scheduler (TopologyScheduler. "$$scheduler")]
  (declare-pstate mb $$finished-listings {Long Boolean})
  (declare-pstate mb $$notifications {Long (vector-schema String {:subindex? true})})
  (.declarePStates scheduler mb)

This defines a microbatch topology. Whereas streaming processes data immediately, microbatching processes a small batch of data across all depot partitions at the same time. Each iteration of microbatching processes the data that accumulated the last microbatch iteration. Since there’s no per-record overhead, microbatching has even higher throughput than streaming. However, because it’s batch-based the processing latency of microbatching is a few hundred milliseconds as opposed to the one or two milliseconds of streaming. Microbatching also provides exactly-once processing semantics for updates to PStates, even if a machine explodes in the middle of processing and the microbatch is retried.

Because they’re part of the same module, this new microbatch ETL is colocated with the streaming ETL and all its PStates. They share the same resources and can read each other’s PStates directly.

There are two PStates for this topology. $$finished-listings , as described before, has a flag for each listing ID as to whether the auction is finished or not. $$notifications contains a list of notification strings for each user. Since the number of notifications a user can receive is unbounded, the list is subindexed.

This topology also makes use of another utility from rama-helpers called TopologyScheduler . This is another small utility that makes it easy to schedule future work in a topology. Because it is built upon Rama’s primitives of ETLs and PStates, it’s completely fault-tolerant and can sustain very high throughputs of scheduled events.

Here’s the start of the definition for this ETL:

1
2
3
4
5
6
7
(<<sources mb
  (source> *listing-with-id-depot :> %microbatch)
  (anchor> <root>)
  (%microbatch :> {*listing-id :id
                   {:keys [*user-id *expiration-time-millis]} :listing})
  (vector *user-id *listing-id :> *tuple)
  (java-macro! (.scheduleItem scheduler "*expiration-time-millis" "*tuple"))

The %microbatch variable emitted by a microbatch source represents the entire batch of data for this microbatch iteration. It’s an anonymous operation which when invoked emits all data across all depot partitions. So if there’s 500 individual ListingWithId objects per depot partition in the microbatch, %microbatch will emit 500 times on each partition.

To process a ListingWithId , this ETL simply uses the TopologyScheduler to schedule a tuple containing the user ID and listing ID for later execution at the specified time.

The last piece of the module handles expired listings:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(hook> <root>)
(java-macro!
  (.handleExpirations
    scheduler
    "*tuple"
    "*current-time-millis"
    (java-block<-
      (identity *tuple :> [*user-id *listing-id])
      (local-transform> [(keypath *listing-id) (termval true)]
                        $$finished-listings)
      (local-select> (keypath *listing-id)
                     $$listing-top-bid
                     :> {*winner-id :user-id *amount :amount})
      (local-transform> [(keypath *user-id)
                         AFTER-ELEM
                         (termval (owner-notification *listing-id *winner-id *amount))]
                        $$notifications)
      (loop<- [*next-id -1 :> *bidder-id]
        (yield-if-overtime)
        (local-select> [(keypath *listing-id)
                        (sorted-set-range-from *next-id {:max-amt 1000 :inclusive? false})]
                       $$listing-bidders
                       :> *users)
        (<<atomic
          (:> (ops/explode *users)))
        (<<if (= (count *users) 1000)
          (continue> (sorted-set-last *users))))
      (|hash *bidder-id)
      (<<if (= *bidder-id *winner-id)
        (winner-notification *user-id *listing-id *amount :> *text)
       (else>)
        (loser-notification *user-id *listing-id :> *text))
      (local-transform> [(keypath *bidder-id)
                         AFTER-ELEM
                         (termval *text)]
                        $$notifications)
      ))))))

.handleExpirations on TopologyScheduler inserts code that checks for expired items. Here, it’s attached to the root of the microbatch iteration so it will run once for each microbatch. For each expired item, it binds the expired item to *tuple , the time at which it checked to *current-time-millis , and then runs the provided block of code. The java-block<- macro defines a block of code for the Java API in Clojure.

First, the $$finished-listings PState is updated. Just like before, while this event is running no other events can run on this partition. So there are no race conditions with concurrent bids.

Next, it fetches the winning bidder. It then delivers a notification to the owner of the listing that the auction is over. Since listings are partitioned by their owner’s user ID, no partitioning is needed to deliver this notification.

Next, the ETL fetches all bidders from the $$listing-bidders PState. The loop that does this demonstrates an important aspect of Rama. As discussed already, nothing else can run on a task thread while an event is running. This property gives great power by being able to atomically read and write to many PStates at once, while also avoiding potential race conditions. However, as an application developer you need to make sure not to hold a thread for too long or else you’ll unfairly delay events for PState reads and other ETLs. Rama modules should be developed with cooperative multitasking in mind.

Since the number of bidders for a listing can be arbitrarily large, this code paginates through the PState reading 1,000 bidders each iteration. Each bidder is emitted from the loop separately. Each iteration, the loop calls yield-if-overtime which yields the thread to other events if too much time has passed (by default 5ms). Because of the power of the Rama’s dataflow paradigm, you’re able to write the code linearly even though it’s performing asynchronous operations in the middle of processing.

To finish delivering notifications for each bidder, the dataflow code then uses (|hash *bidder-id) to switch to the task hosting notifications for that bidder. It then updates the $$notifications PState with the appropriate text.

Here’s the code for the complete module, including requires and helper functions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
(use 'com.rpl.rama)
(use 'com.rpl.rama.path)
(require '[com.rpl.rama.ops :as ops])
(require '[com.rpl.rama.aggs :as aggs])
(require '[com.rpl.rama.test :as rtest])
(import 'com.rpl.rama.helpers.ModuleUniqueIdPState)
(import 'com.rpl.rama.helpers.TopologyScheduler)

(defn owner-notification [listing-id winner-id amount]
  (if winner-id
    (str "Auction for listing " listing-id
          " finished with winner " winner-id
          " for the amount " amount)
    (str "Auction for listing " listing-id " finished with no winner")
    ))

(defn winner-notification [user-id listing-id amount]
  (str "You won the auction for listing " user-id "/" listing-id " for the amount " amount))

(defn loser-notification [user-id listing-id]
  (str "You lost the auction for listing " user-id "/" listing-id))

(defn sorted-set-last [^java.util.SortedSet set]
  (.last set))

(defrecord Listing [user-id post expiration-time-millis])
(defrecord Bid [bidder-id user-id listing-id amount])
(defrecord ListingPointer [user-id listing-id])
(defrecord ListingWithId [id listing])

(defmodule AuctionModule [setup topologies]
  (declare-depot setup *listing-depot (hash-by :user-id))
  (declare-depot setup *bid-depot (hash-by :user-id))
  (declare-depot setup *listing-with-id-depot :disallow)

  (let [s (stream-topology topologies "auction")
        idgen (ModuleUniqueIdPState. "$$id")]
    (declare-pstate s $$user-listings {Long ; user ID
                                        (map-schema Long ; listing ID
                                                    String ; post
                                                    {:subindex? true})})
    (declare-pstate s $$listing-bidders {Long ; listing ID
                                          (set-schema Long ; user ID
                                                      {:subindex? true})})
    (declare-pstate s $$listing-top-bid {Long ; listing ID
                                          (fixed-keys-schema {:user-id Long
                                                              :amount Long})})
    (declare-pstate s $$user-bids {Long (map-schema ListingPointer
                                                    Long ; amount
                                                    {:subindex? true})})
    (.declarePState idgen s)

    (<<sources s
      (source> *listing-depot :> {:keys [*user-id *post] :as *listing})
      (java-macro! (.genId idgen "*listing-id"))
      (local-transform> [(keypath *user-id *listing-id) (termval *post)]
                        $$user-listings)
      (depot-partition-append! *listing-with-id-depot
                               (->ListingWithId *listing-id *listing)
                               :append-ack)

      (source> *bid-depot :> {:keys [*bidder-id *user-id *listing-id *amount]})
      (local-select> (keypath *listing-id) $$finished-listings :> *finished?)
      (filter> (not *finished?))
      (local-transform> [(keypath *listing-id) NONE-ELEM (termval *bidder-id)]
                        $$listing-bidders)
      (local-transform> [(keypath *listing-id)
                         (selected? :amount (nil->val 0) (pred< *amount))
                         (termval {:user-id *bidder-id :amount *amount})]
                        $$listing-top-bid)
      (|hash *bidder-id)
      (->ListingPointer *user-id *listing-id :> *pointer)
      (local-transform> [(keypath *bidder-id *pointer) (termval *amount)] $$user-bids)
      ))
  (let [mb (microbatch-topology topologies "expirations")      
        scheduler (TopologyScheduler. "$$scheduler")]
    (declare-pstate mb $$finished-listings {Long Boolean})
    (declare-pstate mb $$notifications {Long (vector-schema String {:subindex? true})})
    (.declarePStates scheduler mb)

    (<<sources mb
      (source> *listing-with-id-depot :> %microbatch)
      (anchor> <root>)
      (%microbatch :> {*listing-id :id
                       {:keys [*user-id *expiration-time-millis]} :listing})
      (vector *user-id *listing-id :> *tuple)
      (java-macro! (.scheduleItem scheduler "*expiration-time-millis" "*tuple"))

      (hook> <root>)
      (java-macro!
        (.handleExpirations
          scheduler
          "*tuple"
          "*current-time-millis"
          (java-block<-
            (identity *tuple :> [*user-id *listing-id])
            (local-transform> [(keypath *listing-id) (termval true)]
                              $$finished-listings)
            (local-select> (keypath *listing-id)
                           $$listing-top-bid
                           :> {*winner-id :user-id *amount :amount})
            (local-transform> [(keypath *user-id)
                               AFTER-ELEM
                               (termval (owner-notification *listing-id *winner-id *amount))]
                              $$notifications)
            (loop<- [*next-id -1 :> *bidder-id]
              (yield-if-overtime)
              (local-select> [(keypath *listing-id)
                              (sorted-set-range-from *next-id {:max-amt 1000 :inclusive? false})]
                             $$listing-bidders
                             :> *users)
              (<<atomic
                (:> (ops/explode *users)))
              (<<if (= (count *users) 1000)
                (continue> (sorted-set-last *users))))
            (|hash *bidder-id)
            (<<if (= *bidder-id *winner-id)
              (winner-notification *user-id *listing-id *amount :> *text)
             (else>)
              (loser-notification *user-id *listing-id :> *text))
            (local-transform> [(keypath *bidder-id)
                               AFTER-ELEM
                               (termval *text)]
                              $$notifications)
            ))))))

Finally, let’s run another quick test to verify it works:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
(defn expiration [seconds]
  (+ (System/currentTimeMillis) (* seconds 1000)))

(with-open [ipc (rtest/create-ipc)]
  (rtest/launch-module! ipc AuctionModule {:tasks 4 :threads 2})
  (let [module-name (get-module-name AuctionModule)
        listing-depot (foreign-depot ipc module-name "*listing-depot")
        bid-depot (foreign-depot ipc module-name "*bid-depot")
        user-bids (foreign-pstate ipc module-name "$$user-bids")
        user-listings (foreign-pstate ipc module-name "$$user-listings")
        listing-bidders (foreign-pstate ipc module-name "$$listing-bidders")
        listing-top-bid (foreign-pstate ipc module-name "$$listing-top-bid")
        notifications (foreign-pstate ipc module-name "$$notifications")

        larry-id 0
        hank-id 1
        artie-id 2
        beverly-id 3

        _ (foreign-append! listing-depot (->Listing larry-id "Listing 1" (expiration 5)))
        larry1 (foreign-select-one [(keypath larry-id) LAST FIRST] user-listings)]
    (foreign-append! bid-depot (->Bid hank-id larry-id larry1 45))
    (foreign-append! bid-depot (->Bid artie-id larry-id larry1 50))
    (foreign-append! bid-depot (->Bid beverly-id larry-id larry1 48))

    ;; wait slightly more than the expiration time for the listing to allow notifications
    ;; to be delivered
    (Thread/sleep 6000)
    (println "Larry:" (foreign-select [(keypath larry-id) ALL] notifications))
    (println "Hank:" (foreign-select [(keypath hank-id) ALL] notifications))
    (println "Artie:" (foreign-select [(keypath artie-id) ALL] notifications))
    (println "Beverly:" (foreign-select [(keypath beverly-id) ALL] notifications))
    ))

This prints:

1
2
3
4
Larry: [Auction for listing 0 finished with winner 2 for the amount 50]
Hank: [You lost the auction for listing 0/0]
Artie: [You won the auction for listing 0/0 for the amount 50]
Beverly: [You lost the auction for listing 0/0]

That’s all there is to it. In just 100 lines of code we’ve built a high-performance auction application that can scale to millions of listings/bids per second, is completely fault-tolerant, and is easy to evolve over time with new features. Since deployment and monitoring are built-in to Rama, this is production-ready. We didn’t implement account registration or profiles, but that’s trivial to add.

Conclusion

Rama derives its power from being based on composable abstractions. PStates are a composable abstraction for storage, enabling any database’s data model (plus infinite more) to be expressed as the composition of data structures. Rama’s dataflow API is a composable abstraction for distributed computation, enabling you to seamlessly combine regular logic with partitioners, yields, and other asynchronous tasks.

Rama can handle all the computation and storage for a backend, but it’s also easy to integrate with existing architectures. Rama’s integration API allows you to use external databases, queues, monitoring systems, or other tools with your modules.

Besides the documentation, we’ve released other resources for learning Rama. rama-demo-gallery contains short, self-contained, thoroughly commented examples of using Rama to build a variety of use cases. But the best way to learn Rama is to try it out yourself using the publicly available build. The REPL is an invaluable environment for experimenting with Rama. If you have any questions, feel free to ask on the rama-user Google group or #rama channel on Clojurians.

Finally, if you’d like to use Rama in production to build new features, scale your existing systems, or simplify your infrastructure, you can apply to our private beta. We’re working closely with each private beta user to not only help them learn Rama, but also actively helping code, optimize, and test.

How we reduced the cost of building Twitter at Twitter-scale by 100x

I’m going to cover a lot of ground in this post, so here’s the TLDR:

  • We built a Twitter-scale Mastodon instance from scratch in only 10k lines of code. This is 100x less code than the ~1M lines Twitter wrote to build and scale their original consumer product, which is very similar to Mastodon. Our instance is located at https://mastodon.redplanetlabs.com and open for anyone to use. The instance has 100M bots posting 3,500 times per second at 403 average fanout to demonstrate its scale.
  • Our implementation is built on top of a new platform called Rama that we at Red Planet Labs have developed over the past 10 years. This is the first time we’re talking about Rama publicly. Rama unifies computation and storage into a coherent model capable of building end-to-end backends at any scale in 100x less code than otherwise. Rama integrates and generalizes data ingestion, processing, indexing, and querying. Rama is a generic platform for building application backends, not just for social networks, and is programmed with a pure Java API. I will be exploring Rama in this post through the example of our Mastodon implementation.
  • We spent nine person-months building our scalable Mastodon instance. Twitter spent ~200 person-years to build and scale their original consumer product, and Instagram spent ~25 person-years building Threads, a recently launched Twitter competitor. In their effort Instagram was able to leverage infrastructure already powering similar products.
  • Our scalable Mastodon implementation is also significantly less code than Mastodon’s official implementation, which cannot scale anywhere near Twitter-scale.
  • In one week we will release a version of Rama that anyone can download and use. This version simulates Rama clusters within a single process and can be used to explore the full Rama API and build complete prototypes. We will also release the Rama documentation at that time. (Instructions for downloading Rama are now available here and the documentation is available here.)
  • In two weeks we will fully open-source our Mastodon implementation (This is now open-source).
  • Red Planet Labs will be starting a private beta soon to give companies access to the full version of Rama. We will release more details on the private beta later, but companies can apply here in the meantime.

We recognize the way we’re introducing Rama is unusual. We felt that since the 100x cost reduction claim sounds so unbelievable, it wouldn’t do Rama justice to introduce it in the abstract. So we took it upon ourselves to directly demonstrate Rama’s 100x cost reduction by replicating a full application at scale in all its detail.

Table of contents

Our Mastodon instance

First off, we make no comment about whether Mastodon should be scalable. There are good reasons to limit the size of an individual Mastodon instance. It is our belief, however, that such decisions should be product decisions and not forced by technical limitations. What we are demonstrating with our scalable Mastodon instance is that building a complex application at scale doesn’t have to be a costly endeavor and can instead be easily built and managed by individuals or small teams. There’s no reason the tooling you use to most quickly build your prototype should be different from what you use to build your application at scale.

Our Mastodon instance is hosted at https://mastodon.redplanetlabs.com. We’ve implemented every feature of Mastodon from scratch, including:

  • Home timelines, account timelines, local timeline, federated timeline
  • Follow / unfollow
  • Post / delete statuses
  • Lists
  • Boosts / favorites / bookmarks
  • Personalized follow suggestions
  • Hashtag timelines
  • Featured hashtags
  • Notifications
  • Blocking / muting
  • Conversations / direct messages
  • Filters
  • View followers / following in order (paginated)
  • Polls
  • Trending hashtags and links
  • Search (status / people / hashtags)
  • Profiles
  • Image/video attachments
  • Scheduled statuses
  • ActivityPub API to integrate with other Mastodon instances

There’s huge variety between these features, and they require very different kinds of implementations for how computations are done and how indexes are structured. Of course, every single aspect of our Mastodon implementation is scalable.

To demonstrate the scale of our instance, we’re also running 100M bot accounts which continuously post statuses (Mastodon’s analogue of a “tweet”), replies, boosts (“retweet”), and favorites. 3,500 statuses are posted per second, the average number of followers for each post is 403, and the largest account has over 22M followers. As a comparison, Twitter serves 7,000 tweets per second at 700 average fanout (according to the numbers I could find). With the click of a button we can scale our instance up to handle that load or much larger – it would just cost us more money in server costs. We used the OpenAI API to generate 50,000 statuses for the bots to choose from at random.

Since our instance is just meant to demonstrate Rama and costs money to run, we’re not planning to keep it running for that long. So we don’t recommend using this instance for a primary Mastodon account.

One feature of Mastodon that needed tweaking because of our high rate of new statuses was global timelines, as it doesn’t make sense to flood the UI with thousands of new statuses per second. So for that feature we instead show a small sample of all the statuses on the platform.

The implementation of our instance looks like this:

The Mastodon backend is implemented as Rama modules (explained later on this page), which handles all the data processing, data indexing, and most of the product logic. On top of that is our implementation of the Mastodon API using Spring/Reactor. For the most part, the API implementation just handles HTTP requests with simple calls to the Rama modules and serves responses as JSON. We use Soapbox to serve the frontend since it’s built entirely on top of the Mastodon API.

S3 is used only for serving pictures and videos. Though we could serve those from Rama, static content like that is better served via a CDN. So we chose to use S3 to mimic that sort of architecture. All other storage is handled by the Rama modules.

Our implementation totals 10k lines of code, about half of which is the Rama modules and half of which is the API server. We will be fully open-sourcing the implementation in two weeks.

Our implementation is a big reduction in code compared to the official Mastodon implementation, which is built with Ruby on Rails. That codebase doesn’t always have a clear distinction between frontend and backend code, but just adding up the code for clearcut backend portions (models, workers, services, API controllers, ActivityPub) totals 18k lines of Ruby code. That doesn’t include any of the database schema definition code, configurations needed to run Postgres and Redis, or other controller code, so the true line count for the official Mastodon backend is higher than that. And unlike our Rama implementation, it can’t achieve anywhere near Twitter-scale.

This isn’t a criticism of the Mastodon codebase – building products with existing technologies is just plain expensive. The reason we’ve worked on Rama for so many years is to enable developers to build applications much faster, with much greater quality, and to never have to worry about scaling ever again.

Performance and scalability

Here’s a chart showing the scalability of the most intensive part of Mastodon, processing statuses:

As you can see, increasing the number of nodes increases the statuses/second that can be processed. Most importantly, the relationship is linear. Processing statuses is so intensive because of fanout – if you have 15M followers, each of your statuses has to be written to 15M timelines. Each status on our instance is written to an average of 403 timelines (plus additional work to handle replies, lists, and conversations).

Twitter operates at 7,000 tweets per second at 700 average fanout, which is equivalent to about 12,200 tweets / second at 403 average fanout. So you can see we tested our Mastodon instance well above Twitter-scale.

Here’s a chart showing the latency distribution for the time from a status being posted to it being available on follower timelines:

These numbers are a bit better than Twitter’s numbers. Because of how unbalanced the social graph is, getting performance this good and this reliable is not easy. For example, when someone with 20M followers posts a status, that creates a huge burst of load which could delay other statuses from fanning out. How we handled this is described more below.

Lastly, here’s a chart showing the latency distribution for fetching the data to render a user’s home timeline:

Rendering a home timeline requires a lot of data from the backend: a page of statuses to render that aren’t muted/filtered, stats on each status (number of replies, boosts, and favorites), as well as information on the accounts that posted each status (username, display name, profile pic). Getting all this done in an average of 87ms is extremely efficient and a result of Rama being such an integrated system.

Rama

The numbers I’ve shared here should be hard to believe: a Twitter-scale Mastodon implementation with extremely strong performance numbers in only 10k lines of code, which is less code than Mastodon’s current backend implementation and 100x less code than Twitter’s scalable implementation of a very similar product? How is it possible that we’ve reduced the cost of building scalable applications by multiple orders of magnitude?

You can begin to understand this by starting with a simple observation: you can describe Mastodon (or Twitter, Reddit, Slack, Gmail, Uber, etc.) in total detail in a matter of hours. It has profiles, follows, timelines, statuses, replies, boosts, hashtags, search, follow suggestions, and so on. It doesn’t take that long to describe all the actions you can take on Mastodon and what those actions do. So the real question you should be asking is: given that software is entirely abstraction and automation, why does it take so long to build something you can describe in hours?

At its core Rama is a coherent set of abstractions for expressing backends end-to-end. All the intricacies of an application backend can be expressed in code that’s much closer to how you describe the application at a high level. Rama’s abstractions allow you to sidestep the mountains of complexity that blow up the cost of existing applications so much. So not only is Rama inherently scalable and fault-tolerant, it’s also far less work to build a backend with Rama than any other technology.

Let’s now dive into Rama. We’ll start with a high-level overview of Rama’s concepts. Then we’ll look at how some of the most important parts of our Mastodon instance are implemented in terms of these concepts. Finally, we’ll look at some code from our Mastodon implementation.

Rama is programmed entirely with a Java API, and Rama’s programming model has four main concepts:

On the left are “depots”, which are distributed, durable, and replicated logs of data. All data coming into Rama comes in through depot appends. Depots are like Apache Kafka except integrated with the rest of Rama.

Next are "ETL"s, extract-transform-load topologies. These process incoming data from depots as it arrives and produce indexed stores called “partitioned states”. Rama offers two types of ETL, streaming and microbatching, which have different performance characteristics. Most of the time spent programming Rama is spent making ETLs. Rama exposes a Java dataflow API for coding topologies that is extremely expressive.

Next are “partitioned states”, which we usually call “PStates”. PStates are how data is indexed in Rama, and just like depots they’re partitioned across many nodes, durable, and replicated. PStates are one of the keys to how Rama is such a general-purpose system. Unlike existing databases, which have rigid indexing models (e.g. “key-value”, “relational”, “column-oriented”, “document”, “graph”, etc.), PStates have a flexible indexing model. In fact, they have an indexing model already familiar to every programmer: data structures. A PState is an arbitrary combination of data structures, and every PState you create can have a different combination. With the “subindexing” feature of PStates, nested data structures can efficiently contain hundreds of millions of elements. For example, a “map of maps” is equivalent to a “document database”, and a “map of subindexed sorted maps” is equivalent to a “column-oriented database”. Any combination of data structures and any amount of nesting is valid – e.g. you can have a “map of lists of subindexed maps of lists of subindexed sets”. I cannot emphasize enough how much interacting with indexes as regular data structures instead of magical “data models” liberates backend programming.

The last concept in Rama is “query”. Queries in Rama take advantage of the data structure orientation of PStates with a “path-based” API that allows you to concisely fetch and aggregate data from a single partition. In addition to this, Rama has a feature called “query topologies” which can efficiently do real-time distributed querying and aggregation over an arbitrary collection of PStates. These are the analogue of “predefined queries” in traditional databases, except programmed via the same Java API as used to program ETLs and far more capable.

Individually, none of these concepts are new. I’m sure you’ve seen them all before. You may be tempted to dismiss Rama’s programming model as just a combination of event sourcing and materialized views. But what Rama does is integrate and generalize these concepts to such an extent that you can build entire backends end-to-end without any of the impedance mismatches or complexity that characterize and overwhelm existing systems.

All these concepts are implemented by Rama in a linearly scalable way. So if your application needs more resources, you can add them at the click of a button. Rama also achieves fault-tolerance by replicating all data and implementing automatic failover.

Here’s an example of how these concepts fit together. These are all the depots, ETLs, PStates, and query topologies for the portion of our Mastodon implementation handling profiles, statuses, and timelines:

This looks intimidating, but this part of the codebase only totals 1,100 lines of code. And it implements a ton of functionality, all scalably: statuses, timelines, boosts, conversations, favorites, bookmarks, mutes, account registration, profile edits, federation, and more. Notice that the PStates are a diverse collection of data structure combinations, and there are 33 of them here. Many of the ETLs produce multiple PStates and consume multiple depots. Making depots, ETLs, and PStates is inexpensive in Rama and can be done liberally.

What you’re seeing in this diagram is a total inversion of control compared to how applications are typically architected today. For example, consider the “fanout” ETL in this diagram which processes incoming statuses and sends them to follower timelines (you’ll see the code for this later in this post). There are a bunch of rules dictating which statuses go to which followers – boosts never go back to the original author, replies only go to followers who also follow the account being replied to, and so on. Traditionally, that’s accomplished with a “database layer” handling storage and a separate “application layer” implementing the product logic. The “application layer” does reads and writes to the “database layer”, and the two layers are deployed, scaled, and managed separately. But with Rama, the product logic exists inside the system doing the indexing. Computation and storage are colocated. Rama does everything a database does, but it also does so much more.

When building a backend with Rama, you begin with all the use cases you need to support. For example: fetch the number of followers of a user, fetch a page of a timeline, fetch ten follow suggestions, and so on. Then you determine what PState layouts (data structures) you need to support those queries. One PState could support ten of your queries, and another PState may support just one query.

Next you determine what your source data is, and then you make depots to receive that data. Source data usually corresponds to events happening in your application, like “Alice follows Bob”, “James posted the status ‘Hello world’”, or “Bob unfollows Charlie”. You can represent your data however you want, whether Java objects, frameworks like Thrift or Protocol Buffers, or even unstructured formats like JSON (however, we recommend using structured formats as much as possible).

The last step is writing the ETL topologies that convert source data from your depots into your PStates. When deployed, the ETLs run continuously keeping your PStates up to date. Rama’s ETL API, though just Java, is like a “distributed programming language” with the computational capabilities of any Turing-complete language along with facilities to easily control on which partition computation happens at any given point. You’ll see many examples of this API later in this post.

Clusters and modules

Rama is deployed onto a cluster of nodes. There’s a central daemon called the “Conductor” which coordinates deploys, updates, and scaling operations. Every node has a “Supervisor” daemon which manages the launch/teardown of user code.

Applications are deployed onto a Rama cluster as “modules”. A “module” contains an arbitrary combination of depots, ETLs, PStates, and query topologies. Unlike traditional architectures, where the corresponding analogues exist in separate processes and usually on separate nodes, in Rama these are colocated in the same set of processes. This colocation enables fantastic efficiency which has never been possible before. Modules can also consume data from depots and PStates in other modules just as easily as they can from their own. A module runs forever, continuously processing new data from depots, unless you choose to destroy it.

A module is deployed by giving the Conductor a .jar file with user code. Additionally, configuration is provided for the number of nodes to allocate to the module, replication parameters, as well as any other tuning parameters. A module is updated a similar way: a new .jar is provided with the new code, and the Conductor orchestrates an update sequence that launches new processes and transfers depots and PStates to the new module version.

Here’s what a Rama cluster could look like with two modules deployed, “SocialGraphModule” and “TimelineModule”:

For testing and development, Rama provides a class called InProcessCluster for simulating Rama clusters within a single process.

Mastodon on Rama

Let’s look at some of the key parts of how Mastodon is implemented on top of Rama. In this section we’ll focus on the design of Mastodon – what PStates are created, the flow of how data is processed, and how everything is organized. You’ll see how Rama’s capabilities enable some seriously novel ways to architect applications. In the next section we’ll look at some of the code from our implementation.

Following hashtags

Let’s start with an extremely simple part of the implementation, tracking followers for hashtags. The implementation for this totals 11 lines of code and supports the following queries:

  • Does user A follow hashtag H?
  • Who follows hashtag H (paginated)?
  • How many followers does hashtag H have?

Only a single PState is needed for this, called $$hashtagToFollowers (PState names in Rama always begin with $$ ). It is a map from a hashtag to a set of account IDs. Here’s a visualization of what this PState could look like across two partitions. Keep in mind that PStates are distributed across many partitions, with each partition of a PState being the specified data structure:

There are two events that change this PState: following a hashtag, and unfollowing a hashtag. In our implementation, these are represented by the types FollowHashtag and RemoveFollowHashtag .

A good way to visualize how data is processed to produce this PState is via a dataflow graph, as you can see how data moves from the start of processing (one or more depots) to the end results of processing (updates to PStates):

The logic here is trivial, which is why the implementation is only 11 lines of code. You don’t need to worry about things like setting up a database, establishing database connections, handling serialization/deserialization on each database read/write, writing deploys just to handle this one task, or any of the other tasks that pile up when building backend systems. Because Rama is so integrated and so comprehensive, a trivial feature like this has a correspondingly trivial implementation.

You may be wondering why the follow and unfollow events go onto the same depot instead of separate depots. Though you could implement it that way, it would be a mistake to do so. Data is processed off a depot partition in the order in which it was received, but there are no ordering guarantees across different depots. So if a user was spamming the “Follow” and “Unfollow” buttons on the UI and those events were appended to different depots, it’s possible a later unfollow could be processed before a prior follow. This would result in the PState ending up in the incorrect state according to the order by which the user made actions. By putting both events in the same depot, the data is processed in the same order in which it was created.

As a general rule, Rama guarantees local ordering. Data sent between two points are processed in the order in which they were sent. This is true for processing data off a depot, and it’s also true for intra-ETL processing when your processing jumps around to different partitions as part of computation.

This dataflow diagram is literally how you program with Rama, by specifying dataflow graphs in a pure Java API. As you’ll see below, the details of specifying computations like this involve variables, functions, filters, loops, branching, and merging. It also includes fine-grained control over which partitions computation is executing at any given point.

Social graph

Let’s now look at a slightly more involved part of the implementation, the social graph. The social graph totals 105 lines of code and supports the following queries which power various aspects of Mastodon:

  • Does user A follow user B?
  • How many followers does user A have?
  • How many accounts does user A follow?
  • Who are user A’s followers in the order in which they followed (paginated)?
  • Who does user A follow in the order in which they followed them (paginated)?
  • Who is currently requesting to follow user A in the order in which they requested (paginated)?
  • Does user A block user B?
  • Does user A mute user B?
  • Does user A wish to see boosts from user B?

Even though the implementation is so simple with Rama, it’s worth noting that Twitter had to write a custom database from scratch to build their scalable social graph.

There are four main PStates produced by our social graph implementation, named $$followerToFollowees , $$followeeToFollowers , $$accountIdToFollowRequests , and $$accountIdToSuppressions . The first three PStates have the same structure, which is a map from account ID to a linked set of account IDs plus additional information needed about the relationship. For example, for $$followeeToFollowers we track whether a follower wants to see boosts from that account in their home timeline (which is one of Mastodon’s features). This PState is also used to compute: whether an account already follows another account, the order in which accounts were followed (which is why a linked set is used rather than a regular set), and the number of followers for an account (which is just the size of the inner set, something Rama computes in fast constant time even if the set has millions of elements).

The $$accountIdToSuppressions PState tracks blocks and mutes for each account and has a different structure. It is a map from account ID to a map containing two keys: “blocked” and “muted”. The “blocked” key maps to the set of account IDs the top-level account ID has blocked. The “muted” key is similar but stores a map from account ID to the options for that mute (like expiration time). This PState is used wherever statuses are rendered (e.g. home timeline, notifications) to filter out statuses a user doesn’t want to see.

Here’s a visualization of what the $$followeeToFollowers PState could look like in a deployed module:

Here you can see that account “1” is followed by accounts “2”, “7”, and “5”, with “2” and “7” having boosts enabled and “5” having boosts disabled. Account “4” is only followed by account “1”, and “1” has boosts enabled for that relationship.

The social graph is constructed based on follow, unfollow, block, unblock, mute, and unmute events. In Mastodon’s design, blocking a user also unfollows in both directions (if currently followed). So the block and unblock events go on the same depot as the follow-related events, while the mute and unmute events go on a separate depot.

Here’s the dataflow graph showing how these PStates are computed based on the source data:

This is more involved than the hashtag follows dataflow graph since this ETL supports so many more features. Yet it’s still only 105 lines of code to implement. You can see how this ETL makes use of conditionals, branching, and merging in its implementation. Here’s a few notes on the logic within this ETL:

  • When a user follows another user, the $$followerToFollowees and $$followeeToFollowers PStates are updated. Unfollowing updates the same PStates but removes the relationship instead of adding it.
  • Blocking is handled specially by implicitly emitting additional unfollow events as part of processing (to unfollow in both directions), as well as tracking who blocks who in the $$accountIdToSuppressions PState.
  • A locked account (where each follower must be individually approved) receives FollowLockedAccount events, and unlocked accounts receive Follow events.
  • Attributes of a relationship (e.g. “show boosts”, “notify”) are updated via appending another Follow or FollowLockedAccount event to the depot. This is why the FollowLockedAccount checks if the follow relationship already exists so it can determine whether to update the follow relationship or add a new follow request.
  • A Follow event also removes the corresponding follow request if it exists. This is why an AcceptFollowRequest event just converts to a Follow event.

This ETL interacts with many PStates at the same time. Because of Rama’s integrated nature, these PStates are colocated with one another within the same processes that are executing the ETL logic. So whereas you always have to do a network operation to access most databases, PState operations are local, in-process operations with Rama ETLs. As you’ll see later, you utilize the network in an ETL via “partitioner” operations to get to the right partition of a module, but once you’re on a partition you can perform as many PState operations to as many colocated PStates as you need. This is not only extremely efficient but also liberating due to the total removal of impedance mismatches that characterizes interacting with databases.

Computing and rendering home timelines

Next let’s look at the core of Mastodon, computing and rendering home timelines. This powers the primary page of the Mastodon experience:

This use case is a great example of how to think about building data-intensive systems not just with Rama, but in general. For any backend feature you want to implement, you have to balance what gets precomputed versus what gets computed on the fly at query-time. The more you can precompute, the less work you’ll have to do at query-time and the lower latencies your users will experience. This basic structure looks like this:

This of course is the programming model of Rama you’ve already seen, and a big part of designing Rama applications is determining what computation goes in the ETL portion versus what goes in the query portion. Because both the ETL and query portions can be arbitrary distributed computations, and since PStates can be any structure you want, you have total flexibility when it comes to choosing what gets precomputed versus what gets computed on the fly.

It’s generally a good idea to work backwards from the needed queries to learn how to best structure things. In the case of querying for a page of a timeline, you need the following information:

  • Content for each status
  • Stats for each status (number of replies, boosts, and favorites)
  • Information about the account that posted each status (username, display name, profile pic)
  • Whether the author of each status is blocked or muted
  • For boosts, the username of the booster
  • ID to use to fetch the next page of statuses

Since statuses can be edited and profile information can be changed, almost all this information is dynamic and must be fetched for each render of a timeline page.

Let’s consider a typical way to go about this with non-Rama technologies, even scalable ones, which unfortunately is extremely inefficient:

  • Fetch the list of status IDs for a page of the timeline
  • In parallel, send database requests to fetch:
    • Content for each status
    • Stats for each status
    • Information for each author

For a page of 20 statuses, this could easily require over 100 database calls with a lot of overhead in the sheer amount of back and forth communication needed to fetch data from each database partition.

We handled this use case with Rama by making use of Rama’s facilities for colocation of PStates. The module is organized such that all information for a status and the account who posted it are colocated in the same process. So instead of needing separate requests for status content, status stats, and author information, only one request is needed per status.

Structuring the PStates

Here are all the depots, PStates, and query topologies in the module implementing timelines and profiles (repeated from above):

Let’s look at these PStates in closer detail, as the way they’re structured and partitioned is extremely interesting. Let’s start with $$accountIdToAccount . This is a map from account ID to profile information, including username, display name, and profile pic. Here’s a picture of what this PState could look like across four partitions:

This PState is partitioned by the account ID. When I say “partitioned by”, I mean how the Mastodon implementation chooses on which partition to store data for any particular account. The most common way to partition a PState is to hash a partitioning key and modulo the hash by the number of partitions. This deterministically chooses a partition for any particular partitioning key, while evenly spreading out data across all partitions (this “hash/mod” technique is used by most distributed databases). For this PState, the partitioning key is the same as the key in the map, the account ID (as you’ll see soon, it doesn’t have to be).

Next is $$accountIdToStatuses . This is a map from account ID to a map from status ID to a list of status content versions. A list of status content versions is stored to capture the edit history of a status, which Mastodon lets you view in its UI. You can visualize this PState like so:

Whenever a status is edited, a new version is prepended to its respective inner list. Since this PState and $$accountIdToAccount are in the same module, each partition is colocated on the same thread within the same process. You can visualize this colocation like so:

Because of this colocation queries can look at the partitions for both PStates at the same time within the same event instead of needing two roundtrips. You can also have multiple threads per worker process, or multiple worker processes per node.

Now let’s see how things get really interesting. $$statusIdToFavoriters is a map from a status ID to a linked set of account IDs. Here’s a visualization of this PState:

Similar to the social graph PStates, this PState is used to compute: the order in which accounts favorited a status (paginated), whether a particular account already favorited a status, and the number of favorites for a status.

What’s interesting is how this PState is partitioned. It is not partitioned by the status ID, which is the key of the map. It is instead partitioned by the account ID of the user who posted that status, which is not even in the map! This same partitioning scheme is used for all other PStates tracking status information, like $$statusIdToReplies , $$statusIdToBoosters , and $$statusIdToMuters . This means all information for a user and all information for that user’s statuses exist on the same partition, and performing a query to fetch all the information needed to render a status only needs to perform a single roundtrip.

Here’s a visualization of how all the PStates described could exist in a module deployment:

Notice, for example, how status ID “3” for account ID “10” exists both as a subvalue within $$accountIdToStatuses and also as a top-level key for the status-specific states on the same partition.

This use case is a great example of the power of Rama’s integrated approach, achieving incredible efficiency and incredible simplicity. Each of these PStates exists in exactly the perfect shape and partitioning for the use cases it serves.

Home timelines

Next, let’s explore how home timelines are stored. Materializing home timelines is by far the most intensive part of the application, since statuses are fanned out to all followers. Since the average fanout on our instance is 403, there are over 400x more writes to home timelines than any of the other PStates involved in processing statuses.

PStates are durable, replicated, high-performance structures. They are easy to reason about and ideal for most use cases. In our initial implementation of Mastodon, we stored home timelines in a PState and it worked fine.

However, writing to the home timelines PState was clearly the overwhelming bottleneck in the application because of the sheer amount of data that had to be written to disk and replicated. We also realized that home timelines are unique in that they are somewhat redundant with other PStates. You can reconstruct a home timeline by looking at the statuses of everyone you follow. This can involve a few hundred PState queries across the cluster, so it isn’t cheap, but it’s also not terribly expensive.

As it turns out, what we ended up doing to optimize home timelines is very similar to how Twitter did it for their chronological timelines, and for the exact same reasons. In our revised implementation, we store home timelines in-memory and unreplicated. So instead of needing to persist each timeline write to disk and replicate it to followers, a timeline write is an extremely cheap write to an in-memory buffer. This optimization increased the number of statuses we could process per second by 15x.

Solely storing the timeline in-memory is not sufficient though, as it provides no fault-tolerance in the event of a power loss or other node failure. So a new leader for the partition would not have the timeline info since it’s unreplicated and not persisted to disk. To solve this, we reconstruct the timeline on read if it’s missing or incomplete by querying the recent statuses of all follows. This provides the same fault-tolerance as replication, but in a different way.

Implementing fault-tolerance this way is a tradeoff. For the benefit of massively reduced cost on timeline write, sometimes reads will be much more expensive due to the cost of reconstructing lost timelines. This tradeoff is overwhelmingly worth it because timeline writes are way, way more frequent than timeline reads and lost partitions are rare.

Whereas Twitter stores home timelines in a dedicated in-memory database, in Rama they’re stored in-memory in the same processes executing the ETL for timeline fanout. So instead of having to do network operations, serialization, and deserialization, the reads and writes to home timelines in our implementation are literally just in-memory operations on a hash map. This is dramatically simpler and more efficient than operating a separate in-memory database. The timelines themselves are stored like this:

1
2
3
4
5
public static class Timeline {
  public long[] buffer;
  public int startIndex = 0; // index within buffer that contains oldest timeline element
  public int numElems = 0; // number of elements in this timeline
}

To minimize memory usage and GC pressure, we use a ring buffer and Java primitives to represent each home timeline. The buffer contains pairs of author ID and status ID. The author ID is stored along with the status ID since it is static information that will never change, and materializing it means that information doesn’t need to be looked up at query time. The home timeline stores the most recent 600 statuses, so the buffer size is 1,200 to accommodate each author ID and status ID pair. The size is fixed since storing full timelines would require a prohibitive amount of memory (the number of statuses times the average number of followers).

Each user utilizes about 10kb of memory to represent their home timeline. For a Twitter-scale deployment of 500M users, that requires about 4.7TB of memory total around the cluster, which is easily achievable.

The in-memory home timelines and other PStates are put together to render a page of a timeline with the following logic:

  • First, query the in-memory home timeline to fetch a page of [author ID, statusID] pairs.
  • Next, invoke a query topology (a predefined distributed query) that takes in a list of [author ID, statusID] pairs and returns all information needed to render each status – status content, status stats, and author information. The query topology goes to all partitions containing requested status IDs in parallel and fetches all needed information with colocated PState queries.

Implementing fanout

So that’s the story on how timelines are rendered, but how are home timelines and these various PStates computed? If you look back at the performance numbers, you can see our Mastodon implementation has high throughput that scales linearly while also having great, consistent latencies for delivering statuses to follower timelines.

Let’s focus on how statuses are handled, particularly how timelines are materialized. Whenever someone posts a status, that status must be fanned out to all followers and appended to their home timelines.

The tricky part is dealing with bursty load arising from how unbalanced the social graph can get. In our Mastodon instance, for example, the average fanout is 403, but the most popular user has over 22M followers. 3,500 statuses are posted each second, meaning that every second the system usually needs to perform 1.4M timeline writes to keep up. But if a user with 20M followers posts a status, then the number of timeline writes blows up by 15x to about 21.4M. With a naive implementation this can significantly delay other statuses from reaching follower timelines. And since the latency from posting a status to it reaching follower timelines is one of the most important metrics for this product, that’s really bad!

This is essentially a problem of fairness. You don’t want very popular users to hog all the resources on the system whenever they post a status. The key to solving this issue is to limit the amount of resources a status from a popular user can use before allocating those resources to other statuses. The approach we take in our implementation is:

  • For each iteration of timeline processing, fan out each status to at most 64k followers.
  • If there are more followers left to deliver to for a status, add that status to a PState to continue fanout in the next iteration of processing.

With this approach a status from a user with 20M followers will take 312 iterations of processing to complete fanout (about 3 minutes), and fairness is achieved by giving all statuses equal access to resources at any given time. Since the vast majority of users have less than 64k followers, most users will see their statuses delivered in one iteration of processing. Statuses from popular users take longer to deliver to all followers, but that is the tradeoff that has to be made given that the amount of resources is fixed at any given time.

As a side note, Twitter also has special handling for users with lots of followers to address the exact same issue.

With the basic approach understood, let’s look specifically at how statuses are processed in our implementation to materialize timelines. Here’s a dataflow diagram showing the logic:

This dataflow diagram is a little bit different than the social graph one, as this ETL is implemented with microbatching while the social graph is implemented with streaming. Streaming processes data directly off of depots as it arrives, while microbatching processes data in small batches. “Start iteration” in this diagram signifies the beginning of a microbatch. The tradeoffs between streaming and microbatching are too much to cover for this post, but you’ll be able to learn more about that next week when we release the documentation for Rama.

In this diagram you can see all the steps involved in delivering statuses to followers. There are many product rules implemented here, such as: only fan out statuses with the correct visibility, only send replies to followers who also follow the account being replied to, respect “show boost” settings, and so on.

This dataflow diagram only shows the computation of home timelines, whereas in reality this ETL also handles lists, hashtag timelines, and conversations. Those all work similarly to home timelines and are just additional branches of dataflow computation with slightly differing logic.

Processing skew from unbalanced social graph

Fairness issues aren’t the only problem caused by an unbalanced social graph. Another problem is skew: a naive implementation that handles all fanout for a single user from a single partition will lead to some partitions of a module having a lot more overall work to do than others. Without balanced processing, throughput is lowered since some resources are idle while others are being overwhelmed.

In our Mastodon implementation, we put significant effort into balancing fanout computation regardless of how unbalanced a social graph gets. This is a deep topic, so we will explore this further in a subsequent blog post. The optimizations we did in this area increased throughput by about 15%.

Personalized follow suggestions

Let’s now look at another part of Mastodon which works completely differently than timelines: personalized follow suggestions. This powers this section on Mastodon:

Everything about how data is processed and indexed for follow suggestions is different from what we looked at in the previous sections. Taken together, the implementations of timelines, the social graph, and follow suggestions demonstrate how expressive Rama is as a system. This generality is a result of the total arbitrariness with which you can write ETL computations, structure indexes, and compute queries.

Unlike the social graph and timelines, the behavior of personalized follow suggestions could be specified in very different ways. We’ve chosen to determine follow suggestions like so:

  • Rank accounts to suggest based on who’s most followed by the accounts you already follow
  • Don’t suggest accounts you already follow
  • If this doesn’t produce enough suggestions (e.g. because you don’t follow many accounts yet), suggest the most followed accounts on the platform

We took a different approach than Mastodon for follow suggestions – the API docs describe follow suggestions as “Accounts that are promoted by staff, or that the user has had past positive interactions with, but is not yet following.” We chose our approach because it’s much more difficult to implement and thus a better demonstration of Rama. Our implementation of personalized follow suggestions totals 141 lines of code.

The main PState underlying follow suggestions is called $$whoToFollow , a map from account ID to a list of up to 80 account ID suggestions. The interesting part of the follow suggestions implementation is how this PState is computed and maintained.

Follow suggestions can’t be computed fully incrementally, at least not practically. That is, you can’t receive a new follow or unfollow event and incrementally update all the follow suggestions for affected accounts. Computing follow suggestions is a batch operation that needs to look at everyone an account follows, and everyone they follow, at the same time in a single computation.

With that in mind, there are a few pieces to our implementation. First, everyone’s follow suggestions are recomputed on a regular basis. The ETL for follow suggestions recomputes the suggestions for 1,280 accounts every 30 seconds. Since there are 100M accounts, this means each account has its suggestions updated every 27 days.

In addition to this, we have special handling for new users. When a new user signs up, you want to provide good follow suggestions as soon as possible in order to increase engagement and increase the chance they’ll continue to use the service. So you don’t want to wait 27 days to compute personalized suggestions for a new user. At the same time, you can’t produce good personalized suggestions until the user has followed at least a few accounts. So our implementation tracks milestones which trigger immediate recomputation of follow suggestions: when a user follows 10 accounts and when a user follows 100 accounts.

Structuring the PStates

Here are all the depots, PStates, and query topologies related to the follow suggestions implementation:

Notice that this ETL also consumes followAndBlockAccountDepot , which is the same depot as consumed by the social graph ETL to produce the social graph PStates. Depots are sources of truth that can be consumed by as many topologies as you need.

Here’s what each PState for follow suggestions is used for:

  • $$whoToFollow : As described above, this stores a list of up to 80 suggestions for each user.
  • $$nextId : This keeps track of the next group of accounts for which to recompute follow suggestions. This stores a Long on each partition that points to a key within the colocated $$followerToFollowees PState partition.
  • $$followCounts : This is a map from account ID to the number of follow actions that account has taken. This is used to track when a user has passed 10 or 100 follows and trigger an immediate recompute of their follow suggestions.
  • $$forceRecomputeUsers : This is the set of users (a set per partition) that have recently passed the 10 or 100 follows milestone. Accounts chosen for follow suggestion recomputes come from this PState as well as where $$nextId is pointing in $$followerToFollowees .
  • $$topFollowedUsers : This is a global list of the top followed users on the platform. These are used to supplement a user’s follow suggestions when not enough are computed via the personalized method.

Notice how some of these PStates are not maps at the top-level, which may feel unusual given that pretty much every database that’s ever existed is map-based (with a “key” being the central concept to identify a record or row). But just as data structures other than maps are useful for everyday programming, data structures other than maps are useful for backend programming as well.

Computing follow suggestions

Let’s explore in more detail how follow suggestions are recomputed. Unlike the other ETLs we’ve described, this one initiates computations based on time and not on the receipt of data. Every 30 seconds it needs to recompute follow suggestions for a rotating subset of all users and for any users specified in the $$forceRecomputeUsers PState.

You can think of time as an infinite stream of events, with each event being an instant in time. Rama exposes a special depot for time called a “tick depot” which emits events according to a specified frequency.

For follow suggestions, a tick depot is used to trigger processing every 30 seconds. The subsequent computation then uses the PStates described to recompute follow suggestions for a subset of users. The dataflow looks like this:

The key PState is $$followerToFollowees . The ETL selects a subset of the keys in that map for processing, and it stores the last key chosen in $$nextId . The next iteration will start from that key. When it gets to the end of the PState, it starts over again from the beginning.

The rest of the processing uses $$followerToFollowees to fetch follows, fetch the follows of those follows, and aggregate a list of candidates along with how many times each candidate is followed among that subset of users. After filtering out candidates the starting account already follows, the $$whoToFollow PState is updated.

Every step of this is done in parallel. So when a subset of users is selected from $$followerToFollowees , it’s actually selecting a subset of users from each partition of that PState.

In between recomputes, a user may have followed some of the users in their list of suggestions. This is handled with a query topology that filters a user’s follow suggestions to exclude users they already follow.

DevOps with Rama

Let’s briefly take a look at how we do DevOps with Rama: managing the deployment, monitoring, and operation of modules in production. Since the steps are the same for all modules, we’ll use as an example how we manage the module handling statuses, timelines, and profiles. The module is implemented in the class com.rpl.mastodon.modules.Core and is deployed to a Rama cluster like so:

1
2
3
4
5
6
7
8
9
rama deploy \
--action launch \
--jar target/mastodon.jar \
--module com.rpl.mastodon.modules.Core \
--tasks 128 \
--threads 32 \
--workers 16 \
--replicationFactor 3 \
--configOverrides overrides.yaml

This submits the module and its code to the cluster with the given parallelism, and Rama then launches worker processes around the cluster to run the module. The same cluster is shared by all modules. Once the workers finish starting up, they start reading from depots, executing ETLs, updating PStates, serving query requests, and so on.

The “replication factor” specifies to how many nodes each depot and PState should replicate its data. Replication happens completely behind the scenes and provides automatic failover in case of failures (e.g. hardware issues). Rama provides very strong guarantees with replication – data is not made visible for consumption from depots or PStates until it has been successfully replicated.

The referenced overrides.yaml file has only two lines and just registers with Rama how to serialize/deserialize the custom types used by our implementation (defined using Thrift, described more below).

After the module finishes launching, the Cluster UI for Rama starts displaying telemetry on the module:

Rama tracks and displays telemetry for the module as a whole, as well as specific telemetry for each topology, depot, and PState. The telemetry is extremely useful for understanding the performance of a module and when it needs to be scaled. Rama uses itself to implement telemetry – a built-in module collects telemetry data from all modules into a depot, processes that data with an ETL, and indexes the results into PStates arranged in a time-series structure.

When we want to update the module to add a feature (e.g. add a new PState) or fix a bug, we run a command like the following:

1
2
3
4
5
rama deploy \
--action update \
--jar target/mastodon.jar \
--module com.rpl.mastodon.modules.Core \
--configOverrides overrides.yaml

This launches a carefully coordinated automated procedure to launch new worker processes and handoff responsibility for depots and PStates to the new version of the module. Clients of the module doing depot appends and PState queries don’t need to be updated and automatically transition themselves to the new module version.

Similarly, when we want to scale the module to have more resources, we run a command like the following:

1
2
3
rama scaleExecutors \
--module com.rpl.mastodon.modules.Core \
--workers 24

This launches a similar procedure as module update to transition the module to the new version.

And that’s all there is to DevOps with Rama – it’s just a few commands at the terminal to manage everything. You don’t need to invest huge amounts of time writing piles of shell scripts to coordinate changes across dozens of systems. Since Rama is such a cohesive, integrated system it’s able to automate deployment entirely, and it’s able to provide deep and detailed runtime telemetry without needing to lift a finger.

Simple Rama code example

Let’s look at some code! Before diving into the Mastodon code, let’s look at a simple example of coding with Rama to gain a feeling for what it’s like.

I’m not going to explain every last detail in this code – the API is so rich that it’s too much to explain for this post. Instead, I’ll do my best to summarize what the code is doing. In one week we will be releasing all the documentation for Rama, and this includes a six part tutorial that gently introduces everything. We will also be releasing a build of Rama that anyone can download and use. This build will be able to simulate Rama clusters within a single process but will not be able to run distributed clusters. It has the full Rama API and can be used to experiment with Rama. Once we open-source our Mastodon implementation in two weeks, you’ll be able to run it within a single process using this build.

With that said, let’s look at a simple example. Here’s the complete definition for a “word count module”, which accepts sentences as input and produces a single PState containing the count of all words in those sentences:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class WordCountModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
        setup.declareDepot("*sentenceDepot", Depot.random());

        StreamTopology wordCount = topologies.stream("wordCount");
        wordCount.pstate("$$wordCounts", PState.mapSchema(String.class, Long.class));

        wordCount.source("*sentenceDepot").out("*sentence")
                 .each((String sentence, OutputCollector collector) -> {
                     for(String word: sentence.split(" ")) {
                       collector.emit(word);
                     }
                 }, "*sentence").out("*word")
                 .hashPartition("*word")
                 .compoundAgg("$$wordCounts", CompoundAgg.map("*word", Agg.count()));
    }
}

This module has one ETL named wordCount , one depot named *sentenceDepot , and one PState named $$wordCounts . The ETL receives new sentences from the depot, tokenizes those sentences into words, and then updates the counts for those words in the PState. The PState partitions are updated within a few milliseconds of appending a sentence to the depot.

A module implements the interface RamaModule that has a single method define on it. setup is used to declare depots and any dependencies to depots or PStates in other modules, and topologies is used to declare all ETL and query topologies.

The first line of define declares the depot. Depot names always begin with a * . Strings beginning with * are interpreted as variables in Rama code, and they can be passed around and used just like variables in any programming language. The second argument Depot.random() specifies the partitioning scheme of the depot. In this case the partitioning scheme causes appended sentences to go to a random partition of the depot. When local ordering is important, like for follow and unfollow events, the partitioning scheme would be set appropriately so events for the same entity go to the same partition.

The next line declares the ETL wordCount as a streaming topology.

After that is the declaration of the PState $$wordCounts . The PState is declared with a schema that specifies what it stores and how it stores it. In this case it’s just a simple map, but you can specify whatever structure you want here (e.g. a map of subindexed maps of lists of subindexed sets).

Lastly is the definition of the ETL. The line wordCount.source("*sentenceDepot").out("*sentence") subscribes the ETL to *sentenceDepot and binds any new sentences received to the variable *sentence .

The next line tokenizes each sentence into words. Java code is inserted with a lambda to split each sentence on whitespace and emit each word individually as the variable *word . Inserting arbitrary Java code into topologies like this is extremely common.

The next line .hashPartition("*word") relocates the dataflow to the partition of the module storing the counts for that word. The code before that line and after that line can execute on different machines, and Rama takes care of all the serialization and network transfer involved in moving the computation.

Finally, now that the computation is on the correct partition, the last line updates the count for the word in the PState. This PState update is specified in the form of an aggregation template – in this case it says it’s aggregating a map where the key is the word and the value is the count of all events seen for that word.

This is such a basic example that it doesn’t really do justice to the expressive power of Rama. However, it does demonstrate the general workflow of declaring modules, depots, PStates, and topologies. Some of the functionality not shown here includes: consuming depots/PStates from other modules, query topologies, microbatching, branching/merging, joins, loops, shadowing variables, conditionals, and decomposing code with macros.

Let’s now take a look at interacting with Rama modules as a client outside the cluster, similar to how you interact with a database using a database client. Here’s code that connects to a remote cluster, creates handles to the depot and PState of the module, appends some sentences, and then does some PState queries:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map config = new HashMap();
config.put("conductor.host", "1.2.3.4");
RamaClusterManager manager = RamaClusterManager.open(config);
Depot depot = manager.clusterDepot("rama.examples.wordcount.WordCountModule", "*sentenceDepot");
depot.append("hello world");
depot.append("hello world again");
depot.append("say hello to the planet");
depot.append("red planet labs");

PState wc = manager.clusterPState("rama.examples.wordcount.WordCountModule", "$$wordCounts");
System.out.println("'hello' count: " + wc.selectOne(Path.key("hello")));
System.out.println("'world' count: " + wc.selectOne(Path.key("world")));
System.out.println("'planet' count: " + wc.selectOne(Path.key("planet")));
System.out.println("'red' count: " + wc.selectOne(Path.key("red")));

RamaClusterManager is used to connect to a cluster and retrieve handles to depots and PStates. Depots and PStates are identified by their module name (the class name of the module definition) and their name within the module. By default, depot appends block until all colocated streaming topologies have finished processing the appended data. This is why the PState queries can be executed immediately following the depot appends without further coordination.

The PState queries here fetch the values for the specified keys. PStates are queried using Rama’s “Path” API, and this example barely scratches the surface of what you can do with paths. They allow you to easily reach into a PState, regardless of its structure, and retrieve precisely what you need – whether one value, multiple values, or an aggregation of values. They can also be used for updating PStates within topologies. Mastering paths is one of the keys to mastering Rama development.

Let’s now take a look at how you would run WordCountModule in a unit test environment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void wordCountTest() throws Exception {
    try (InProcessCluster cluster = InProcessCluster.create()) {
        cluster.launchModule(new WordCountModule(), new LaunchConfig(4, 2));
        String moduleName = WordCountModule.class.getName();
        Depot depot = cluster.clusterDepot(moduleName, "*sentenceDepot");
        depot.append("hello world");
        depot.append("hello world again");
        depot.append("say hello to the planet");
        depot.append("red planet labs");

        PState wc = cluster.clusterPState(moduleName, "$$wordCounts");
        System.out.println("'hello' count: " + wc.selectOne(Path.key("hello")));
        System.out.println("'world' count: " + wc.selectOne(Path.key("world")));
        System.out.println("'planet' count: " + wc.selectOne(Path.key("planet")));
        System.out.println("'red' count: " + wc.selectOne(Path.key("red")));
    }
}

Running this code prints:

1
2
3
4
'hello' count: 3
'world' count: 2
'planet' count: 2
'red' count: 1

InProcessCluster simulates a Rama cluster completely in-process and is ideal for unit testing modules. Here you can see how InProcessCluster is used to launch the module and then fetch depots/PStates just like with RamaClusterManager . There’s no difference in the functionality available with InProcessCluster versus a real cluster, and you’ll be able to try out InProcessCluster next week when we release the non-production build of Rama.

Sample code from our Mastodon implementation

Now let’s look at some code from our Mastodon implementation. We’ll be looking at bigger code samples in this section utilizing a lot more of the Rama API, so even more than the last section I won’t be able to explain all the details of the code. I’ll summarize what the key parts are, and you’ll be able to learn all the details next week when we release the documentation.

Please don’t be too intimidated by this code. There are a lot of concepts and API methods at work here, and no one could possibly understand this code completely at a first glance. This is especially true without the accompanying documentation. I’m showing this code because it ties together the high-level concepts I’ve discussed in this post by making them real instead of abstract.

Representing data

Let’s start by looking at an example of how data is defined. We chose to represent data using Thrift since it has a nice schema definition language and produces efficient serialization, but you can just as easily use plain Java objects, Protocol Buffers, or anything else you want. We use Thrift-defined objects in both depots and PStates. Here’s how statuses are defined:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
typedef i64 AccountId
typedef i64 StatusId
typedef i64 Timestamp

enum StatusVisibility {
  Public = 1,
  Unlisted = 2,
  Private = 3,
  Direct = 4
}

enum AttachmentKind {
  Image = 1,
  Video = 2
}

struct StatusPointer {
  1: required AccountId authorId;
  2: required StatusId statusId;
  3: optional Timestamp timestamp;
  4: optional bool shouldExclude;
}

struct PollContent {
  1: list<string> choices;
  2: required Timestamp expiration;
  3: required bool multipleChoice;
}

struct Attachment {
  1: required AttachmentKind kind;
  2: required string extension;
  3: required string description;
}

struct AttachmentWithId {
  1: required string uuid;
  2: required Attachment attachment;
}

struct NormalStatusContent {
  1: required string text;
  2: required StatusVisibility visibility;
  3: optional PollContent pollContent;
  4: optional list<AttachmentWithId> attachments;
  5: optional string sensitiveWarning;
}

struct ReplyStatusContent {
  1: required string text;
  2: required StatusVisibility visibility;
  3: required StatusPointer parent;
  4: optional PollContent pollContent;
  5: optional list<AttachmentWithId> attachments;
  6: optional string sensitiveWarning;
}

struct BoostStatusContent {
  1: required StatusPointer boosted;
}

union StatusContent {
  1: NormalStatusContent normal;
  2: ReplyStatusContent reply;
  3: BoostStatusContent boost;
}

struct Status {
  1: required AccountId authorId;
  2: required StatusContent content;
  3: required Timestamp timestamp;
  4: optional string remoteUrl;
  5: optional string language;
}

Every type of status, including boosts, replies, and statuses with polls is represented by this definition. Being able to represent your data using normal programming practices, as opposed to restrictive database environments where you can’t have nested definitions like this, goes a long way in avoiding impedance mismatches and keeping code clean and comprehensible.

Following hashtags code

Next, let’s look at the entire definition of following hashtags (described earlier in this post). As a reminder, here’s the dataflow diagram for the following hashtags ETL:

Here’s the implementation, which is a direct translation of the diagram to code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
setup.declareDepot("*followHashtagDepot", Depot.hashBy(ExtractToken.class));

StreamTopology stream = topologies.stream("relationshipsStream");

stream.pstate("$$hashtagToFollowers", PState.mapSchema(String.class, PState.setSchema(Long.class).subindexed()));

stream.source("*followHashtagDepot", StreamSourceOptions.retryAllAfter()).out("*data")
      .subSource("*data",
         SubSource.create(FollowHashtag.class)
                  .macro(extractFields("*data", "*accountId", "*token"))
                  .localTransform("$$hashtagToFollowers", Path.key("*token").voidSetElem().termVal("*accountId")),
         SubSource.create(RemoveFollowHashtag.class)
                  .macro(extractFields("*data", "*accountId", "*token"))
                  .localTransform("$$hashtagToFollowers", Path.key("*token").setElem("*accountId").termVoid()));

This is extremely simple. subSource branches the dataflow graph based on the type of an object. In this code the object in *data can be one of two types, and there is a separate branch of dataflow for each type. When a FollowHashtag event is received, that account is added to the set of followers for that hashtag. When a RemoveFollowHashtag event is received, that account is removed from the set of followers for that hashtag. Because the nested sets are subindexed, they can efficiently contain hundreds of millions of elements or more.

extractFields is a helper function in the Mastodon implementation for extracting fields out of Thrift objects by name and binding them to corresponding Rama variables of the same name. So extractFields("*data", "*accountId", "*token")) extracts the fields “accountId” and “token” from the Thrift object in *data and binds them to the variables *accountId and *token .

extractFields is implemented as a Rama macro, which is a utility for inserting a snippet of dataflow code into another section of dataflow code. It is a mechanism for code reuse that allows the composition of any dataflow elements: functions, filters, aggregation, partitioning, etc.

Unlike word count, this code uses paths instead of aggregators to define the writes to the PStates, which is the same API used to read from PStates. You’ll be able to learn more next week when we release Rama’s documentation about the differences between aggregators and paths and when to prefer one over the other.

Note that this code defines a parallel computation just like the word count example earlier. The code runs across many nodes to process data off each partition of the depot and update the PState. Any failures (e.g. a node dying) are handled transparently and Rama guarantees all depot data will be fully processed.

The partitioning is defined at the depot level ( Depot.hashBy(ExtractToken.class) ), so when the ETL begins processing a piece of data, the computation is already located on the partition of the module storing followers for that hashtag. So no further partitioning is needed in the ETL definition.

Social graph code

Next, let’s look at the entire definition of the social graph as described earlier. Here was the dataflow diagram for the social graph ETL:

Like hashtag follows, the social graph implementation is also a direct translation of the diagram to code. Since the code for this is longer, let’s look at it section by section in the order in which it’s written. The first part is the declaration of the depots, topology, and PStates:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
setup.declareDepot("*followAndBlockAccountDepot", Depot.hashBy(ExtractAccountId.class));
setup.declareDepot("*muteAccountDepot", Depot.hashBy(ExtractAccountId.class));

StreamTopology stream = topologies.stream("relationshipsStream");

KeyToLinkedEntitySetPStateGroup accountIdToFollowRequests = new KeyToLinkedEntitySetPStateGroup("$$accountIdToFollowRequests", Long.class, FollowLockedAccount.class)
    .entityIdFunction(Long.class, req -> ((FollowLockedAccount) req).requesterId)
    .descending();
accountIdToFollowRequests.declarePStates(stream);

KeyToLinkedEntitySetPStateGroup followerToFollowees = new KeyToLinkedEntitySetPStateGroup("$$followerToFollowees", Long.class, Follower.class)
    .entityIdFunction(Long.class, new ExtractAccountId())
    .descending();
KeyToLinkedEntitySetPStateGroup followeeToFollowers = new KeyToLinkedEntitySetPStateGroup("$$followeeToFollowers", Long.class, Follower.class)
    .entityIdFunction(Long.class, new ExtractAccountId())
    .descending();
followerToFollowees.declarePStates(stream);
followeeToFollowers.declarePStates(stream);

stream.pstate("$$accountIdToSuppressions",
              PState.mapSchema(Long.class, PState.fixedKeysSchema("muted", PState.mapSchema(Long.class, MuteAccountOptions.class).subindexed(),
                                                                  "blocked", PState.setSchema(Long.class).subindexed())));

Note that both hashtag follows and the social graph are part of the same stream topology. The social graph implementation consumes different depots than hashtag follows does, so the code is otherwise completely independent.

The $$followerToFollowees and $$followeeToFollowers PStates are defined with KeyToLinkedEntitySetPStateGroup , which defines the “map to linked set” data structure abstraction as the composition of multiple, more primitive PStates underneath the hood. Its implementation is only 68 lines of code.

The next part defines the root of processing where the branching occurs at the start of the dataflow diagram:

1
2
stream.source("*followAndBlockAccountDepot", StreamSourceOptions.retryAllAfter()).out("*initialData")
      .anchor("SocialGraphRoot")

As we build up the code for the social graph, let’s also take a look visually at how the dataflow diagram is filled out. This code starts off the dataflow diagram like this:

anchor defines a location in a dataflow graph that can later be hooked onto with hook . The next section defines the first branch of processing:

1
2
.each(Ops.IDENTITY, "*initialData").out("*data")
.anchor("Normal")

This branch passes all data through to the anchor “Normal”, which will later be merged with other branches as you can see in the dataflow diagram.

Rama provides the Ops class which has commonly used functions to use within dataflow code. This includes math operations, comparators, and other utilities. Here Ops.IDENTITY is used which emits its input unchanged.

The next section defines the branch handling implicit unfollow events for block events:

1
2
3
4
5
6
7
8
.hook("SocialGraphRoot")
.keepTrue(new Expr(Ops.IS_INSTANCE_OF, BlockAccount.class, "*initialData"))
.each((BlockAccount data, OutputCollector collector) -> {
    collector.emit(new RemoveFollowAccount(data.getAccountId(), data.getTargetId(), data.getTimestamp()));
    collector.emit(new RemoveFollowAccount(data.getTargetId(), data.getAccountId(), data.getTimestamp()));
    collector.emit(new RejectFollowRequest(data.getAccountId(), data.getTargetId()));
}, "*initialData").out("*data")
.anchor("ImplicitUnfollow")

This uses hook to create a branch off the root of processing for this depot that was defined in the previous code section. The keepTrue line continues processing on this branch only for block events. It then generates implicit events to unfollow in both directions and remove a follow request if it exists. Lastly, the “ImplicitUnfollow” anchor is declared which will later be used to merge this branch together with “Normal” and other branches.

The next section defines the branch handling accepting a follow request:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
.hook("SocialGraphRoot")
.keepTrue(new Expr(Ops.IS_INSTANCE_OF, AcceptFollowRequest.class, "*initialData"))
.macro(extractFields("*initialData", "*accountId", "*requesterId"))
.localSelect("$$accountIdToFollowRequests", Path.key("*accountId").must("*requesterId")).out("*followRequestId")
.localSelect("$$accountIdToFollowRequestsById", Path.key("*accountId", "*followRequestId")).out("*followRequest")
.hashPartition("*requesterId")
.each((AcceptFollowRequest data, FollowLockedAccount req) -> {
    FollowAccount follow = new FollowAccount(data.getRequesterId(), data.getAccountId(), data.getTimestamp());
    if (req.isSetShowBoosts()) follow.setShowBoosts(req.showBoosts);
    if (req.isSetNotify()) follow.setNotify(req.notify);
    if (req.isSetLanguages()) follow.setLanguages(req.languages);
    return follow;
} , "*initialData", "*followRequest").out("*data")
.anchor("CompleteFollowRequest")

This code is structured just like the previous sections by hooking onto the root and then filtering for the data type of interest. Then, this code checks to see if that follow request still exists since a user could retract their follow request at the same time it was accepted. The must navigator stops this branch of computation if the follow request no longer exists.

After that, the code generates the implicit Follow event which will later perform the actual logic of updating the $$followerToFollowees and $$followeeToFollowers PStates.

The next section handles follows to a locked account. As a reminder, a locked account requires all followers to be manually approved.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.hook("SocialGraphRoot")
.keepTrue(new Expr(Ops.IS_INSTANCE_OF, FollowLockedAccount.class, "*initialData"))
.macro(extractFields("*initialData", "*accountId", "*requesterId"))
.localSelect("$$followeeToFollowers", Path.key("*accountId", "*requesterId")).out("*existingFollowerId")
.ifTrue(new Expr(Ops.IS_NOT_NULL, "*existingFollowerId"),
   Block.each((FollowLockedAccount data) -> {
          FollowAccount follow = new FollowAccount(data.requesterId, data.accountId, data.timestamp);
          if(data.isSetShowBoosts()) follow.setShowBoosts(data.isShowBoosts());
          if(data.isSetNotify()) follow.setNotify(data.isNotify());
          if(data.isSetLanguages()) follow.setLanguages(data.getLanguages());
          return follow;
        }, "*initialData").out("*data")
        .hashPartition("*requesterId")
        .anchor("UpdatePrivateFollow"),
   Block.macro(extractFields("*initialData", "*accountId", "*requesterId"))
        .macro(accountIdToFollowRequests.addToLinkedSet("*accountId", "*initialData")))

When a follow request is done in the UI to a locked account, a FollowLockedAccount event is appended to the depot. Otherwise, a FollowAccount event is appended.

Follow relationships contain additional information such as whether the follower wants to see boosts from the followee and whether they only want to see statuses in a certain language from the followee (another one of Mastodon’s features). Updating these settings is done via another Follow or FollowLockedAccount event.

This code uses ifTrue to determine if the follower already follows the followee. ifTrue works just like if in any programming language, with a “then” block and an optional “else” block. If the follow relationship exists, it creates an implicit FollowAccount event to update the options on the relationship. The “UpdatePrivateFollow” anchor is used later to merge that branch just like the previous sections.

If the follow relationship does not already exist, then the PState tracking follow requests is updated.

The next section merges the prior branches together and begins processing for the rest of the events, whether they came directly off the depot or were created implicitly by one of the branches:

1
2
.unify("Normal", "ImplicitUnfollow", "CompleteFollowRequest", "UpdatePrivateFollow")
.subSource("*data",

unify merges the specified branches together so they share subsequent computation. Any variables that are in scope in all specified branches are in scope in the code following the unify call.

The subSource call dispatches subsequent code on the type of the object in *data . The following code defines the handling for FollowAccount events:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SubSource.create(FollowAccount.class)
         .macro(extractFields("*data", "*accountId", "*targetId", "*followerSharedInboxUrl", "*showBoosts", "*notify", "*languages"))
         .localSelect("$$followerToFollowees", Path.key("*accountId").view(Ops.SIZE)).out("*followeeCount")
         .keepTrue(new Expr(Ops.LESS_THAN, "*followeeCount", relationshipCountLimit))
         .localSelect("$$followerToFollowees", Path.key("*accountId", "*targetId")).out("*followeeId")
         .localSelect("$$followerToFolloweesById", Path.key("*accountId", "*followeeId")).out("*existingFollowee")
         .each(Relationships::makeFollower, "*targetId", "*showBoosts", "*languages", "*followerSharedInboxUrl", "*existingFollowee").out("*followee")
         .macro(followerToFollowees.addToLinkedSet("*accountId", "*followee"))
         .hashPartition("*targetId")
         .localSelect("$$followeeToFollowers", Path.key("*targetId", "*accountId")).out("*followerId")
         .localSelect("$$followeeToFollowersById", Path.key("*targetId", "*followerId")).out("*existingFollower")
         .each(Relationships::makeFollower, "*accountId", "*showBoosts", "*languages", "*followerSharedInboxUrl", "*existingFollower").out("*follower")
         .macro(followeeToFollowers.addToLinkedSet("*targetId", "*follower"))
         .macro(accountIdToFollowRequests.removeFromLinkedSetByEntityId("*targetId", "*accountId")),

This code adds the relationship to the $$followerToFollowees and $$followerToFollowees PStates. If the relationship already exists, it updates the options on the relationship. It also removes any corresponding follow request from $$accountIdToFollowRequests if it exists.

relationshipCountLimit puts an upper limit on the number of follows someone can have and is set to a very conservative limit of 100,000. It exists to prevent abuse of the system.

The next section handles RemoveFollowAccount events:

1
2
3
4
5
6
7
SubSource.create(RemoveFollowAccount.class)
         .macro(extractFields("*data", "*accountId", "*targetId", "*followerSharedInboxUrl"))
         .hashPartition("*accountId")
         .macro(followerToFollowees.removeFromLinkedSetByEntityId("*accountId", "*targetId"))
         .hashPartition("*targetId")
         .macro(followeeToFollowers.removeFromLinkedSetByEntityId("*targetId", "*accountId"))
         .macro(accountIdToFollowRequests.removeFromLinkedSetByEntityId("*targetId", "*accountId")),

This code just removes the relationship from all relevant PStates.

Here is the next section:

1
2
3
SubSource.create(FollowLockedAccount.class),

SubSource.create(AcceptFollowRequest.class),

This handles events coming off the depot that were handled already in one of the top-level branches we already looked at. subSource requires every data type it sees to have a handler, so this code says to do nothing for those types.

The next section handles RejectFollowRequest :

1
2
3
SubSource.create(RejectFollowRequest.class)
         .macro(extractFields("*data", "*accountId", "*requesterId"))
         .macro(accountIdToFollowRequests.removeFromLinkedSetByEntityId("*accountId", "*requesterId")),

This just removes the follow request from the PState.

The next section handles BlockAccount :

1
2
3
4
5
SubSource.create(BlockAccount.class)
         .macro(extractFields("*data", "*accountId", "*targetId", "*timestamp"))
         .localSelect("$$accountIdToSuppressions", Path.key("*accountId", "blocked").view(Ops.SIZE)).out("*blockeeCount")
         .keepTrue(new Expr(Ops.LESS_THAN, "*blockeeCount", relationshipCountLimit))
         .localTransform("$$accountIdToSuppressions", Path.key("*accountId", "blocked").voidSetElem().termVal("*targetId")),

BlockAccount was handled in one of the initial branches to generate the implicit unfollows between the two accounts. Here, BlockAccount is handled again to record the block relationship in the $$accountIdToSuppressions PState.

The next section handles RemoveBlockAccount events:

1
2
3
SubSource.create(RemoveBlockAccount.class)
         .macro(extractFields("*data", "*accountId", "*targetId"))
         .localTransform("$$accountIdToSuppressions", Path.key("*accountId", "blocked").setElem("*targetId").termVoid()));

All this does is remove the block relationship from the $$accountIdToSuppressions PState.

That’s all the code for handling social graph updates from events on followAndBlockAccountDepot . The next section contains all the logic for handling events from muteAccountDepot :

1
2
3
4
5
6
7
8
9
10
stream.source("*muteAccountDepot", StreamSourceOptions.retryAllAfter()).out("*data")
      .subSource("*data",
         SubSource.create(MuteAccount.class)
                  .macro(extractFields("*data", "*accountId", "*targetId", "*options"))
                  .localSelect("$$accountIdToSuppressions", Path.key("*accountId", "muted").view(Ops.SIZE)).out("*muteeCount")
                  .keepTrue(new Expr(Ops.LESS_THAN, "*muteeCount", relationshipCountLimit))
                  .localTransform("$$accountIdToSuppressions", Path.key("*accountId", "muted", "*targetId").termVal("*options")),
         SubSource.create(RemoveMuteAccount.class)
                  .macro(extractFields("*data", "*accountId", "*targetId"))
                  .localTransform("$$accountIdToSuppressions", Path.key("*accountId", "muted", "*targetId").termVoid()));

This is really simple, as all it does is add the mute relationship for MuteAccount events and remove the relationship for RemoveMuteAccount events.

That’s the complete implementation of the social graph! As you can see, it’s expressed exactly as you saw in the dataflow diagram with branches, merges, and dispatching on the types of events.

It’s worth noting that dataflow code compiles to efficient bytecode when deployed, as efficient as regular Java code. So Rama variables like *accountId and *targetId become actual variables in the generated bytecode.

The code for this ETL is also a great example of why it’s so beneficial to interact with your data layer with an API in a general-purpose language instead of a custom language (like SQL). This code makes use of normal programming practices to factor out reusable functionality or to separate code into separate functions to make it easier to read. This code also demonstrates how easy it is to intermix logic written in Java with logic written in Rama’s dataflow API. Method references, lambdas, and macros are facilities for combining the two.

Let’s look at some of the Mastodon API implementation related to the social graph so you can see how you interact with a Rama cluster to serve the frontend. Here’s how a new unfollow event is added:

1
2
3
4
5
public CompletableFuture<Boolean> postRemoveFollowAccount(long followerId, long followeeId, String sharedInboxUrl) {
    RemoveFollowAccount removeFollowAccount = new RemoveFollowAccount(followerId, followeeId, System.currentTimeMillis());
    if (sharedInboxUrl != null) removeFollowAccount.setFollowerSharedInboxUrl(sharedInboxUrl);
    return followAndBlockAccountDepot.appendAsync(removeFollowAccount).thenApply(res -> true);
}

This uses the async API for depots to append unfollow data to followAndBlockAccountDepot . Rama’s async API is used almost exclusively in the Mastodon API implementation so as not to block any threads (which would be an inefficient use of resources).

Here’s how a follow event is handled, conditioning the type of data appended depending on if the account is locked or not:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public CompletableFuture<Boolean> postFollowAccount(long followerId, long followeeId, String sharedInboxUrl, PostFollow params) {
    return getAccountWithId(followeeId)
        .thenCompose((followee) -> {
            if (followee != null && followee.account != null && followee.account.locked) {
                FollowLockedAccount req = new FollowLockedAccount(followeeId, followerId, System.currentTimeMillis());
                if (params != null) {
                    if (params.reblogs != null) req.setShowBoosts(params.reblogs);
                    if (params.notify != null) req.setNotify(params.notify);
                    if (params.languages != null) req.setLanguages(params.languages);
                }
                return followAndBlockAccountDepot.appendAsync(req);
            } else {
                FollowAccount req = new FollowAccount(followerId, followeeId, System.currentTimeMillis());
                if (params != null) {
                    if (params.reblogs != null ) req.setShowBoosts(params.reblogs);
                    if (params.notify != null) req.setNotify(params.notify);
                    if (params.languages != null) req.setLanguages(params.languages);
                }
                if (sharedInboxUrl != null) req.setFollowerSharedInboxUrl(sharedInboxUrl);
                return followAndBlockAccountDepot.appendAsync(req);
            }
        }).thenApply(res -> true);
}

This first calls the helper function getAccountWithId which uses a query topology to get all information about that account. If the account is locked, a FollowLockedAccount is appended with any options appropriately set. Otherwise, a FollowAccount events is appended.

The helper function getAccountWithId is implemented like this:

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<AccountWithId> getAccountWithId(Long requestAccountIdMaybe, long accountId) {
    return getAccountsFromAccountIds.invokeAsync(requestAccountIdMaybe, Arrays.asList(accountId))
                                    .thenApply(accounts -> {
                                        if (accounts.size() == 0) return null;
                                        return accounts.get(0);
                                    });
}

public CompletableFuture<AccountWithId> getAccountWithId(long accountId) {
    return this.getAccountWithId(null, accountId);
}

The query topology can optionally be invoked with a “requesting account ID”, because in some circumstances an account should not be visible to another account (e.g. the requesting account is blocked by that user). In this case, it just needs to check if the account is locked or not so it passes null for the requesting account ID. The query topology client is fetched like so:

1
QueryTopologyClient<List> getAccountsFromAccountIds = cluster.clusterQuery("com.rpl.mastodon.modules.Core", "getAccountsFromAccountIds");

As you can see, a query topology client is fetched just like how you fetch a handle to a depot or PState. Invoking a query topology is like invoking a regular function – you pass it some arguments and you get a result back. Unlike a regular function, a query topology executes on a cluster across potentially many nodes. Here the result is received asynchronously, but you can also do a blocking call with invoke .

Timeline fanout code

Lastly, let’s look at the code implementing timeline fanout, as described earlier. This implementation is only 51 lines of code. As a reminder, here’s the dataflow diagram for timeline fanout:

Once again, since this is a longer piece of code let’s look at it section by section. Let’s start with the subscription to the depot containing all the statuses:

1
2
fan.source("*statusWithIdDepot").out("*microbatch")
   .anchor("FanoutRoot")

Like in the social graph example, let’s see visually how the dataflow diagram gets filled out. This code starts off the dataflow diagram like this:

Unlike the previous examples, this topology is implemented with microbatching. Microbatching guarantees exactly-once processing semantics even in the case of failures. That is, even if there are node or network outages and computation needs to be retried, the resulting PState updates will be as if each depot record was processed exactly once.

The variable *microbatch represents a batch of data across all partitions of the depot. This code simply binds that variable and marks the root of computation with the label “FanoutRoot”. As you can see in the dataflow diagram, there are two branches off the root of processing.

The next section implements the first branch, which handles continuing fanout for statuses with too many followers from the previous iteration:

1
2
3
4
5
6
7
8
9
10
11
.allPartition()
.localSelect("$$statusIdToLocalFollowerFanouts", Path.all()).out("*keyAndVal")
.each(Ops.EXPAND, "*keyAndVal").out("*statusId", "*followerFanouts")
.localTransform("$$statusIdToLocalFollowerFanouts", Path.key("*statusId").termVoid())
.each(Ops.EXPLODE, "*followerFanouts").out("*followerFanout")
.macro(extractFields("*followerFanout", "*authorId", "*nextIndex", "*fanoutAction", "*status", "*task"))
.each(FanoutAction::getValue, "*fanoutAction").out("*fanoutActionValue")
.macro(extractFields("*status", "*content", "*language"))
.each((RamaFunction2<Long, Long, StatusPointer>) StatusPointer::new, "*authorId", "*statusId").out("*statusPointer")
.directPartition("$$partitionedFollowers", "*task")
.anchor("LocalFollowerFanoutContinue")

Whenever a status has too many followers for one iteration of fanout, it is added to the $$statusIdToLocalFollowerFanouts PState. This PState is a map from status ID to the type FollowerFanout , which contains the information needed to continue fanout starting with the next unhanded follower. As you can see in this code, it reads everything from that PState using Path.all() and then deletes everything from that PState.

As you’ll see later, adding to $$statusIdToLocalFollowerFanouts is done on whatever partition followers were read for that status. So this code uses allPartition to access every partition of the PState. allPartition is a partitioner like hashPartition , except instead of the subsequent code executing on one partition, the subsequent code executes on all partitions. This allows the ETL to fetch all statuses that required continued fanout from the last iteration. You have to be careful when using allPartition as you can create non-scalable topologies if you were to use it for every piece of data on a high throughput depot. In this case allPartition is used just once per iteration, so it doesn’t affect the scalability of the topology.

At the end of this block of code is some handling related to $$partitionedFollowers . We didn’t mention this PState in the earlier discussion of fanout, but it’s an additional optimization to balance load for handling of users with large amounts of followers. In short, this is an additional view of the social graph where users with more than 1,000 followers have their followers spread among multiple partitions of this PState. This balances the load of processing for fanout by reducing variance among partitions. We will be publishing another blog post in the future exploring this optimization and others.

The next section begins the other branch of processing at the root of the dataflow diagram:

1
2
.hook("FanoutRoot")
.explodeMicrobatch("*microbatch").out("*data")

This creates the branch and reads all new statuses for this iteration from the microbatch. explodeMicrobatch here reads all data from the microbatch across all partitions and binds each piece of data to the variable *data . This operation emits across all partitions of the module.

The next section begins processing of new statuses:

1
2
3
4
5
6
7
8
.macro(extractFields("*data", "*statusId", "*status"))
.macro(extractFields("*status", "*authorId", "*content", "*language"))
.each(MastodonHelpers::getStatusVisibility, "*status").out("*visibility")
.keepTrue(new Expr(Ops.NOT_EQUAL, "*visibility", StatusVisibility.Direct))
.each(Ops.IDENTITY, -1L).out("*nextIndex")
.each(Ops.IDENTITY, FanoutAction.Add.getValue()).out("*fanoutActionValue")
.each((RamaFunction2<Long, Long, StatusPointer>) StatusPointer::new, "*authorId", "*statusId").out("*statusPointer")
.each(HomeTimelines::addTimelineItem, "*homeTimelines", "*authorId", "*statusPointer", new Expr(Ops.CURRENT_MICROBATCH_ID))

This code specifies to only perform fanout for statuses with visibility other than Direct , which is for direct messages and handled elsewhere. It then adds the status to the author’s own home timeline, which implements self-fanout.

The next section reads a batch of followers for the status:

1
2
3
4
5
6
7
8
9
10
.select("$$partitionedFollowersControl", Path.key("*authorId")).out("*tasks")
.each(Ops.EXPLODE_INDEXED, "*tasks").out("*i", "*task")
.ifTrue(new Expr(Ops.NOT_EQUAL, 0, "*i"), Block.directPartition("$$partitionedFollowers", "*task"))
.anchor("NormalFanout")

.unify("NormalFanout", "LocalFollowerFanoutContinue")
.macro(safeFetchMapLocalFollowers("$$partitionedFollowers", "*authorId", "*nextIndex", rangeQueryLimit, "*fetchedFollowers", "*nextFollowerId"))
.ifTrue(new Expr(Ops.IS_NOT_NULL, "*nextFollowerId"),
        Block.each((RamaFunction5<Long, Long, FanoutAction, Status, Integer, FollowerFanout>) FollowerFanout::new, "*authorId", "*nextFollowerId", new Expr(FanoutAction::findByValue, "*fanoutActionValue"), "*status", "*task").out("*followerFanout")
             .localTransform("$$statusIdToLocalFollowerFanouts", Path.key("*statusId").nullToList().afterElem().termVal("*followerFanout")))

As mentioned earlier, in a future post we’ll explore the $$partitionedFollowers optimization. As will be explored in that future post, the first section of this code determines from which tasks to read followers in parallel for the status’s author.

The unify call merges processing for statuses from both last iteration and new statuses from this iteration. safeFetchMapLocalFollowers is a small helper function reading up to rangeQueryLimit followers from this partition for that author ( rangeQueryLimit is a constant set to 1,000). Since followers are read in parallel across many partitions for users with more than 1,000 followers, and since we deployed this module with 64 partitions, this means up to 64k followers are read per status per iteration.

The ifTrue line writes to the $$statusIdToLocalFollowerFanouts PState to continue fanout next iteration if there are still more followers to handle. This is the same PState you saw used in the earlier section.

The next section handles follower-specified options on the types of statuses they wish to see from this author:

1
2
3
4
5
6
7
8
9
10
11
.each(Ops.EXPLODE, "*fetchedFollowers").out("*follower")
.each((Follower follower) -> follower.accountId, "*follower").out("*followerId")
.each((Follower follower) -> follower.sharedInboxUrl, "*follower").out("*followerSharedInboxUrl")
.each((Follower follower) -> follower.isShowBoosts(), "*follower").out("*showBoosts")
.each((Follower follower) -> follower.getLanguages(), "*follower").out("*languages")
.keepTrue(new Expr(Ops.IS_NULL, "*followerSharedInboxUrl")) // skip remote followers
.ifTrue(new Expr(Ops.IS_INSTANCE_OF, BoostStatusContent.class, "*content"),
        Block.macro(extractFields("*content", "*boostedAuthorId"))
             .keepTrue(new Expr(Ops.NOT_EQUAL, "*boostedAuthorId", "*followerId"))
             .keepTrue("*showBoosts"))
.keepTrue(new Expr((List<String> languages, String statusLanguage) -> languages == null || statusLanguage == null || languages.contains(statusLanguage), "*languages", "*language"))

This filters this follower out of fanout for this status if: it’s a boost and they don’t wish to see boosts from this author, it’s a boost and they’re the original author of the status, or they specified they only wish to see certain languages from this author and the status doesn’t match.

Notice how all the information needed to do the filtering is on the Follower structure that was retrieved as part of fetching followers for fanout. No extra PState queries need to be done for this information, which is one of the reasons our implementation has such high throughput. The average amount of fanout per status on our instance is 403, so any work post-fanout (after the Ops.EXPLODE call, which emits once per follower in the *fetchedFollowers list) is multiplied by 403 compared to work pre-fanout. This is why we went out of our way to materialize as much information on the follow relationship as possible to minimize the work post-fanout.

The next section handles additional filtering required for replies:

1
2
3
4
5
6
.ifTrue(new Expr(Ops.IS_INSTANCE_OF, ReplyStatusContent.class, "*content"),
        Block.macro(extractFields("*content", "*parentAuthorId"))
             .hashPartition("*followerId")
             .macro(fetchBloomMacro("*followerId", "*rbloom"))
             .keepTrue(new Expr((RBloomFilter rbloom, Long accountId) -> rbloom.bloom.isPresent("" + accountId), "*rbloom", "*parentAuthorId"))
             .select("$$followerToFollowees", Path.key("*followerId").must("*parentAuthorId")))

Replies are delivered to a follower only if they also follow the account being replied to. This code queries $$followerToFollowees to perform that check, with the must navigator only emitting if the follow relationship exists.

Before the PState query, there’s a bloom filter check to minimize the amount of PState queries done here. This is another optimization that we didn’t mention in the earlier discussion of fanout, and we’ll discuss it more in a future post. In short, a bloom filter is materialized and cached in-memory on this module for each account with all follows for the account. If the bloom filter returns false, the follow relationship definitely does not exist and no PState query is necessary. If it returns true, the PState query is done to weed out false positives. The bloom filter reduces PState queries for replies by 99%.

The next section completes this ETL by writing to the home timelines of followers that have passed each of the preceding filters:

1
2
.hashPartition("*followerId")
.each(HomeTimelines::addTimelineItem, "*homeTimelines", "*followerId", "*statusPointer", new Expr(Ops.CURRENT_MICROBATCH_ID));

This simply routes to the appropriate partition hosting each follower’s home timeline and then adds to it. That .hashPartition call is actually the most expensive part of this ETL because of the huge volume of messages that flow through it. Due to the average of 403 fanout on our instance and our incoming rate of 3,500 statuses / second, 1.4M messages go across that partitioner every second.

When we open-source our Mastodon instance in two weeks, you’ll see that this ETL also handles hashtags, lists, conversations, and federation. We excluded those from this code example since they’re all pretty similar to home timeline fanout, with slightly different rules. They’re just additional branches of computation on this ETL.

That’s all we’ll show for now. As mentioned, in two weeks we’ll be open-sourcing our entire Mastodon implementation.

Conclusion

I’ve covered a lot in this post, but I’ve barely scratched the surface on Rama and our Mastodon implementation. For example, I didn’t mention “fine-grained reactivity”, a new capability provided by Rama that’s never existed before. It allows for true incremental reactivity from the backend up through the frontend. Among other things it will enable UI frameworks to be fully incremental instead of doing expensive diffs to find out what changed. We use reactivity in our Mastodon implementation to power much of Mastodon’s streaming API.

I also didn’t mention Rama’s integration API. Because of my description of Rama as being able to build an entire backend on its own, you may have the impression that Rama is an “all-or-nothing” tool. However, just because Rama can do so much doesn’t mean it has to be used to do everything. We’ve designed Rama to be able to seamlessly integrate with any other tool (e.g. databases, queues, monitoring systems, etc.). This allows Rama to be introduced gradually into any architecture.

To reiterate what’s to come: in one week we will be releasing the full Rama documentation as well as a build of Rama that exposes the full API for use with InProcessCluster , and in two weeks we will be fully open-sourcing our Mastodon implementation (which can run on InProcessCluster ). Additionally, we will be publishing more posts exploring Rama and our Mastodon implementation in greater depth.

You can keep track of developments with Rama by joining our newsletter or following us on Twitter at @redplanetlabs. We’ve also started the Google group rama-user, where you can discuss Rama or ask questions.

Lastly, Red Planet Labs will be starting a private beta in the coming months to give companies access to the full version of Rama. We plan to work closely with our private beta users to help them build new systems or reimplement existing systems at massively reduced cost. We will be releasing more details on the private beta later, but you can apply here in the meantime.

Discussion on Hacker News.

Tour of our 250k line Clojure codebase

At Red Planet Labs we’ve been quietly developing a new kind of developer tool for many years. Our tool reduces the cost of building large-scale end-to-end applications by multiple orders of magnitude, and Clojure is a big reason why we’ve been able to tackle such an ambitious project with a small team.

Our codebase consists of 250k lines of Clojure split evenly between source and test code. It’s one of the largest Clojure codebases in the world. In this post I’ll give a tour of how we organize our code so a project of this size can be understood amongst a team, the development and testing techniques we use that leverage the unique qualities of Clojure, and an overview of the key libraries we use.

Custom language within a language

One of the coolest parts of our codebase is the new general purpose language at its foundation. Though the semantics of the language are substantially different than Clojure, it’s defined entirely within Clojure using macros to express the differing behavior. It compiles directly to bytecode using the ASM library. The rest of our system is built using both this language and vanilla Clojure, interoperating seamlessly.

One of the striking capabilities our language has that vanilla Clojure does not is first-class continuations. The way in which our language expresses continuations makes it extremely good at async, parallel, and reactive programming. All of these are foundational to the large-scale distributed infrastructure we’re building.

That you can build an entirely new language with radically different semantics within Clojure demonstrates how powerful Clojure is. There’s a lot you get “for free” when building a language this way: lexing, parsing, datatypes, namespaces, immutable data structures, and the entire library ecosystem of Clojure and the JVM. Ultimately our new language is Clojure since it’s defined within Clojure, so it benefits from seamless interoperability with both Clojure and the JVM.

The vast majority of applications are not going to need to develop a full language like we have. But there are plenty of use cases where a focused DSL is appropriate, and we have examples of that too. The ability when using Clojure to customize how code itself is interpreted, via macros and meta-programming, is an incredibly powerful capability.

Type/schema checking

Central to any codebase is the data that is created, managed, and manipulated. We find it’s imperative to carefully and clearly document the data flying around the system. At the same time, type or schema annotations add overhead so it’s important to be thoughtful and not overdo it.

We use the Schema library for defining datatypes within our codebase. It’s easy to use and we like the flexibility to define schema constraints beyond just types: e.g. arbitrary predicates, enums, and unions. Our codebase contains about 600 type definitions, most of which are annotated using Schema.

Around Schema we have a helper called “defrecord+” which defines constructor functions which also perform validation (e.g. for type Foo it generates “->valid-Foo” and “map->valid-Foo”). These functions throw a descriptive exception if the schema check fails.

There’s no static type checking in Clojure, and static type checks wouldn’t be able to check all the kinds of constraints we define using Schema anyway (e.g. the value of a number being within a certain range). We’ve found we only need to insert schema checking on either:

  • Construction of types, for which our auto-generated “valid” constructor functions remove all the ceremony. Detecting an error when creating a record is much better than when using it later on, as during creation you have the context needed to debug the problem.
  • A few strategic spots throughout the codebase where lots of different types flow.

We only occasionally annotate the types of function args and return values. We find instead that being consistent about how we name things is good enough for understanding the code. We do have about 500 assertions throughout our codebase, though these are generally about higher-level properties rather than simple type checks.

The approach we’ve taken for schema definition and enforcement is lightweight, comprehensive, and doesn’t get in our way. The lack of static typing in Clojure scares a lot of programmers who have never used Clojure, and all we can say is that with a little bit of thought in how you organize your code it’s not an issue at all. And doing things dynamically means we can enforce stronger constraints than possible with static type systems.

Multi-module repository setup

Our codebase exists in a single git repo with four modules to split up the implementation:

  • “core”, which contains the definition of our compiler and the corresponding abstractions for parallel programming
  • “distributed”, which implements those parallel programming abstractions as a distributed cluster
  • “rpl-specter”, an internal fork of Specter which adds a ton of functionality
  • “webui”, which implements the front end of our product

We use Leiningen and deps.edn for our build. The ability to specify local targets as dependencies in deps.edn files is key to our multi-module setup, and the basic organization of our source tree looks like:

1
2
3
4
5
6
7
8
9
10
project.clj
deps.edn
rpl-specter/project.clj
rpl-specter/deps.edn
core/project.clj
core/deps.edn
distributed/project.clj
distributed/deps.edn
webui/project.clj
webui/deps.edn

Here’s an excerpt from our deps.edn file for “distributed”:

1
2
3
4
5
6
{:deps {rpl/core {:local/root "../core"
                  :deps/manifest :deps}
        ...
        }
  ...
  }

This setup lets us develop within any one of the modules and automatically see any source changes in the other modules without having to make explicit Maven dependencies.

Loading the entire codebase for running tests or loading a REPL is pretty slow (largely from compilation of code using our custom language), so we use AOT compilation heavily to speed up development. Since we spend most of our time developing in “distributed”, we AOT compile “core” to speed things up.

Polymorphic data with Specter

Specter is a library we developed for supercharging our ability to work with data structures, especially nested and recursive data. Specter is based around the concept of “paths” into data structures, where a path can “navigate” to any number of values starting from the root of a data structure. The path can include traversals, views, and filters, and they’re deeply composable.

Our compiler compiles code into an abstract representation with a distinct record type for each kind of operation possible in our language. There are a variety of attributes every operation type must expose in a uniform way. For example, one of these attributes is “needed fields”, the fields in the closure of that operation that it requires to do its work. A typical way to express this polymorphic behavior would be to use an interface or protocol, like so:

1
2
(defprotocol NeededFields
  (needed-fields [this]))

The problem with this approach is it only covers querying. Some phases of our compiler must rewrite the fields throughout the abstract representation (e.g. uniquing vars to remove shadowing) and this protocol doesn’t support that. A (set-needed-fields [this fields] ) method could be added to this protocol, but that doesn’t cleanly fit data types which have a fixed number of input fields. It also doesn’t compose well for nested manipulation.

Instead, we use Specter’s “protocol paths” feature to organize the common attributes of our varying compiler types. Here’s an excerpt from our compiler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(defprotocolpath NeededFields [])

(defrecord+ OperationInput
  [fields :- [(s/pred opvar?)]
   apply? :- Boolean
   ])

(defrecord+ Invoke
  [op    :- (s/cond-pre (s/pred opvar?) IFn RFn)
   input :- OperationInput])

(extend-protocolpath NeededFields Invoke
  (multi-path [:op opvar?] [:input :fields ALL]))

(defrecord+ VarAnnotation
  [var :- (s/pred opvar?)
   options :- {s/Keyword Object}])

(extend-protocolpath NeededFields VarAnnotation
  :var)

(defrecord+ Producer
  [producer :- (s/cond-pre (s/pred opvar?) PFn)])

(extend-protocolpath NeededFields Producer
  [:producer opvar?])

“Invoke”, for instance, is the type that represents calling another function. The :op field could be a static function or a var reference to a function in the closure. The other path navigates to all the fields used as arguments to the function invocation.

This structure is extremely flexible and allows for modifications to be expressed just as easily as queries by integrating directly with Specter. For instance, we can append a “-foo” suffix to all the needed fields in a sequence of operations like so:

1
(setval [ALL NeededFields NAME END] "-foo" ops)

If we want the unique set of fields used in a sequence of ops, the code is:

1
(set (select [ALL NeededFields] ops))

Protocol paths are a way to make the data itself polymorphic and able to integrate with the supercharged abilities of Specter. They greatly reduce the number of manipulation helper functions that would be required otherwise and make the codebase far more comprehensible.

Organizing complex subsystems with Component

The daemons comprising the distributed system we’re building are comprised of dozens of subsystems that build on top of one another and depend on each other. The subsystems need to be started in a particular order, and in tests they must be torn down in a particular order. Additionally, within tests we need the ability to inject mocks for some subsystems or disable some subsystems altogether.

We use the Component library to organize our subsystems in a way that manages lifecycle and gives us the flexibility to inject alternate dependencies or disable subsystems. Internally, we built a “defrcomponent” helper to unify field and dependency declarations. For example, from our codebase:

1
2
3
4
5
6
7
8
9
10
(defrcomponent AdminUiWebserver
  {:init      [port]
   :deps      [metastore
               service-handler
               cluster-retriever]
   :generated [^org.eclipse.jetty.server.Server jetty-instance]}

  component/Lifecycle
  ...
  )

This automatically retrieves fields “metastore”, “service-handler”, and “cluster-retriever” from the system map it’s started in and makes them available in the closure of the component’s implementation. It expects one field “port” in the constructor of the component, and it generates another field “jetty-instance” on startup into its internal closure.

We also extended the component lifecycle paradigm with “start-async” and “stop-async” protocol methods. Some components do part of their initialization/teardown on other threads, and it was important for the rest of our system (especially deterministic simulation, described below) for those to be doable in a non-blocking way.

Our test infrastructure builds upon Component for doing dependency injection. For instance, from our test code:

1
2
3
4
5
6
7
8
(sc/with-simulated-cluster
  [{:ticker (rcomponent/noop-component)}
   {:keys [cluster-manager
           executor-service-factory
           metastore]
    :as   full-system}]
  ...
  )

That first map is a dependency injection map, and this code disables the “ticker” component. The “ticker” causes simulation tests to advance time occasionally, and since this test wants to control time explicitly it disables it. That dependency injection map can be used to override or disable any component in the system, providing the flexibility necessary for writing tests.

Using with-redefs for testing

Clojure provides the macro “with-redefs” that can redefine any function executed within the scope of that form, including on other threads. We have found this to be an invaluable feature for writing tests.

Sometimes we use with-redefs to mock specific behavior in the dependencies of what we’re testing so we can test that functionality in isolation. Other times we use it to inject failures to test fault-tolerance.

The most interesting usage of with-redefs in our codebase, and one of our most common, is using it alongside no-op functions we insert into our source code. These functions effectively provide a structured event log that can be dynamically tapped in an à la carte way depending on what a test is interested in.

Here’s one example (out of hundreds in our codebase) of how we use this pattern. One part of our system executes user-specified work in a distributed way and needs to: 1) retry the work if it fails, and 2) checkpoint its progress to a durable, replicated store after a threshold amount of work has succeeded. One of the tests for this injects a failure the first time work is attempted and then verifies the system retries the work.

The source function that executes the work is called “process-data!”, and here is an excerpt from that function:

1
2
3
(when (and success? retry?)
  (retry-succeeded)
  (inform-of-progress! manager))

“retry-succeeded” is a no-op function defined as (defn retry-succeeded [] ).

In a totally separate function called “checkpoint-state!”, the no-op function “durable-state-checkpointed” is called after it finishes replicating and writing to disk the progress information. In our test code, we have:

1
2
3
4
5
6
7
8
9
10
(deftest retry-user-work-simulated-integration-test
  (let [checkpoints     (volatile! 0)
        retry-successes (volatile! 0)]
    (with-redefs [manager/durable-state-checkpointed
                  (fn [] (vswap! checkpoints inc))

                  manager/retry-succeeded
                  (fn [] (vswap! retry-successes inc))]
      ...
      )))

Then in the body of the test, we check the correct internal events happen at the correct moments.

Best of all, since this à la carte event log approach is based on no-op functions, it adds basically no overhead when the code runs in production. We have found this approach to be an incredibly powerful testing technique that utilizes Clojure’s design in a unique way.

Macro usage

We have about 400 macros defined through our codebase, 70% of which are part of source code and 30% of which are for test code only. We have found the common advice for macros, like don’t use a macro when you can use a function, to be wise guidance. That we have 400 macros doing things you can’t do with regular functions demonstrates the extent to which we make abstractions that go far beyond what you can do with a typical language that doesn’t have a powerful macro system.

About 100 of our macros are simple “with-” style macros which open a resource at the start and ensure the resource is cleaned up when the form exits. We use these macros for things like managing file lifecycles, managing log levels, scoping configurations, and managing complex system lifecycles.

About 60 of our macros define abstractions of our custom language. In all of these the interpretation of the forms within is different than vanilla Clojure.

Many of our macros are utility macros, like “letlocals” which lets us more easily mix variable binding with side effects. We use it heavily in test code like so:

1
2
3
4
5
(letlocals
  (bind a (mk-a-thing))
  (do-something! a)
  (bind b (mk-another-thing))
  (is (= (foo b) (bar a))))

This code expands to:

1
2
3
4
(let [a (mk-a-thing)
      _ (do-something! a)
      b (mk-another-thing)]
  (is (= (foo b) (bar a))))

The rest of the macros are a mix of internal abstractions, like a state machine DSL we built, and various idiosyncratic implementation details where the macro removes code duplication that can’t be removed otherwise.

Macros are a language feature that can be abused to produce terribly confusing code, or they can be leveraged to produce fantastically elegant code. Like anything else in software development, the result you end up with is determined by the skill of those using it. At Red Planet Labs we can’t imagine building software systems without macros in our toolbox.

Deterministic simulation

As we wrote about previously, we have the ability to write 100% reproducible distributed systems tests by running our whole system on a single thread and randomizing the order in which entities execute events starting from a random seed. Simulation is a major, codebase-spanning capability that heavily utilizes the aforementioned techniques of dependency injection and redefs. For example:

  • Any part of the system that in production would be a unique thread is coded in terms of executor services. To get an executor service for that particular part of the system, it requests one from an “executor service factory”. In production, this returns new threads. In simulation, however, we override that component to provide executor services from our single-threaded, globally managed source.
  • Much of our system relies on time (e.g. timeouts), so time is abstracted away from our implementation. Any part of the system that is interested in time consults a “time source” dependency. In production this is the system clock, but in simulation the component is overridden with a “simulated time source” that can be explicitly controlled within our simulation tests.
  • Promises are used quite a bit throughout the codebase to manage asynchronous, non-blocking behavior. Simulation uses with-redefs to layer in additionally functionality into promises useful for stepping through simulation.

Front end

Our product provides a UI to let users see what they have running on a cluster, the current status of operations like scaling, and telemetry showing what’s going on in their applications.

The front end is a web-based single page app coded in ClojureScript. The ClojureScript ecosystem has many mature, well-designed libraries that make development efficient and fun.

Reviewing the libraries and their advantages could be a blog post in itself, but briefly: we use re-frame because its data-oriented state management and event handling models are easy to reason about and inspect. We use reitit for frontend routing; we like how its data-oriented design allows us to associate arbitrary data with each route, which in turn lets us do neat things like dispatch re-frame events on route changes. We use shadow-cljs to compile the project, in part because it dramatically simplifies the process of using JavaScript libraries and dealing with externs.

We use uPlot for displaying time-series data. Our API backend is served using a Jetty server, and we use Compojure to define backend routes.

Defining our front end in the same language as the rest of our codebase is a huge win, especially the ease of shuttling data back and forth between Clojure and ClojureScript. The immutable style emphasized by Clojure is just as beneficial in front-end code as back-end code, so being able to leverage that consistently benefits our productivity and the robustness of our product greatly.

Libraries

Here are many of the external libraries we use in our codebase, a mixture of Clojure, ClojureScript, Java, and Javascript libraries:

  • ASM: used for bytecode generation
  • Compojure: used for defining routes in web server
  • Component: used for defining subsystems with well-defined lifecycles
  • Jetty: used to serve data to our front end
  • Loom: used for representing graph data structures, especially within our compiler.
  • Netty: used for asynchronous network communication
  • Nippy: used for serialization
  • Potemkin: used for a few utilities, especially “import-namespace”, “import-vars”, and “def-map-type”
  • reitit: used for front-end routing
  • re-frame: used to build our web code
  • RocksDB: used for some durable indexing tasks
  • Schema: used for defining types with rich schemas
  • shadow-cljs: used for compiling front-end code
  • SnakeYAML: used for parsing YAML
  • Thrift: used to help power some of the CLI of our product
  • uPlot: used to display time series graphs in our front end

Conclusion

Clojure has been fantastic for developing our product. It’s enabled us to build powerful abstractions not possible in other languages, remove all ceremony whatsoever, and utilize powerful testing techniques. Plus we’ve had multiple members on our team start with no Clojure or functional programming experience and they were able to get up to speed quickly.

If you’re interested in working with us to help define the future of software development, we’re hiring! We work on hard problems pushing what’s possible with compilers, databases, and distributed systems. Our team is fully distributed and we’re open to hiring anywhere in the world.

Where we’re going, we don’t need threads: Simulating Distributed Systems

Testing distributed systems is hard.

You probably already knew that. If you’ve spent even a little time writing distributed systems, you know that the relevant algorithms and techniques are complex and nuanced. Distributed systems end up full of gotchas and corner cases. The tests for these systems are even worse! If you actually care that your application is correct, it’s exactly the hard-to-reproduce failures that are going to make you want to throw your computer into the garbage and run off to live in the woods. 

Before you start looking for habitable caves, we have good news: there’s a way out of this problem, and it’s called deterministic simulation.

It’s all about ordering

Let’s take a step back and ask ourselves why distributed systems are so hard to test. In the standard unit testing methodology, you identify blocks of behavior (functions, classes), carefully control their inputs across the range of acceptable values, and validate their outputs. This is easy when your behavior is fully synchronous, because the only inputs you need to control are the actual data. 

But in a distributed system — which really means “any system with concurrency greater than 1” — while you do control the “inputs” in terms of arguments, you typically do not control another key variable: ordering of events. “Event” is abstract here, but for the most part, we are interested in points of interaction between concurrent actors. (Think: two threads printing to standard out, or a user trying to update the same field on their profile from multiple browser tabs.) Depending on what order each concurrent actor reads or writes some piece of data, your system can give wildly different results.

When you run a test, it runs in some order. But when you run it again, it might run in a different order, or it might not, depending on a bunch of totally invisible parameters! (What’s the background cpu load on your machine? How full is the heap in the test process? Did the JIT compiler run yet?) It’s often the case that it’s easy to get a test passing consistently in your own environment, only to have it fail when it runs in a different environment like your CI system. It might not even fail every time! Now you’re stuck trying to reproduce that failure with more adverse development conditions, repeatedly running the same test to try and catch that rare failure. You can easily lose hours or days trying to reproduce and fix these kinds of problems.

The traditional approach in this situation is to put in ad-hoc measures that exclude unexpected orderings of events: you replace implementations with mocks, or inject “extra” locking, or jump through hoops to verify the system reaches a specific state before you run your test assertions. This can be brittle, or worse, end up compromising the validity of your tests, since you’re no longer testing the code that you actually run in production. But the real problem with this approach is that it only prevents the unexpected orderings you can imagine up front! Your system can still reach unanticipated states based on orderings that you didn’t even know were possible.

No matter how many tests you write of your distributed system, if you can’t figure out some way to systematically test ordering, there are going to be vast swaths of territory that are completely uncharted. The more complex the system, the more likely it is that some totally bonkers emergent bad behavior is lurking just out of sight. 

Deterministic Simulation

Deterministic simulation is all about changing ordering from an uncontrolled variable into a controlled one. With that control, you can vary execution order directly, just like any other test input. This is an incredibly powerful technique for getting to a whole new level of rigor and repeatability in distributed systems testing, making it much easier to find bugs related to ordering and to repeat those orderings once they’ve been detected. It’s a gigantic productivity lever for your team as you build your ever-more-complex application.

This probably sounds pretty pie in the sky, and it’s true that there is no “out of the box” simulation tooling that can be bolted on to any application. But it turns out that building a simulation capability that is well-suited to your specific environment and codebase is totally achievable. Folks like FoundationDB have talked publicly about their experience following this path, and their success was a major inspiration to our own efforts. 

We’ve built up a functionally complete deterministic simulation capability at Red Planet Labs over the last few months. It’s been a transformative time for us that has already dramatically reduced the cost of finding and fixing complex ordering bugs in our distributed system. Some problems that took us weeks to resolve in the past are now getting fixed in a single afternoon! We’ve also gained the ability to test conditions that would have been difficult or impossible to create in the past. We’ll be building on top of these capabilities for a long time to come.

For us, the formula ended up being no parallelism + quantized execution + deterministic behavior = deterministic simulation. We’ll go through each of those terms below. The rest of this post will be devoted to talking through some of the interesting philosophical and design pattern discoveries we’ve made along the way, which will hopefully provide somewhere for you to start should you decide to go down this road. 

No Parallelism

If you let concurrent actors in your simulated system actually execute in parallel, you really hamstring your ability to control execution order, since there are fundamentally no guarantees about the order in which their tasks execute. To give your simulation complete control, you must explicitly remove all parallelism. One way to achieve this is to run all concurrent actors on a single thread in the same process. 

Wait — you’re trying to test distributed systems. Isn’t parallelism a fundamental characteristic of distributed systems? How can you test anything interesting if you have no parallelism? While distributed systems are indeed both concurrent and parallel, the vast majority of the problems these systems experience are a result of logical concurrency mistakes rather than actual parallel resource contention.  

When you have two co-located concurrent actors running in parallel, they can both try to access a shared resource in the same exact instant. This is called contention. Problems caused by contention are often solved by using some kind of lock, which literally reduces the allowed parallelism to 1. However, even when there is no contention, your concurrent actors can still misbehave, based solely on how their accesses interleave with each other. These problems arise from flawed concurrency logic present at a higher level of the application, and are rarely caused — or fixed — by locking. 

But when your concurrent actors don’t share the same process or hardware, they can’t access shared resources in the first place — they have to use some kind of asynchronous message passing and wait for a reply. They cannot contend. That means all bugs found in the interaction between two independent concurrent actors must therefore be of the concurrency kind, which means they only depend on order. 

A simulation scheme with no parallelism can readily explore orderings in the concurrency space, but is incapable of exploring orderings in the contention space. This is a good trade off, though, because the bulk of the problems you’re going to experience in a distributed system are of the concurrent type. 

Contention problems are still problems that have to be tested and fixed. However, it’s common for distributed system implementations to minimize direct access to shared resources and emphasize use of message-passing style even when co-located, since it makes the entire system more consistent and easier to understand. With this approach, any implementation details that can cause contention get abstracted into the “framework” layer of your application, where they can be tested exhaustively to verify correctness without having to worry about the bigger picture app-level concurrency. When this pattern is used, the no-parallelism simulation approach makes no sacrifices in its ability to explore orderings.

Quantized Execution

If you want to be able to control “what happens next” in a granular fashion, you need to change the way concurrent work streams are expressed in your application. If you define your concurrent work streams naively — as threads, for instance — then in production you get all the isolation and independence attributes you want, but you can’t introspect concurrent work streams during simulation. What the simulation really needs is for concurrent work streams to be uniquely identified and to expose their individual work items.

We call the idea of chunked work in identifiable streams quantized execution. This sounds intense, but in practice, it just means that instead of using threads, you organize everything in terms of ExecutorServices and Runnables / Callables that are submitted to them. You tie it all together with a central “executor service factory” that allows you to effectively inject the execution mode into your runtime system just like any other dependency. In production, the executor service factory returns “real” independent ExecutorServices backed by their own threads, and they will run in a totally uncoordinated manner (that is, governed only by the concurrency structures you put in place). 

But when the system is under simulation, you inject a factory that returns special “facade” ExecutorServices. They capture submitted tasks into a queue, but don’t actually execute anything until they’re explicitly told to do so. The overall simulation controller uses this interface to make the decision about who gets to execute their next work item.

A meaningful cost of this approach is that your application can no longer be defined in terms of “naive” concurrency / parallelism constructs, since those constructs don’t work in simulations. For instance, if you try to do a blocking wait on a response from another actor, it will never complete, because you’re blocking the only thread in your system! Any part of your application that will be simulated must start to follow the cooperative multitasking model; chances are good that this will eventually spread to all parts of your application. 

Refactoring your system as a whole to work in these terms can be painful and expensive, especially while you’re getting used to it. But after you’ve gotten over the hump, you will have an equivalent system that’s better in a lot of ways. The biggest advantage of this approach is that you always run the same code, whether you’re in production or in simulation — there’s no need to vary the behavior of your components to specifically accommodate simulation. This minimizes the chances of your mocks diverging from their real-world counterparts, which is a major risk otherwise.

For Red Planet Labs, the shift to universal cooperative multitasking meant that our system as a whole is more reactive, and we don’t rely on blocking behavior except when interacting with 3rd-party libraries we don’t control. There are a lot of factors that go into it, but on the whole, our simulations tend to be meaningfully faster than their unsimulated counterparts, even when the unsimulated version gets to use multiple cpu cores! 

Deterministic Behavior

We’ve removed genuine parallelism’s unpredictability and made our actors’ work streams transparent with quantized execution. The final ingredient needed to make simulations into a productivity powerhouse is determinism.

Testing of distributed systems is often burdened with two divergent requirements: the ability to reproduce the same result consistently, and the ability to vary or explore possible orderings to produce novel failures. It turns out that it’s pretty easy to get both requirements out of simulations.

At every step of execution, our simulation must select an ExecutorService and let it complete one work item. There are a lot of ways the selection could be implemented, but the scheme that covers the broadest set of needs with the least complexity is using a seeded random number generator to choose an actor randomly. 

By making selection random, you avoid accidentally creating an ordering that is more orderly than what would happen naturally in the real world. If you run the same simulation many times, you can expect execution order to vary, iteratively exploring some fraction of all theoretically possible orderings. But by using a seeded random number generator as the source of that random selection, that variability instantly becomes repeatable. 

To rerun some “interesting” ordering, all you need is the random seed that generated that sequence of selections. When you specify that seed again directly, the properties of PRNGs kick in to give you the same sequence of selections. As long as all the other inputs to the simulation are also the same, then you will repeat the interesting ordering exactly.

It’s worth really hammering on that last point: simulations are only repeatable if all the behavior in them is deterministic. Any source of non-determinism could thwart your efforts, and it’s easy to be sabotaged by very subtle mistakes. If you ever generate a UUID, use random numbers, implicitly rely on hashmap ordering, or refer to system time, your simulation can produce different results even with the same random seed. 

Non-deterministic simulations are very frustrating, since they are often hard to detect in the first place and tedious to diagnose and fix once detected. RPL expects to do more work on our tooling in this area over time; we’ll be sure to share any interesting takeaways.

Beyond Simulation

Even just the basic capabilities of simulation are incredibly impactful when it comes to testing a distributed system. But it turns out that there are all sorts of things you can do on top of simulation that make it even more useful. A system that is compatible with simulation has gone to lengths to expose the seams between many sub-components; you can exploit those seams to create conditions in tests that would otherwise be very difficult to replicate. For instance:

  • Preventing a particular actor from being selected for execution simulates a garbage collection pause
  • Stopping all the executors associated with a logical “process” simulates process death
  • Adding delay to the execution of network IO tasks simulates network latency

And many more! RPL has only scratched the surface in this area, but we expect to exploit this avenue a ton as we seek to test our system under ever more adverse conditions.

Clojure, Faster

Dynamic, functional programming languages like Clojure are sometimes considered “slower” than their statically-typed and/or OO counterparts, due to facilities like dynamic function dispatch and immutable-orientation, etc. But this comparison is too general.

Clojure is readily optimizable in cases where utmost performance is key. Clojure embraces the “Get it working, then make it fast” approach to building up a software system—even large-scale systems with high performance demands¹.

This post is about micro-optimizing Clojure code: how can we improve our Clojure code performance on the JVM when we need it the most, when hot code paths and tight loops prevail—for example, in large-scale data processing systems.

General JVM performance, benchmarking, and tuning is a broad and deep skillset and there are many more considerations beyond Clojure. The focus here is Clojure-specific.

Clojure Performance Tools

Before we get to our specific list of Clojure optimizations, we should emphasize that running your own benchmark and profiling experiments is essential to producing specific top-level performance outcomes for your own code and domain.

You could follow all of the advice in this post, shower your code with type hints, remove laziness and sequences, etc, etc, and⁠ without a specific performance model for where your costs are coming from and how to measure improvement, it is possible or even likely to see no dip in your processing latencies or reduction in your cloud bill².

We recommend the following tools for performing your own analyses; these were all used in developing this post:

  • criterium A benchmarking tool that is a Clojure industry-standard at this point; most of the relative benchmarks in this post are generated from the REPL using criterium.
  • clj-async-profiler For minimal quick and dirty Clojure profiling. REPL-enabled.
  • no.disassemble For inspecting Clojure-compiled byte code from the REPL. (If you’re familiar with javap, the output is what you would see from decompiling classes using javap -c on the command-line, except you can see what a compiled Clojure function looks like.)
  • jmh (Java Microbenchmark Harness) A feature-rich benchmarking toolkit for the JVM. Its usage is heavily Java/annotation-based (so not REPL-friendly) but it has an expressive feature set, making it possible to run more specialized benchmarks such as ones that could deadlock (and many others; see here.) We find it essential in carrying out a comprehensive benchmarking analyses and we’ll demonstrate out how to use it with Clojure a bit later on.

Avoiding Laziness

Clojure’s sequence abstraction and laziness are powerful programming facilities but we must shy away from these tools when memory and compute are in high demand.

Why? Lazy implementations require generating per-element state and this can be substantial overhead when compared to non-lazy alternatives like arrays and vectors whose elements each contribute just their own value’s size to memory.

Accordingly there is computation overhead in laziness due to this per element construction; compare lazy implementations of map and filter to transform values in a vector with a non-lazy (transducer-based) alternative:

1
2
3
4
5
6
7
8
9
10
(let [data (vec (range 100))]
    (report-result
      (quick-benchmark
        (into [] (map inc (filter even? data))) {}))
;; Execution time mean : 3.916001 µs

    (report-result
      (quick-benchmark
        (into [] (comp (filter even?) (map inc)) data) {})))
;; Execution time mean : 1.738479 µs

into has first-class transducer support; the same over-2x gains follow when using transducers + transduce in favor of laziness + reduce.

Avoid laziness where scale matters.

first and last

This one is particularly frustrating. We might expect first and last to have special consideration of their provided type. But this is not the case; they are implemented over Clojure’s lazy sequence API without respect of type. Even with a vector, last will obtain the result using a full linear pass over the data. Both first and last seq the input unnecessarily which is additional computation cost.

Of course we have a way out⁠—but the moral of the story is to stay well away from first and last when optimal performance is required. Here’s a full sequence of benchmarks; notice the dramatic relative cost of first and last on vectors. Pick the latencies you like the best:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(let [v (into [] (range 1000))]
    (report-result
      (quick-benchmark
        (first v) {}))
;; Execution time mean : 53.459498 ns
    (report-result
      (quick-benchmark
        (nth v 0) {}))
;; Execution time mean : 8.477276 ns
    (report-result
      (quick-benchmark
        (.nth ^Indexed v 0) {}))
;; Execution time mean : 7.898334 ns
    (report-result
      (quick-benchmark
        (last v) {}))
;; Execution time mean : 13.215272 µs  (!!!)
    (report-result
      (quick-benchmark
        (nth v (-> v count dec)) {}))
;; Execution time mean : 93.989401 ns
    ;; The cost of the previous expression is primarily in the count; so we can improve it:
    (report-result
      (quick-benchmark
        (nth v (-> ^IPersistentVector v .length dec)) {}))
    )
;; Execution time mean : 7.155083 ns

Edit: Clojure’s peek will answer the last item of a vector optimally. The benchmark for peek matches precisely the mean execution time of nth + .length in the last benchmark above. [Credit]

equiv and equals

An occasional need in high-performing applications is a read-heavy, in-memory cache or value registry, commonly with some kind of composite/tuple for its map keys. It turns out that the choice of datatype (for both map and key) are… erm… key here.

Consider the following benchmarks for a simple map lookup using different combinations of map type (Clojure’s PersistentHashMap vs. Java’s HashMap) and composite key type.

PersistentHashMap[“a” “b”]167 ns
(ArrayList. [“a” “b”])95 ns
“a:b”78 ns
(→CacheKey “a” “b”)116 ns
HashMap[“a” “b”]36 ns
(ArrayList. [“a” “b”])53 ns
“a:b”33 ns
(→CacheKey “a” “b”)152 ns
Benchmarks for (get map key) for different map and key types. CacheKey is a basic defrecord type; “a:b” is a simple String with the composite values concatenated, etc.

This is quite a lot of variation in performance: JDK’s HashMap performs significantly better (except when using a record key—we’ll look at that one later). What accounts for this?

Clojure defines equality (i.e., clojure.core/=) more generically than Java’s .equals and this definition is used by Clojure’s data structures for equality comparison, PersistentHashMap in this case:

1
2
3
4
5
6
7
8
9
(let [v ["a" "b"] v2 ["a" "b"]]
    (report-result
      (quick-benchmark
        (= v v2) {}))
;; Execution time mean : 91.347726 ns
    (report-result
      (quick-benchmark
        (.equals v v2) {})))
;; Execution time mean : 10.741904 ns

Significant performance difference for these composite (vector) keys.

Of course, Clojure’s = is more general than Java .equals, which is a stricter comparison in several cases; here is just one example:

1
2
3
4
(.equals [1 2] [(int 1) 2])
⇒ false
(= [1 2] [(int 1) 2])
⇒ true

So we need to be sure that the keys we use in our map are carefully constructed to always compare correctly with the stronger .equals semantics.

Lastly, using a defrecord (CacheKey) appears to offer the worst performance. Let’s use some lightweight profiling to understand what is going on here, by comparing records vs vectors as keys:

1
2
3
4
5
6
7
8
(let [k (→CacheKey "a" "b")
      m (HashMap. {(→CacheKey "a" "b") ::val})]
    (prof/profile
      (dotimes [i 100000000] (get m k)))) ;; Figure 1
  (let [k ["a" "b"]
        m (HashMap. {["a" "b"] ::val})]
    (prof/profile
      (dotimes [i 100000000] (get m k)))) ;; Figure 2
Figure 1. Map lookup using a defrecord as the key. This flamegraph shows relative time spent in execution of each function in the call stack — generated from the REPL by clj-async-profiler.
Figure 2. Map lookup using a vector tuple as the key.

From our profiling, the map equality used by the CacheKey record looks suspicious. We can confirm its performance is much worse than vector equals:

1
2
3
4
5
6
7
8
  (let [ka (→CacheKey "a" "b") kb (→CacheKey "a" "b")]
    (quick-benchmark
      (.equals ka kb) {}))
;; Execution time mean : 2.015044 µs
  (let [ka ["a" "b"] kb ["a" "b"]]
    (quick-benchmark
      (.equals ka kb) {})))
;; Execution time mean : 11.376671 ns

The performance overhead we’ve discussed here is minimal for map lookups of singular values like keywords; we are generally okay to stick to Clojure datatypes for data entities where these key types are idiomatic. Still, using records (defrecord) for data entities can have a significant twofactor impact on key lookups:

1
2
3
4
5
6
7
8
  (let [k (→CacheKey "a" "b")]
    (quick-benchmark
      (:a k) {}))
;; Execution time mean :  5.638519 ns
  (let [k {:a "a" :b "b"}]
    (quick-benchmark
      (:a k) {})))
;; Execution time mean : 10.495481 ns

To make a broader generalization to wrap up this section, the JDK ecosystem at large has battled-hardened collection and data structure offerings for high performance use cases and Clojure’s Java interop (with type hints!) gives us access to any of these that make sense for our code hot spots.

Dynamic Vars vs. ThreadLocal

Another occasional need is for a per-thread state mechanism. Clojure has dynamic vars and binding, but the Clojure implementation of reading a dynamic var is far less optimal than using a ThreadLocal directly:

1
2
3
4
5
6
7
8
9
10
11
12
13
(def ^:dynamic *dyna-state* nil)
(def ^ThreadLocal thr-local-state (ThreadLocal.))

(binding [*dyna-state* 100]
    (report-result
      (quick-benchmark
        (.get thr-local-state) {}))
;; Execution time mean : 6.269224 ns
    (report-result
      (quick-benchmark
        *dyna-state* {}))
;; Execution time mean : 42.911467 ns
    )

Even the resolution of var values can be optimized by use of ^:const metadata, which makes the var value inlined at compile-time:

1
2
3
4
5
6
7
8
9
10
11
12
13
(def ^:const ^Long const-val 99)
(def ^Long var-val 99)

(do
    (report-result
      (quick-benchmark
        (unchecked-multiply 1 const-val) {}))
;; Execution time mean : 2.712172 ns
    (report-result
      (quick-benchmark
        (unchecked-multiply 1 var-val) {}))
;; Execution time mean : 4.876835 ns
    )

Of course this means any redef of const-val here will go unnoticed by any previously compiled references to it.

Clojure Transients

For initializing immutable data types with many values, Clojure’s transients are fast:

1
2
3
4
5
6
(quick-benchmark
   (persistent! (reduce conj! (transient []) (range 100000)))
;; Execution time mean: 1.189867 ms

   (reduce conj [] (range 100000))
;; Execution time mean: 2.709462 ms

This benchmark is roughly the implementation of clojure.core/into, by the way, which uses transients for performance—so certainly use into when you can. Transients are well-covered by the documentation but I mention them here for coverage.

Dramatic Gains with Type Hinting

As a Clojure programmer, you will inevitably encounter type hinting but what may come as a surprise is how dramatically it can improve performance.

Clojure is adamantly dynamic, which means Clojure code can compile without needing to know (or even having access to) the types of values that will flow at runtime. In the following function, x and y could take on the value of an array, map, String, or many other types that might show up dynamically:

1
2
(defn same-count? [x y]
   (= (count x) (count y)))

Using criterium, we can baseline this code without having compile-time type information for x and y:

1
2
3
(quick-benchmark
   (same-count? "f" "g") {}))
;; Execution time mean : 71.037835 ns

Reading the source for clojure.core/count we find lots of “instance of” logic to pick and invoke the relevant behavior at runtime. This is all expensive and we can circumvent it entirely by committing to types at compile-time using type hints:

1
2
3
(defn same-count? [^String x ^String y]
   (= (.length x) (.length y)))
;; Execution time mean : 3ns

Note the whopping 20x improvement in mean execution time.

It should also be noted that if we omitted the ^String type hints from this last implementation of same-count? we would have created the worst of both worlds and our average execution time observed by criterium would be 40x worse than baseline (that is, 20x worse than the initial implementation based on count.)

Arrays

Type hinting Java .instanceMember accesses like above is common. But type hints are essential for performant invocation of Clojure’s array functions, as well.

For example, this:

1
(alength my-arr)

is 1000 times more expensive when compiled without type information. The following clocks in at 5.491867 µs mean execution time:

1
2
3
(let [my-arr (identity (long-array 5))]
   (report-result
      (quick-benchmark (alength my-arr) {})))

By removing the (identity) wrapper there (so that Clojure can infer the array type), or by type hinting like so:

1
(alength ^longs my-arr)

…the invocation will reduce to 4.12 ns. Huge.

As a final note, clean benchmarking requires us to be aware that no subtle implicit optimizations are happening where we don’t expect them. Just above we wrapped our array construction with identity to “hide” the type of the array from Clojure’s compiler.

no.disassemble can come in handy here and let us see if reflection is kicking in:

1
2
3
4
5
6
(require '[no.disassemble :as nd])
(nd/disassemble (fn [] (let [my-arr (identity (long-array 5))] (alength my-arr))))

...
38  invokestatic clojure.lang.Reflector.invokeStaticMethod(java.lang.Class, java.lang.String, java.lang.Object[]) : java.lang.Object [53]
...

Compare this to the disassembled bytecode when type-hinting or inference is used:

1
2
3
4
5
(nd/disassemble (fn [] (let [my-arr (long-array 5)] (alength my-arr))))

...
15  invokestatic clojure.lang.RT.alength(long[]) : int [29]
...

We can see clearly that Clojure RT/alength is directly invoked with the proper array type; avoiding reflection for the enormous performance gain.

The Cost of Destructuring

Clojure’s destructuring is elegant and generally performant, but we can get significant performance wins by avoiding it when it matters.

clojure.core/destructure gives us a peek at its mechanics:

1
2
3
4
5
6
(destructure '[[a b c] [1 2 3]])

[vec__3071 [1 2 3]
 a (clojure.core/nth vec__3071 0 nil)
 b (clojure.core/nth vec__3071 1 nil)
 c (clojure.core/nth vec__3071 2 nil)]

destructuring sequential collections, as we can see, leverages nth which evaluates a series of “reflective” conditionals (much like count, as we saw above) to determine the type of its arg in order to dispatch the correct behavior.

How does nth’s performance compare with other facilities?

Arrays

Compare nth on an array vs using aget directly:

1
2
3
4
5
6
7
8
9
10
(let [v (long-array [1 2 3 4 5])]
  (report-result
    (quick-benchmark
      (nth v 4) {}))
  ;; Execution time mean :  114.954441 ns

  (report-result
    (quick-benchmark
      (aget v 4) {}))
  ;; Execution time mean : 5.319098 ns

The aget is a 30x gain over the nth (and destructured) variants. (Again note that the argument to aget should be type-inferred/hinted or else we’ll find it to perform the worst of any variant.)

Vectors

Can destructuring Clojure’s first-class vector types be improved upon?

Consider:

1
2
3
4
5
6
7
8
9
 (let [v [1 2 3 4 5]]
    (report-result
      (quick-benchmark
        (nth v 2) {}))))
;; Execution time mean : 5.9 ns
    (report-result
      (quick-benchmark
       (.nth ^IPersistentVector v 2))))
;; Execution time mean : 3.8 ns

A modest, yet noticeable, 1.5x improvement.

Narrow results like this one are a perfect time to apply suspicion. Let’s attempt to cross-check our benchmark using another tool, JMH—if at least to demonstrate how to use JMH to benchmark Clojure behavior:

Benchmarking Clojure with JMH

JMH requires you to write your benchmarks as Java methods annotated with @Benchmark. To invoke Clojure we will need to rely on Java-to-Clojure interop.

In order to analyze our .nth vs nth performance from JMH, we can compile (eval) Clojure function defs from Java:

1
2
3
4
5
6
    static IFn eval_ = Clojure.var("clojure.core", "eval");
    static IFn dot_nth_fn =
      (IFn) eval_.invoke(Clojure.read(
         "(fn [^clojure.lang.Indexed x i] (.nth x i :not-found))"));
    static IFn nth_fn =
      (IFn) eval_.invoke(Clojure.read("(fn [x i] (nth x i :not-found))"));

We will also need some data to invoke our function; we’ll use a static value in this case (though for more sophisticated benchmarks we could leverage declarative JMH params to analyze how performance changes with input):

1
2
static IPersistentVector cljVector =
    (IPersistentVector) eval_.invoke(Clojure.read("(vec (range 25))"));

And now we can declare our benchmark methods:

1
2
3
4
5
6
7
8
9
10
11
12
13
    @Benchmark
    @BenchmarkMode(Mode.AverageTime)
    @OutputTimeUnit(TimeUnit.NANOSECONDS)
    public Object dot_nth_vector() {
        return dot_nth_fn.invoke(cljVector, 2);
    }

    @Benchmark
    @BenchmarkMode(Mode.AverageTime)
    @OutputTimeUnit(TimeUnit.NANOSECONDS)
    public Object nth_vector() {
        return nth_fn.invoke(cljVector, 2);
    }

(Note: some of the boilerplate jmh setup has been omitted in these code snippets, the omitted boilerplate is well-documented here.)

Here are our results using Java HotSpot version 1.8.0_241:

1
2
3
Benchmark                           Mode  Cnt  Score   Error  Units
ClojureNthBenchmark.dot_nth_vector  avgt   10  4.380 ± 0.407  ns/op
ClojureNthBenchmark.nth_vector      avgt   10  6.034 ± 0.156  ns/op

It seems we’ve confirmed our 1.5x gain with JMH, almost to a tee. Still, skepticism should be the rule in benchmark-land. One advantage of JMH is that our benchmarks end up declaratively codified and can be readily recompiled and re-run with various configurations.

Not changing any code, we can recompile using Java 14, and see the above disparity presumably optimized away:

1
2
ClojureNthBenchmark.dot_nth_vector  avgt   10  3.506 ± 0.281  ns/op
ClojureNthBenchmark.nth_vector      avgt   10  3.301 ± 0.376  ns/op

Instead of spending the rest of this section running through permutations of JMH configuration, machine type, and JDK versions, we will leave this to the curious reader. The primary goal here is to stoke suspicion, especially when benchmarking results are narrow.

As a last note on being effective with JMH, we can tidy up our experience a little bit by not having to eval-compile functions defined in Java Strings from within Java code, as we did above. We can define namespaces of benchmark-able functions and load/compile them from our JMH Java class:

1
2
3
4
5
6
static final IFn require = Clojure.var("clojure.core", "require");
static {
   require.invoke(Clojure.read("com.acme.my-benchmarking-suite"));
}
IFn dot_nth_fn =
    (IFn) Clojure.var("com.acme.my-benchmarking-suite", "dot-nth");

Short-Circuiting

Like several of Clojure’s dynamic functions, in the source code of the nth function, we see a series of logical branches to select target behavior based on the provided argument type. The first of these checks in nth is Clojure’s Indexed type; by making this the first dynamic type check in its conditional reflection logic, nth short-circuits in the most common case of a Clojure collection datatype.

We should apply the same trick to our own code. When short-cutting on conditional logic in our code (say, for one example, when using cond), we benefit by placing the most-commonly true conditions first. Here is a contrived example:

1
2
3
4
5
6
7
8
9
10
(let [x "2020"]
    (report-result
      (quick-benchmark
        (cond
          (instance? Long x) :long
          (instance? Integer x) :int
          (instance? Short x) :short
          (instance? Date x) :date
          (instance? String x) :string) {})))
;; Execution time mean : 6.4 ns

If String is the most commonly flowing type, we pay a big price⁠—by moving the String instance check to the top we reduce to 2.4 ns, a 2.5x improvement.

Final Notes

Keep in mind the famous Kent Beck-attributed aphorism “Make it work, then it make it fast,” or the Donald Knuth-attributed “Premature optimization is the root of all evil.” Generally Clojure makes for high developer productivity and solid performance out of the box; but for areas where optimal performance is a must, getting there from within Clojure is well within reach.


Footnotes

¹ In the end, and rarely necessary, one can always hand-roll Java code and invoke it from Clojure. There is always a way to full optimization if tuning the Clojure itself is not obvious; Clojure’s Java interop is solid and Lein and Maven are both capable of simple polyglot build configurations.

² Some of the fodder for this post came from performance tuning my open-source project thurber; one overall outcome of that effort was a 2x reduction in auto-scaling costs for the game engine Beam pipelines (used as integration tests for that project) when run on the GCP Cloud Dataflow platform. But careful focus on optimizing the specific hot code paths was key to that top-level outcome.

Serializing and Deserializing Clojure Fns with Nippy

We use Nippy at Red Planet Labs whenever we need general purpose serialization / deserialization of Clojure objects. It’s easy to use, fast, and covers virtually everything you could ever want to serialize in a Clojure program.

One notable exception is Clojure fns. There are several places in the distributed computing application we’re developing where storing or transmitting a serialized fn is really the best option. It’s worth talking about what we mean when we say serialized fn. We aren’t looking to serialize and distribute code — we already have a good mechanism for that. (It’s called a jar.) Instead, what we want to do is transmit fn instances between processes as a way to capture state and intention. Clojure already provides the primitives needed to treat a fn declared in a lexical context as an object; we just want to extend that so it works across process boundaries.

To be clear, this isn’t exactly a deficiency in Nippy. There are a lot of good reasons why you should never do this! The semantics of serializing a fn instance at one point in time and deserializing it again later, possibly in another process or even on another machine, leaves a lot of opportunities for mistakes to be made. Suffice it to say that your use cases should make it easy to ensure that fn implementations — the actual code — are always available when and where you deserialize an instance, and that they remain consistent across time. In our case, these characteristics are relatively easy to guarantee, so we can use this strategy safely.

One of the best things about Nippy is how easy it is to extend via the extend-freeze and extend-thaw APIs. Using this extension mechanism, we added transparent support for serializing and deserializing fn instances. Once you require the com.rpl.nippy-serializable-fn namespace, nippy/freeze! and nippy/thaw! will handle fn instances, either at the top level or nested within other objects, without any additional syntax:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
(ns example
  (:require [taoensso.nippy :as nippy]
            [com.rpl.nippy-serializable-fn]))
 
(defn my-fn [a b c]
  (+ a b c))

(def thawed-myfn
  (nippy/thaw! (nippy/freeze! my-fn)))
(thawed-myfn 1 2 3) ; 6

(let [x 10
      afn (fn [a b c] (+ a b c x))      
      fn-bytes (nippy/freeze! afn)      
      thawed-fn (nippy/thaw! fn-bytes)]  
  (thawed-fn 1 2 3) ; 16  
  )

And that’s it! In our experiments, performance of freezing and thawing fns was comparable to freezing and thawing Clojure Records with similar amounts of “stuff” in them. You can find this extension on the Red Planet Labs GitHub, or add it as a dependency via Clojars.

A look under the hood

From the programmer perspective, there are (broadly) two “kinds” of fns in Clojure: declared fns (via defn) and anonymous fns (via (fn [] …)). But from an implementation perspective, what we really end up caring about is two different factors: whether the fn has anything in its closure, and whether it has metadata associated. At the highest level, to serialize a fn, we need to capture the “type” of it, anything in the closure, and any associated metadata. If we have all those things serialized in the byte stream, then we can reconstruct that fn instance at a later point in time.

Let’s take a quick side track into how Clojure actually implements fns. At compile time, Clojure generates a Java class for each fn. The most obvious part of this class is the invoke method, but there’s also a constructor, and possibly some fields (more on these later). When you interact with a fn instance in regular Clojure code, you are interacting with an instance of this generated Java class. When you invoke a Clojure fn instance, you are literally calling the invoke method with the provided arguments.

When a fn instance has nothing in it’s closure, then it’s really “static” in the Java sense of the term. Invocation relies only on arguments to the invoke method, not on any additional context. Whether you’re dealing with a declared fn or an anonymous fn, all you need to serialize in order to capture the essence of this kind of fn is the generated Java class name. It can be deserialized just by reading the name out of the byte stream and instantiating the fn by class name. We go a little farther here and make sure that when you deserialize a declared fn, you actually get the singleton instance stored in the named global var.

What about when you have a fn with a lexical closure? This is where the generated constructor and fields come into play. The Clojure compiler determines referenced local vars at compile time, and then adds a constructor argument and a corresponding protected field for each one. When the resulting fn is instantiated at runtime, the constructor is called with the values of local vars, capturing them for future invocation. Their invoke method implementation refers to both the invoking arguments and the constructing arguments, providing all the needed context to complete computation.

To serialize fns with a closure, we need to serialize the type along with all the values stored in the protected fields. Luckily, detecting fields used for storing closure values is easy and reliable, so all we need to do is get ahold of those values and recursively serialize them. The tricky part is that, naively, the only way to do this is via Java Reflection, which is a big performance no-no. The good news is that the JVM has continued to improve in this department, and now offers a much more sophisticated and high performance API for doing “reflective” operations called MethodHandles. A full discussion of MethodHandles is outside the scope of this post, but the key detail is that a MethodHandle has virtually identical performance to regular method invocation or direct field access, but can be constructed from Java Reflection primitives. We use this combination of APIs to dynamically analyze a fn instance’s closure fields and then generate an efficient serializer function, so the Java Reflection costs are only paid once per unique fn type encountered.

The other top-level case we mentioned above is fns with metadata. This is something most Clojure programmers encounter primarily through protocol fns. The implementation of these fns is different yet again — all fns with metadata are an instance of a special wrapper class “clojure.lang.AFunction$1”, which does nothing except hold a metadata map alongside the actual fn instance, and dispatches invocations down to the actual fn. When we try to freeze a fn like this, all we have to do is serialize the metadata and the unwrapped fn in sequence using Nippy and we’re done!