Rama on Clojure’s terms, and the magic of continuation-passing style

Rama is a platform with huge applicability, able to express all the computation and storage for a backend at any scale. Just like the UNIX philosophy of composing simple programs to do more complex tasks, Rama is based on simple building blocks that compose for any backend use case.

At the heart of Rama is its dataflow language, a Clojure library that’s also a full-fledged language. Rama’s dataflow language is based on continuation-passing style (CPS). Rama provides a clean and elegant way to express entire programs in CPS while producing bytecode that’s just as efficient as Clojure. In this post I’ll explore how Rama works in comparison to equivalent Clojure code written in a CPS style. You’ll see how CPS through Rama greatly generalizes the basic concept of a function, how that enables new ways of writing code in general, and how that is particularly liberating for writing parallel and asynchronous code.

You can follow along with the code in this post by cloning rama-demo-gallery and opening a REPL with lein repl . Run the following to set up your REPL:

1
2
(use 'com.rpl.rama)
(require '[com.rpl.rama.ops :as ops])

Basic example

Let’s start by defining the equivalent of Clojure’s identity function in Rama:

1
2
(deframaop identity-rama [*v]
  (:> *v))

Here we define a “Rama operation” called identity-rama that accepts one argument named *v . Variables in Rama code are symbols beginning with * . A “Rama operation” can do everything a regular Clojure function can – conditionals, loops, define anonymous operations with lexical closures, declare locals, etc. – plus it can do much more.

In this case, the body of identity-rama “emits” the value of *v to its caller using :> . This is equivalent to the following Clojure code:

1
2
(defn identity-rama-clj [v cont]
  (cont v))

In Rama code, the continuation is implicit and is invoked by calling :> like a function. A Rama operation does not return a value to its caller. It emits values to its continuation. This is a critical distinction, as part of what makes Rama operations more general than functions is how they can emit multiple times, not emit at all, or emit asynchronously.

Note that Rama does not compile to Clojure code like this. It compiles straight to bytecode, which is necessary to achieve high performance.

Now suppose you want to call identity-rama with the value “Hello world!” and print the result. In Rama you would write this like so:

1
2
3
(?<-
  (identity-rama "Hello world!" :> *str)
  (println "Emitted:" *str))

?<- is called the “execution operator” and just dynamically executes some Rama code. It’s not used in production and is just for playing at the REPL like this. Here the string “Hello world!” is passed as input to our identity-rama operation. The :> *str part binds the output of the operation to the variable *str . The :> keyword distinguishes the input from the output and is called the “default output stream” (you’ll see soon how you can have more than one output stream). The variable *str is then passed to println .

Here’s the equivalent Clojure code in CPS:

1
2
3
4
(identity-rama-clj
  "Hello world!"
  (fn [str]
    (println "Emitted:" str)))

So far, you can see from this example how Rama makes things more concise by eliminating nested callback functions from the code. Here’s a slightly more complicated example to show how unreadable CPS gets when done manually:

1
2
3
4
(?<-
  (+ 1 2 :> *a)
  (* *a 10 :> *b)
  (println *a *b))

In Clojure with CPS versions of + and * , this looks like:

1
2
3
4
5
6
7
8
9
10
11
(defn add [v1 v2 cont]
  (cont (+ v1 v2)))

(defn multiply [v1 v2 cont]
  (cont (* v1 v2)))

(add 1 2
  (fn [a]
    (multiply a 10
      (fn [b]
        (println a b)))))

In addition to how unreadable all the nesting makes this, it’s also extremely inefficient. Every single “emit” uses up another stack frame, so it seems entire programs compiled this way will quickly overflow the stack. If this is all Rama was doing, that would indeed be the case. You’ll see later some important optimizations Rama makes so that dataflow code is just as efficient as idiomatic code in any other language. The continuation being implicit in Rama rather than explicit like in the Clojure CPS examples gives Rama critical flexibility to make those optimizations.

Emitting zero or multiple times

As mentioned, you don’t have to call the continuation exactly one time. You can call it multiple times, or you can call it zero times. You can also call it asynchronously, on a different thread, or even on a different machine. This is where the expressive power of dataflow starts to show itself.

Here’s an example of a deframaop that emits multiple times along with some code that uses it:

1
2
3
4
5
6
7
8
9
(deframaop emit-many-times []
  (:> 1)
  (:> 3)
  (:> 2)
  (:> 5))

(?<-
  (emit-many-times :> *v)
  (println "Emitted:" *v))

This is equivalent to this CPS Clojure function:

1
2
3
4
5
6
7
8
9
(defn emit-many-times-clj [cont]
  (cont 1)
  (cont 3)
  (cont 2)
  (cont 5))

(emit-many-times-clj
  (fn [v]
    (println "Emitted:" v)))

Let’s now take a look at another deframaop that does filtering:

1
2
3
(deframaop my-filter> [*v]
  (<<if *v
    (:>)))

We’ll look more at conditionals later on in this post. my-filter> is the same as the built-in operation filter> and is equivalent to:

1
2
3
(defn my-filter>-clj [v cont]
  (if v
    (cont)))

As you can see here, my-filter> emits zero values to its continuation. Emits can be done with any number of values, and in the next section you’ll see examples of emitting multiple values.

You could combine my-filter> with emit-many-times to write code like this:

1
2
3
4
(?<-
  (emit-many-times :> *v)
  (my-filter> (odd? *v))
  (println *v))

You can nest expressions in Rama code just like you can in Clojure code, and the above is the same as writing:

1
2
3
4
5
(?<-
  (emit-many-times :> *v)
  (odd? *v :> *is-odd?)
  (my-filter> *is-odd?)
  (println *v))

This is equivalent to:

1
2
3
4
5
6
7
8
9
10
(defn odd?-cont [v cont]
  (cont (odd? v)))

(emit-many-times-clj
  (fn [v]
    (odd?-cont v
      (fn [is-odd?]
        (my-filter>-clj is-odd?
          (fn []
            (println v)))))))

Running any of these prints:

1
2
3
1
3
5

This code is kind of like doing a filter on a sequence followed by a doseq , except no sequences are materialized. It also reads kind of like a WHERE clause in SQL, in that the filter is expressed solely on the value in question with an arbitrary predicate, and computation only continues with values that match.

Operations can also emit a dynamic number of times. For example, Rama has an operation in its standard library called explode that’s equivalent to this CPS Clojure function:

1
2
3
(defn explode-clj [aseq cont]
  (doseq [e aseq]
    (cont e)))

You could use explode to print every element of a sequence like this:

1
2
3
(?<-
  (ops/explode [1 2 3 4] :> *v)
  (println "Val:" *v))

This is the same as:

1
2
(explode-clj [1 2 3 4]
  (fn [e] (println "Val:" e)))

Emitting multiple values in one emit

Besides being able to emit multiple times, Rama operations can emit multiple values per emit. Here’s an example:

1
2
3
4
5
6
7
(deframaop emit-many [*v]
  (:> (inc *v) (dec *v))
  (:> (* *v 2) (/ *v 2)))

(?<-
  (emit-many 9 :> *v1 *v2)
  (println "Result:" *v1 *v2))

This is equivalent to the Clojure CPS code (for brevity, without doing inc , dec , * , or / in CPS):

1
2
3
4
5
6
7
(defn emit-many-clj [v cont]
  (cont (inc v) (dec v))
  (cont (* v 2) (/ v 2)))

(emit-many-clj 9
  (fn [v1 v2]
    (println "Result:" v1 v2)))

Something important here is the caller needs to know how many fields are expected to be given to the continuation. In Rama that’s specified by the number of variables bound to the :> output stream, and in the Clojure version that’s specified by the arity of the passed continuation function. In both cases, you’ll get a runtime error if you bind the incorrect number of continuation outputs. Whereas with a Clojure function you only have to know what arities are valid for inputs, with Rama operations you also must know the arity of the output.

Anonymous operations

Just like how you can declare anonymous functions in Clojure and pass them around as values, you can do the same in Rama with Rama operations. Like anonymous Clojure functions, anonymous Rama operations capture their lexical scope. Here’s a basic example of this:

1
2
3
4
5
6
7
8
9
(deframaop adder [*v1]
  (<<ramaop %ret [*v2]
    (:> (+ *v1 *v2)))
  (:> %ret))

(?<-
  (adder 10 :> %f)
  (%f 3 :> *val)
  (println "Result:" *val))

<<ramaop defines an anonymous Rama operation with the given name, arguments, and body. Vars for anonymous operations are prefixed with % . There’s no difference in functionality between an anonymous Rama op and a top-level one. The above code is equivalent to this Clojure CPS code:

1
2
3
4
5
6
7
8
9
10
(defn adder-clj [v1 cont]
  (let [ret (fn [v2 cont-inner]
              (cont-inner (+ v1 v2)))]
    (cont ret)))

(adder-clj 10
  (fn [f]
    (f 3
      (fn [val]
        (println "Result:" val)))))

As alluded to before, Rama operations can be passed around as values. Here’s an example of passing around top-level Rama operations, anonymous Rama operations, and regular Clojure functions as values in Rama:

1
2
3
4
5
6
7
8
9
10
11
(deframaop times2 [*v]
  (:> (* *v 2)))

(deframaop foo [%f1 %f2 %f3]
  (%f3 (%f2 (%f1 2)) :> *res)
  (:> *res))

(?<-
  (adder 20 :> %f1)
  (foo %f1 inc times2 :> *res)
  (println "Result:" *res))

This prints “Result: 46”.

Emitting asynchronously

Rama operations being able to emit asynchronously is what makes Rama so good for writing parallel and asynchronous code. To demonstrate this, I’ll briefly introduce you to Rama’s cluster programming environment which implements the underlying infrastructure powering the parallel programming primitives you’re about to see. A “Rama module” is what you deploy to a Rama cluster, and it uses dataflow to define all the data ingestion, processing, and indexing for a backend. A module is launched with a configurable number of partitions called “tasks”, and these tasks run across the cluster in processes launched for the module. Dataflow code runs across all tasks in parallel and defines how to react to incoming data.

For a basic example of distributed programming with dataflow, here’s code doing a bank transfer from *from-user-id to *to-user-id in the amount of *amt dollars. This code is a stripped down version of our open-source atomic bank transfer example, and the code for reading/writing that information to durable storage is mocked out to focus on the distributed programming aspects. $$funds here refers to a durable index, similar to a database.

1
2
3
4
5
6
(|hash *from-user-id)
(user-current-funds $$funds *from-user-id :> *funds)
(filter> (>= *funds *amt))
(deduct-funds-from-user! $$funds *from-user-id *amt)
(|hash *to-user-id)
(add-funds-to-user! $$funds *to-user-id *amt)

|hash is called a “partitioner”, and it relocates computation to a different thread/node. The only difference with the other Rama operations you’ve seen is it emits to its continuation asynchronously and potentially on a different thread/node. |hash computes the target task by modding the hash of its argument by the total number of tasks in the module. Hashing ensures the same argument always goes to the same task, while different arguments get evenly distributed across all tasks.

Computation and storage are colocated in Rama. By using partitioners to control where code is executing, you’re able to control to which partitions of durable storage you read and write. This lets you control in a fine-grained way how data is partitioned across durable storage.

What makes partitioners powerful is they’re just like any other Rama operation, and that uniformity enables composition. You can use partitioners just like any other code, such as within conditionals, loops, or helper operations. Code is read linearly without any callback functions even though you’re jumping around the cluster with impunity.

Rama’s implementation of partitioners is similar to this Clojure CPS version:

1
2
3
(defn |hash-clj [k cont]
  (let [task-id (mod (hash k) (num-tasks-in-module))]
    (send-to-task! task-id cont)))

Internally, every task of a module has a queue that runs events in the order in which they arrive. The event sent in this case is the continuation, which when called continues computation where it left off. This is no different than if the emit was done synchronously.

The Rama operation definition is similar, though since we haven’t exposed manipulating continuations in Rama’s public API, the following code is only representative of what the definition looks like internally:

1
2
3
4
5
(deframaop |hash-pseudo [*k]
  (mod (hash *k) (num-tasks-in-module) :> *task-id)
  (<<continuation %cont []
    (:>))
  (send-to-task! *task-id %cont))

<<continuation defines an anonymous Rama operation just like any other, with the difference being that it emits to the caller of its parent rather than its own caller. This is just like the Clojure CPS version: when cont is eventually invoked on the other thread/node, it invokes the code following the call of |hash-clj .

Rama takes care of efficiently serializing the continuation, including any information in its closure. The Rama compiler analyzes what vars are used after every invoke of an operation, and it uses that information to only include in the closure vars that are referenced in downstream code. This minimizes the amount of information sent across the wire. This compiler analysis isn’t specific to partitioners, as its used for closure construction for all anonymous operations.

Partitioners don’t have to emit just one time at one location. Sometimes, for example, you want to run code like this:

1
2
3
4
(|all)
(fetch-information-from-storage $$p :> *v)
(|global)
(aggregate-information *v :> *result)

Code like this is typical for queries that fetch and aggregate information stored across all partitions. |all partitions to all tasks in parallel, and |global always goes to the same task. |all is defined approximately like this:

1
2
3
4
5
(deframaop |all-pseudo []
  (<<continuation %cont []
    (:>))
  (ops/explode (range 0 (num-tasks-in-module)) :> *task-id)
  (send-to-task! *task-id %cont))

And |global is defined approximately like this:

1
2
3
4
(deframaop |global-pseudo []
  (<<continuation %cont []
    (:>))
  (send-to-task! 0 %cont))

CPS and the ability to emit asynchronously unifies general purpose programming with distributed programming, by enabling parallel code to be expressed no differently than any other logic. Partitioners enable Rama code to precisely control not just what is executing, but where.

Emitting to multiple output streams

Emitting zero or multiple times, emitting multiple values, and emitting asynchronously are three ways Rama operations are more general than functions. Another is that Rama operations can emit to output streams besides :> .

Here’s an example Rama operation doing this:

1
2
3
4
5
6
(deframaop emit-multiple-streams []
  (:a> 1)
  (:> 2)
  (:> 3)
  (:a> 4)
  (:b> 5 6))

This emits to three streams: :> , :a> , and :b> . Let’s take a look at an equivalent Clojure function in CPS. Instead of passing in one continuation function, we’ll now pass in a map from output stream to continuation function:

1
2
3
4
5
6
(defn emit-multiple-streams-clj [cont-map]
  ((:a> cont-map) 1)
  ((:> cont-map) 2)
  ((:> cont-map) 3)
  ((:a> cont-map) 4)
  ((:b> cont-map) 5 6))

This isn’t totally accurate, as Rama does not require a caller to provide a continuation for each output stream. If there’s no continuation, then emitting to that output stream is a no-op. So the Clojure code that matches what Rama does is this:

1
2
3
4
5
6
7
8
9
10
11
(defn emit-multiple-streams-clj [cont-map]
  (if-let [cont (:a> cont-map)]
    (cont 1))
  (if-let [cont (:> cont-map)]
    (cont 2))
  (if-let [cont (:> cont-map)]
    (cont 3))
  (if-let [cont (:a> cont-map)]
    (cont 4))
  (if-let [cont (:b> cont-map)]
    (cont 5 6)))

Invoking a Rama operation that emits multiple output streams is a little bit different, as each output stream is a different code path. Here’s an example:

1
2
3
4
5
6
7
8
9
10
(?<-
  (emit-multiple-streams
    :a> <emitted-a> *v
    :b> <b> *v1 *v2
    :> *v)
  (println "Default:" *v)
  (<<branch <emitted-a>
    (println "A:" *v))
  (<<branch <b>
    (println "B:" *v1 *v2)))

Rama code produces an “abstract syntax graph” (ASG), whereas Clojure (and most other languages) produce an “abstract syntax tree” (AST). <emitted-a> and <b> are called “anchors” and label part of the ASG. Those anchors are used by <<branch to specify where that code should attach. You can visualize this code like so:

Rama has other ways to specify how code should be attached. In this particular case, since each stream has only one line of code attached to it, the above code can be written more concisely as:

1
2
3
4
5
6
(?<-
  (emit-multiple-streams
    :a> *v :>> (println "A:" *v)
    :b> *v1 *v2 :>> (println "B:" *v1 *v2)
    :> *v)
  (println "Default:" *v))

:>> is called an inline hook and automatically handles setting up anchors and branching.

The above can be written with Clojure CPS like so:

1
2
3
4
5
(emit-multiple-streams-clj
  {:a> (fn [v] (println "A:" v))
   :b> (fn [v1 v2] (println "B:" v1 v2))
   :> (fn [v] (println "Default:" v))
  })

All of these print:

1
2
3
4
5
A: 1
Default: 2
Default: 3
A: 4
B: 5 6

As mentioned, you can also call emit-multiple-streams without providing continuations for every output stream. For example:

1
2
3
4
5
(?<-
  (emit-multiple-streams
    :b> *v1 *v2 :>> (println "B:" *v1 *v2)
    :> *v)
  (println "Default:" *v))

This is equivalent to:

1
2
3
4
(emit-multiple-streams-clj
  {:b> (fn [v1 v2] (println "B:" v1 v2))
   :> (fn [v] (println "Default:" v))
  })

Both of these print:

1
2
3
Default: 2
Default: 3
B: 5 6

Each continuation and the code attached to it is fully executed before the subsequent line of emit-multiple-streams is run. The behavior of Rama is exactly the same as the Clojure CPS version in this respect.

Rama provides an operation called if> which is the basic primitive for specifying conditional behavior. if> takes in a value and emits to :then> or :else> depending on the truthiness of that value. The operation <<if mentioned before is a Rama macro (called “segmacro”) implemented using if> . Here’s an example of usage of if> :

1
2
3
4
(?<-
  (if> (= 1 2)
    :then> :>> (println "True")
    :else> :>> (println "False")))

This prints “False”. Unlike if in Clojure (as well as the equivalent in pretty much every other programming language), if> is not a special form in Rama. It doesn’t have to be, since it’s no different than any other Rama operation. So it can be passed around just like any Rama operation, like so:

1
2
3
4
5
6
7
8
9
(deframaop exec-if-like-op [%f *v]
  (%f *v
    :then> :>> (:> "True branch")
    :else>)
  (:> "False branch"))

(?<-
  (exec-if-like-op if> true :> *res)
  (println "Result:" *res))

This prints “Result: True branch”.

So far, I’ve never found a reason to pass if> around dynamically like this. What this demonstrates is how Rama’s richer language primitives provide greater uniformity and less special cases.

Also important to note is that if> produces exactly the same bytecode as Clojure’s if when invoked directly (not as an anonymous operation). Rama accomplishes this with an “intrinsic” implementation for if> in its compiler. This doesn’t change anything about semantics and is purely an optimization.

Unification

Rama’s “unification” facility enables separate branches of computation to be merged together. It’s another way to share code that fits naturally into dataflow. Here’s an example:

1
2
3
4
5
6
7
8
9
10
(deframaop foo [*v]
  (:a> (inc *v))
  (:b> (dec *v)))

(?<-
  (foo 10
    :a> <a> *v
    :b> <b> *v)
  (unify> <a> <b>)
  (println "Emit:" *v))

You can visualize this code like so:

unify> is a compile-time directive on how to construct the abstract syntax graph. When either :a> or :b> emit, the code continues on the shared code after the unify> . unify> can merge any number of branches, not just two.

Since the code after a unify> is shared among all its parent branches, there are rules regarding what vars are in scope after a unify> . Only vars defined in all parent branches are in scope. So if you try to reference a var after a unify> that doesn’t exist in all of the parent branches, you’ll get a compile-time error.

The above code can be written in Clojure CPS like so:

1
2
3
4
5
6
7
8
9
10
(defn foo [v cont-map]
  (if-let [cont (:a> cont-map)]
    (cont (inc v)))
  (if-let [cont (:b> cont-map)]
    (cont (dec v))))

(let [shared (fn [v] (println "Emit:" v))]
  (foo 10
    {:a> shared
     :b> shared}))

As you can see, the only way to share code across the two branches is to factor out a helper function defined before the code that executes first. The dataflow version reads much nicer since the code is ordered the same way it executes.

Loops

Dataflow loops are similar to Clojure loops, but like Rama operations they can emit any number of times. Here’s an example of a dataflow loop:

1
2
3
4
5
6
(?<-
  (loop<- [*a 10 :> *v]
    (<<if (>= *a 0)
      (:> *a)
      (continue> (dec *a))))
  (println "Emitted:" *v))

Like a Clojure loop, a dataflow loop has bindings along with initial values. Here the variable *a is initialized to 10. The bindings vector also binds emits from this loop to output variables which will be in scope after the loop –  *v in this case. A loop is recurred with continue> , and emits are done with :> just like emitting from a Rama operation.

This is equivalent to the following Clojure code:

1
2
3
4
5
(let [cont (fn [v] (println "Emitted:" v))]
  (loop [a 10]
    (when (>= a 0)
      (cont a)
      (recur (dec a)))))

Loops compose with everything else in dataflow, including partitioners. This makes it trivial to do distributed loops that hop around the cluster. Such loops are common with graph algorithms where you may be traversing from node to node fetching connections from each partition. For example:

1
2
3
4
5
6
7
(loop<- [*node-id *start-node-id :> *ancestor-node]
  (|hash *node-id)
  (fetch-node-parent $$parents *node-id :> *parent-id)
  (<<if *parent-id
    (:> *parent-id)
    (continue> *parent-id)
    ))

Like before, the details of storage are mocked out to focus on the computation aspects. Here $$parents represents a datastore mapping the parent for every node. This code fetches all ancestors for a node ID by traversing the graph across the cluster in a loop.

Optimizations in dataflow compiler

As mentioned earlier, passing a continuation on every invocation is inefficient and would likely cause a stack overflow if done for every invocation in a program. Rama has a number of optimizations to make the bytecode it produces just as efficient as Clojure. I’ll focus on one particularly important optimization.

When a Rama operation emits exactly one time, synchronously, and as the last thing it does, then it’s like a function. Rama provides two ways of defining operations: deframaop , as already shown, and deframafn . deframafn is just like deframaop except its implementation must synchronously emit exactly one time to the :> stream as the last thing it does.

For every invoke, Rama determines if it’s executing a deframaop or a deframafn . If it’s a deframafn , then it invokes it just like how functions are invoked in Clojure by unrolling the stack frame with the return value. For example, consider this Rama code:

1
2
3
4
5
6
7
8
9
10
(deframafn double-value [*v]
  (:> (* 2 *v)))

(deframaop bar []
  (:> (ops/explode [1 2 3 4])))

(?<-
  (bar :> *v)
  (double-value *v :> *v2)
  (println *v2))

The Rama execution will be very similar to this Clojure CPS:

1
2
3
4
5
6
7
8
9
(defn bar-clj [cont]
  (explode-clj [1 2 3 4]
    (fn [v]
      (cont v))))

(bar-clj
  (fn [v]
    (let [v2 (double-value v)]
      (println v2))))

The calls to double and println do not pass a continuation and instead do optimized invokes as functions.

It’s also worth noting that since a deframafn works just like a Clojure function, it can be invoked directly from Clojure like any Clojure function.

deframafn only has restrictions on emits to the :> output stream – it can emit to all other streams any number of times and/or asynchronously. Internally, we refer to operations like that as “semi-functions”. Rama determines whether a deframafn is a semi-function or regular function by statically analyzing whether emits are done to any other output stream beside :> . If so, invokes of that operation will unroll the stack and pass a continuation for the other output streams.

Finally, the ramafn> annotation can be used to tell Rama an anonymous operation is a ramafn .

The ramafn optimization is critical because the majority of code is still best written with function semantics. So most of a codebase will compile to stack-efficient invokes. Emitting multiples times, zero times, or asynchronously is powerful but less common.

The general term we use to refer to an object which is either a ramafn or ramaop is “fragment”. A ramafn is a fragment that has restrictions on the :> stream, while a ramaop has no restrictions.

Conclusion

Dataflow turns CPS into a full-fledged programming paradigm that’s elegant and efficient. This paradigm isn’t just for backend programming, like data processing, indexing, and querying. It’s a general purpose paradigm that we’ve used for building a huge amount of Rama itself. Emitting zero times, multiple times, asynchronously, or to multiple output streams are major generalizations of functions that open up huge new avenues to explore in the craft of programming. One of the joys of working on Rama has been the opportunity to explore and develop new techniques utilizing this new programming paradigm.

There’s a lot I didn’t cover in this post, like “segmacros” (macros that produce dataflow code) and “batch blocks” (a slightly declarative form of dataflow that has equivalent functionality as relational languages, like joins and aggregation). These additional capabilities are documented on this page.

Rama as a cluster platform adds durable storage into the mix, using dataflow to process distributed logs and produce indexes of any shape. It generalizes the ideas of event sourcing and materialized views into a unified system, providing strong fault-tolerance and ACID semantics. Dataflow is one of the keys to how it’s such a generally applicable platform, as it gives tight control over what, where, and how code executes.

Leave a Reply

Your email address will not be published. Required fields are marked *