This is part of a series of posts exploring programming with Rama, ranging from interactive consumer apps, high-scale analytics, background processing, recommendation engines, and much more. This tutorial is self-contained, but for broader information about Rama and how it reduces the cost of building backends so much (up to 100x for large-scale backends), see our website.
Like all Rama applications, the example in this post requires very little code. It’s easily scalable to millions of reads/writes per second, ACID compliant, high performance, and fault-tolerant from how Rama incrementally replicates all state. Deploying, updating, and scaling this application are all one-line CLI commands. No other infrastructure besides Rama is needed. Comprehensive monitoring on all aspects of runtime operation is built-in.
In this post, I’ll explore implementing a “mute” feature on a social network, where users can hide content from any number of other users from appearing. Code will be shown in both Clojure and Java, with the total code being 60 lines for Clojure and 80 lines for Java. You can download and play with the Clojure implementation in this repository or the Java implementation in this repository.
There are two ways to approach implementing a feature like this. The first approach is to delete content from muted users from the underlying feeds. However, this approach has two drawbacks. First, it’s not possible if multiple users are viewing the same content as a user may be muted by one viewer but not another. Second, deleting the underlying content means it won’t show up if the user is unmuted, which is undesirable.
Instead, I’ll show how to implement this by filtering the content on read. This enables the same content to be viewed with different mute settings by multiple viewers, and it makes unmuting work as desired. The filtering must be implemented efficiently so as not to unacceptably increase the latency of fetching content for the frontend. Doing many roundtrips back and forth to the backend for a page of content would create too much latency, and this implementation will require just a single roundtrip per page of content.
Backend storage
Indexed datastores in Rama, called PStates (“partitioned state”), are much more powerful and flexible than databases. Whereas databases have fixed data models, PStates can represent infinite data models due to being based on the composition of the simpler primitive of data structures. PStates are distributed, durable, high-performance, and incrementally replicated. Each PState is fine-tuned to what the application needs, and an application makes as many PStates as needed. In this case, we’ll need one PState to store content and another PState to track which users are muted by each user.
Different applications may store content differently – perhaps each user has their own feed like Twitter, perhaps there’s a global feed with content for everyone, or other variations. Implementing muting by filtering on read isn’t any different in any of these cases, so for this example each user will have their own feed of content. The PState for each user’s feed will be defined like this:
This declares the PState as a map of lists, with the keys being 64-bit user IDs and the list values being post data. The name of a PState always begins with
$$
. For each user, it tracks their feed as a list of content. The inner list is declared as “subindexed”, which instructs Rama to store the elements individually on disk rather than the whole list read and written as one value. Subindexing enables nested data structures to have billions of elements and still be read and written to extremely quickly. This PState can support many queries in less than one millisecond: get the number of posts on a user’s feed, get a single post at a particular index, or get all posts between two indices.
The second PState tracks all the users muted by each user:
The top-level key is a 64-bit user ID, and each set value is a muted user ID. The inner set is subindexed since it can be of arbitrary size. Like the last PState, this PState supports many queries in less than a millisecond: get the number of mutes for a user, check whether a particular user is muted for a user, or get a range of muted users from the inner set.
Let’s now review the broader concepts of Rama in order to understand how these PStates will be materialized and used.
Rama concepts
A Rama application is called a “module”. In a module you define all the storage and implement all the logic needed for your backend. All Rama modules are event sourced, so all data enters through a distributed log in the module called a “depot”. Most of the work in implementing a module is coding “ETL topologies” which consume data from one or more depots to materialize any number of PStates. Modules look like this at a conceptual level:

Modules can have any number of depots, topologies, and PStates, and clients interact with a module by appending new data to a depot or querying PStates. Although event sourcing traditionally means that processing is completely asynchronous to the client doing the append, with Rama that’s optional. By being an integrated system Rama clients can specify that their appends should only return after all downstream processing and PState updates have completed.
A module deployed to a Rama cluster runs across any number of worker processes across any number of nodes, and a module is scaled by adding more workers. A module is broken up into “tasks” like so:

