It is often said that “Learning x will make you a better
programmer”, for various values of x.
In my experience the secret to making that true lies not so much in
the choice of a given x, but rather in the ability to apply ideas
from x to y, and then getting exposure to a wide range of
such ideas.
This post aims to show a fully worked example of this process: we will look at a problem involving distributed transactions in a very high level functional language, and solve it with a technique used to rearchitect object oriented code in the presence of manual memory management. Let’s dive in.
Intro: distributed transactions in Unison
My day job involves designing and implementing distributed systems for Unison Cloud, the next-gen cloud platform we’re building on top of the Unison language.
A unique characteristic of our cloud is the power to manipulate persistent and transactional storage as if it was an in-memory data structure, via a set of abilities : Unison’s take on algebraic effects that can express custom control flow abstractions as ordinary straight-line code.
The model is fairly simple: you write programs in the Transaction
ability that operate on one or more typed key-value tables, and call
transact
to delimit a transactional boundary, which will require the
Storage
ability.
type Table k v = Table Text
ability Transaction where
write.tx : Table k v -> k -> v ->{Transaction} ()
tryRead.tx : Table k v -> k ->{Transaction} Optional v
delete.tx : Table k v -> k ->{Transaction} ()
-- like tryRead.tx, but fails on key not found
read.tx: Table k v -> k ->{Transaction, Exception} v
transact : Database -> '{Transaction, Exception, Random} a ->{Exception, Storage} a
As a quick syntax primer, a -> b ->{g} c
is a function from a
and
b
to c
that performs effects defined by the g
ability, '{g} a
is syntactic sugar for the type of the thunk () ->{g} a
, and lower
case letters in type signatures indicate generic type parameters.
Function calls are just whitespace (f a b
), do ...
is syntactic
sugar for thunks _ -> ...
, and |>
creates function pipelines.
Here’s the canonical bank transfer example, Bob sends 10 pounds (represented as pennies) to Alice:
type UserId = UserId Text
-- populated with data elsewhere
accounts: Table UserId Nat
accounts = Table "accounts"
transfer: Database ->{Exception, Storage} ()
transfer db =
bob = UserId "Bob"
alice = UserId "Alice"
transact db do
from = read.tx accounts bob
to = read.tx accounts alice
amount = 10 * 100
if from >= amount
then
write.tx accounts bob (from - amount)
write.tx accounts alice (to + amount)
else
Exception.raiseGeneric "insufficient balance" (bob, from)
-- no infra needed to run code on cloud!
Cloud.run do
db = Database.default()
-- `submit` accepts thunks with the Storage ability
Cloud.submit Environment.default() do
transfer db
The code snippet above runs transfer
on Unison Cloud, where the data
is persisted on our distributed storage, and the implementation of
transact
guarantees that the transaction executes atomically.
Data structures
Transactions on typed key-value tables are a flexible building block, and we can build data structures like we do for in-memory data.
Let’s build a Counter
:
type Counter = Counter (Table () Nat)
Counter.named: Text -> Counter
Counter.named name = Counter (Table name)
The idea is that we can represent the state of the counter in a table
with a single key of the unit type ()
. Just like any other other
data structure, we can write functions that operate on our Counter
,
using the Transaction
ability to retain atomicity:
Counter.getAndIncrement: Counter ->{Transaction} Nat
Counter.getAndIncrement counter =
(Counter state) = counter
n = tryRead.tx state () |> Optional.getOrElse 0
write.tx state () (n + 1)
n
We can use our Counter
to build an append-only log, similar to a
Kafka partition, by pairing a Counter
with a Table Nat a
. The
Counter
will keep track of the current size of the log so we know
where to append next, and the table will host the elements, indexed by
their offset.
type Log a = Log Counter (Table Nat a)
Log.named: Text -> Log a
Log.named name = Log (Counter.named (name ++ "-size")) (Table name)
Reading an element from the log is straightforward:
Log.at : Nat -> Log a ->{Transaction} Optional a
Log.at n log =
(Log _ elems) = log
tryRead.tx elems n
Appending to the log is slightly more involved but still pretty easy:
Log.append: a -> Log a ->{Transaction} ()
Log.append v log =
(Log size elems) = log
n = getAndIncrement size
write.tx elems n v
note that size
and elems
cannot go out of sync since we’re still
in the Transaction
ability.
A call to Log.append
executes atomically when we call transact
:
type Track = Track Text
playlist: Log
playlist = Log.named "my-playlist"
transact myDb do
playlist |> append (Track "Obstacles")
and we can append multiple elements atomically as well:
transact myDb do
playlist |> append (Track "Sea above, sky below")
playlist |> append (Track "Featherweight")
Let’s conclude this section with a couple of notes on semantics that will be useful later.
The first is that the Table
constructor, and by extension
Counter.named
and Log.named
, don’t actually create anything in
storage there and then. A Table
is just a logical handle, all the
action happens during transact
, which takes the actual Database
we
will modify. This is actually guaranteed by the types too:
Counter.named
and Log.named
don’t advertise any abilities, which
means that they have no side-effects.
The second is about the execution model of transactions, to help us
reason about performance. The implementation of transact
uses
Optimistic Concurrency Control, which means that the first read of
each key goes to storage, whilst writes (and reads to keys that have
been written to) are buffered in memory. When the transaction completes,
transact
will try to atomically commit all the writes to storage at
once. If there is a conflict, it retries the whole transaction, which
it can do because the Unison type system guarantees that the thunk
passed to transact
doesn’t perform any other effects that wouldn’t
be safe to retry arbitrarily (like an HTTP call).
Case study: stream storage
We can now look at the problem this post will be centred around, which is a simplified version of something I’ve encounted while developing volturno, a distributed stream processing framework (think Kafka + Flink).
Imagine we need to design the storage layer for keyed data streams. For our toy version, we can go with something like this:
type Key
type Event
streams: Table Key (Log Event)
where we basically store a log of events for each key (the real code uses sharding, but we will keep the simpler version above in this post).
We want to implement a function to publish a batch of events to our
streams
Table
:
publish: Database -> [(Key, Event)] ->{Remote} ()
Remote
is the ambient ability in Unison Cloud, it’s very powerful
but we won’t cover it here for space reasons, just know that we can
call toRemote
to embed other cloud abilities in it, like Storage
,
Exception
or Random
.
Ok, let’s plan the implementation out. We need to group the batch of events by key, which we can do with:
List.groupMap : (a -> k) -> (a -> v) -> [a] -> Map k (List.Nonempty v)
We then want to upload batches of events for different keys in parallel,
let’s use Remote.parMap
:
Remote.parMap : (a ->{Remote} t) -> [a] ->{Remote} [t]
At this point we will be dealing with batches of events for a single
key that have to be appended sequentially, so we can have a
transaction that fetches the relevant Log
and iterates through the
keyed batch to append the events.
Here’s the full code. Note the use of the cases
keyword to pattern
match on the (key, events)
tuple in a lambda:
publish: Database -> [(Key, Event)] ->{Remote} ()
publish db messages =
messages
|> List.groupMap at1 at2
|> Map.toList
|> Remote.parMap cases (key, events) ->
toRemote do publishKey db key (toList events)
|> ignore
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
transact db do
log = read.tx streams key
events |> foreach (event -> log |> append event)
That looks pretty good, but unfortunately publishKey
has a bug. We
fetch the relevant log with read.tx streams key
, which will fail if
the log isn’t there, but nothing guarantees the log will be there.
The logs are per key, so we cannot create them all in advance as we
don’t know all the keys in advance. Instead, we will create each log
on demand if we cannot find it in storage when we want to write some
events to it. We will use randomName: '{Random} Text
to generate a
name for our log:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
transact db do
log = match tryRead.tx streams key with
Some log -> log
None ->
log = Log.named randomName()
write.tx streams key log
log
events |> foreach (event -> log |> append event)
Wrestling with optimisation
We’re dealing with a performance sensitive system, so we have to be
conscious about optimising our code properly. To start with, we’re
publishing all the events for a given key in a single transaction.
Transactions actually have a size limit, so this isn’t wise.
On the other hand, we don’t want to publish each event in its own transaction
and give up batching entirely, so we’ll compromise by sending events in
batches of 25, using chunk: Nat -> [a] -> [[a]]
for help:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
events
|> chunk 25
|> foreach (chunk ->
transact db do
log = match tryRead.tx streams key with
Some log -> log
None ->
log = Log.named randomName()
write.tx streams key log
log
chunk |> foreach (event -> log |> append event)
)
Ok, but now note how log
is read from storage multiple times
(one per chunk), even though we know that after the first chunk it
will certainly have been created (by us) if it didn’t exist.
Well, we can move the code that checks or creates the log to a separate transaction at the start:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
log = transact db do
match tryRead.tx streams key with
Some log -> log
None ->
log = Log.named randomName()
write.tx streams key log
log
events
|> chunk 25
|> foreach (chunk ->
transact db do
chunk |> foreach (event -> log |> append event)
)
Remember that the above is still correct: even if we get the log
and
something else modifies it straight after, each call to Log.append
is checking the size of the log transactionally before appending.
This version of the code does avoid reading the log on each chunk, but it’s still not optimal: if the log doesn’t exist and we do need to create it, we will have this extra transaction just to create the log, instead of creating it in the same transaction that also adds the first chunk. In other words, we’re “wasting” one transactional roundtrip that could carry some messages instead.
The optimal behaviour requires trickier code, we want to get (and potentially create) the log on the first chunk, and then carry it across the other chunks afterwards, using a fold:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
events
|> chunk 25
|> foldLeft_ None (log chunk ->
transact db do
log' = match log with
Some log -> log
None -> match tryRead.tx streams key with
Some log -> log
None ->
log = Log.named randomName()
write.tx streams key log
log
chunk |> foreach (event -> log' |> append event)
Some log'
)
|> ignore
Ok, this behaves as we want it to… but it’s starting to get pretty gnarly. It’s not terrible in this short snippet, but the real code dealt with additional concerns such as error handling, and this log creation logic was really tipping the scale and making it hard to understand.
Now, it’s reasonable at this point to want to introduce some more
named helpers to clean it up, but that’s not as great an idea as it
sounds in this type of system: named helpers might preserve (or even
clarify) the intent of the code, but they obscure the access
patterns to the data, which is important information for systems code
to convey. A couple of named helpers (mapChunked
, getLog
) can make
the very first version we had look quite harmless, for example:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
events |> mapChunked (chunk ->
transact db do
log = getLog key
events |> foreach (event -> log |> append event)
)
On top of that, there’s another instinct that’s even more pernicious: a subtle bias to make the behaviour of the system worse in order to have prettier code. Not every optimisation is worth its complexity, of course, however pretty code is first and foremost a tool to achieve the desired behaviour, not the other way around. This risk is particularly prominent in a functional programming language, which is typically geared towards elegant code.
So, is this the best we can do? Turns out it’s not, but to see how we need to go looking for ideas in some unexpected places…
OOP and Data Oriented Design
Unison is decidedly not an object oriented language: it has no classes, no methods, no inheritance, not even interfaces. And yet, the design we’ve seen embodies some of the ideas that have made OO popular: it’s based on data types that mirror their logical domain, keep their internals encapsulated, and expose their behaviour through an explicit api.
In fact, here’s how our data model could be written in a generic-looking OO language:
class Counter(state: Int) {
def getAndIncrement(): Int {
result = state
state = state + 1
return result
}
}
class Log[A](counter: Counter, elems: Array[A]) {
def at(n: Int): Int {
return elems[n]
}
def append(a: A) {
n = counter.getAndIncrement()
elems[n] = a
}
}
class Key(..)
class Event(..)
class Streams(streams: Map[Key, Log[Event]])
There is a lot to like about this model: the behaviour of each component is easily understood just via its public api, and richer behaviour is achieved by combining these small components together.
Furthermore, the common FP criticism about the dangers of mutation wouldn’t translate to our original code: transactions guarantee serializability, which is very easy to reason about.
There is however another angle for critiquing this model, using the perspective of Data Oriented Design. The ideas behind Data Oriented Design come largely from videogame development, in a context where optimising memory access matters a lot, and you’re managing memory manually.
So let’s apply this lens to the code snippet above. We’re going to assume a runtime representation similar to Java, Scala or Ruby, except without a GC.
The main thing to note is how many pointers are involved at runtime
here: Streams
has a pointer to a Map
, which has pointers to
various instances of Log
, which have pointers to instances of
Counter
, and so on.
This is problematic for two reasons:
- From a performance point of view, accessing a piece of data involves several roundtrips to memory as we hop from pointer to pointer. Also, creating data requires a bunch of tiny individual heap allocations.
- From a simplicity point of view, manual memory management is error
prone as each pointer has to have its memory deallocated
individually, and in a specific order. E.g. freeing our
Streams
class involves iterating over theMap
to free eachLog
, and freeing eachLog
involves freeing theCounter
and iterating over theArray
to free eachA
, etc.
Data Oriented Design on the other hand would forego nested pointers in favour of structuring data as flat arrays indexed by integer IDs. We could then access it efficiently by reading whole chunks of memory, and allocate it and deallocate it in bulk, without worrying about the lifetimes of its individual pieces.
But beyond specific technical strategies, Data Oriented Design advocates for a very different way to approach data modelling: we should not strive for code to reflect the logical domain we’re working in, but rather frame programs as data transformations, and then identity the simplest, most efficient way for the machine to perform the desired transformation.
Now, it’s easy to dismiss all this as supremely irrelevant to us: we do have a GC, we enjoy it very much thank you, and we’re in a far higher level language anyway where this minutiae ought not to matter.
But let’s zoom out a bit: it is true that in a higher level language
there’s less emphasis on counting every single memory access, however
in our transactional code the pointer hopping mapped to reads
from storage, and we did care about minimising those. We also didn’t
have to deal with manual deallocation of memory, but the chief
complication in publishKey
could indeed be framed as a problem with
lifetimes, specifically about having to create these Log
instances
at the right time.
So let’s try to apply Data Oriented Design to our problem and see if it bears any fruit.
Data Oriented Design in action
Let’s look back at the most optimised version of our code:
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
events
|> chunk 25
|> foldLeft_ None (log chunk ->
transact db do
log' = match log with
Some log -> log
None -> match tryRead.tx streams key with
Some log -> log
None ->
log = Log.named randomName()
write.tx streams key log
log
chunk |> foreach (event -> log' |> append event)
Some log'
)
|> ignore
Remember that the complexity in this code is to ensure that each call
to publishKey
fetches the log once, if it exists, and creates it in
the same roundtrip that carries the first chunk of events, if it
doesn’t.
Also note that append
involves another read from storage to fetch
the latest size of the log: that call has to happen once per chunk,
so that we don’t overwrite any events if a concurrent call to
publishKey
has published some events in between two of our chunks.
It might seem like this storage read happens once per message rather
than once per chunk, but subsequent calls to append
in a single
transaction will read the log size from memory since previous appends
have buffered a write to it (this ensures that transactions see a
consistent snapshot).
Ok, now let’s apply Data Oriented Design instead, we will forget about our existing abstractions in favour of looking at the essential data transformations that define our problem.
What data do we need to read a message in a keyed stream? Such a message would be identified by its key, and by its index in the stream, so we have a mapping:
(Key, Nat) ---> Event
That’s not enough information to write new events though, we also have to keep track of the size of each stream, to know which index to write at next. There is one stream per key, so we need a mapping:
Key ---> Nat
We can easily represent these mappings as Table
s:
streams: Table (Key, Nat) Event
streamSizes: Table Key Nat
writing a chunk of events is now straightforward: in a single
transaction we read the size of the stream for a given key, compute
the range of indexes the new events will have, and write the new size
to the streamSizes
table, and the events to the streams
table. We
do that for every chunk in our input batch.
Here’s the new and improved code:
streams: Table (Key, Nat) Event
streams = Table "streams"
streamSizes: Table Key Nat
streamSizes = Table "stream-sizes"
publishKey: Database -> Key -> [Event] ->{Storage, Exception} ()
publishKey db key events =
events
|> chunk 25
|> foreach (chunk ->
transact db do
start = tryRead.tx streamSizes key |> Optional.getOrElse 0
write.tx streamSizes key (start + size chunk)
chunk
|> indexed
|> foreach cases (event, n) -> write.tx streams (key, start + n) event
)
That’s a lot less convoluted!
But not just that, it’s also more performant than the most optimised
version we had previously: both versions read the log size on each
chunk, but the previous version would also have to read and
potentially write the Log
object itself on the first chunk of each
call, that’s just
gone here. And note how much easier it is to reason about the access
patterns to storage in the first place, previously we’d have to look
at publishKey
, Log.append
and Counter.getAndIncrement
, whereas
now we just have to look at publishKey
.
But hold on, you might say, how can the log management just disappear, have we lost anything in the transition to this new design? Well, in retrospect we can see how the previous design was more powerful than we actually need, this representation:
streams: Table Key (Log Event)
can also represent dynamic logs, for example a log that expires every 15 minutes and gets replaced by a new one.
We don’t need this power here, but are forced to pay for it
regardless, both in the complexity of dealing with log creation, and
with the performance hit of having each call to publishKey
read the
Log
object, even though its identity will never change, only its
contents will.
Focusing on essential data transformations helped us hone in on exactly what we need, and delivered code that is both simpler and faster in this scenario.
It’s also interesting how Data Oriented Design changed our code in
much the same way it does when applied to an in-memory OO codebase: we
no longer have these self-contained objects like Log
and Counter
,
connected by nested pointers (dynamic table names), with pointer
hopping (streams
–> Log
–> Counter
). Instead, data is layed
out in flat tables with static indexes.
Conclusion
You shouldn’t walk away from this post thinking that there is a universal winner between the two approaches we discussed today. Instead, you should look critically at the tradeoffs.
The OO design is more composable, it gives us nice building blocks that we can package in a library and reuse in interesting ways to easily assemble novel behaviour. Conversely, if we’re willing to give up flexibility, Data Oriented Design can result in simpler code, and with an easier path to optimal performance.
There is something deeper here, regarding the nature of abstraction itself: abstraction generalises solutions to similar problems by forgetting about their differences.
For example, we can generalise several scenarios where data needs to
be linearised into the concept of a Log
, which leaves space in our
brain to think about other parts of the problem and therefore raises
the ceiling of what we’re able to accomplish. However, the most
optimal solution for a given subproblem relies almost by definition
on the aspects that make it unique in a class of similar problems, and
those are more easily discovered by tearing abstraction down. In our
case, foregoing the Log
abstraction showed how we don’t have to
worry about log lifetimes at all.
I would argue that a greater appreciation of these tradeoffs is as big a payoff from our journey than the code improvement that we got.
In fact, beyond any specific technique, it is the compounding of this type of understanding that, in my experience, actually makes you a better programmer.
Appendix: 3 practical points for exploiting far out ideas
A lot of interesting ideas are found in somewhat niche communities: functional programming, game development, formal methods, futuristic HCI, etc. Be curious.
At a first glance, many bad ideas make perfect sense, and many good ideas sound like utter nonsense. Don’t take anything at face value, but don’t close your mind too early either.
Unfortunately, cool ideas are often presented as universal truths that ought to apply to every field of software engineering. Instead, they are typically borne out of a specific context. Strive to really understand it, and you will find ways to apply those ideas in interesting, unexpected places that share just a tiny bit of that context. Equally as importantly, you will know when you can safely dismiss them as well.