There's enough for everyone

गते गते पारगते पारसंगते बोधि स्वाहा गते गते पारगते पारसंगते बोधि स्वाहा

Dataflow concurrency in Ruby

This is the workshop I presented at Rubyfuza 2015. I had some requests to put it online, so here it is. It has exercises so you should be able to work through it.

Please let me know if you find errors.

Requirements

  • computer with network interface, and preferably at least one other host on the same subnet.

  • Ruby >= 2.1 installed (2.0 will probably work too. 1.9 doesn’t)

  • pry (or irb, but not recommended), preferably with a working readline/editline.

  • some ruby experience

Introduction

Dataflow concurrency is a simple but powerful idea. It rests on single-assignment variables (sometimes called immutable values) with the addition of blocking semantics, in other words any thread attempting to use an unbound variable will wait until another thread assigns a value to it.

Dataflow variables fall in the same category as I-Vars, Futures and Promises.

We’ll look at threads (covering some little-known things about the Thread class). Then we’ll see how to use delegators to make dataflow pleasant to use. Then we will implement a single-value, single-assignment data store with blocking semantics in Ruby (these are sometimes called I-Var or Promise). This will require a mutex and a condition variable which will also be explained.

Loosely, concurrency means more than one thing happening at the same time. Like in real life. Everybody runs around doing their own thing, and fights erupt whenever we have to share. I know, because I have two sons and only one big tub of lego.

My understanding of dataflow comes primarily from “Concepts Techniques and Models of Computer Programming” by Van Roy and Haridi. Very worthwhile book, I do recommend it. There’s also an edx course.

When you see EXERCISE, it would be best if you try to figure out the answer for yourself, although it’s a line or two further down anyway. Be brave! Give it a try!

Threads Introduction

Any discussion of threads in Ruby has to mention the GVL/GIL. MRI has it, jruby and rubinius do not. Effectively it means that concurrency in MRI is limited in a particular way. I’ll defer the full explanation until mutexes later on.

The first thing you need to know about threads is that every running program is/has at least one.

1
2
3
Thread.main
Thread.current
Thread.main == Thread.current

EXERCISE: start up pry, type that in.

A Thread instance is a wrapper around an OS resource / entity. This will show you all live threads.

1
Thread.list

Thread instances outlive their related OS resource. You’ll see why later on.

Threads

So, coding time. Pretend that you’re stranded somewhere on a distant server within a heavily fortified private network, with no root access to install new packages. You have only ruby (but you can’t install gems), and a minimal set of system tools. There is another server – you have to talk to it. However, its ip address is from dhcp, and there’s no dns on this network. DHCP reassigns the ip address fairly frequently. You need to use ping to find the missing server. Most of the ip addresses on this private network are not in use. Sometimes the server is down for an unspecified period.

If you haven’t met ping, in bash say ping localhost and see what happens. Ctrl-C to make it stop.

ping -c1 localhost will only send and wait for one packet.

In ruby, `` or %x{} will execute a system command, returning a string containing stdout from that process. You can do string interpolation inside of those. The return code from the command will be in $? until you execute another command.

This is how to find your ip and subnet in ruby.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
require 'socket'
require 'ipaddr'

# Your own ip address(es) and netmask - works on linux but will need
# different interface names on OSX. The workaround is just to use ifconfig to
# find your ip and netmask and create the IPAddr instance manually.

ips = Socket.getifaddrs. \
  select{|ifa| ifa.addr.afamily == Socket::AF_INET && ifa.name != 'lo'}. \
  map{|ifaddr| [ifaddr.addr, ifaddr.netmask].map &:ip_address}

your_ip, your_netmask = ips.first

addrs = IPAddr.new("#{your_ip}/#{your_netmask}").to_range

EXERCISE: Find the addresses on the your subnet that respond to pings and put them in an array called results. There is actually a problem you’re going to run into once you get the code working. If you know what the problem is already, just write the code anyway. Ctrl-C will be your friend.

ANSWER
1
2
results = []
addrs.each{|addr| results << [`ping -c1 #{addr}`, $?, addr]} }

