Tag Archives: threads

Clojure’s Concurrency: easy atoms

Clojure’s atoms let one transactionally and atomically store values. When one uses an atom, Clojure manages the mutation of the value and guarantees atomicity. Such a feature is very useful in a highly concurrent application. Much like Java’s Atomic* classes, but somewhat more powerful.

This is a brief introduction.

Atomic Values
To define an atom, one simply invokes the atom function value with an initial value (just like agents; see also agents and futures):
(def a (atom 0)) ; create an atom and let me refer to it by the identifier a

Now a is our reference to an atom which is managed by Clojure.

To get the value held in a, we simply dereference it:
user=> @a ; or (deref a)

Applying state changes to atoms can be achieved in two ways.

Calling swap! on an atom and passing a function will synchronously and atomically compare-and-set the atom’s value. If we wanted to increase the value held in a we’d do the following:
(swap! a inc)

At which point a would now hold 1:
user=> @a

Note: most of the state-mutation functions in Clojure’s concurrency features return the new/current state of the target. For atoms, calling swap! will return the new value should it succeed.

The compare-and-set nature is very useful because agents have another powerful element: validators. When defining an agent, you can optionally pass the :validator key and a function. This function will be used for the compare-and-set “phase”.

Let’s redefine a with our new validator:
(def a (atom 0 :validator even?))

This basically says “let a reference an atom whose initial value is 0 and call the function even? before setting any new values”.

If we then called swap! with the inc function on an initial value of 0, we’d expect it to fail:
user=> (swap! a inc)
IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)

which is awesome, as we can get the atomicity from using atoms but not have code dotted around the place performing pre-requisite checks on new values and all supporting high concurrency.

Another way of mutating an atom is by using the reset! function to assign the atom a new value:
(reset! a 4)

If an atom has a validator assigned, this will still execute when calling reset!:
user=> (reset! a 1)
IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)

Clojure’s Concurrency: Futures and Agents in Harmony

I’ve previously written on the wonders of Clojure’s agents, giving the programmer a wonderfully easy way of writing asynchronous code with very little effort.

Here’s a slightly more complex example for those wanting more context.

Combining Futures and Agents
We’ll use this (deliberately poor) inefficient find-primes in a (future) to allow for asynchronous processing in a seperate thread — which will also write to our agent — and continue with other tasks as necessary.

Futures are Clojure’s way of syphoning off some calculation in the background so you can retrieve it later on. It’s fork/join, but a hell of a lot simpler.

To define a future, all we have to do is assign it to some variable:
(def f (future (some-complex-long-running-function)))

and when we’re ready to get the value from the future, we just dereference it:
user=> @f

If the future is still processing when you dereference, it will block. This isn’t the same as agents which won’t block, but pass you the current value, unless you use (await).

Our inefficient (find-primes) function is a prime (HAH) candidate for asynchronous execution: knowing that it’s slow means we can let it run in the background while we favour other, more pressing tasks in our main thread.

(def f (future (find-primes 60000)))

So f‘s our reference to a new future that is happily running in the background. Calculating all primes up to 60,000 will take a while with the poor (find-primes) implementation (around 14 seconds). Let’s do the user a favour and present the results in a GUI. Here’s the full function:
(defn show-primes [i]
"Find all primes up to i inclusive and present them in a GUI"
(let [fr (JFrame. "Prime Numbers")
lbl (JLabel. (str "Here are all the prime numbers for " i ":"))
ta (JTextArea.)
sp (JScrollPane. ta)
pane (.getContentPane fr)]
(def f (future (find-primes i))) ; asynchronously find the primes while we set up the GUI
(.setPreferredSize lbl (Dimension. 410 20))
(.setPreferredSize sp (Dimension. 410 190))
(.setLineWrap ta true)
(.setSize fr 410 210)
(.add pane lbl BorderLayout/PAGE_START)
(.add pane sp BorderLayout/CENTER)
(.pack fr)
(.setVisible fr true)
(dotimes [n (count @f)] (.setText ta (str (.getText ta) (@f n) "\n")))))

As you can see at the future line, we let Clojure asynchronously execute the prime generation function while we set the GUI, then we add to the text area by derefencing the future — which will block if its work isn’t complete — and finally present the results.

It’s lovely: with one function we can fork calculation and get on with something else, retrieving the results at a later stage. Simple, easy.

00 (agent) – Clojure’s Concurrent Agents in action

or Clojure’s concurrency agents – a simple example
Clojure has some fantastic concurrency idioms. Agents are but one example of what’s available to the JVM lisper — alongside refs, atoms and vars — that make concurrent programming very easy.

I thought I’d articulate my understanding to help embed the knowledge but to also provide a simple example for newcomers coming to Clojure.

Clojure’s agent constructs are its way of providing asynchronous read/writes that guarantee visibility across threads but give you the option of not “caring” when those events occur. Put simply, agents are a wrapper for values that can be written/read by multiple “clients” with no guarantee of order. An example would be a fork/join solution: you divide the problem into “bitesize” parts that can then be combined at a common barrier point for combination and utilisation elsewhere.

Here’s how you define an agent:
(def a (agent 0)) ; define an agent with an initial value of 0

To read an agent’s value, we just dereference it:
(println @a) ; or (deref a)

To write, you send a function to apply to the value in the agent, like so:
(send a inc) ; apply inc to a

which results in increment of a’s value, to 1.

Clojure’s documentation calls the “inc” in the (send) call above the “action-fn”:
user=> (doc send)
([a f & args])
Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread from a thread pool, the state of the agent
will be set to the value of:

(apply action-fn state-of-agent args)

Executing asynchronously with agents
As stated, agents allow you to execute something asynchronously (in a thread pool managed by Clojure) whose execution order you don’t require control over. Every time you (send) to an agent you can effectively assume the function to apply will be executed as part of a queue (and such queueing is done so synchronously, though the execution is not).

To demonstrate this asynchronous nature, we can extend our previous example to utilise the (await) function.

Let’s define a new agent, b:
(def b (agent 10000))

whose initial value is 10000.

Now, let’s have a function that will A) utilise b‘s current value to do something and B) also change its state.

Using Thread/sleep and the value of b we can satisfy A above and simply use (inc) to change b‘s state:
(defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))

So what do we have?

On paper, if we call bond, we’d wait for @b milliseconds and then increment b by one. Lovely and simple. Defining the logic above in a function (bond) allows us to easily demonstrate the nature of agents and the (send) function: that (send) will queue the action-fn (the function we give to apply to the agent) and return immediately, and that the action-fn will execute in a seperate thread and be applied to the agent.

Let’s prove that (bond) is doing what we expect by calling it:
user=> @b ; prove b's value is as-initialised
user=> (bond) ; invoke bond which will (send) to the agent
#<Agent@351c2555: 10000>
user=> @b ; derefence (access the value of) b to prove it hasn't changed

No change thus far…Lest we wait 10 seconds…
user=> @b

Aha! The agent’s value has changed. (bond) has accomplished his mission!

If we wanted to wait until the action-fn had finished — e.g. you’re implementing a fork/join solution — we can simply use (await):
user=> (def b (agent 10000))
user=> (defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))
user=> (bond)
#<Agent@4382d44b: 10000>
user=> (await b)
; waiting...
user=> @b