A “task” is a partition of a module. The number of tasks for a module is specified on deploy. A task contains one partition of every depot and PState for the module as well as a thread and event queue for running events on that task. A running event has access to all depot and PState partitions on that task. Each worker process has a subset of all the tasks for the module.
Coding a topology involves reading and writing to PStates, running business logic, and switching between tasks as necessary.
Materializing the PStates
Let’s first implement the part of the module that materializes these PStates. Then, we’ll implement the query that will dynamically filter content based on a user’s mutes.
The first step to coding the module is defining the depots:
1 2 3 4 5 6 7 | public class ContentModerationModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*post-depot", Depot.hashBy("to-user-id")); setup.declareDepot("*mute-depot", Depot.hashBy("user-id")); } } |
1 2 3 4 5 | (defmodule ContentModerationModule [setup topologies] (declare-depot setup *post-depot (hash-by :to-user-id)) (declare-depot setup *mute-depot (hash-by :user-id)) ) |
This declares a Rama module called “ContentModerationModule” with two depots. The first depot is called
*post-depot
and will receive all “post” data. Objects appended to a depot can be any type. The second argument of declaring the depot is called the “depot partitioner” – more on that later.
The second depot is called
*mute-depot
and will receive mute and unmute events.
Part of designing Rama modules is determining how many depots to have and how to split different types of data across depots. In this case, post objects go into one depot while both mute and unmute events go in the same depot. Data should go into the same depot when there’s an ordering relationship between them, and they should otherwise go into separate depots so consumers don’t need to filter out data they’re not interested in. There’s an ordering relationship between mutes and unmutes – imagine a user were spamming the mute/unmute buttons over and over causing tons of events to be appended. If they were sent to separate depots, they could be processed in a different order than they were appended, producing the wrong result. Putting them in the same depot ensures they’re processed in the exact order they were appended. On the other hand, there’s no ordering relationship between posts and mutes/unmutes, so they can go in separate depots.
To keep the example simple, the data appended to the depots will be
defrecord
objects for the Clojure version and
HashMap
objects for the Java version. To have a tighter schema on depot records you could instead use Thrift, Protocol Buffers, or a language-native tool for defining the types. Here’s the functions that will be used to create depot data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public static Map createPost(long fromUserId, long toUserId, String content) { Map ret = new HashMap(); ret.put("from-user-id", fromUserId); ret.put("to-user-id", toUserId); ret.put("content", content); return ret; } public static Map createMute(long userId, long mutedUserId) { Map ret = new HashMap(); ret.put("type", "mute"); ret.put("user-id", userId); ret.put("muted-user-id", mutedUserId); return ret; } public static Map createUnmute(long userId, long unmutedUserId) { Map ret = new HashMap(); ret.put("type", "unmute"); ret.put("user-id", userId); ret.put("unmuted-user-id", unmutedUserId); return ret; } |
1 2 3 | (defrecord Post [from-user-id to-user-id content]) (defrecord Mute [user-id muted-user-id]) (defrecord Unmute [user-id unmuted-user-id]) |
Next, let’s begin defining the topology to consume data from the depots and materialize the PStates. Here’s the declaration of the topology with the PStates:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class ContentModerationModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*post-depot", Depot.hashBy("to-user-id")); setup.declareDepot("*mute-depot", Depot.hashBy("user-id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); topology.pstate("$$mutes", PState.mapSchema(Long.class, PState.setSchema(Long.class).subindexed())); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | (defmodule ContentModerationModule [setup topologies] (declare-depot setup *post-depot (hash-by :to-user-id)) (declare-depot setup *mute-depot (hash-by :user-id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$posts {Long (vector-schema Post {:subindex? true})}) (declare-pstate topology $$mutes {Long (set-schema Long {:subindex? true})}) )) |
This defines a stream topology called “core”. Rama has two kinds of topologies, stream and microbatch, which have different properties. In short, streaming is best for interactive applications that need single-digit millisecond update latency, while microbatching has update latency of a few hundred milliseconds and is best for everything else. Streaming is used here because an interactive application like this should have immediate feedback on posts, mutes, and unmutes being fully processed.
Notice that the PStates are defined as part of the topology. Unlike databases, PStates are not global mutable state. A PState is owned by a topology, and only the owning topology can write to it. Writing state in global variables is a horrible thing to do, and databases are just global variables by a different name.
Since a PState can only be written to by its owning topology, they’re much easier to reason about. Everything about them can be understood by just looking at the topology implementation, all of which exists in the same program and is deployed together. Additionally, the extra step of appending to a depot before processing the record to materialize the PState does not lower performance, as we’ve shown in benchmarks. Rama being an integrated system strips away much of the overhead which traditionally exists.
Let’s now add the code to materialize the
$$posts
PState:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public class ContentModerationModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*post-depot", Depot.hashBy("to-user-id")); setup.declareDepot("*mute-depot", Depot.hashBy("user-id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); topology.pstate("$$mutes", PState.mapSchema(Long.class, PState.setSchema(Long.class).subindexed())); topology.source("*post-depot").out("*post") .each(Ops.GET, "*post", "to-user-id").out("*to-user-id") .localTransform("$$posts", Path.key("*to-user-id") .afterElem() .termVal("*post")); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | (defmodule ContentModerationModule [setup topologies] (declare-depot setup *post-depot (hash-by :to-user-id)) (declare-depot setup *mute-depot (hash-by :user-id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$posts {Long (vector-schema Post {:subindex? true})}) (declare-pstate topology $$mutes {Long (set-schema Long {:subindex? true})}) (<<sources topology (source> *post-depot :> {:keys [*to-user-id] :as *post}) (local-transform> [(keypath *to-user-id) AFTER-ELEM (termval *post)] $$posts)))) |
The code to materialize that PState is only a few lines, but there’s a lot to unpack here. The business logic is implemented with dataflow. Rama’s dataflow API is exceptionally expressive, able to intermix arbitrary business logic with loops, conditionals, and moving computation between tasks. This post is not going to explore all the details of dataflow as there’s simply too much to cover. Full tutorials for Rama dataflow can be found on our website for the Java API and for the Clojure API.
Let’s go over each line of this code. The first step is subscribing to the depot:
1 2 | topology.source("*post-depot").out("*post") .each(Ops.GET, "*post", "to-user-id").out("*to-user-id") |
1 2 | (<<sources topology (source> *post-depot :> {:keys [*to-user-id] :as *post}) |
This subscribes the topology to the depot
*post-depot
and starts a reactive computation on it. Operations in dataflow do not return values. Instead, they emit values that are bound to new variables. In the Clojure API, the input and outputs to an operation are separated by the
:>
keyword. In the Java API, output variables are bound with the
.out
method.
Whenever data is appended to that depot, the data is emitted into the topology. The Java versions binds the emit into the variable
*post
and then gets the fields “to-user-id” from the map into the variable
*to-user-id
, while the Clojure version captures the emit as the variable
*post
and also destructures the “to-user-id” field into the variable
*to-user-id
. All variables in Rama code begin with a
*
. The subsequent code runs for every single emit.
Remember that last argument to the depot declaration called the “depot partitioner”? That’s relevant here. Here’s that image of the physical layout of a module again:

The depot partitioner determines on which task the append happens and thereby on which task computation begins for subscribed topologies. In this case, the depot partitioner says to hash by the “to-user-id” field of the appended data. The target task is computed by taking the hash and modding it by the total number of tasks. This means data with the same ID always go to the same task, while different IDs are evenly spread across all tasks.
Rama gives a ton of control over how computation and storage are partitioned, and in this case we’re partitioning by the hash of “to-user-id” since that’s how we want the PState to be partitioned. This allows us to easily locate the task storing posts for any particular user.
The next line completes this part of the topology by writing the post into the PState:
1 2 3 4 | .localTransform("$$posts", Path.key("*to-user-id") .afterElem() .termVal("*post")); |
1 2 | (local-transform> [(keypath *to-user-id) AFTER-ELEM (termval *post)] $$posts) |
The PState is updated with the “local transform” operation. The transform takes in as input the PState
$$posts
and a “path” specifying what to change about the PState. When a PState is referenced in dataflow code, it always references the partition of the PState that’s located on the task on which the event is currently running.
Paths are a deep topic, and the full documentation for them can be found here. A path is a sequence of “navigators” that specify how to hop through a data structure to target values of interest. A path can target any number of values, and they’re used for both transforms and queries. In this case, the path navigates by the key
*to-user-id
to the list of posts for that user. The next navigator, called
AFTER-ELEM
in Clojure and
afterElem()
in Java, navigates to the “void” element after the end of the list. Setting that “void” element to a value with the “term val” navigator causes that value to be appended to that list.
Let’s now add the code to materialize the
$$mutes
PState:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | public class ContentModerationModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*post-depot", Depot.hashBy("to-user-id")); setup.declareDepot("*mute-depot", Depot.hashBy("user-id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); topology.pstate("$$mutes", PState.mapSchema(Long.class, PState.setSchema(Long.class).subindexed())); topology.source("*post-depot").out("*post") .each(Ops.GET, "*post", "to-user-id").out("*to-user-id") .localTransform("$$posts", Path.key("*to-user-id") .afterElem() .termVal("*post")); topology.source("*mute-depot").out("*data") .each(Ops.GET, "*data", "type").out("*type") .each(Ops.GET, "*data", "user-id").out("*user-id") .ifTrue(new Expr(Ops.EQUAL, "*type", "mute"), Block.each(Ops.GET, "*data", "muted-user-id").out("*muted-user-id") .localTransform("$$mutes", Path.key("*user-id") .voidSetElem() .termVal("*muted-user-id")), Block.each(Ops.GET, "*data", "unmuted-user-id").out("*unmuted-user-id") .localTransform("$$mutes", Path.key("*user-id") .setElem("*unmuted-user-id") .termVoid())); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | (defmodule ContentModerationModule [setup topologies] (declare-depot setup *post-depot (hash-by :to-user-id)) (declare-depot setup *mute-depot (hash-by :user-id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$posts {Long (vector-schema Post {:subindex? true})}) (declare-pstate topology $$mutes {Long (set-schema Long {:subindex? true})}) (<<sources topology (source> *post-depot :> {:keys [*to-user-id] :as *post}) (local-transform> [(keypath *to-user-id) AFTER-ELEM (termval *post)] $$posts) (source> *mute-depot :> *data) (<<subsource *data (case> Mute :> {:keys [*user-id *muted-user-id]}) (local-transform> [(keypath *user-id) NONE-ELEM (termval *muted-user-id)] $$mutes) (case> Unmute :> {:keys [*user-id *unmuted-user-id]}) (local-transform> [(keypath *user-id) (set-elem *unmuted-user-id) NONE>] $$mutes) )))) |
Once again, let’s go through the new code line by line:
1 | topology.source("*mute-depot").out("*data") |
1 | (source> *mute-depot :> *data) |
This adds a new depot subscription to
*mute-depot
to the same topology. Topologies can subscribe to any number of depots, and each source subscription is processed independently. This code binds emits from that depot to the variable
*data
.
This depot emits “mute” and “unmute” events. Mutes should add to the PState, while unmutes should remove from the PState. The subsequent code adds a conditional on the type of
*data
:
1 2 3 | .each(Ops.GET, "*data", "type").out("*type") .each(Ops.GET, "*data", "user-id").out("*user-id") .ifTrue(new Expr(Ops.EQUAL, "*type", "mute"), |
1 | (<<subsource *data |
The Java and Clojure versions implement this a little differently since the Clojure version is using first-class types while the Java version is using plain maps with a “type” key inside.
The Clojure version uses
<<subsource
, which is a convenient way to dispatch code on the type of an object. You could also use
<<if
or
<<cond
for this, but the
<<subsource
version is a little more concise.
The Java version fetches the “type” and “user-id” fields from the map into the variables
*type
and
*user-id
.
*user-id
will be used a little later. It then uses
.ifTrue
to check if
*type
is equal to “mute”. The “then” branch of that conditional will handle mutes, and then “else” branch will handle unmutes.
The next bit of code handles mutes:
1 2 3 4 5 | Block.each(Ops.GET, "*data", "muted-user-id").out("*muted-user-id") .localTransform("$$mutes", Path.key("*user-id") .voidSetElem() .termVal("*muted-user-id")), |
1 2 3 | (case> Mute :> {:keys [*user-id *muted-user-id]}) (local-transform> [(keypath *user-id) NONE-ELEM (termval *muted-user-id)] $$mutes) |
In the Clojure version,
case>
begins a handler for a particular type given to the surrounding
<<subsource
form. This
case>
handles
Mute
objects.
case>
allows the object given to
<<subsource
to be further destructured in its emit. Here it destructures
*user-id
and
*muted-user-id
from
*data
.
The Java version fetches the “muted-user-id” field from the map into the variable
*muted-user-id
.
Then, the “local transform” operation is invoked to update the
$$mutes
PState. The path used is similar to the path from the earlier “local transform”, but the inner data structure being updated is a set instead of a list. The path first navigates to the inner set by the key
*user-id
. The next navigator, called
NONE-ELEM
in Clojure and
voidSetElem
in Java, navigates to the “void” element of the set. Setting that “void” element to a value causes that value to be added to that set.
Adding to a set is idempotent, so if the frontend were to append multiple mutes for the same user in a row, the mute for that user will only exist one time in the set.
The last part of the topology handles unmutes:
1 2 3 4 5 | Block.each(Ops.GET, "*data", "unmuted-user-id").out("*unmuted-user-id") .localTransform("$$mutes", Path.key("*user-id") .setElem("*unmuted-user-id") .termVoid()) |
1 2 3 | (case> Unmute :> {:keys [*user-id *unmuted-user-id]}) (local-transform> [(keypath *user-id) (set-elem *unmuted-user-id) NONE>] $$mutes) |
This is the inverse of the previous case. The Clojure version declares a
case>
to handle
Unmute
objects and destructure
*user-id
and
*unmuted-user-id
. The Java version is the “else” branch of the
.ifTrue
and starts by fetching “unmuted-user-id” from the map into the variable
*unmuted-user-id
.
Then, a “local transform” is done to remove the user from the set of mutes. The path first navigates to the inner set by the key
*user-id
. The next navigator, called
set-elem
in Clojure and
setElem
in Java, navigates to the specified element of the set if it exists. If the element doesn’t exist, the path navigates nowhere and the transform is a no-op. The last navigator removes the element from the set. Removing elements with paths is done in Clojure with
NONE>
and in Java with
.termVoid()
. This navigator sets the element to “void”, which causes it to be removed from its containing data structure.
That completes the materialization of
$$posts
and
$$mutes
. Now we’re ready to move on to implementing an efficient query to fetch a page from
$$posts
with muted content filtered out.
Implementing the query
At a high-level, the query needs to fetch some number of posts for a user starting from an index. For example, “fetch 10 posts for user ID 123 starting from index 75”. Because any number of posts could be filtered by muting, the fetched posts are not necessarily consecutive and many more posts may need to be read from the PState to find the desired number of posts from non-muted users. If you’re looking for 10 posts, you may need to read 11 posts, 20 posts, 100 posts, or more depending on how many posts get filtered. Since the total number of posts that needs to be read cannot be known in advance, a loop of some sort is needed to continue scanning until the desired number of posts is found.
With that in mind, here’s the algorithm to implement the query “fetch 10 posts for user ID 123 starting from index 75”:
- Set N=10, START_OFFSET=75
- Read posts for user ID 123 from PState
$$postsfrom indices START_OFFSET to START_OFFSET + N - For each post, check if the poster is muted by user ID 123 in the
$$mutesPState - Set NUM_FETCHED_POSTS equal to the number of posts remaining after filtering
- If NUM_FETCHED_POSTS is less than N, go back to step 2 with N set to (N – NUM_FETCHED_POSTS) and START_OFFSET set to START_OFFSET + N
This algorithm iteratively fetches content from the
$$posts
PState until the desired number of posts is found.
In the case of a user having a ton of mutes set as well as a lot of posts, it’s possible this algorithm could take a long time to scan through the
$$posts
PState to find the desired number of posts. If this is a concern, the algorithm could be enhanced to have an upper limit on the number of times it loops. This implementation won’t do that, but it’s easy to add if needed.
This will be implemented via a predefined query in the module called a “query topology”. Query topologies are programmed with the exact same dataflow API as we already used to implement the ETL topology. All the behavior of this algorithm – looping, checking posts to see if they should be filtered, and aggregating results will all be done within the module. This means the client doing the query only has to make one request to the backend. If the client implemented this algorithm client-side, it would have to make many requests adding a lot of overhead for all the roundtrips. Implementing as a query topology eliminates all that overhead which minimizes the latency of the query.
Query topologies are like functions in Java or Clojure. They take in any number of input arguments and return one value as the result. As such, query topologies can be decomposed into multiple query topologies just like you can decompose a function’s implementation into multiple functions. This query will be implemented with two query topologies. The first will fetch a page of posts and filter them according to mute settings. The second will use the first query topology and loop until the desired number of posts have been found.
Implementing the first query topology
Let’s start with the first query topology. Here it is defined in the module:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | public class ContentModerationModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*post-depot", Depot.hashBy("to-user-id")); setup.declareDepot("*mute-depot", Depot.hashBy("user-id")); StreamTopology topology = topologies.stream("core"); topology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Map.class).subindexed())); topology.pstate("$$mutes", PState.mapSchema(Long.class, PState.setSchema(Long.class).subindexed())); topology.source("*post-depot").out("*post") .each(Ops.GET, "*post", "to-user-id").out("*to-user-id") .localTransform("$$posts", Path.key("*to-user-id") .afterElem() .termVal("*post")); topology.source("*mute-depot").out("*data") .each(Ops.GET, "*data", "type").out("*type") .each(Ops.GET, "*data", "user-id").out("*user-id") .ifTrue(new Expr(Ops.EQUAL, "*type", "mute"), Block.each(Ops.GET, "*data", "muted-user-id").out("*muted-user-id") .localTransform("$$mutes", Path.key("*user-id") .voidSetElem() .termVal("*muted-user-id")), Block.each(Ops.GET, "*data", "unmuted-user-id").out("*unmuted-user-id") .localTransform("$$mutes", Path.key("*user-id") .setElem("*unmuted-user-id") .termVoid())); topologies.query("get-posts-helper", "*user-id", "*start-offset", "*end-offset").out("*posts") .hashPartition("*user-id") .localSelect("$$posts", Path.key("*user-id") .sublist("*start-offset", "*end-offset") .all()).out("*post") .each(Ops.GET, "*post", "from-user-id").out("*from-user-id") .localSelect("$$mutes", Path.key("*user-id") .view(Ops.CONTAINS, "*from-user-id")).out("*muted?") .keepTrue(new Expr(Ops.NOT, "*muted?")) .originPartition() .agg(Agg.list("*post")).out("*posts"); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | (defmodule ContentModerationModule [setup topologies] (declare-depot setup *post-depot (hash-by :to-user-id)) (declare-depot setup *mute-depot (hash-by :user-id)) (let [topology (stream-topology topologies "core")] (declare-pstate topology $$posts {Long (vector-schema Post {:subindex? true})}) (declare-pstate topology $$mutes {Long (set-schema Long {:subindex? true})}) (<<sources topology (source> *post-depot :> {:keys [*to-user-id] :as *post}) (local-transform> [(keypath *to-user-id) AFTER-ELEM (termval *post)] $$posts) (source> *mute-depot :> *data) (<<subsource *data (case> Mute :> {:keys [*user-id *muted-user-id]}) (local-transform> [(keypath *user-id) NONE-ELEM (termval *muted-user-id)] $$mutes) (case> Unmute :> {:keys [*user-id *unmuted-user-id]}) (local-transform> [(keypath *user-id) (set-elem *unmuted-user-id) NONE>] $$mutes) ))) (<<query-topology topologies "get-posts-helper" [*user-id *start-offset *end-offset :> *posts] (|hash *user-id) (local-select> [(keypath *user-id) (srange *start-offset *end-offset) ALL] $$posts :> {:keys [*from-user-id] :as *post}) (local-select> [(keypath *user-id) (view contains? *from-user-id)] $$mutes :> *muted?) (filter> (not *muted?)) (|origin) (aggs/+vec-agg *post :> *posts)) ) |
Once again, let’s go through the topology definition line by line to understand how it works:
1 | topologies.query("get-posts-helper", "*user-id", "*start-offset", "*end-offset").out("*posts") |
1 2 | (<<query-topology topologies "get-posts-helper" [*user-id *start-offset *end-offset :> *posts] |
This declares a query topology named “get-posts-helper” that takes in input arguments
*user-id
,
*start-offset
, and
*end-offset
. It declares the return variable
*posts
, which will be bound by the end of the topology execution.
The next line gets the query to the task of the module containing the data for that user ID:
1 | .hashPartition("*user-id") |
1 | (|hash *user-id) |
The line does a “hash partition” by the value of
*user-id
. Partitioners relocate subsequent code to potentially a new task, and a hash partitioner works exactly like the aforementioned depot partitioner. The details of relocating computation, like serializing and deserializing any variables referenced after the partitioner, are handled automatically. The code is linear without any callback functions even though partitioners could be jumping around to different tasks on different nodes.
When the first operation of a query topology is a partitioner, query topology clients are optimized to go directly to that task. You’ll see an example of invoking a query topology in the next section.
The next bit of code fetches all posts in the requested range:
1 2 3 4 5 | .localSelect("$$posts", Path.key("*user-id") .sublist("*start-offset", "*end-offset") .all()).out("*post") .each(Ops.GET, "*post", "from-user-id").out("*from-user-id") |
1 2 | (local-select> [(keypath *user-id) (srange *start-offset *end-offset) ALL] $$posts :> {:keys [*from-user-id] :as *post}) |
A “local select” does a query on the partition of the PState that’s on the same task on which the event is running. This path first navigates to the list of posts for the key
*user-id
. It then navigates to the “sublist” between
*start-offset
and
*end-offset
. At this point the path is navigated to a single list of values. The last navigator navigates to each individual element of that sublist, causing the “local select” to emit one time for each post in that range.
What’s happening here is very different from “regular” programming, where you call a function which returns one value as the result. In dataflow programming, operations can emit any number of times. You can think of the output of an operation as being the input arguments to the rest of the topology. When an operation emits, it invokes the “rest of the topology” with those arguments (the “rest of the topology” is also known as the “continuation”). This is a powerful generalization of the concept of a function, which as you’re about to see enables very elegant code.
The code after the “local select” is now operating on a single post at a time, and you’ll see how the results are aggregated together at the end. The output of the “local select” is bound to the variable
*post
. The Clojure version destructures the field
from-user-id
from that object into the variable
*from-user-id
, and the Java version fetches the field “from-user-id” from the map into the variable
*from-user-id
.
The next line checks to see if that user is muted:
1 2 3 | .localSelect("$$mutes", Path.key("*user-id") .view(Ops.CONTAINS, "*from-user-id")).out("*muted?") |
1 2 | (local-select> [(keypath *user-id) (view contains? *from-user-id)] $$mutes :> *muted?) |
This code does a “local select” on the
$$mutes
PState. Since the
$$mutes
PState stores data with the same partitioning scheme as the
$$posts
PState – by the hash of the top-level key – the code does not need to switch to a different task with a partitioner before querying.
The query emits a single boolean to the variable
*muted?
on whether that user who made the post is muted or not. The path first navigates to the inner set by the key
*user-id
. The next navigator,
view
, runs a function on the navigated value. The function can be any Java or Clojure function, and it’s given as arguments the navigated value plus any additional arguments given to
view
. In this case it just checks if the posting user ID is contained in the set.
The next line filters out posts from muted users:
1 | .keepTrue(new Expr(Ops.NOT, "*muted?")) |
1 | (filter> (not *muted?)) |
This operation,
filter>
in Clojure and
.keepTrue
in Java, emits exactly one time if its input is true and otherwise doesn’t emit. Just like how the “local select” call differs from regular functions by emitting many times, this differs from regular functions by potentially not emitting at all. No values are captured for the output since this operation doesn’t emit any values when it emits, but you can still think of its emit as invoking the “rest of the topology”.
In this case, the code only continues for users that aren’t muted.
The next line is:
1 | .originPartition() |
1 | (|origin) |
The “origin partitioner” relocates computation to the task where the query began execution. Every query topology must use the origin partitioner exactly once and as the last partitioner used.
The last line of the query topology aggregates the remaining posts together:
1 | .agg(Agg.list("*post")).out("*posts"); |
1 | (aggs/+vec-agg *post :> *posts) |
Up until here, the query topology has been just like the ETL topology. Every line processed emits from the preceding line and emitted some number of times itself. Aggregators in dataflow are different in that they’re collecting all emits that happened in the execution of the query topology and combining them into a single value.
Query topologies are “batch blocks”, which have expanded dataflow capabilities including aggregation. Batch blocks can also do inner joins, outer joins, subqueries, and everything else possible with relational languages. You can find the full documentation for batch blocks here, with Clojure-specific documentation here.
This particular aggregator combines all the remaining posts into a single list into the variable
*posts
.
*posts
is the same variable name as declared earlier for the output of the query topology.
This completes the first query topology.
Implementing the second query topology
Let’s now implement the second query topology, which uses the first query topology inside a loop to find the desired number of posts. Here’s the full definition:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | topologies.query("get-posts", "*user-id", "*from-offset", "*limit").out("*ret") .hashPartition("*user-id") .each(() -> new ArrayList()).out("*posts") .loopWithVars(LoopVars.var("*query-offset", "*from-offset"), Block.localSelect("$$posts", Path.key("*user-id").view(Ops.SIZE)).out("*num-posts") .each(Ops.MINUS, "*limit", new Expr(Ops.SIZE, "*posts")).out("*fetch-amount") .each(Ops.MIN, "*num-posts", new Expr(Ops.PLUS, "*query-offset", "*fetch-amount")).out("*end-offset") .invokeQuery("get-posts-helper", "*user-id", "*query-offset", "*end-offset").out("*fetched-posts") .each((List posts, List fetchedPosts) -> posts.addAll(fetchedPosts), "*posts", "*fetched-posts") .cond(Case.create(new Expr(Ops.EQUAL, "*end-offset", "*num-posts")) .emitLoop(null), Case.create(new Expr(Ops.EQUAL, new Expr(Ops.SIZE, "*posts"), "*limit")) .emitLoop("*end-offset"), Case.create(true) .continueLoop("*end-offset")) ).out("*next-offset") .originPartition() .each((List posts, Integer nextOffset) -> { Map ret = new HashMap(); ret.put("posts", posts); ret.put("next-offset", nextOffset); return ret; }, "*posts", "*next-offset").out("*ret"); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | (<<query-topology topologies "get-posts" [*user-id *from-offset *limit :> *ret] (|hash *user-id) (loop<- [*query-offset *from-offset *posts [] :> *posts *next-offset] (local-select> [(keypath *user-id) (view count)] $$posts :> *num-posts) (- *limit (count *posts) :> *fetch-amount) (min *num-posts (+ *query-offset *fetch-amount) :> *end-offset) (invoke-query "get-posts-helper" *user-id *query-offset *end-offset :> *fetched-posts) (reduce conj *posts *fetched-posts :> *new-posts) (<<cond (case> (= *end-offset *num-posts)) (:> *new-posts nil) (case> (= (count *new-posts) *limit)) (:> *new-posts *end-offset) (default>) (continue> *end-offset *new-posts) )) (|origin) (hash-map :posts *posts :next-offset *next-offset :> *ret)) |
Once again, let’s go through this line by line:
1 | topologies.query("get-posts", "*user-id", "*from-offset", "*limit").out("*ret") |
1 | (<<query-topology topologies "get-posts" [*user-id *from-offset *limit :> *ret] |
This declares the query topology “get-posts” with input arguments
*user-id
,
*from-offset
, and
*limit
. It declares the return variable
*ret
, which will be bound by the end of the topology execution.
Just like the first query topology, the next line gets the query to the task of the module containing the data for that user ID:
1 | .hashPartition("*user-id") |
1 | (|hash *user-id) |
The next bit of code begins the loop that will run for as many iterations as necessary to find the desired number of posts for non-muted users:
1 2 | .each(() -> new ArrayList()).out("*posts") .loopWithVars(LoopVars.var("*query-offset", "*from-offset"), |
1 2 3 | (loop<- [*query-offset *from-offset *posts [] :> *posts *next-offset] |
Loops in dataflow first declare “loop variables” which are in scope for the body of the loop and are set to new values each time the loop is recurred. The Clojure version declares two loop variables,
*query-offset
and
*posts
, which are initialized to
*from-offset
and
[]
respectively. The Java version will collect posts into a mutable
ArrayList
, so it initializes that
ArrayList
into the variable
*posts
before the loop. The Java version’s only loop var is
*query-offset
which is initialized to
*from-offset
.
Loops can be emitted from any number of times (with
:>
in Clojure and
.emitLoop
in Java), and each emit runs the code after the loop with the emitted values. The Clojure version binds the emitted values from this loop as
*posts
and
*next-offset
in the binding vector. The Java version specifies emitted loop values at the end of the loop body. Again, since the Java version is accumulating posts into a mutable value its loop will only emit the one value
*next-offset
.
This query topology will return the fetched posts as well as this
*next-offset
value. In order for the frontend to paginate to the next page of posts, it needs to know where the prior query topology left off in its traversal. Returning
*next-offset
here lets the frontend know exactly where to begin for the next page. Additionally, this query topology will return “null” for
*next-offset
if it reached the end of the list of posts, which indicates to the frontend that it reached the last page.
The first query topology needs to know the range of offsets to fetch from the subindexed list of posts. The next line starts to prepare for that by querying for the total number of posts in the PState for this user:
1 | .localSelect("$$posts", Path.key("*user-id").view(Ops.SIZE)).out("*num-posts") |
1 | (local-select> [(keypath *user-id) (view count)] $$posts :> *num-posts) |
This “local select” call is similar to the earlier one, using
view
to run a function on the nested list to get the total number of posts and bind that to
*num-posts
. Fetching the size of a subindexed list is a fast sub-millisecond query even if the list has billions of elements.
The next two lines determine the last offset that will be fetched:
1 2 3 4 | .each(Ops.MINUS, "*limit", new Expr(Ops.SIZE, "*posts")).out("*fetch-amount") .each(Ops.MIN, "*num-posts", new Expr(Ops.PLUS, "*query-offset", "*fetch-amount")).out("*end-offset") |
1 2 | (- *limit (count *posts) :> *fetch-amount) (min *num-posts (+ *query-offset *fetch-amount) :> *end-offset) |
The number of posts remaining to fetch is simply the number of desired posts,
*limit
, subtracted by the number of posts fetched so far. This is bound to the variable
*fetch-amount
. The last offset to query is the minimum of the last offset in the subindexed list and
*fetch-amount
offsets after the query offset. The result of that is bound to
*end-offset
.
The next line invokes the first query topology to get the posts between those offsets from non-muted users:
1 2 3 4 | .invokeQuery("get-posts-helper", "*user-id", "*query-offset", "*end-offset").out("*fetched-posts") |
1 2 | (invoke-query "get-posts-helper" *user-id *query-offset *end-offset :> *fetched-posts) |
Invoking another query topology is just like invoking a function. It’s given input arguments and then emits a single value. In this case it emits a list of posts. Query topologies can also invoke themselves, or query topologies can be mutually recursive. This is discusses in this section of the docs.
The next line adds the fetched posts to the list of accumulated posts so far:
The Clojure version appends all the fetched posts into
*posts
to produce the new variable
*new-posts
. The Java version uses a lambda to add the fetched posts into the
ArrayList
with its
addAll
method. It gives the lambda the arguments from dataflow
*posts
and
*fetched-posts
, which become the arguments to the lambda. This code shows how arbitrary Java code can be injected into dataflow, and you can also do so with method references as well.
The code now needs to determine what to do depending on the current state. There are two cases to check: “has it reached the end of the list of posts?”, and “has it found the desired number of posts?”. If the answer to either of those is true, traversal is finished and it should finish the query topology execution. Otherwise, it should continue traversing.
You could implement this with nested “if” conditionals (
<<if
in the Clojure API and
.ifTrue
in the Java API), but Rama provides a more elegant way to express code that needs to check more than one condition through its “cond” operation. The next code begins this form of conditional like so:
1 | .cond( |
1 | (<<cond |
This operation is given a series of “cases” to check. The first case that evaluates to true will have its body run and then no other cases will be checked.
The first case is whether it has reached the end of the list of posts:
1 2 | Case.create(new Expr(Ops.EQUAL, "*end-offset", "*num-posts")) .emitLoop(null), |
1 2 | (case> (= *end-offset *num-posts)) (:> *new-posts nil) |
It’s reached the end when the
*end-offset
computed earlier is the same as the total number of posts. The subsequent code is the body of the “case”, which can be any amount of arbitrary dataflow code. In this case, there’s only one line which says to emit from the loop. As explained earlier, the loop emits the “next offset” which will be included in the return value so the frontend knows where to begin in order to query the next page of content. The Clojure version emits the full accumulation of posts along with
nil
for the “next offset”, which indicates it reached the end of the list of posts. The Java version just emits
null
for the “next offset”.
The next case checks if the desired number of posts from non-muted users was found:
1 2 | Case.create(new Expr(Ops.EQUAL, new Expr(Ops.SIZE, "*posts"), "*limit")) .emitLoop("*end-offset"), |
1 2 | (case> (= (count *new-posts) *limit)) (:> *new-posts *end-offset) |
The desired number of posts has been found if the size of the list of accumulated posts is equal to the
*limit
argument for the query topology. The body of this case emits from the loop. The Clojure version emits the full accumulation of posts along with
*end-offset
for the “next offset”. The Java version just emits
*end-offset
for the “next offset”.
Finally, the last case invokes another iteration of the loop:
1 2 | Case.create(true) .continueLoop("*end-offset"))).out("*next-offset") |
1 2 | (default>) (continue> *end-offset *new-posts) |
Using
default>
in the Clojure API is the same as saying
(case> true)
. The body of this case invokes the next iteration of the loop with
continue>
in the Clojure API and
continueLoop
in the Java API. This runs the loop from the start with new values for the loop vars.
The
.out
in the Java version binds the output of the loop into the variable
*next-offset
, which in the Clojure version was specified in the binding vector at the beginning of the loop.
The next line, which is after the loop, invokes the required “origin partitioner”:
1 | .originPartition() |
1 | (|origin) |
Finally, the end of the query topology packages together the posts and “next offset” into a single map for the return value:
This map is bound to the variable
*ret
, which is the same as the output variable for the query topology specified in the beginning.
This completes the definition of the query to fetch a page of content for a user. Since it’s executing entirely in the module, the query is extremely efficient since the client only has to make a single request. All the overhead of making network requests and serializing/deserializing values back and forth is completely avoided.
Interacting with the module
Let’s now take a look at how you would interact with the module to do appends and queries, such as from a web server. Here’s an example of how you would get clients to the depots and query topology:
1 2 3 4 5 6 7 | Map config = new HashMap(); config.put("conductor.host", "1.2.3.4"); RamaClusterManager manager = RamaClusterManager.open(config); Depot postDepot = manager.clusterDepot("nlb.ContentModerationModule", "*post-depot"); Depot muteDepot = manager.clusterDepot("nlb.ContentModerationModule", "*mute-depot"); QueryTopologyClient<Map> getPosts = manager.clusterQuery("nlb.ContentModerationModule", "get-posts"); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | (def manager (open-cluster-manager {"conductor.host" "1.2.3.4"})) (def post-depot (foreign-depot manager "nlb.content-moderation/ContentModerationModule" "*post-depot")) (def mute-depot (foreign-depot manager "nlb.content-moderation/ContentModerationModule" "*mute-depot")) (def get-posts (foreign-query manager "nlb.content-moderation/ContentModerationModule" "get-posts" )) |
A “cluster manager” connects to a Rama cluster by specifying the location of its “Conductor” node. The “Conductor” node is the central node of a Rama cluster, which you can read more about here. From the cluster manager, you can retrieve clients to any depots, PStates, or query topologies for any module. The objects are identified by the module name and their name within the module.
Here’s an example of appending a new post to the depot using the function defined earlier to create the data:
1 | postDepot.append(createPost(100, 101, "Hello!")); |
1 | (foreign-append! post-depot (->Post 100 101 "Hello!")); |
This adds a post from user 100 to user 101 with the content “Hello!”. Appending mutes and unmutes to the mute depot works exactly the same way, just with different data to a different depot.
Here’s an example of invoking the query topology to get a page of content:
1 | getPosts.invoke(101L, 0, 10); |
1 | (foreign-invoke-query get-posts 101 0 10) |
This looks no different than invoking any other function, but it’s actually executing remotely on a cluster. It fetches 10 posts starting from offset 0 for user 101. This returns a hash map containing the list of posts and the start offset to use for the next page, just as defined in the query topology.
Summary
There’s a lot to learn with Rama, but you can see from this example application how much you can accomplish with very little code. Building efficient content moderation like this is no small feat, but with Rama it only took 60 lines of code in Clojure and 80 lines of code in Java. There’s no additional work needed for deployment, updating, and scaling since that’s all built-in to Rama. For an experienced Rama programmer, a project like this takes only a few hours to fully develop, test, and have ready for deployment.
This example particularly demonstrates the power of colocating computation and storage. The ability for a query topology to do a looping computation with arbitrary logic over multiple colocated PStates is both powerful and extremely performant.
As mentioned earlier, there’s a Github project for the Clojure version and for the Java version containing all the code in this post. Those projects also have tests showing how to unit test modules in Rama’s “in-process cluster” environment.
You can get in touch with us at consult@redplanetlabs.com to schedule a free consultation to talk about your application and/or pair program on it. Rama is free for production clusters for up to two nodes and can be downloaded at this page.