BONUS CHALLENGE: This is purely optional – if you feel like stretching your regexes. results should also contain the response time. As a float.

How long does that take? About 254 * 4 = 1016 seconds moreorless 16 minutes. Which is a long time to find a handful of ip addresses. Effectively, it’s doing this:

1
256.times{ sleep 4; puts :done }

So what happens when you do this:

1
256.times{ Thread.new{ sleep 4; puts :done } }

It takes about 4 seconds. How long does it take to boil an egg? 3 minutes. How long does it take to boil 4 eggs? Not 12 minutes.

EXERCISE: Now do the ping code with threads.

ANSWER
1
results = []; addrs.each{|addr| Thread.new{results << [`ping -c1 #{addr}`, $?, addr]} }

Nondeterminism

You’ll notice that the ip addresses don’t come back in order. Why is that?

ANSWERS:

1) Wait time for reply vs non-reply. The ips that are claimed by a host will reply to the ping quickly and will therefore be at the beginning of results. ip addresses that are not claimed will be subject to a wait of about 4 seconds until ping decides it won’t get a reply.

2) Nondeterminism. CTM explains nondeterminism as something that happens which is outside of the control of the programmer. In this case, the thread scheduler is making decisions that the programmer (that’s you and me) has no control over. We also have no control over how long it takes for a server to answer a ping. Note that these ips may be in order anyway – this would be an example of multithreaded code being correct by accident.

Nondeterminism applies even in a really simple case like:

1
2
ary = []; 4.times{|i| Thread.new{ ary << i }}; ary
# what is the order of the elements of ary here?

We don’t know. It’s nondeterministic. Try it out a few times in pry. It won’t necessarily be different every time.

Nondeterminism is mostly harmless in this case because it’s easy to restore the lost information (the ordering). But there’s a case where it’s a big problem. First the deterministic program:

1
2
a = nil; 4.times{|i| a ||= (puts "setting a to #{i}"; i) }
a # will always be 0, and "setting a to" will be printed only once.

And now the nondeterminism from threads:

1
2
a = nil; 4.times{|i| Thread.new{ a ||= (puts "setting a to #{i}"; i) }}
a # Don't know the value, and will output "setting a to" several times.

This is called a race condition. Which is badly named (because it actually originates in logic circuit design). It’s more like an accident at an intersection because none of the cars was willing to wait. Well, I spose you could see that as a race…

QUESTIONS:

1) Why is a nil when you execute this in pry?

1
a = nil; 4.times{|i| Thread.new{ a ||= (puts "setting a to #{i}"; i) }}; a

2) Why is results nil, or mostly empty when you execute the following?

1
results = []; addrs.each{|addr| Thread.new{results << [`ping -c1 #{addr}`, $?, addr]} }; results

ANSWERS:

1) the threads testing and setting a have not yet executed

2) threads shelling out to ping have not yet put values in results.

Synchronisation

So the existence of values in results depend on when you ask for them. And a partial set of results is not really useful, right? This is one of those basic logic situations: you know when you’ve found something, but how do you know when you haven’t found it? You have to search everything. In this case we’re lucky because ‘everything’ is a relatively small array. So how do we ensure that we have searched everything, before concluding that the server is down?

Think about a fetching your kid from school (or being fetched when you were a kid). What happens when class finishes a few minutes early, or you arrive a few minutes late, or a few minutes early. Or your child is negotiating a play date with friends? Or there was a traffic jam on the way to school. How do we handle those situations? We wait. Hopefully we’re not waiting in different places…

The traditional way is Thread#join, and I’m going to show you that first, even though it’s a bit clunky.

1
2
3
a = nil
Thread.new{sleep 3 + rand; a = 1 }.join
a # will always be 1

In other words, we’ve brought determinism back, by waiting. The one thread waits for the other thread to join it. The two control flows join together back into one (the one calling .join).

EXERCISE do that in the ping exercise so that by the time we ask for the results, they’re guaranteed to all be there. Be careful, if you bring determinism back too early you’ll be waiting 16 minutes for your results. Remember that join is a method that you call on the thread instance that you want to wait for.

Exceptions in threads

