Monthly Archives: July 2012

Clojure’s Concurrency: Futures and Agents in Harmony

I’ve previously written on the wonders of Clojure’s agents, giving the programmer a wonderfully easy way of writing asynchronous code with very little effort.

Here’s a slightly more complex example for those wanting more context.

Combining Futures and Agents
We’ll use this (deliberately poor) inefficient find-primes in a (future) to allow for asynchronous processing in a seperate thread — which will also write to our agent — and continue with other tasks as necessary.

Futures are Clojure’s way of syphoning off some calculation in the background so you can retrieve it later on. It’s fork/join, but a hell of a lot simpler.

To define a future, all we have to do is assign it to some variable:
(def f (future (some-complex-long-running-function)))

and when we’re ready to get the value from the future, we just dereference it:
user=> @f
1

If the future is still processing when you dereference, it will block. This isn’t the same as agents which won’t block, but pass you the current value, unless you use (await).

Our inefficient (find-primes) function is a prime (HAH) candidate for asynchronous execution: knowing that it’s slow means we can let it run in the background while we favour other, more pressing tasks in our main thread.

(def f (future (find-primes 60000)))

So f‘s our reference to a new future that is happily running in the background. Calculating all primes up to 60,000 will take a while with the poor (find-primes) implementation (around 14 seconds). Let’s do the user a favour and present the results in a GUI. Here’s the full function:
(defn show-primes [i]
"Find all primes up to i inclusive and present them in a GUI"
(let [fr (JFrame. "Prime Numbers")
lbl (JLabel. (str "Here are all the prime numbers for " i ":"))
ta (JTextArea.)
sp (JScrollPane. ta)
pane (.getContentPane fr)]
(def f (future (find-primes i))) ; asynchronously find the primes while we set up the GUI
(.setPreferredSize lbl (Dimension. 410 20))
(.setPreferredSize sp (Dimension. 410 190))
(.setLineWrap ta true)
(.setSize fr 410 210)
(.add pane lbl BorderLayout/PAGE_START)
(.add pane sp BorderLayout/CENTER)
(.pack fr)
(.setVisible fr true)
(dotimes [n (count @f)] (.setText ta (str (.getText ta) (@f n) "\n")))))

As you can see at the future line, we let Clojure asynchronously execute the prime generation function while we set the GUI, then we add to the text area by derefencing the future — which will block if its work isn’t complete — and finally present the results.

It’s lovely: with one function we can fork calculation and get on with something else, retrieving the results at a later stage. Simple, easy.

00 (agent) – Clojure’s Concurrent Agents in action

or Clojure’s concurrency agents – a simple example
Clojure has some fantastic concurrency idioms. Agents are but one example of what’s available to the JVM lisper — alongside refs, atoms and vars — that make concurrent programming very easy.

I thought I’d articulate my understanding to help embed the knowledge but to also provide a simple example for newcomers coming to Clojure.

Agents
Clojure’s agent constructs are its way of providing asynchronous read/writes that guarantee visibility across threads but give you the option of not “caring” when those events occur. Put simply, agents are a wrapper for values that can be written/read by multiple “clients” with no guarantee of order. An example would be a fork/join solution: you divide the problem into “bitesize” parts that can then be combined at a common barrier point for combination and utilisation elsewhere.

Here’s how you define an agent:
(def a (agent 0)) ; define an agent with an initial value of 0

To read an agent’s value, we just dereference it:
(println @a) ; or (deref a)

To write, you send a function to apply to the value in the agent, like so:
(send a inc) ; apply inc to a

which results in increment of a’s value, to 1.

Clojure’s documentation calls the “inc” in the (send) call above the “action-fn”:
user=> (doc send)
-------------------------
clojure.core/send
([a f & args])
Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread from a thread pool, the state of the agent
will be set to the value of:

(apply action-fn state-of-agent args)

Executing asynchronously with agents
As stated, agents allow you to execute something asynchronously (in a thread pool managed by Clojure) whose execution order you don’t require control over. Every time you (send) to an agent you can effectively assume the function to apply will be executed as part of a queue (and such queueing is done so synchronously, though the execution is not).

To demonstrate this asynchronous nature, we can extend our previous example to utilise the (await) function.

Let’s define a new agent, b:
(def b (agent 10000))

whose initial value is 10000.

Now, let’s have a function that will A) utilise b‘s current value to do something and B) also change its state.

Using Thread/sleep and the value of b we can satisfy A above and simply use (inc) to change b‘s state:
(defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))

So what do we have?

On paper, if we call bond, we’d wait for @b milliseconds and then increment b by one. Lovely and simple. Defining the logic above in a function (bond) allows us to easily demonstrate the nature of agents and the (send) function: that (send) will queue the action-fn (the function we give to apply to the agent) and return immediately, and that the action-fn will execute in a seperate thread and be applied to the agent.

Let’s prove that (bond) is doing what we expect by calling it:
user=> @b ; prove b's value is as-initialised
10000
user=> (bond) ; invoke bond which will (send) to the agent
#<Agent@351c2555: 10000>
user=> @b ; derefence (access the value of) b to prove it hasn't changed
10000

No change thus far…Lest we wait 10 seconds…
user=> @b
10001

Aha! The agent’s value has changed. (bond) has accomplished his mission!

If we wanted to wait until the action-fn had finished — e.g. you’re implementing a fork/join solution — we can simply use (await):
user=> (def b (agent 10000))
#'user/b
user=> (defn bond [] (send b #(do (Thread/sleep %1) (inc %1))))
#'user/bond
user=> (bond)
#<Agent@4382d44b: 10000>
user=> (await b)
; waiting...
nil
user=> @b
10001