This is part of a series of posts exploring programming with Rama, ranging from interactive consumer apps, high-scale analytics, background processing, recommendation engines, and much more. This tutorial is self-contained, but for broader information about Rama and how it reduces the cost of building backends so much (up to 100x for large-scale backends), see our website.
Like all Rama applications, the example in this post requires very little code. It’s easily scalable to millions of reads/writes per second, ACID compliant, high performance, and fault-tolerant from how Rama incrementally replicates all state. Deploying, updating, and scaling this application are all one-line CLI commands. No other infrastructure besides Rama is needed. Comprehensive monitoring on all aspects of runtime operation is built-in.
In this post, I’ll explore implementing timed notifications in a Rama backend. More generally, I’ll be showing how to schedule future work in a fault-tolerant way.
The example will store a “feed” per user and accept events that specify when in the future to add an item to a user’s feed. Code will be shown in both Clojure and Java, with the total code being only 25 lines for each implementation. You can download and play with the Clojure implementation in this repository or the Java implementation in this repository.
Backend storage
Indexed datastores in Rama, called PStates (“partitioned state”), are much more powerful and flexible than databases. Whereas databases have fixed data models, PStates can represent infinite data models due to being based on the composition of the simpler primitive of data structures. PStates are distributed, durable, high-performance, and incrementally replicated. Each PState is fine-tuned to what the application needs, and an application makes as many PStates as needed. For this application, we’ll make two PStates: one to track the feed for each user, and one to manage scheduled work in the future.
Here’s the PState definition to track each user’s feed:
This declares the PState as a map of lists, with the key being a username string and the inner lists containing the list of string items for that user’s feed. The inner list is declared as “subindexed”, which instructs Rama to store the elements individually on disk rather than the whole list read and written as one value. Subindexing enables nested data structures to have billions of elements and still be read and written to extremely quickly. This PState can support many queries in less than one millisecond: get the number of items in a feed, get a single item at a particular index, or get all items between two indices.
Here’s the definition of the PState to track scheduled work:
1 2 | TopologyScheduler scheduler = new TopologyScheduler("$$scheduled"); scheduler.declarePStates(topology); |
1 2 | (let [scheduler (TopologyScheduler. "$$scheduled")] (.declarePStates scheduler topology)) |
This uses a higher-level abstraction called TopologyScheduler from the open-source rama-helpers project.
TopologyScheduler
is a small helper that handles the storage and logic for scheduling future work, designed to be used as part of any module. It keeps state in a PState which is declared into a topology with the
declarePStates
method. You’ll soon see its helper methods for injecting code into a topology to schedule future work and handling scheduled work which is ready to execute. Because
TopologyScheduler
is built upon Rama’s primitives of PStates and topologies, it’s scalable and fault-tolerant.
declarePStates
takes as an argument the name of the PState for this
TopologyScheduler
instance. Later on when methods are called on
TopologyScheduler
to inject code, it will automatically reference that PState when doing reads and writes.
TopologyScheduler
stores in that PState an ordered queue based on the timestamp for which each piece of future work is scheduled.
Let’s now review the broader concepts of Rama in order to understand how these PStates will be utilized.
Rama concepts
A Rama application is called a “module”. In a module you define all the storage and implement all the logic needed for your backend. All Rama modules are event sourced, so all data enters through a distributed log in the module called a “depot”. Most of the work in implementing a module is coding “ETL topologies” which consume data from one or more depots to materialize any number of PStates. Modules look like this at a conceptual level:

Modules can have any number of depots, topologies, and PStates, and clients interact with a module by appending new data to a depot or querying PStates. Although event sourcing traditionally means that processing is completely asynchronous to the client doing the append, with Rama that’s optional. By being an integrated system Rama clients can specify that their appends should only return after all downstream processing and PState updates have completed.
A module deployed to a Rama cluster runs across any number of worker processes across any number of nodes, and a module is scaled by adding more workers. A module is broken up into “tasks” like so:

