Next-level backends with Rama: storing and traversing graphs in 60 LOC

This is the first of a series of posts exploring programming with Rama, ranging from interactive consumer apps, high-scale analytics, background processing, recommendation engines, and much more. This tutorial is self-contained, but for broader information about Rama and how it reduces the cost of building backends so much (up to 100x for large-scale backends), see our website.

Like all Rama applications, the example in this post requires very little code. It’s easily scalable to millions of reads/writes per second, ACID compliant, high performance, and fault-tolerant from how Rama incrementally replicates all state. Deploying, updating, and scaling this application are all one-line CLI commands. No other infrastructure besides Rama is needed. Comprehensive monitoring on all aspects of runtime operation is built-in.

In this post, I’ll explore building the backend for storing a graph and implementing fast queries that traverse the graph. Code will be shown in both Clojure and Java, with the total code being about 60 lines for each implementation. You can download and play with the Clojure implementation in this repository or the Java implementation in this repository.

There are endless use cases that utilize graphs, each one with particularities on exactly how the graph should be represented. To keep the post focused, I’ll demonstrate graphs with Rama with the use case of storing and querying family trees. It should be easy to see how the techniques in this post can be tweaked for any graph use case.

Family trees are technically directed acyclic graphs (except when someone is their own grandfather) where every node has two parents and any number of children.

Indexed datastores in Rama, called PStates (“partitioned state”), are much more powerful and flexible than databases. Whereas databases have fixed data models, PStates can represent infinite data models due to being based on the composition of the simpler primitive of data structures. PStates are distributed, durable, high-performance, and incrementally replicated. Each PState is fine-tuned to what the application needs, and an application makes as many PStates as needed. In this case, we only need one PState to represent the graph. Here’s how the PState is declared in the application we’ll write:

JavaClojure
1
2
3
4
5
6
7
8
9
topology.pstate(
  "$$family-tree",
  PState.mapSchema(UUID.class,
                   PState.fixedKeysSchema(
                     "parent1", UUID.class,
                     "parent2", UUID.class,
                     "name", String.class,
                     "children", PState.setSchema(UUID.class)
                     )));