An important question to ask as this point is: what happens when an exception is thrown in the thread block?

1
Thread.new{raise 'really?'}

Nothing spectacularly continues to happen.

But with this:

1
Thread.new{raise 'really?'}.join

the exception shows up in the thread calling join (after the thread being waited for has raised the exception, of course).

EXERCISE: Now do a deliberate typo and get exceptions for using pong instead of ping.

ANSWER: Really?

In other words, exceptions do not come out of threads unless you wait for them. This makes sense – in which thread, and when, would the exception be raised? It would be extremely weird if at some nondeterministic time your main thread stopped because of an exception thrown by another thread. So unless you really don’t care what happens to a thread, somewhere you have to wait for it to finish. (Or you have to set abort_on_exception)

So I said I was showing you the traditional way. Well, the exception showing up when you call join is already somewhat non-traditional. The nice way of doing synchronisation uses the ruby idea that everything is an expression, even a thread. You use Thread#value:

1
Thread.new{sleep 3 + rand; 1}.value

Notice 1) we don’t need a variable anymore, 2) Why bother with Thread#join then? It returns the thread instance, and takes an optional timeout parameter.

EXERCISE: Go over the ping code and use .value to get rid of unnecessary variables.

ANSWER
1
addrs.map{|addr| Thread.new{results << [`ping -c1 #{addr}`, $?, addr]} }.map( &:value )

.value and .join are the simplest form of synchronisation.

Synchronisation is also badly named (maybe also from circuit design where you have a clock signal?) In programming it makes more sense to think of synchronisation as waiting for something. And you can see how this applies to asynchronous operations – no thread waits. However the logical implication of asynchronous operations is that something has to busy-wait, or do a polling loop, or have some other mechanism to get the value later.

Congratulations. You’ve written a correct program, using threads. All we’ve done so far is learn how to wait. Waiting is also called blocking, or suspending. Here’s a first description of some other ways to wait. Just read over them for now and don’t worry if you feel you don’t understand things fully:

Other ways to synchronise

  • mutex (mutual exclusion) forces other threads to wait. The Mutex class unsurprisingly has a method called synchronize.

  • condition variable is a way to avoid busy-waiting. It allows a thread to go to sleep and be woken up by a signal. It’s always used in conjunction with a mutex.

  • monitor is a re-entrant mutex that’s shared across multiple methods often in an object. eg the synchronize keyword in Java. There is a Monitor class in ruby, and it’s useful for ADT style objects, ie stacks, lists, things like that where you have multiple ways of accessing shared state, that must remain consistent, ie not have race conditions. But, it’s quite coarse-grained because it basically locks the whole object.

Order of execution

We all know that a program is essentially a sequence of statements. What I’d like to talk about now is that threads allows parts of a program to execute in a different order than the sequence of statements in the code. In fact we saw this already with race conditions, but I want to show you the useful case.

Think about the ping example: first you need to get your ip and subnet and then can spawn 256 threads to find possible servers in that subnet. In this case you have 2 dependencies – the ip and the subnet.

What happens if you had more dependencies? Then you have a dependency tree (dependency sapling at this point, because it’s so small). Imagine you have a broken bash shell and you have to find the full path of the ping command and also find the ip and subnet. ip address and the bash shell path have no dependencies on one another, so they could execute in separate threads. In sequential code it doesn’t matter which order you do them in. So in concurrent code you might as well just do them at the same time.

From this point of view, code is a linear representation of the tree of dependencies for an expression.

EXERCISE: find the full path of the ping command in separate thread from ip address and the concurrent pings. ENV['PATH'] is :-separated.

ANSWER
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
require 'socket'
require 'ipaddr'

addrs = Thread.new do
  # Your own ip address and netmask. Same as above. Unless you had to tweak it.
  ips = Socket.getifaddrs.select{|ifa| ifa.addr.afamily == Socket::AF_INET && ifa.name != 'lo'}.map{|ifaddr| [ifaddr.addr, ifaddr.netmask].map &:ip_address}
  your_ip, your_netmask = ips.first
  IPAddr.new("#{your_ip}/#{your_netmask}").to_range
end

