Tag Archives: multithreading

Clojure’s Concurrency: easy atoms

Clojure’s atoms let one transactionally and atomically store values. When one uses an atom, Clojure manages the mutation of the value and guarantees atomicity. Such a feature is very useful in a highly concurrent application. Much like Java’s Atomic* classes, but somewhat more powerful.

This is a brief introduction.

Atomic Values
To define an atom, one simply invokes the atom function value with an initial value (just like agents; see also agents and futures):
(def a (atom 0)) ; create an atom and let me refer to it by the identifier a

Now a is our reference to an atom which is managed by Clojure.

To get the value held in a, we simply dereference it:
user=> @a ; or (deref a)
0

Applying state changes to atoms can be achieved in two ways.

swap!
Calling swap! on an atom and passing a function will synchronously and atomically compare-and-set the atom’s value. If we wanted to increase the value held in a we’d do the following:
(swap! a inc)

At which point a would now hold 1:
user=> @a
1

Note: most of the state-mutation functions in Clojure’s concurrency features return the new/current state of the target. For atoms, calling swap! will return the new value should it succeed.

The compare-and-set nature is very useful because agents have another powerful element: validators. When defining an agent, you can optionally pass the :validator key and a function. This function will be used for the compare-and-set “phase”.

Let’s redefine a with our new validator:
(def a (atom 0 :validator even?))

This basically says “let a reference an atom whose initial value is 0 and call the function even? before setting any new values”.

If we then called swap! with the inc function on an initial value of 0, we’d expect it to fail:
user=> (swap! a inc)
IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)

which is awesome, as we can get the atomicity from using atoms but not have code dotted around the place performing pre-requisite checks on new values and all supporting high concurrency.

reset!
Another way of mutating an atom is by using the reset! function to assign the atom a new value:
(reset! a 4)

If an atom has a validator assigned, this will still execute when calling reset!:
user=> (reset! a 1)
IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)

00 (agent) – Clojure’s Concurrent Agents in action

or Clojure’s concurrency agents – a simple example
Clojure has some fantastic concurrency idioms. Agents are but one example of what’s available to the JVM lisper — alongside refs, atoms and vars — that make concurrent programming very easy.

I thought I’d articulate my understanding to help embed the knowledge but to also provide a simple example for newcomers coming to Clojure.

Agents
Clojure’s agent constructs are its way of providing asynchronous read/writes that guarantee visibility across threads but give you the option of not “caring” when those events occur. Put simply, agents are a wrapper for values that can be written/read by multiple “clients” with no guarantee of order. An example would be a fork/join solution: you divide the problem into “bitesize” parts that can then be combined at a common barrier point for combination and utilisation elsewhere.

Here’s how you define an agent:
(def a (agent 0)) ; define an agent with an initial value of 0

To read an agent’s value, we just dereference it:
(println @a) ; or (deref a)

To write, you send a function to apply to the value in the agent, like so:
(send a inc) ; apply inc to a

which results in increment of a’s value, to 1.

Clojure’s documentation calls the “inc” in the (send) call above the “action-fn”:
user=> (doc send)
-------------------------
clojure.core/send
([a f & args])
Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread from a thread pool, the state of the agent
will be set to the value of:

(apply action-fn state-of-agent args)

Executing asynchronously with agents
As stated, agents allow you to execute something asynchronously (in a thread pool managed by Clojure) whose execution order you don’t require control over. Every time you (send) to an agent you can effectively assume the function to apply will be executed as part of a queue (and such queueing is done so synchronously, though the execution is not).

To demonstrate this asynchronous nature, we can extend our previous example to utilise the (await) function.

Let’s define a new agent, b:
(def b (agent 10000))

whose initial value is 10000.

Now, let’s have a function that will A) utilise b‘s current value to do something and B) also change its state.

Using Thread/sleep and the value of b we can satisfy A above and simply use (inc) to change b‘s state:
(defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))

So what do we have?

On paper, if we call bond, we’d wait for @b milliseconds and then increment b by one. Lovely and simple. Defining the logic above in a function (bond) allows us to easily demonstrate the nature of agents and the (send) function: that (send) will queue the action-fn (the function we give to apply to the agent) and return immediately, and that the action-fn will execute in a seperate thread and be applied to the agent.

Let’s prove that (bond) is doing what we expect by calling it:
user=> @b ; prove b's value is as-initialised
10000
user=> (bond) ; invoke bond which will (send) to the agent
#<Agent@351c2555: 10000>
user=> @b ; derefence (access the value of) b to prove it hasn't changed
10000

No change thus far…Lest we wait 10 seconds…
user=> @b
10001

Aha! The agent’s value has changed. (bond) has accomplished his mission!

If we wanted to wait until the action-fn had finished — e.g. you’re implementing a fork/join solution — we can simply use (await):
user=> (def b (agent 10000))
#'user/b
user=> (defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))
#'user/bond
user=> (bond)
#<Agent@4382d44b: 10000>
user=> (await b)
; waiting...
nil
user=> @b
10001