This is the workshop I presented at Rubyfuza 2015. I had some requests to put it online, so here it is. It has exercises so you should be able to work through it.
Please let me know if you find errors.
Requirements
computer with network interface, and preferably at least one other host on the same subnet.
Ruby >= 2.1 installed (2.0 will probably work too. 1.9 doesn’t)
pry (or irb, but not recommended), preferably with a working readline/editline.
some ruby experience
Introduction
Dataflow concurrency is a simple but powerful idea. It rests on single-assignment variables (sometimes called immutable values) with the addition of blocking semantics, in other words any thread attempting to use an unbound variable will wait until another thread assigns a value to it.
Dataflow variables fall in the same category as I-Vars, Futures and Promises.
We’ll look at threads (covering some little-known things about the Thread
class). Then we’ll see how to use delegators to make dataflow pleasant to use.
Then we will implement a single-value, single-assignment data store with
blocking semantics in Ruby (these are sometimes called I-Var
or Promise
).
This will require a mutex and a condition variable which will also be explained.
Loosely, concurrency means more than one thing happening at the same time. Like in real life. Everybody runs around doing their own thing, and fights erupt whenever we have to share. I know, because I have two sons and only one big tub of lego.
My understanding of dataflow comes primarily from “Concepts Techniques and Models of Computer Programming” by Van Roy and Haridi. Very worthwhile book, I do recommend it. There’s also an edx course.
When you see EXERCISE, it would be best if you try to figure out the answer for yourself, although it’s a line or two further down anyway. Be brave! Give it a try!
Threads Introduction
Any discussion of threads in Ruby has to mention the GVL/GIL. MRI has it, jruby and rubinius do not. Effectively it means that concurrency in MRI is limited in a particular way. I’ll defer the full explanation until mutexes later on.
The first thing you need to know about threads is that every running program is/has at least one.
1 2 3 |
|
EXERCISE: start up pry, type that in.
A Thread instance is a wrapper around an OS resource / entity. This will show you all live threads.
1
|
|
Thread instances outlive their related OS resource. You’ll see why later on.
Threads
So, coding time. Pretend that you’re stranded somewhere on a distant server within a heavily fortified private network, with no root access to install new packages. You have only ruby (but you can’t install gems), and a minimal set of system tools. There is another server – you have to talk to it. However, its ip address is from dhcp, and there’s no dns on this network. DHCP reassigns the ip address fairly frequently. You need to use ping to find the missing server. Most of the ip addresses on this private network are not in use. Sometimes the server is down for an unspecified period.
If you haven’t met ping, in bash say ping localhost
and see what happens. Ctrl-C to
make it stop.
ping -c1 localhost
will only send and wait for one packet.
In ruby, ``
or %x{}
will execute a system command, returning a
string containing stdout from that process. You can do string interpolation
inside of those. The return code from the command will be in $?
until you
execute another command.
This is how to find your ip and subnet in ruby.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
EXERCISE: Find the addresses on the your subnet that respond to pings and put
them in an array called results
. There is actually a problem you’re going to run into
once you get the code working. If you know what the problem is already, just
write the code anyway. Ctrl-C
will be your friend.
1 2 |
|
BONUS CHALLENGE: This is purely optional – if you feel like stretching your
regexes. results
should also contain the response time. As a float.
How long does that take? About 254 * 4 = 1016 seconds moreorless 16 minutes. Which is a long time to find a handful of ip addresses. Effectively, it’s doing this:
1
|
|
So what happens when you do this:
1
|
|
It takes about 4 seconds. How long does it take to boil an egg? 3 minutes. How long does it take to boil 4 eggs? Not 12 minutes.
EXERCISE: Now do the ping code with threads.
1
|
|
Nondeterminism
You’ll notice that the ip addresses don’t come back in order. Why is that?
ANSWERS:
1) Wait time for reply vs non-reply. The ips that are claimed by a host will reply to the ping quickly and will therefore be at the beginning of results. ip addresses that are not claimed will be subject to a wait of about 4 seconds until ping decides it won’t get a reply.
2) Nondeterminism. CTM explains nondeterminism as something that happens which is outside of the control of the programmer. In this case, the thread scheduler is making decisions that the programmer (that’s you and me) has no control over. We also have no control over how long it takes for a server to answer a ping. Note that these ips may be in order anyway – this would be an example of multithreaded code being correct by accident.
Nondeterminism applies even in a really simple case like:
1 2 |
|
We don’t know. It’s nondeterministic. Try it out a few times in pry. It won’t necessarily be different every time.
Nondeterminism is mostly harmless in this case because it’s easy to restore the lost information (the ordering). But there’s a case where it’s a big problem. First the deterministic program:
1 2 |
|
And now the nondeterminism from threads:
1 2 |
|
This is called a race condition. Which is badly named (because it actually originates in logic circuit design). It’s more like an accident at an intersection because none of the cars was willing to wait. Well, I spose you could see that as a race…
QUESTIONS:
1) Why is a
nil when you execute this in pry?
1
|
|
2) Why is results nil, or mostly empty when you execute the following?
1
|
|
ANSWERS:
1) the threads testing and setting a
have not yet executed
2) threads shelling out to ping have not yet put values in results.
Synchronisation
So the existence of values in results
depend on when you ask for them. And a
partial set of results is not really useful, right? This is one of those basic
logic situations: you know when you’ve found something, but how do you know
when you haven’t found it? You have to search everything. In this case we’re
lucky because ‘everything’ is a relatively small array. So how do we ensure
that we have searched everything, before concluding that the server is down?
Think about a fetching your kid from school (or being fetched when you were a kid). What happens when class finishes a few minutes early, or you arrive a few minutes late, or a few minutes early. Or your child is negotiating a play date with friends? Or there was a traffic jam on the way to school. How do we handle those situations? We wait. Hopefully we’re not waiting in different places…
The traditional way is Thread#join
, and I’m going to show you that
first, even though it’s a bit clunky.
1 2 3 |
|
In other words, we’ve brought determinism back, by waiting. The one
thread waits for the other thread to join
it. The two control
flows join together back into one (the one calling .join
).
EXERCISE do that in the ping exercise so that by the time we ask for the
results, they’re guaranteed to all be there. Be careful, if you bring
determinism back too early you’ll be waiting 16 minutes for your results.
Remember that join
is a method that you call on the thread instance that you
want to wait for.
Exceptions in threads
An important question to ask as this point is: what happens when an exception is thrown in the thread block?
1
|
|
Nothing spectacularly continues to happen.
But with this:
1
|
|
the exception shows up in the thread calling join (after the thread being waited for has raised the exception, of course).
EXERCISE: Now do a deliberate typo and get exceptions for using pong instead of ping.
ANSWER: Really?
In other words, exceptions do not come out of threads unless you wait for them. This makes sense – in which thread, and when, would the exception be raised? It would be extremely weird if at some nondeterministic time your main thread stopped because of an exception thrown by another thread. So unless you really don’t care what happens to a thread, somewhere you have to wait for it to finish. (Or you have to set abort_on_exception)
So I said I was showing you the traditional way. Well, the exception showing up
when you call join is already somewhat non-traditional. The nice way of doing
synchronisation uses the ruby idea that everything is an expression, even a
thread. You use Thread#value
:
1
|
|
Notice 1) we don’t need a variable anymore, 2) Why bother with Thread#join
then? It returns the thread instance, and takes an optional timeout
parameter.
EXERCISE: Go over the ping code and use .value
to get rid of unnecessary
variables.
1
|
|
.value
and .join
are the simplest form of synchronisation.
Synchronisation is also badly named (maybe also from circuit design where you have a clock signal?) In programming it makes more sense to think of synchronisation as waiting for something. And you can see how this applies to asynchronous operations – no thread waits. However the logical implication of asynchronous operations is that something has to busy-wait, or do a polling loop, or have some other mechanism to get the value later.
Congratulations. You’ve written a correct program, using threads. All we’ve done so far is learn how to wait. Waiting is also called blocking, or suspending. Here’s a first description of some other ways to wait. Just read over them for now and don’t worry if you feel you don’t understand things fully:
Other ways to synchronise
mutex (mutual exclusion) forces other threads to wait. The Mutex class unsurprisingly has a method called synchronize.
condition variable is a way to avoid busy-waiting. It allows a thread to go to sleep and be woken up by a signal. It’s always used in conjunction with a mutex.
monitor is a re-entrant mutex that’s shared across multiple methods often in an object. eg the synchronize keyword in Java. There is a Monitor class in ruby, and it’s useful for ADT style objects, ie stacks, lists, things like that where you have multiple ways of accessing shared state, that must remain consistent, ie not have race conditions. But, it’s quite coarse-grained because it basically locks the whole object.
Order of execution
We all know that a program is essentially a sequence of statements. What I’d like to talk about now is that threads allows parts of a program to execute in a different order than the sequence of statements in the code. In fact we saw this already with race conditions, but I want to show you the useful case.
Think about the ping example: first you need to get your ip and subnet and then can spawn 256 threads to find possible servers in that subnet. In this case you have 2 dependencies – the ip and the subnet.
What happens if you had more dependencies? Then you have a dependency tree (dependency sapling at this point, because it’s so small). Imagine you have a broken bash shell and you have to find the full path of the ping command and also find the ip and subnet. ip address and the bash shell path have no dependencies on one another, so they could execute in separate threads. In sequential code it doesn’t matter which order you do them in. So in concurrent code you might as well just do them at the same time.
From this point of view, code is a linear representation of the tree of dependencies for an expression.
EXERCISE: find the full path of the ping command in separate thread from
ip address and the concurrent pings. ENV['PATH']
is :
-separated.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
BONUS EXERCISE: search the path concurrently. Is it faster than the equivalent single-threaded code? Why?
1
|
|
Obviously in the code, you have to have have path-search before subnet or vice-versa. But when those blocks execute, that can be concurrent. So can you see how threads decouple execution order from statement order? There’s a bit of wriggle room (or maybe a lot) because of nondeterminism. But synchronisation enforces the ordering of the dependencies of an expression. In single-thread code, this ordering is already implicit (dependencies always come before the statement which needs them) and we usually don’t think about it.
EXERCISE: Draw yourself a picture of the dependency tree of the ping code. Then turn it upside down a few times and contemplate the dataflow ;–)
Termination and Deadlock
Whenever you change your assumptions, there will be consequences. So we need to look at some of the consequences of using threads to decouple order of execution from sequence of statements. We’ve seen one of the classic problems with threading – race conditions which are a form of nondeterminism. We’re going to briefly look at the other, which is deadlock. But lets start with termination.
We’ve all done this involuntarily at least once:
1
|
|
Fondly known as an “endless loop”.
Will this next one terminate? Execute it in pry and look at top and sort by CPU usage (and then kill the thread or exit pry)
1
|
|
Will this one ever give back a value?
1
|
|
No – the thread never terminates, and so things waiting for the thread to end will wait forever. So it’s kinda ½ a deadlock.
The easiest way to explain full deadlock is with Queue, which is actually really useful in several other ways too – you can think of it as ruby’s sortof-equivalent of channels in Go.
1 2 3 4 5 |
|
So that’s how a queue works. q.pop
will return a value from the queue if
there is one, otherwise it will wait for a value to be pushed. That
will hopefully be fairly intuitive for you by now.
But back to deadlock:
1 2 3 4 |
|
Lucky for us, ruby is smart enough to detect this is possibly a deadlock. But lucky for me, ruby also lets me ignore that warning, so I’ll do it again:
1
|
|
Will that ever terminate? Why?
Deadlock occurs whenever you have mutual waiting. Which generalises to a cycle, hence the dining philosophers problem. Such cycles can sometimes be hard to figure out, like when one of the waiting places is not in your code, or when the waiting places are in completely different parts of the codebase.
We’ve seen that the ping code can be non-terminating (when you don’t use -c1). Is there a way to make the ping code deadlock? Why?
You remember I was talking about dependency trees in expressions?
A useful property of a dependency tree is that a tree (by definition in graph theory) has no cycles. So tree-structured concurrency means that deadlock because of mutual waiting cannot happen. Of course it’s still possible for one of the subexpressions in that tree to call some external code which waits forever (eg leaving off -c1 from ping), or other code which deadlocks because of access to shared values. So we don’t magically get deadlock breaking or deadlock avoidance. But it does make a whole class of deadlock-creating situations disappear. And it greatly reduces contention for locks.
Threads conclusion
We’re mostly done with threads, so here’s a summary:
programs can do (or wait for in the GVL/GIL case) more than one thing at the same time. This is concurrency.
concurrency introduces nondeterminism, because threads allow the order of execution to be decoupled from the sequence of statements. Uncontrolled nondeterminism can result in race conditions.
Since we can’t rely on the sequence of statements to know when results are ready (which is what we do in single-threaded code), we need synchronisation (ie waiting) to enforce the order of dependencies.
cyclical mutual waiting is deadlock, which is also nondeterministic. It does not happen every time you run the code. Which is why it’s hard to debug.
Effectively,
Thread#value
makes a thread behave as an explicit future. It doesn’t have a value now, but it will, in the future. Why explicit future? Because you have to ask for it, by calling.value
. Calling.value
will wait, until there is a value to return. So futures are: 1) a way of synchronising concurrent operations, and 2) a way of expressing the idea of a thing that does not have a value yet. And not only does a thread instance behave as a future, it also behaves as a single-valued future. It can only ever have one value. So it’s immutable.
Dataflow
So I said earlier that I would cover threads first so that I could explain dataflow. Well, we’ve actually already seen quite a bit of dataflow. Dataflow is effectively blocking semantics for things that don’t have a values yet. It’s called dataflow because as those things change state from unvalued to valued, the data flows through the dependency tree. So dataflow is a weak form of state (that’s straight out of CTM).
We’ve seen how threads decouple the order of execution from the sequence of statements in the code. Dataflow allows the order of execution to be changed without affecting the result of a calculation. It’s deterministic. You can think about the code as if it were sequential, but use threads wherever you want, to make it concurrent. Well, as long as there aren’t other race conditions (ie non-atomic shared writable state). Which is all over the place in your average ruby program. So you have to be careful.
Delegators
But the dataflow we’ve used so far is quite far from that ideal, so let’s
bring it a bit closer. The ping code, again. That .value
everywhere?
Ewww. Say for example you have a complex calculation of some kind (in other
words a large dependency tree like an income tax calculation, or a invoice
including discounts and VAT and partial payments and forex), and you’d like to
retrofit it with dataflow concurrency without having to go around and add
.value
all over the place. We’d like to just pass in something that will let
it continue operating as before, except concurrently. In short, how do we get
rid of .value
?
1 2 3 4 5 6 |
|
EXERCISE: Create a class called Waiter
, which inherits from Delegator
. You
have to implement the __getobj__
and __setobj__
methods. __setobj__
should raise an exception (Why?).
BONUS EXERCISE: Waiter#initialize
can optionally take a block which
gets passed to a new thread instance, or it can take an existing thread
instance.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
What happens if you comment out inspect
and pretty_print
?
EXERCISE: now redo the full ping example with Waiter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
QUESTION: What happens when you replace all Waiter.new do
with
begin
? Things should work exactly the same (obviously without
concurrency).
And now we have implicit (because we don’t have to call .value
) immutable
futures with blocking semantics, that is they behave just like an ordinary old
variables or objects, except that any method call will wait until there is a
value inside the delegator. Syntactically, delegators are as close as we can
get to proper dataflow variables in ruby. However Ruby works against the idea
because it lets us reassign the variable referencing the delegator at any
time. So you have to be careful, or refactor to use method calls on frozen
objects instead of local variables.
Unbound Variables
The other place that ruby works against dataflow is the idea of unbound or unassigned variables. What’s the difference between declaring a variable, and assigning it?
You can declare a variable without assignment/binding in (amongst others) Java, Scala, c, c++, Javascript.
And ruby? Local variables? No. Instance variables and globals? Kindof – they’re automatically initialised to nil.
So we don’t have unbound variables in ruby. But as we’ve just seen, threads behave a little bit like a thing that does not yet have a value, and we can make it behave almost exactly the same as an ordinary variable/object by putting a Waiter delegate around it.
Futures / Promises
However, the one thing you can’t do with our thread + delegator approach is declare one of these in one place, and assign it in another. You need to know how, at the time you create the thread and its delegator, how to calculate the value. This is not always the case. A not-so-good example is: how would you handle a subnet with 65536 ips? Or 16777216 ips (the 10.x.x.x subnet)? Too many threads. So you need to be able to declare the variables first (to keep the ordering) and assign them later.
So we need something that we can wrap in a delegator, and that wrapped thing must have blocking semantics with single-assignment.
These things, where you can actually set the value instead of it being implicit as part of the creation the way it is with Thread and Waiter, are sometimes called promises (as opposed to futures), but from what I can see there’s a lot of overlap in the terminology.
SingleAssign implementation
So essentially we need to implement a single-assignment value store with blocking semantics, to take the place of the Thread inside the delegator. That means:
it can be read by as many threads as necessary, as many times as we like. Can Waiter/Thread do this? Yes.
reading threads will block/wait/suspend until it has a value. Can Waiter/Thread do this? Yes.
the value can be set exactly once. Subsequent attempts should raise an exception. Waiter/Thread cannot do this.
you can set an exception to be raised when the delegator is accessed, like with
Thread#value
A minimal implementation of this needs a value
and value=
method.
But it must be completely threadsafe.
Do you remember from the different ways of waiting, what we can use here? We have to use a mutex and a condition variable. A condition variable requires a mutex, so let’s do that first.
Mutex
A mutex (mutual exclusion) is a bit like a cubicle in a public toilet or an airplane. Only one person gets to use the cubicle at one time. Only one thread gets to enter (or occupy) the mutex at any one time. While there is a thread in the mutex, all other threads must wait until the occupying thread exits the mutex. A mutex is also called a lock, although sometimes lock is the name of an operation on a mutex. The GIL/GVL is a mutex around the entire C implementation of MRI, with some unlocks for waiting on IO and other things so that those don’t block threads which can perform useful work.
Mutexes can be used to prevent race conditions. This is the one from earlier:
1
|
|
EXERCISE: use a mutex to prevent the race condition.
1 2 3 4 |
|
So a mutex groups a sequence of statements together, and makes them atomic in
other words nobody else can use the cubicle while you’re busy. It’s still
nondeterministic because we still don’t know what value a
will have, but
at least now it’s atomic. So actually, instead of a ‘race condition’, calling
it an ‘atomic failure’ would be more accurate. That way when someone asks why
it’s taking so long, you can say you’re cleaning up the fallout from your
atomic failure.
In the context of a SingleAssign store, mutex lets us ensure that the value is only set once. Now, for threads wanting the value when it’s not available yet, we need a way to put threads to sleep and wake them up again. For this we need a ConditionVariable.
Condition Variable
Condition variable is also badly named. It’s easier to think of it as fifo list of threads (I’m deliberately not saying queue, because most implementations don’t use an actual queue, they just behave as if they do). And the condition part is actually outside the actual ConditionVariable instance. So now it’s badly named twice. It should have been called a WaitList. “Unfortunately sah, we’re full, but we’ll notify you if we have a cancellation.”
EXERCISE: I can’t come up with a good exercise here, so take a look at the documentation for ConditionVariable, and think about which methods should be used where.
This is a simple implementation that satifies the requirements above. It’s less efficient than it could be, but it’s deterministic and it’s atomic, and it only allows one assignment to value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
It’s somewhat inefficient because once the value has been set, the call to
synchronize
and the check for is_a?(Exception)
are no longer necessary.
Fortunately because of the single-assignment assumption, it’s fairly easy to
optimise if necessary. Interestingly because of the single-assignment
assumption, we can mostly ignore the ‘condition’ part of ConditionVariable
(and some of its other complexities).
QUESTIONS:
1) Can @waitlist
and @cubicle
be lazy-assigned? Why?
2) how many threads could concurrently access instance variables
inside `initialize`?
3) What options are there for making #value
more efficient?
ANSWERS:
1) @cubicle
no because you would get a race condition assigning it.
@waitlist
well, you could. But it probably wouldn’t be worthwhile.
2) initialize
is always single-threaded – for your code. The GC thread(s) would have access, but presumably that’s threadsafe.
3) Options I can think of: different modules that implement value
and value=
(ie the State pattern)
EXERCISE: A Delegator for this is almost the same as Waiter, but obviously
you’ll need to implement __setobj__
as well.
Waitron
1 2 3 |
|
And now (at least in ruby) the ultimate in messing with order of execution. Obviously this won’t work
1 2 |
|
But this will
1 2 3 4 5 6 7 |
|
Conclusion
Concurrency is when several things are happening at the same time. Threads decouple the order of execution from the sequence of statements. Synchronisation is waiting for various events.
Using concurrency in single-threaded code is always going to execute more slowly, because of the overhead of thread creation and synchronisation. So your performance improvement from concurrency needs to more than offset that.
Dataflow variables are things which can be initially unbound, and then they can be assigned only once. Threads that want their value will have to wait, until the value is assigned. Obviously this only makes sense in a threaded environment. Strictly speaking, dataflow variables are reassignable provided that the new assignment is compatible with the existing assignment. This only really makes sense in languages that use unification (Prolog etc) as opposed to languages that use assignment (as ruby does).
Dataflow is when you have a tree of dependencies (which is anyway just an expression), and the values themselves are the synchronisation points. There is no deadlock because a tree has no cycles. Similarly, there is very little contention for mutexes.
We can emulate dataflow variables in ruby using a delegator, which can contain a thread or a single-assignment store with blocking semantics.
Further reading
Ruby core and standard libraries:
Gems:
- Celluloid
- Go-like concurrency, in Ruby
- concurrent-ruby
- Dataflow concurrency for Ruby (inspired by the Oz language)
Posts and articles:
- Dataflow Erlang Style Thread Safety in Ruby
- Self Schizophrenia with delegators
- How many threads is too many
Books:
- Working with Ruby Threads
- CTM section 4.9 Advanced Topics