ping_cmd = Thread.new do
  ENV['PATH'].split(':').map do |dir|
    ping_path = Pathname.new(dir) + 'ping'
    ping_path if ping_path.executable?
  end.compact.first
end

addrs.value. \
  map{|addr| Thread.new do [`#{ping_cmd.value} -c1 #{addr}`, $?, addr] end }. \
  map( &:value )

BONUS EXERCISE: search the path concurrently. Is it faster than the equivalent single-threaded code? Why?

1
ENV['PATH'].split(':').map{|dir| Thread.new{dir if File.exist?(dir + '/ping') }}.map(&:value).compact.first

Obviously in the code, you have to have have path-search before subnet or vice-versa. But when those blocks execute, that can be concurrent. So can you see how threads decouple execution order from statement order? There’s a bit of wriggle room (or maybe a lot) because of nondeterminism. But synchronisation enforces the ordering of the dependencies of an expression. In single-thread code, this ordering is already implicit (dependencies always come before the statement which needs them) and we usually don’t think about it.

EXERCISE: Draw yourself a picture of the dependency tree of the ping code. Then turn it upside down a few times and contemplate the dataflow ;–)

Termination and Deadlock

Whenever you change your assumptions, there will be consequences. So we need to look at some of the consequences of using threads to decouple order of execution from sequence of statements. We’ve seen one of the classic problems with threading – race conditions which are a form of nondeterminism. We’re going to briefly look at the other, which is deadlock. But lets start with termination.

We’ve all done this involuntarily at least once:

1
while true; end

Fondly known as an “endless loop”.

Will this next one terminate? Execute it in pry and look at top and sort by CPU usage (and then kill the thread or exit pry)

1
Thread.new{while true; end}

Will this one ever give back a value?

1
Thread.new{while true; end; :done}.value

No – the thread never terminates, and so things waiting for the thread to end will wait forever. So it’s kinda ½ a deadlock.

The easiest way to explain full deadlock is with Queue, which is actually really useful in several other ways too – you can think of it as ruby’s sortof-equivalent of channels in Go.

1
2
3
4
5
require 'thread'
q = Queue.new
Thread.new{sleep 3; q << :done}
q.pop
# returns :done in approx 3 seconds

So that’s how a queue works. q.pop will return a value from the queue if there is one, otherwise it will wait for a value to be pushed. That will hopefully be fairly intuitive for you by now.

But back to deadlock:

1
2
3
4
q1 = Queue.new
q2 = Queue.new
Thread.new{ q2 << q1.pop }
q1 << q2.pop

Lucky for us, ruby is smart enough to detect this is possibly a deadlock. But lucky for me, ruby also lets me ignore that warning, so I’ll do it again:

1
q1 << q2.pop

Will that ever terminate? Why?

Deadlock occurs whenever you have mutual waiting. Which generalises to a cycle, hence the dining philosophers problem. Such cycles can sometimes be hard to figure out, like when one of the waiting places is not in your code, or when the waiting places are in completely different parts of the codebase.

We’ve seen that the ping code can be non-terminating (when you don’t use -c1). Is there a way to make the ping code deadlock? Why?

You remember I was talking about dependency trees in expressions?

A useful property of a dependency tree is that a tree (by definition in graph theory) has no cycles. So tree-structured concurrency means that deadlock because of mutual waiting cannot happen. Of course it’s still possible for one of the subexpressions in that tree to call some external code which waits forever (eg leaving off -c1 from ping), or other code which deadlocks because of access to shared values. So we don’t magically get deadlock breaking or deadlock avoidance. But it does make a whole class of deadlock-creating situations disappear. And it greatly reduces contention for locks.

Threads conclusion