A “task” is a partition of a module. The number of tasks for a module is specified on deploy. A task contains one partition of every depot and PState for the module as well as a thread and event queue for running events on that task. A running event has access to all depot and PState partitions on that task. Each worker process has a subset of all the tasks for the module.
Coding a topology involves reading and writing to PStates, running business logic, and switching between tasks as necessary.
Implementing the module
Let’s start implementing the module for timed notifications. The first step to coding the module is defining the depots:
1 2 3 4 5 6 7 | public class TimedNotificationsModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*scheduled-post-depot", Depot.hashBy("id")); setup.declareTickDepot("*tick", 1000); } } |
1 2 3 4 5 | (defmodule TimedNotificationsModule [setup topologies] (declare-depot setup *scheduled-post-depot (hash-by :id)) (declare-tick-depot setup *tick 1000) ) |
This declares a Rama module called “TimedNotificationsModule” with two depots. The first depot is called
*scheduled-post-depot
and will receive all scheduled post information. Objects appended to a depot can be any type. The second argument of declaring the depot is called the “depot partitioner” – more on that later.
To keep the example simple, the data appended to the depot will be
defrecord
objects for the Clojure version and
HashMap
objects for the Java version. To have a tighter schema on depot records you could instead use Thrift, Protocol Buffers, or a language-native tool for defining the types. Here’s the function that will be used to create depot data:
The second depot
*tick
will be used for checking if any scheduled posts are ready to be appended to a user’s feed and then triggering the appropriate code to do so. Unlike the first depot, this is declared as a “tick depot”. Whereas a normal depot emits whenever new data is appended to it, a tick depot emits when the configured amount of time has passed. Subscribing to a tick depot from a topology is no different than subscribing to a regular depot. This particular tick depot is configured to emit once every 1000 milliseconds, meaning a scheduled post will be delivered within one second of its scheduled time.
Next, let’s begin defining the topology to consume data from the depot and materialize the PStates. Here’s the declaration of the topology with the PStates:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class TimedNotificationsModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*scheduled-post-depot", Depot.hashBy("id")); setup.declareTickDepot("*tick", 1000); StreamTopology topology = topologies.stream("core"); TopologyScheduler scheduler = new TopologyScheduler("$$scheduled"); topology.pstate("$$feeds", PState.mapSchema(String.class, PState.listSchema(String.class).subindexed())); scheduler.declarePStates(topology); } } |
1 2 3 4 5 6 7 8 9 10 11 12 | (defmodule TimedNotificationsModule [setup topologies] (declare-depot setup *scheduled-post-depot (hash-by :id)) (declare-tick-depot setup *tick 1000) (let [topology (stream-topology topologies "core") scheduler (TopologyScheduler. "$$scheduled")] (declare-pstate topology $$feeds {String (vector-schema String {:subindex? true})}) (.declarePStates scheduler topology) )) |
This defines a stream topology called “core”. Rama has two kinds of topologies, stream and microbatch, which have different properties. In short, streaming is best for interactive applications that need single-digit millisecond update latency, while microbatching has update latency of a few hundred milliseconds and is best for everything else. Streaming is used here so a user gets immediate feedback that their post has been scheduled.
Notice that the PStates are defined as part of the topology. Unlike databases, PStates are not global mutable state. A PState is owned by a topology, and only the owning topology can write to it. Writing state in global variables is a horrible thing to do, and databases are just global variables by a different name.
Since a PState can only be written to by its owning topology, they’re much easier to reason about. Everything about them can be understood by just looking at the topology implementation, all of which exists in the same program and is deployed together. Additionally, the extra step of appending to a depot before processing the record to materialize the PState does not lower performance, as we’ve shown in benchmarks. Rama being an integrated system strips away much of the overhead which traditionally exists.
Let’s now add the code that consumes
*scheduled-post-depot
to get posts durably scheduled for the future:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public class TimedNotificationsModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*scheduled-post-depot", Depot.hashBy("id")); setup.declareTickDepot("*tick", 1000); StreamTopology topology = topologies.stream("core"); TopologyScheduler scheduler = new TopologyScheduler("$$scheduled"); topology.pstate("$$feeds", PState.mapSchema(String.class, PState.listSchema(String.class).subindexed())); scheduler.declarePStates(topology); topology.source("*scheduled-post-depot").out("*scheduled-post") .each(Ops.GET, "*scheduled-post", "time-millis").out("*time-millis") .macro(scheduler.scheduleItem("*time-millis", "*scheduled-post")); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | (defmodule TimedNotificationsModule [setup topologies] (declare-depot setup *scheduled-post-depot (hash-by :id)) (declare-tick-depot setup *tick 1000) (let [topology (stream-topology topologies "core") scheduler (TopologyScheduler. "$$scheduled")] (declare-pstate topology $$feeds {String (vector-schema String {:subindex? true})}) (.declarePStates scheduler topology) (<<sources topology (source> *scheduled-post-depot :> {:keys [*time-millis] :as *scheduled-post}) (java-macro! (.scheduleItem scheduler "*time-millis" "*scheduled-post")) ))) |
This part of the topology is only three lines, but there’s a lot to unpack here. The business logic is implemented with dataflow. Rama’s dataflow API is exceptionally expressive, able to intermix arbitrary business logic with loops, conditionals, and moving computation between tasks. This post is not going to explore all the details of dataflow as there’s simply too much to cover. Full tutorials for Rama dataflow can be found on our website for the Java API and for the Clojure API.
Let’s go over each line of this topology implementation. The first step is subscribing to the depot:
1 2 | topology.source("*scheduled-post-depot").out("*scheduled-post") .each(Ops.GET, "*scheduled-post", "time-millis").out("*time-millis") |
1 2 | (<<sources topology (source> *scheduled-post-depot :> {:keys [*time-millis] :as *scheduled-post}) |
This subscribes the topology to the depot
*scheduled-post-depot
and starts a reactive computation on it. Operations in dataflow do not return values. Instead, they emit values that are bound to new variables. In the Clojure API, the input and outputs to an operation are separated by the
:>
keyword. In the Java API, output variables are bound with the
.out
method.
Whenever data is appended to that depot, the data is emitted into the topology. The Java versions binds the emit into the variable
*scheduled-post
and then gets the field “time-millis” from the map into the variable
*time-millis
, while the Clojure version captures the emit as the variable
*scheduled-post
and also destructures a field into the variable
*time-millis
. All variables in Rama code begin with a
*
. The subsequent code runs for every single emit.
Remember that last argument to the depot declaration called the “depot partitioner”? That’s relevant here. Here’s that image of the physical layout of a module again:

The depot partitioner determines on which task the append happens and thereby on which task computation begins for subscribed topologies. In this case, the depot partitioner says to hash by the “id” field of the appended data. The target task is computed by taking the hash and modding it by the total number of tasks. This means data with the same ID always go to the same task, while different IDs are evenly spread across all tasks.
Rama gives a ton of control over how computation and storage are partitioned, and in this case we’re partitioning by the hash of the user ID since that’s how we ultimately want the
$$feeds
PState to be partitioned. This allows us to easily locate the task storing data for any particular user.
The final line schedules the post for the future:
1 | .macro(scheduler.scheduleItem("*time-millis", "*scheduled-post")); |
1 | (java-macro! (.scheduleItem scheduler "*time-millis" "*scheduled-post")) |
This uses the
scheduleItem
method from
TopologyScheduler
to write the scheduled post into the PState managed by
TopologyScheduler
.
scheduleItem
returns a block of code that is inserted into the topology using Rama’s “macro” facility. “Macros” are a feature from Rama’s Java API for decomposing bits of dataflow code and later mixing them in to any other dataflow code. The definition of
scheduleItem
from the
TopologyScheduler
implementation is as follows:
1 2 3 4 5 6 7 8 9 | public Block.Impl scheduleItem(Object timestampMillis, Object item) { String uuidVar = Helpers.genVar("scheduledUUID"); String tupleVar = Helpers.genVar("scheduleTuple"); String longVar = Helpers.genVar("timestampLong"); return Block.each(() -> UUID.randomUUID().toString()).out(uuidVar) .each((Number n) -> n.longValue(), timestampMillis).out(longVar) .each(Ops.TUPLE, new Expr(TopologyScheduler::padTimeStr, longVar), uuidVar).out(tupleVar) .localTransform(_pstateVar, Path.key(tupleVar).termVal(item)); } |
Here you can see it writes to its managed PState using
localTransform
after doing a bit of preparatory logic. Any intermediate variables are given unique names using
genVar
so they don’t inadvertently shadow any variables that are already in scope in the dataflow code where this code is being inserted. You can read more about macros in this section. Note that the Clojure API has additional facilities for decomposition that the Java API does not have, such as deframaop.
Understanding the implementation of this macro isn’t important – what matters is it handles the work of scheduling an item for the future at a particular timestamp.
scheduleItem
takes in a timestamp and an item, and that item will be given to the callback that’s later invoked when the scheduled time is reached. The state for scheduling the item is stored on the PState on this task, and the callback will be invoked on this same task when it’s ready.
Since
TopologyScheduler
is built with the Java API, the Clojure API has the
java-macro!
operation so that it can use utilities built around the Java API. This is also why the variables are specified as strings instead of symbols in this line of Clojure code, since the Java API represents variables as strings beginning with
*
.
Let’s now take a look at handling callbacks from
TopologyScheduler
, which completes the module implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public class TimedNotificationsModule implements RamaModule { @Override public void define(Setup setup, Topologies topologies) { setup.declareDepot("*scheduled-post-depot", Depot.hashBy("id")); setup.declareTickDepot("*tick", 1000); StreamTopology topology = topologies.stream("core"); TopologyScheduler scheduler = new TopologyScheduler("$$scheduled"); topology.pstate("$$feeds", PState.mapSchema(String.class, PState.listSchema(String.class).subindexed())); scheduler.declarePStates(topology); topology.source("*scheduled-post-depot").out("*scheduled-post") .each(Ops.GET, "*scheduled-post", "time-millis").out("*time-millis") .macro(scheduler.scheduleItem("*time-millis", "*scheduled-post")); topology.source("*tick") .macro(scheduler.handleExpirations( "*scheduled-post", "*current-time-millis", Block.each(Ops.GET, "*scheduled-post", "id").out("*id") .each(Ops.GET, "*scheduled-post", "post").out("*post") .localTransform("$$feeds", Path.key("*id").afterElem().termVal("*post")))); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | (defmodule TimedNotificationsModule [setup topologies] (declare-depot setup *scheduled-post-depot (hash-by :id)) (declare-tick-depot setup *tick 1000) (let [topology (stream-topology topologies "core") scheduler (TopologyScheduler. "$$scheduled")] (declare-pstate topology $$feeds {String (vector-schema String {:subindex? true})}) (.declarePStates scheduler topology) (<<sources topology (source> *scheduled-post-depot :> {:keys [*time-millis] :as *scheduled-post}) (java-macro! (.scheduleItem scheduler "*time-millis" "*scheduled-post")) (source> *tick) (java-macro! (.handleExpirations scheduler "*scheduled-post" "*current-time-millis" (java-block<- (identity *scheduled-post :> {:keys [*id *post]}) (local-transform> [(keypath *id) AFTER-ELEM (termval *post)] $$feeds) ))) ))) |
The code added was:
1 2 3 4 5 6 7 8 | topology.source("*tick") .macro(scheduler.handleExpirations( "*scheduled-post", "*current-time-millis", Block.each(Ops.GET, "*scheduled-post", "id").out("*id") .each(Ops.GET, "*scheduled-post", "post").out("*post") .localTransform("$$feeds", Path.key("*id").afterElem().termVal("*post")))); |
1 2 3 4 5 6 7 8 9 10 | (source> *tick) (java-macro! (.handleExpirations scheduler "*scheduled-post" "*current-time-millis" (java-block<- (identity *scheduled-post :> {:keys [*id *post]}) (local-transform> [(keypath *id) AFTER-ELEM (termval *post)] $$feeds) ))) |
This first adds a subscription to the tick depot, which emits at the configured 1000 millisecond frequency. The emit isn’t captured into a variable like the previous depot subscription since all this code cares about is the frequency at which the code runs.
Tick depots are global and emit only on task 0, so the subsequent code runs on just one task each time it emits. The next line handles expired items using the
handleExpirations
method from
TopologyScheduler
and inserting the code into the topology with a macro.
handleExpirations
does the following:
- Goes to all tasks using the “all partitioner”.
- Fetches expired items from the PState based on the current time
- Loops over expired items and runs the code in the provided callback
handleExpirations
takes in three arguments as input. The first specifies a variable to bind the expired item. This is the same item as was passed to
scheduleItem
before, and the variable will be in scope for the callback code. The next argument specifies a variable to bind the current time, which isn’t needed for this particular use case. The last argument is the callback code that specifies what to do with an expired item. It’s an arbitrary block of dataflow code. Since the method expects a Java API block of code, the Clojure API uses
java-block<-
to convert Clojure dataflow code to a Java API block.
Let’s go through each line of this callback code:
1 2 | Block.each(Ops.GET, "*scheduled-post", "id").out("*id") .each(Ops.GET, "*scheduled-post", "post").out("*post") |
1 | (identity *scheduled-post :> {:keys [*id *post]}) |
First, the “id” and “post” fields from the scheduled post are extracted into the variables
*id
and
*post
. The Clojure API destructures them, while the Java API uses the
Ops.GET
function to fetch them from the map.
The final line adds the post to the feed for the user:
1 2 | .localTransform("$$feeds", Path.key("*id").afterElem().termVal("*post")))); |
1 | (local-transform> [(keypath *id) AFTER-ELEM (termval *post)] $$feeds) |
The PState is updated with the “local transform” operation. The transform takes in as input the PState
$$feeds
and a “path” specifying what to change about the PState. When a PState is referenced in dataflow code, it always references the partition of the PState that’s located on the task on which the event is currently running.
Paths are a deep topic, and the full documentation for them can be found here. A path is a sequence of “navigators” that specify how to hop through a data structure to target values of interest. A path can target any number of values, and they’re used for both transforms and queries. In this case, the path navigates by the key
*id
to the list of posts for that user. The next navigator, called
AFTER-ELEM
in Clojure and
afterElem()
in Java, navigates to the “void” element after the end of the list. Setting that “void” element to a value with the “term val” navigator causes that value to be appended to that list.
This code writes to the correct partition of
$$feeds
because the callback code is run on the same task where the post was scheduled. That was on the correct task because of the depot partitioner, as discussed before.
TopologyScheduler performance
TopologyScheduler
adds very little overhead to processing. Scheduling an item of future work is a fast PState write, and checking for expired items is also very fast due to how it’s indexed by
TopologyScheduler
. The total overhead from
TopologyScheduler
is one small unit of work to schedule it, and then one small unit of work to emit it to the callback code later. This means
TopologyScheduler
can handle very large throughputs of scheduled items.
Summary
There’s a lot to learn with Rama, but you can see from this example application how much you can accomplish with very little code. Timed notifications like this probably aren’t going to be a module of its own, but part of the implementation of a larger module with more scope. For example, in our Twitter-scale Mastodon implementation, the techniques shown in this post are used for scheduled posts and poll expirations.
As mentioned earlier, there’s a Github project for the Clojure version and for the Java version containing all the code in this post. Those projects also have tests showing how to unit test modules in Rama’s “in-process cluster” environment.
You can get in touch with us at consult@redplanetlabs.com to schedule a free consultation to talk about your application and/or pair program on it. Rama is free for production clusters for up to two nodes and can be downloaded at this page.