1
2
3
4
5
6
7
8
(declare-pstate
  topology
  $$family-tree
  {UUID (fixed-keys-schema
          {:parent1 UUID
           :parent2 UUID
           :name String
           :children #{UUID}})})

This declares the PState as a map of maps, where the inner map has a pre-determined set of keys each with their own schema. People are identified by a UUID, and each person has fields for their two parents, name, and children. Children are represented as a set of IDs.

Note that we’re not using a generic graph schema for this use case, where every node would have any number of outgoing nodes and any number of incoming nodes. This is the way you would have to store your data if you were using a graph database (due to its fixed data model). By tuning our PState to exactly what’s needed by the application, we’re able to trivially enforce that each person has exactly two parents and specify a tight schema as to what’s allowed for the other fields. By representing the children as a set instead of a list, we’re also able to enforce that a child doesn’t appear twice for the same parent. A graph database allowing multiple edges between nodes would not enforce this.

The queries we’ll implement on family trees will be:

  • Who are all the ancestors of a person ID within N generations?
  • How many direct descendants does a person have in each successive generation?

Rama concepts

A Rama application is called a “module”. In a module you define all the storage and implement all the logic needed for your backend. All Rama modules are event sourced, so all data enters through a distributed log in the module called a “depot”. Most of the work in implementing a module is coding “ETL topologies” which consume data from one or more depots to materialize any number of PStates. Modules look like this at a conceptual level:

Modules can have any number of depots, topologies, and PStates, and clients interact with a module by appending new data to a depot or querying PStates. Although event sourcing traditionally means that processing is completely asynchronous to the client doing the append, with Rama that’s optional. By being an integrated system Rama clients can specify that their appends should only return after all downstream processing and PState updates have completed.

A module deployed to a Rama cluster runs across any number of worker processes across any number of nodes, and a module is scaled by adding more workers. A module is broken up into “tasks” like so:

A “task” is a partition of a module. The number of tasks for a module is specified on deploy. A task contains one partition of every depot and PState for the module as well as a thread and event queue for running events on that task. A running event has access to all depot and PState partitions on that task. Each worker process has a subset of all the tasks for the module.

Coding a topology involves reading and writing to PStates, running business logic, and switching between tasks as necessary.

Materializing the PState

Let’s start implementing the family tree application by defining the code to materialize the PState, and then we’ll implement the queries. The first step to coding the module is defining the depot:

JavaClojure
1
2
3
4
5
6
public class FamilyTreeModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*people-depot", Depot.hashBy("id"));
  }
}
1
2
3
4
(defmodule FamilyTreeModule
  [setup topologies]
  (declare-depot setup *people-depot (hash-by :id))
  )

This declares a Rama module called “FamilyTreeModule” with a depot called *people-depot which will receive all new person information. Objects appended to a depot can be any type. The second argument of declaring the depot is called the “depot partitioner” – more on that later.

To keep the example simple, the data appended to the depot will be defrecord objects for the Clojure version and HashMap objects for the Java version. To have a tighter schema on depot records you could instead use Thrift, Protocol Buffers, or a language-native tool for defining the types. Here’s the function that will be used to create depot data:

JavaClojure
1
2
3
4
5
6
7
8
public static Map createPerson(UUID id, UUID parent1, UUID parent2, String name) {
  Map ret = new HashMap();
  ret.put("id", id);
  ret.put("parent1", parent1);
  ret.put("parent2", parent2);
  ret.put("name", name);
  return ret;
}
1
(defrecord Person [id parent1 parent2 name])

Next, let’s begin defining the topology to consume data from the depot and materialize the PState. Here’s the declaration of the topology with the PState:

JavaClojure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FamilyTreeModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*people-depot", Depot.hashBy("id"));
    StreamTopology topology = topologies.stream("core");
    topology.pstate(
      "$$family-tree",
      PState.mapSchema(UUID.class,
                       PState.fixedKeysSchema(
                         "parent1", UUID.class,
                         "parent2", UUID.class,
                         "name", String.class,
                         "children", PState.setSchema(UUID.class)
                         )));
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
(defmodule FamilyTreeModule
  [setup topologies]
  (declare-depot setup *people-depot (hash-by :id))
  (let [topology (stream-topology topologies "core")]
    (declare-pstate
      topology
      $$family-tree
      {UUID (fixed-keys-schema
              {:parent1 UUID
               :parent2 UUID
               :name String
               :children #{UUID}})})
    ))

This defines a stream topology called “core”. Rama has two kinds of topologies, stream and microbatch, which have different properties. In short, streaming is best for interactive applications that need single-digit millisecond update latency, while microbatching has update latency of a few hundred milliseconds and is best for everything else. Streaming is used here under the assumption that a family tree application would want immediate feedback on a new person being added to the system.

Notice that the PState is defined as part of the topology. Unlike databases, PStates are not global mutable state. A PState is owned by a topology, and only the owning topology can write to it. Writing state in global variables is a horrible thing to do, and databases are just global variables by a different name.

Since a PState can only be written to by its owning topology, they’re much easier to reason about. Everything about them can be understood by just looking at the topology implementation, all of which exists in the same program and is deployed together. Additionally, the extra step of appending to a depot before processing the record to materialize the PState does not lower performance, as we’ve shown in benchmarks. Rama being an integrated system strips away much of the overhead which traditionally exists.

Let’s now add the code to materialize the PState:

JavaClojure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FamilyTreeModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*people-depot", Depot.hashBy("id"));
    StreamTopology topology = topologies.stream("core");
    topology.pstate(
      "$$family-tree",
      PState.mapSchema(UUID.class,
                       PState.fixedKeysSchema(
                         "parent1", UUID.class,
                         "parent2", UUID.class,
                         "name", String.class,
                         "children", PState.setSchema(UUID.class)
                         )));
    topology.source("*people-depot").out("*person")
            .each(Ops.GET, "*person", "id").out("*id")
            .each(Ops.GET, "*person", "parent1").out("*parent1")
            .each(Ops.GET, "*person", "parent2").out("*parent2")
            .each(Ops.GET, "*person", "name").out("*name")
            .localTransform(
              "$$family-tree",
              Path.key("*id")
                  .multiPath(Path.key("parent1").termVal("*parent1"),
                             Path.key("parent2").termVal("*parent2"),
                             Path.key("name").termVal("*name")))
            .each(Ops.TUPLE, "*parent1", "*parent2").out("*parents")
            .each(Ops.EXPLODE, "*parents").out("*parent")
            .hashPartition("*parent")
            .localTransform(
              "$$family-tree",
              Path.key("*parent", "children").voidSetElem().termVal("*id"));
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(defmodule FamilyTreeModule
  [setup topologies]
  (declare-depot setup *people-depot (hash-by :id))
  (let [topology (stream-topology topologies "core")]
    (declare-pstate
      topology
      $$family-tree
      {UUID (fixed-keys-schema
              {:parent1 UUID
               :parent2 UUID
               :name String
               :children #{UUID}})})
   (<<sources topology
     (source> *people-depot :> {:keys [*id *parent1 *parent2] :as *person})
     (local-transform>
       [(keypath *id) (termval (dissoc *person :id))]
       $$family-tree)
     (ops/explode [*parent1 *parent2] :> *parent)
     (|hash *parent)
     (local-transform>
       [(keypath *parent) :children NONE-ELEM (termval *id)]
       $$family-tree)
     )))

The code to implement the topology is only a few lines, but there’s a lot to unpack here. At a high level, this creates the new node for the person with its attributes filled in, and then it updates each parent to list the new person as a child.

The business logic is implemented with dataflow. Rama’s dataflow API is exceptionally expressive, able to intermix arbitrary business logic with loops, conditionals, and moving computation between tasks. This post is not going to explore all the details of dataflow as there’s simply too much to cover. Full tutorials for Rama dataflow can be found on our website for the Java API and for the Clojure API.

Let’s go over this topology implementation line by line, and I’ll explain the details of what’s happening as we go. The first step is subscribing to the depot:

JavaClojure
1
topology.source("*people-depot").out("*person")
1
2
(<<sources topology
  (source> *people-depot :> {:keys [*id *parent1 *parent2] :as *person})

This subscribes the topology to the depot *people-depot and starts a reactive computation on it. Operations in dataflow do not return values. Instead, they emit values that are bound to new variables. In the Clojure API, the input and outputs to an operation are separated by the :> keyword. In the Java API, output variables are bound with the .out method.

Whenever data is appended to that depot, the data is emitted into the topology into the variable *person . All variables in Rama code begin with a * . The subsequent code runs for every single emit.

Remember that last argument to the depot declaration called the “depot partitioner”? That’s relevant here. Here’s that image of the physical layout of a module again:

The depot partitioner determines on which task the append happens and thereby on which task computation begins for subscribed topologies. In this case, the depot partitioner says to hash by the “id” field of the appended data. The target task is computed by taking the hash and modding it by the total number of tasks. This means data with the same ID always go to the same task, while different IDs are evenly spread across all tasks.

Rama gives a ton of control over how computation and storage are partitioned, and in this case we’re partitioning by the hash of the ID since that’s how we want the PState to be partitioned. This allows us to easily locate the PState partition storing data for any particular ID.

The next bit of code updates the PState for the new person with all attributes filled in:

JavaClojure
1
2
3
4
5
6
7
8
9
10
.each(Ops.GET, "*person", "id").out("*id")
.each(Ops.GET, "*person", "parent1").out("*parent1")
.each(Ops.GET, "*person", "parent2").out("*parent2")
.each(Ops.GET, "*person", "name").out("*name")
.localTransform(
  "$$family-tree",
  Path.key("*id")
      .multiPath(Path.key("parent1").termVal("*parent1"),
                 Path.key("parent2").termVal("*parent2"),
                 Path.key("name").termVal("*name")))
1
2
3
(local-transform>
  [(keypath *id) (termval (dissoc *person :id))]
  $$family-tree)

In the Clojure version, the ID for the person was destructured from the object into the variable *id on the source> line. In the Java version, the Ops.GET function is run on the *person map to fetch all the fields into variables of the same name.

The PState is updated with the “local transform” operation. The transform takes in as input the PState $$family-tree and a “path” specifying what to change about the PState. When a PState is referenced in dataflow code, it always references the partition of the PState that’s located on the task on which the event is currently running.

Paths are a deep topic, and the full documentation for them can be found here. A path is a sequence of “navigators” that specifies how to hop through a data structure to target values of interest. A path can target any number of values, and they’re used for both transforms and queries.

In the Clojure version, the path is very basic and navigates to exactly one value. It first navigates by the key in the variable *id which hops to the inner map for that ID. The “term val” navigator then says to replace whatever’s there with the provided value, which is just the person object from the depot with the “id” field removed.

In the Java version, the fields of the inner map are set by the path individually. It first navigates to the inner map by the key *id . Then multiPath does three separate navigations from that point for each of the fields, setting them to the values from the *person map.

The next part of the topology relocates computation to each parent in order to update their “children” field:

JavaClojure
1
2
3
.each(Ops.TUPLE, "*parent1", "*parent2").out("*parents")
.each(Ops.EXPLODE, "*parents").out("*parent")
.hashPartition("*parent")
1
2
(ops/explode [*parent1 *parent2] :> *parent)
(|hash *parent)

First, the two parents are put into a single list. This is done in the Clojure version by just putting *parent1 and *parent2 into a vector, and in the Java version it’s done with the call to Ops.TUPLE .

Then the “explode” operation is called on that list, which emits one time for each element of the list. In this case it emits twice, once for each parent, and the subsequent code runs for each parent. Putting the two parents into one list and then exploding it is an easy way to share the code for updating the two parents.

What’s happening here is very different from “regular” programming, where you call a function which returns one value as the result. In dataflow programming, operations can emit any number of times. You can think of the output of an operation as being the input arguments to the rest of the topology. When an operation emits, it invokes the “rest of the topology” with those arguments (the “rest of the topology” is also known as the “continuation”). This is a powerful generalization of the concept of a function, which as you’re about to see enables very elegant code.

The final line in this code does a “hash partition” by the value of *parent . Partitioners relocate subsequent code to potentially a new task, and a hash partitioner works exactly like the aforementioned depot partitioner. The details of relocating computation, like serializing and deserializing any variables referenced after the partitioner, are handled automatically. The code is linear without any callback functions even though partitioners could be jumping around to different tasks on different nodes.

The way the code is structured also causes the children for the two parents to be updated in parallel, which lowers the latency of the topology.

The final part of the topology updates the “children” field for each parent:

JavaClojure
1
2
3
.localTransform(
  "$$family-tree",
  Path.key("*parent", "children").voidSetElem().termVal("*id"));
1
2
3
(local-transform>
  [(keypath *parent) :children NONE-ELEM (termval *id)]
  $$family-tree)

This is just like the previous transform, except it adds one element to the children set instead of replacing the entire inner map. It navigates first to the inner map for the *parent key and then to the “children” field within that map. The next navigator, called NONE-ELEM in Clojure and voidSetElem in Java, navigates to the “void” element of the set. Setting that “void” element to a value causes that value to be added to that set.

That completes everything involved in materializing that PState. In just 20 lines of code we’ve implemented the equivalent of a graph database, except tailored to match our use case exactly.

Rama’s dataflow API is as expressive as a full programming language with the additional power of making it easy to distribute computation. What you’ve seen in this section is just a small taste of what it can do.

Querying the PState directly

The module already supports many queries just through its PState, which can be queried directly. Here’s an example of how you would get a client to the PState, such as in your web server:

JavaClojure
1
2
3
4
Map config = new HashMap();
config.put("conductor.host", "1.2.3.4");
RamaClusterManager manager = RamaClusterManager.open(config);
PState familyTreePState = manager.clusterPState("nlb.FamilyTreeModule", "$$family-tree");
1
2
(def manager (open-cluster-manager {"conductor.host" "1.2.3.4"}))
(def family-tree-pstate (foreign-pstate manager "nlb.family-tree/FamilyTreeModule" "$$family-tree"))

A “cluster manager” connects to a Rama cluster by specifying the location of its “Conductor” node. The “Conductor” node is the central node of a Rama cluster, which you can read more about here. From the cluster manager, you can retrieve clients to any depots or PStates for any module.

Let’s look at a few examples of querying the $$family-tree PState using this client. This query fetches the name for the person with the UUID “aa7d7c5c-0679-4b01-b40b-cb3c20065549”:

JavaClojure
1
2
String name = familyTreePState.selectOne(Path.key(UUID.fromString("aa7d7c5c-0679-4b01-b40b-cb3c20065549"),
                                                  "name"));
1
2
3
(def name (foreign-select-one [(keypath (UUID/fromString "aa7d7c5c-0679-4b01-b40b-cb3c20065549"))
                               :name]
                              family-tree-pstate))

As mentioned before, querying uses the exact same path API as used for transforms. This path just navigates to the “name” field for that person in the PState.

Here’s a query to fetch the number of children for the person:

JavaClojure
1
2
3
int numChildren = familyTreePState.selectOne(Path.key(UUID.fromString("aa7d7c5c-0679-4b01-b40b-cb3c20065549"),
                                                      "children")
                                                 .view(Ops.SIZE));
1
2
3
4
(def num-children (foreign-select-one [(keypath (UUID/fromString "aa7d7c5c-0679-4b01-b40b-cb3c20065549"))
                                       :children
                                       (view count)]
                                      family-tree-pstate))

This path navigates to the “children” field for that person and then counts it using a count function. Paths execute completely server-side, so the only information sent from the client to the task is the path, and the only information sent back is the count. Any function can be provided to the “view” navigator, giving a ton of power and flexibility to PState queries. These functions are just regular Java or Clojure functions and don’t need any special registration.

You could do traversal queries through PState clients, but that would require many roundtrips to navigate from parent to children or vice-versa. More importantly, the ability to parallelize those traversals client-side is limited. Instead, the next section will show how to easily implement traversal queries as on-demand distributed computations with a mechanism called “query topologies”.

Implementing the first graph traversal query

Here again are the traversal queries we wish to implement:

  • Who are all the ancestors of a person within N generations?
  • How many direct descendants does a person have in each successive generation?

These will be implemented with predefined queries in the module called “query topologies”. Query topologies are programmed with the exact same dataflow API as used before to implement the ETL topology.

Here’s the module with the query topology added to get all ancestors of a person within N generations:

JavaClojure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class FamilyTreeModule implements RamaModule {
  @Override
  public void define(Setup setup, Topologies topologies) {
    setup.declareDepot("*people-depot", Depot.hashBy("id"));
    StreamTopology topology = topologies.stream("core");
    topology.pstate(
      "$$family-tree",
      PState.mapSchema(UUID.class,
                       PState.fixedKeysSchema(
                         "parent1", UUID.class,
                         "parent2", UUID.class,
                         "name", String.class,
                         "children", PState.setSchema(UUID.class)
                         )));
    topology.source("*people-depot").out("*person")
            .each(Ops.GET, "*person", "id").out("*id")
            .each(Ops.GET, "*person", "parent1").out("*parent1")
            .each(Ops.GET, "*person", "parent2").out("*parent2")
            .each(Ops.GET, "*person", "name").out("*name")
            .localTransform(
              "$$family-tree",
              Path.key("*id")
                  .multiPath(Path.key("parent1").termVal("*parent1"),
                             Path.key("parent2").termVal("*parent2"),
                             Path.key("name").termVal("*name")))
            .each(Ops.TUPLE, "*parent1", "*parent2").out("*parents")
            .each(Ops.EXPLODE, "*parents").out("*parent")
            .hashPartition("*parent")
            .localTransform(
              "$$family-tree",
              Path.key("*parent", "children").voidSetElem().termVal("*id"));

    topologies.query("ancestors", "*start-id", "*num-generations").out("*ancestors")
              .loopWithVars(LoopVars.var("*id", "*start-id")
                                    .var("*generation", 0),
                Block.keepTrue(new Expr(Ops.LESS_THAN_OR_EQUAL, "*generation", "*num-generations"))
                     .hashPartition("*id")
                     .localSelect("$$family-tree",
                                  Path.key("*id")
                                      .multiPath(Path.key("parent1"),
                                                 Path.key("parent2"))
                                      .filterPred(Ops.IS_NOT_NULL)).out("*parent")
                     .emitLoop("*parent")
                     .continueLoop("*parent", new Expr(Ops.INC, "*generation"))).out("*ancestor")
              .originPartition()
              .agg(Agg.set("*ancestor")).out("*ancestors");
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
(defmodule FamilyTreeModule
  [setup topologies]
  (declare-depot setup *people-depot (hash-by :id))
  (let [topology (stream-topology topologies "core")]
    (declare-pstate
      topology
      $$family-tree
      {UUID (fixed-keys-schema
              {:parent1 UUID
               :parent2 UUID
               :name String
               :children #{UUID}})})
   (<<sources topology
     (source> *people-depot :> {:keys [*id *parent1 *parent2] :as *person})
     (local-transform>
       [(keypath *id) (termval (dissoc *person :id))]
       $$family-tree)
     (ops/explode [*parent1 *parent2] :> *parent)
     (|hash *parent)
     (local-transform>
       [(keypath *parent) :children NONE-ELEM (termval *id)]
       $$family-tree)
     ))
  (<<query-topology topologies "ancestors"
    [*start-id *num-generations :> *ancestors]
    (loop<- [*id *start-id
             *generation 0
             :> *ancestor]
      (filter> (<= *generation *num-generations))
      (|hash *id)
      (local-select> [(keypath *id) (multi-path :parent1 :parent2) some?]
        $$family-tree
        :> *parent)
      (:> *parent)
      (continue> *parent (inc *generation)))
    (|origin)
    (aggs/+set-agg *ancestor :> *ancestors))
  )

The implementation iteratively looks at the parents of a node keeping a counter of how many generations back it has traversed so far. All ancestors are aggregated into a set for the return value. Let’s go through it line by line, starting with the declaration of the topology:

JavaClojure
1
topologies.query("ancestors", "*start-id", "*num-generations").out("*ancestors")
1
2
(<<query-topology topologies "ancestors"
  [*start-id *num-generations :> *ancestors]

This declares a query topology named “ancestors” that takes in input arguments *start-id and *num-generations . It declares the return variable *ancestors , which will be bound by the end of the topology execution.

The next line starts traversal of the family tree graph by initiating a loop:

JavaClojure
1
2
.loopWithVars(LoopVars.var("*id", "*start-id")
                      .var("*generation", 0),
1
2
3
(loop<- [*id *start-id
         *generation 0
         :> *ancestor]

Loops in dataflow first declare “loop variables” which are in scope for the body of the loop and are set to new values each time the loop is recurred. Here the two variables *id and *generation are declared, and they are initialized to *start-id and 0 for the first iteration of the loop. Loops can be emitted from any number of times (with :> in Clojure and .emitLoop in Java), and each emit runs the code after the loop with the emitted values. The Clojure version binds the emitted value from this loop as *ancestor in the binding vector, while that’s specified after the loop body in the Java version.

Then next line of code checks whether the query has traversed too far:

JavaClojure
1
Block.keepTrue(new Expr(Ops.LESS_THAN_OR_EQUAL, "*generation", "*num-generations"))
1
(filter> (<= *generation *num-generations))

This operation, filter> in Clojure and .keepTrue in Java, emits exactly one time if its input is true and otherwise doesn’t emit. Just like how the “explode” call differs from regular functions by emitting many times, this differs from regular functions by potentially not emitting at all. No values are captured for the output since this operation doesn’t emit any values when it emits, but you can still think of its emit as invoking the “rest of the topology”.

The *generation variable keeps track of how many parents have been traversed, and this code stops execution of the loop iteration if *generation is greater than the *num-generations parameter from the invoke of the query.

The next lines fetch the two parents for node:

JavaClojure
1
2
3
4
5
6
.hashPartition("*id")
.localSelect("$$family-tree",
             Path.key("*id")
                 .multiPath(Path.key("parent1"),
                            Path.key("parent2"))
                 .filterPred(Ops.IS_NOT_NULL)).out("*parent")
1
2
3
4
(|hash *id)
(local-select> [(keypath *id) (multi-path :parent1 :parent2) some?]
  $$family-tree
  :> *parent)

The hash partition, just like the ETL topology, moves the computation to the task containing information for that ID in the $$family-tree PState. The local select call selects both parents and emits them. The “multi path” navigator causes this local select to emit twice, once for each parent. The filter at the end of the path causes it not to emit parents that are null, which are nodes that have no parents specified.

The next line emits the found ancestor from the loop:

JavaClojure
1
.emitLoop("*parent")
1
(:> *parent)

As mentioned before, this runs the code following the loop with that value bound to the variable *ancestor .

The next line invokes another iteration of the loop with that node:

JavaClojure
1
.continueLoop("*parent", new Expr(Ops.INC, "*generation"))
1
(continue> *parent (inc *generation))

This runs the loop from the start, setting the value of *id to *parent and *generation to one more than it was before.

Something very different from loops in languages like Java or Clojure is happening here. The loop is being continued multiple times in one iteration, once for each parent. Along with the hash partitioner, this is causing the loop to recur an ever increasing number of times in parallel across the cluster until iterations reach the generation limit and filter themselves out. This is a very elegant way to express a parallel traversal.

The next line is declared after the loop and runs for each emit from the loop:

JavaClojure
1
.originPartition()
1
(|origin)

The “origin partitioner” relocates computation to the task where the query began execution. It gets all the emitted ancestors onto the same task so they can be aggregated and returned.

The last line of the query topology aggregates the ancestors together:

JavaClojure
1
.agg(Agg.set("*ancestor")).out("*ancestors");
1
(aggs/+set-agg *ancestor :> *ancestors)

Up until here, the query topology has been just like the ETL topology. Every line processed emits from the preceding line and emitted some number of times itself. Aggregators in dataflow are different in that they’re collecting all emits that happened in the execution of the query topology and combining them into a single value.

Query topologies are “batch blocks”, which have expanded dataflow capabilities including aggregation. Batch blocks can also do inner joins, outer joins, subqueries, and everything else possible with relational languages. You can find the full documentation for batch blocks here, with Clojure-specific documentation here.

For this query, you may be wondering about the case of the same person being reached through multiple ancestry paths. The code as written will traverse that parson multiple times, emitting them and all their ancestors every single time they’re reached. This doesn’t change the results since the set aggregation will eliminate duplicates, but it can be very wasteful. Fortunately, this can be optimized easily by just adding three more lines to the query topology:

JavaClojure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
topologies.query("ancestors", "*start-id", "*num-generations").out("*ancestors")
          .loopWithVars(LoopVars.var("*id", "*start-id")
                                .var("*generation", 0),
            Block.keepTrue(new Expr(Ops.LESS_THAN_OR_EQUAL, "*generation", "*num-generations"))
                 .hashPartition("*id")
                 .localSelect("$$ancestors$$", Path.view(Ops.CONTAINS, "*id")).out("*traversed?")
                 .keepTrue(new Expr(Ops.NOT, "*traversed?"))
                 .localTransform("$$ancestors$$", Path.voidSetElem().termVal("*id"))
                 .localSelect("$$family-tree",
                              Path.key("*id")
                                  .multiPath(Path.key("parent1"),
                                             Path.key("parent2"))
                                  .filterPred(Ops.IS_NOT_NULL)).out("*parent")
                 .emitLoop("*parent")
                 .continueLoop("*parent", new Expr(Ops.INC, "*generation"))).out("*ancestor")
          .originPartition()
          .agg(Agg.set("*ancestor")).out("*ancestors");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(<<query-topology topologies "ancestors"
  [*start-id *num-generations :> *ancestors]
  (loop<- [*id *start-id
           *generation 0
           :> *ancestor]
    (filter> (<= *generation *num-generations))
    (|hash *id)
    (local-select> (view contains? *id) $$ancestors$$ :> *traversed?)
    (filter> (not *traversed?))
    (local-transform> [NONE-ELEM (termval *id)] $$ancestors$$)
    (local-select> [(keypath *id) (multi-path :parent1 :parent2) some?]
      $$family-tree
      :> *parent)
    (:> *parent)
    (continue> *parent (inc *generation)))
  (|origin)
  (aggs/+set-agg *ancestor :> *ancestors))
)

The three lines added were:

JavaClojure
1
2
3
.localSelect("$$ancestors$$", Path.view(Ops.CONTAINS, "*id")).out("*traversed?")
.keepTrue(new Expr(Ops.NOT, "*traversed?"))
.localTransform("$$ancestors$$", Path.voidSetElem().termVal("*id"))
1
2
3
(local-select> (view contains? *id) $$ancestors$$ :> *traversed?)
(filter> (not *traversed?))
(local-transform> [NONE-ELEM (termval *id)] $$ancestors$$)

Every query topology invocation has a temporary, in-memory PState it can use with the name of the query topology surrounded by $$ . In this case, that PState is called $$ancestors$$ . This code uses that temporary PState to record when it traverses a node with a set on each task and to skip traversal if it’s already seen it. Using the temporary PState like this is common in graph queries.

Let’s take a look at how to invoke this query topology from outside a module, like from a web server. First, you retrieve a client for the query topology just like how we retrieved a PState client earlier:

JavaClojure
1
2
3
4
Map config = new HashMap();
config.put("conductor.host", "1.2.3.4");
RamaClusterManager manager = RamaClusterManager.open(config);
QueryTopologyClient<Set> ancestorsQuery = manager.clusterPState("nlb.FamilyTreeModule", "ancestors");
1
2
(def manager (open-cluster-manager {"conductor.host" "1.2.3.4"}))
(def ancestors-query (foreign-query manager "nlb.family-tree/FamilyTreeModule" "ancestors"))

Then, the query topology can be invoked just like any other function. It takes in input and returns the result:

JavaClojure
1
Set ancestors = ancestorsQuery.invoke(UUID.fromString("aa7d7c5c-0679-4b01-b40b-cb3c20065549"), 3);
1
(def ancestors (foreign-invoke-query ancestors-query (UUID/fromString "aa7d7c5c-0679-4b01-b40b-cb3c20065549") 3))

It looks like any other function call, but it’s actually executing as a distributed query across the Rama cluster where the module is deployed.

Implementing the second graph traversal query

The second traversal query is to compute the number of direct descendants a person has per successive generation. It returns a map from generation number to the number of descendants in that generation, i.e. 3 children, 7 grand-children, 22 great-grand-children, etc.

The implementation is similar to the last one:

JavaClojure
1
2
3
4
5
6
7
8
9
10
11
topologies.query("descendants-count", "*start-id", "*num-generations").out("*result")
          .loopWithVars(LoopVars.var("*id", "*start-id")
                                .var("*generation", 0),
            Block.keepTrue(new Expr(Ops.LESS_THAN, "*generation", "*num-generations"))
                 .hashPartition("*id")
                 .localSelect("$$family-tree", Path.key("*id", "children")).out("*children")
                 .emitLoop("*generation", new Expr(Ops.SIZE, "*children"))
                 .each(Ops.EXPLODE, "*children").out("*c")
                 .continueLoop("*c", new Expr(Ops.INC, "*generation"))).out("*gen", "*count")
          .originPartition()
          .compoundAgg(CompoundAgg.map("*gen", Agg.sum("*count"))).out("*result");
1
2
3
4
5
6
7
8
9
10
11
12
(<<query-topology topologies "descendants-count"
  [*start-id *num-generations :> *result]
  (loop<- [*id *start-id
           *generation 0 :> *gen *count]
    (filter> (< *generation *num-generations))
    (|hash *id)
    (local-select> [(keypath *id) :children] $$family-tree :> *children)
    (:> *generation (count *children))
    (ops/explode *children :> *c)
    (continue> *c (inc *generation)))
  (|origin)
  (+compound {*gen (aggs/+sum *count)} :> *result))

The query topology takes in the person ID and the number of generations to count, and it returns the map from generation number to count. Since the implementation is similar to the last query topology, I’ll just point out the differences.

The body of the loop fetches the number of children for a node and continues traversal for each child:

JavaClojure
1
2
3
4
.localSelect("$$family-tree", Path.key("*id", "children")).out("*children")
.emitLoop("*generation", new Expr(Ops.SIZE, "*children"))
.each(Ops.EXPLODE, "*children").out("*c")
.continueLoop("*c", new Expr(Ops.INC, "*generation"))
1
2
3
4
(local-select> [(keypath *id) :children] $$family-tree :> *children)
(:> *generation (count *children))
(ops/explode *children :> *c)
(continue> *c (inc *generation)))

The emit callsite emits two values, the generation number of that person and how many children that person has. Those counts will later be summed together to get the total number of descendants for that generation. You can see that the loop binds two variables for its output which corresponds to this callsite emitting two values per emit.

The way those counts are aggregated uses a slightly different mechanism than the last query topology:

JavaClojure
1
.compoundAgg(CompoundAgg.map("*gen", Agg.sum("*count"))).out("*result");
1
(+compound {*gen (aggs/+sum *count)} :> *result)

This uses “compound aggregation”, which allows one or more aggregations to be composed into a data structure. This compound aggregation produces a map keyed by *gen with the values being the sum of the counts for that generation. This produces exactly what’s desired for the result of the query topology. You can read more about aggregation and compound aggregation on this page.

Summary

There’s a lot to learn with Rama, but you can see from this example application how much you can accomplish with very little code. Building the equivalent of a graph database with tailored queries to a particular use case is no small feat, but with Rama it only took 60 lines of code. There’s no additional work needed for deployment, updating, and scaling since that’s all built-in to Rama. For an experienced Rama programmer, a project like this takes only a few hours to fully develop, test, and have ready for deployment.

Rama being an event sourced system instills some extremely useful properties to applications that you don’t get without event sourcing. Depots provide an audit log of every change that’s ever happened to the application, making it possible to go back and answer questions about the application’s history. They also enable PStates to be recomputed in the future, which could save the company if a bad bug was deployed that corrupted vast portions of the PState. The fault tolerance you get from event sourcing is night and day compared to the alternative.

As mentioned earlier, there’s a Github project for the Clojure version and for the Java version containing all the code in this post. Those projects also have tests showing how to unit test modules in Rama’s “in-process cluster” environment.

You can get in touch with us at consult@redplanetlabs.com to schedule a free consultation to talk about your application and/or pair program on it. Rama is free for production clusters for up to two nodes and can be downloaded at this page.

Leave a Reply

Your email address will not be published. Required fields are marked *