We’re mostly done with threads, so here’s a summary:

  • programs can do (or wait for in the GVL/GIL case) more than one thing at the same time. This is concurrency.

  • concurrency introduces nondeterminism, because threads allow the order of execution to be decoupled from the sequence of statements. Uncontrolled nondeterminism can result in race conditions.

  • Since we can’t rely on the sequence of statements to know when results are ready (which is what we do in single-threaded code), we need synchronisation (ie waiting) to enforce the order of dependencies.

  • cyclical mutual waiting is deadlock, which is also nondeterministic. It does not happen every time you run the code. Which is why it’s hard to debug.

  • Effectively, Thread#value makes a thread behave as an explicit future. It doesn’t have a value now, but it will, in the future. Why explicit future? Because you have to ask for it, by calling .value. Calling .value will wait, until there is a value to return. So futures are: 1) a way of synchronising concurrent operations, and 2) a way of expressing the idea of a thing that does not have a value yet. And not only does a thread instance behave as a future, it also behaves as a single-valued future. It can only ever have one value. So it’s immutable.

Dataflow

So I said earlier that I would cover threads first so that I could explain dataflow. Well, we’ve actually already seen quite a bit of dataflow. Dataflow is effectively blocking semantics for things that don’t have a values yet. It’s called dataflow because as those things change state from unvalued to valued, the data flows through the dependency tree. So dataflow is a weak form of state (that’s straight out of CTM).

We’ve seen how threads decouple the order of execution from the sequence of statements in the code. Dataflow allows the order of execution to be changed without affecting the result of a calculation. It’s deterministic. You can think about the code as if it were sequential, but use threads wherever you want, to make it concurrent. Well, as long as there aren’t other race conditions (ie non-atomic shared writable state). Which is all over the place in your average ruby program. So you have to be careful.

Delegators

But the dataflow we’ve used so far is quite far from that ideal, so let’s bring it a bit closer. The ping code, again. That .value everywhere? Ewww. Say for example you have a complex calculation of some kind (in other words a large dependency tree like an income tax calculation, or a invoice including discounts and VAT and partial payments and forex), and you’d like to retrofit it with dataflow concurrency without having to go around and add .value all over the place. We’d like to just pass in something that will let it continue operating as before, except concurrently. In short, how do we get rid of .value?

1
2
3
4
5
6
require 'delegate'
d = SimpleDelegator.new 7
d + 9
10 - d
puts "d: #{d}"
# etc

EXERCISE: Create a class called Waiter, which inherits from Delegator. You have to implement the __getobj__ and __setobj__ methods. __setobj__ should raise an exception (Why?).

BONUS EXERCISE: Waiter#initialize can optionally take a block which gets passed to a new thread instance, or it can take an existing thread instance.

ANSWER
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
require 'delegate.rb'

class Waiter < Delegator
  def initialize( thread = nil, &blk )
    @thread = thread || Thread.new(&blk)
    raise "you must provide either a thread or a block" unless @thread
  end

  def __getobj__
    @thread.value
  end

  def __setobj__
    raise ThreadError, "Not possible to set a thread's value"
  end

  def __assigned?
    @thread.status == false || @thread.status == nil
  end

  def inspect
    value_representation =
    if __assigned?
      __getobj__.inspect rescue $!.inspect
    else
      :unset
    end

    "#<Waiter:0x#{'%x' % __id__} #{value_representation}>"
  end

  # have to handle this otherwise pry (and maybe irb) will block
  def pretty_print(pp)
    if __assigned?
      super
    else
      pp.text inspect
    end
  end

  # for awesome_print
  def ai; inspect end
end

What happens if you comment out inspect and pretty_print?

EXERCISE: now redo the full ping example with Waiter.

ANSWER
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
require 'socket'
require 'ipaddr'
require 'waiter' # assuming your file is called waiter.rb and contains Waiter

addrs = Waiter.new do
  # Your own ip address and netmask
  ips = Socket.getifaddrs.select{|ifa| ifa.addr.afamily == Socket::AF_INET && ifa.name != 'lo'}.map{|ifaddr| [ifaddr.addr, ifaddr.netmask].map &:ip_address}
  your_ip, your_netmask = ips.first
  IPAddr.new("#{your_ip}/#{your_netmask}").to_range
end

ping_cmd = Waiter.new do
  ENV['PATH'].split(':').map do |dir|
    ping_path = Pathname.new(dir) + 'ping'
    ping_path if ping_path.executable?
  end.compact.first
end

addrs.map{|addr| Waiter.new do [`#{ping_cmd} -c1 #{addr}`, $?, addr] end }

QUESTION: What happens when you replace all Waiter.new do with begin? Things should work exactly the same (obviously without concurrency).

And now we have implicit (because we don’t have to call .value) immutable futures with blocking semantics, that is they behave just like an ordinary old variables or objects, except that any method call will wait until there is a value inside the delegator. Syntactically, delegators are as close as we can get to proper dataflow variables in ruby. However Ruby works against the idea because it lets us reassign the variable referencing the delegator at any time. So you have to be careful, or refactor to use method calls on frozen objects instead of local variables.

Unbound Variables

The other place that ruby works against dataflow is the idea of unbound or unassigned variables. What’s the difference between declaring a variable, and assigning it?

You can declare a variable without assignment/binding in (amongst others) Java, Scala, c, c++, Javascript.

And ruby? Local variables? No. Instance variables and globals? Kindof – they’re automatically initialised to nil.

So we don’t have unbound variables in ruby. But as we’ve just seen, threads behave a little bit like a thing that does not yet have a value, and we can make it behave almost exactly the same as an ordinary variable/object by putting a Waiter delegate around it.

Futures / Promises

However, the one thing you can’t do with our thread + delegator approach is declare one of these in one place, and assign it in another. You need to know how, at the time you create the thread and its delegator, how to calculate the value. This is not always the case. A not-so-good example is: how would you handle a subnet with 65536 ips? Or 16777216 ips (the 10.x.x.x subnet)? Too many threads. So you need to be able to declare the variables first (to keep the ordering) and assign them later.

So we need something that we can wrap in a delegator, and that wrapped thing must have blocking semantics with single-assignment.

These things, where you can actually set the value instead of it being implicit as part of the creation the way it is with Thread and Waiter, are sometimes called promises (as opposed to futures), but from what I can see there’s a lot of overlap in the terminology.

SingleAssign implementation

So essentially we need to implement a single-assignment value store with blocking semantics, to take the place of the Thread inside the delegator. That means:

  • it can be read by as many threads as necessary, as many times as we like. Can Waiter/Thread do this? Yes.

  • reading threads will block/wait/suspend until it has a value. Can Waiter/Thread do this? Yes.

  • the value can be set exactly once. Subsequent attempts should raise an exception. Waiter/Thread cannot do this.

  • you can set an exception to be raised when the delegator is accessed, like with Thread#value

A minimal implementation of this needs a value and value= method. But it must be completely threadsafe.

Do you remember from the different ways of waiting, what we can use here? We have to use a mutex and a condition variable. A condition variable requires a mutex, so let’s do that first.

Mutex

A mutex (mutual exclusion) is a bit like a cubicle in a public toilet or an airplane. Only one person gets to use the cubicle at one time. Only one thread gets to enter (or occupy) the mutex at any one time. While there is a thread in the mutex, all other threads must wait until the occupying thread exits the mutex. A mutex is also called a lock, although sometimes lock is the name of an operation on a mutex. The GIL/GVL is a mutex around the entire C implementation of MRI, with some unlocks for waiting on IO and other things so that those don’t block threads which can perform useful work.

Mutexes can be used to prevent race conditions. This is the one from earlier:

1
a = nil; 4.times{|i| Thread.new{ a ||= (puts "setting a to #{i}"; i) }}

EXERCISE: use a mutex to prevent the race condition.

ANSWER
1
2
3
4
require 'thread'
mutex = Mutex.new
a = nil; 4.times{|i| Thread.new{mutex.synchronize{ a ||= (puts "setting a to #{i}"; i) }}}
a # will only be set once, and "setting a to" will only be output once

So a mutex groups a sequence of statements together, and makes them atomic in other words nobody else can use the cubicle while you’re busy. It’s still nondeterministic because we still don’t know what value a will have, but at least now it’s atomic. So actually, instead of a ‘race condition’, calling it an ‘atomic failure’ would be more accurate. That way when someone asks why it’s taking so long, you can say you’re cleaning up the fallout from your atomic failure.

In the context of a SingleAssign store, mutex lets us ensure that the value is only set once. Now, for threads wanting the value when it’s not available yet, we need a way to put threads to sleep and wake them up again. For this we need a ConditionVariable.

Condition Variable

Condition variable is also badly named. It’s easier to think of it as fifo list of threads (I’m deliberately not saying queue, because most implementations don’t use an actual queue, they just behave as if they do). And the condition part is actually outside the actual ConditionVariable instance. So now it’s badly named twice. It should have been called a WaitList. “Unfortunately sah, we’re full, but we’ll notify you if we have a cancellation.”

EXERCISE: I can’t come up with a good exercise here, so take a look at the documentation for ConditionVariable, and think about which methods should be used where.

This is a simple implementation that satifies the requirements above. It’s less efficient than it could be, but it’s deterministic and it’s atomic, and it only allows one assignment to value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SingleAssign
  def initialize
    @waitlist = ConditionVariable.new
    @cubicle = Mutex.new
  end

  def value
    @cubicle.synchronize do
      unless instance_variable_defined? :@value
        # call to wait will unlock the @cubicle once this thread is in the sleep fifo.
        # Another thread can then occupy @cubicle to set the value.
        @waitlist.wait @cubicle
        # when broadcast is called, sleeping threads will wake up here
      end

      ::Kernel.raise @value if @value.is_a?(Exception)
      @value
    end
  end

  def value=( rhs )
    @cubicle.synchronize do
      ::Kernel.raise "already assigned with #{@value.inspect}" if instance_variable_defined? :@value
      @value = rhs
      # wake up waiting threads
      @waitlist.broadcast
    end
  end

  def raise( *args )
    self.value = (::Kernel.raise *args rescue $!)
  end
end

It’s somewhat inefficient because once the value has been set, the call to synchronize and the check for is_a?(Exception) are no longer necessary. Fortunately because of the single-assignment assumption, it’s fairly easy to optimise if necessary. Interestingly because of the single-assignment assumption, we can mostly ignore the ‘condition’ part of ConditionVariable (and some of its other complexities).

QUESTIONS:

1) Can @waitlist and @cubicle be lazy-assigned? Why?

2) how many threads could concurrently access instance variables

inside `initialize`?

3) What options are there for making #value more efficient?

ANSWERS:

1) @cubicle no because you would get a race condition assigning it. @waitlist well, you could. But it probably wouldn’t be worthwhile.

2) initialize is always single-threaded – for your code. The GC thread(s) would have access, but presumably that’s threadsafe.

3) Options I can think of: different modules that implement value and value= (ie the State pattern)

EXERCISE: A Delegator for this is almost the same as Waiter, but obviously you’ll need to implement __setobj__ as well.

Waitron

1
2
3
class Waitron
# code coming soon
end

And now (at least in ruby) the ultimate in messing with order of execution. Obviously this won’t work

1
2
b = a + 1
a = 23

But this will

1
2
3
4
5
6
7
b = Waitron.new
a = Waitron.new

Thread.new{ b._ = a + 1 }
Thread.new{ puts b + 1 }
a._ = 23
b + 1

Conclusion

Concurrency is when several things are happening at the same time. Threads decouple the order of execution from the sequence of statements. Synchronisation is waiting for various events.

Using concurrency in single-threaded code is always going to execute more slowly, because of the overhead of thread creation and synchronisation. So your performance improvement from concurrency needs to more than offset that.

Dataflow variables are things which can be initially unbound, and then they can be assigned only once. Threads that want their value will have to wait, until the value is assigned. Obviously this only makes sense in a threaded environment. Strictly speaking, dataflow variables are reassignable provided that the new assignment is compatible with the existing assignment. This only really makes sense in languages that use unification (Prolog etc) as opposed to languages that use assignment (as ruby does).

Dataflow is when you have a tree of dependencies (which is anyway just an expression), and the values themselves are the synchronisation points. There is no deadlock because a tree has no cycles. Similarly, there is very little contention for mutexes.

We can emulate dataflow variables in ruby using a delegator, which can contain a thread or a single-assignment store with blocking semantics.

Further reading

Ruby core and standard libraries:

Gems:

Posts and articles:

Books:

Comments