occasionally useful ruby, ubuntu, etc

29Oct/105

Ruby Monitor Basics, or, How the heck do I synchronize producers/consumers

Have you ever written something like this?

threads = []
threads << Thread.new { something_slow }
threads << Thread.new { something_really_slow }
threads << Thread.new { go_take_a_coffee_break }
threads.each {|t| t.join}
puts "all done!"

This works fine, but...it's expensive to start up and stop threads (even green ones), and what if you instead used/needed Ruby's queue:

1
2
3
4
5
6
7
8
9
10
11
require "thread"
q = Queue.new
q << 3 << 10 << 10000 < 2352352
threads = []
10.times do
  threads << Thread.new do 
    num = q.pop
    # marker 1
    puts "#{num}: " + calculate_fibonacci_for(num)
  end
end

For those of you who don't know, Queue is basically a (threadsafe) array that puts the calling thread to sleep if #pop is called on it while it's empty, and wakes it up again when it's not empty.

But how do we know when we're done? You can't call #join on all the threads; they never terminate because they're blocking on q. Here's where monitors come in.

Another potential alternative would be adding something like

sleep 0.1 until q.empty?

to the end of the script, but besides being blatantly inefficient, it's also not even correct. If we context switch from the thread at the "marker 1" comment above to the main thread on the last number, then we'll stop sleeping and the program will terminate, before we've even printed the last Fibonacci number. In fact, if you ever find yourself waiting for something to happen (and are tempted to write "sleep 0.1 until X", chances are you'll want a monitor/condition variable.

So, we can use monitors -- it allows you to wait until a condition is fulfilled in one thread(s), while another thread(s) can poke it to re-check the condition after the condition may have changed. Here's the previous script using monitors:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
require "thread"
require "monitor"
 
# Create our queue
q = Queue.new
# Using our queue itself as our monitor
q.extend(MonitorMixin)
# Adding the numbers we want to calculate for
q << 3 << 10 << 10000 < 2352352
# Keep track of how many results we have yet to process
results_unprocessed = q.length
 
cond = q.new_cond
 
threads = []
10.times do
  threads << Thread.new do 
    num = q.pop
    # marker 1
    puts "#{num}: " + calculate_fibonacci_for(num)
    # we're changing a condition of the monitor, so we need to synchronize
    q.synchronize do
      # updating condition
      results_unprocessed -= 1
      cond.signal
    end
  end
end
 
q.synchronize do
  cond.wait_while do
    # check condition here
    results_unprocessed > 0
  end
  # could have also done cond.wait_until { results_unprocessed == 0 }
end

The object that you're mixing in MonitorMixin is really only important in that whenever you access the condition variable, you'll need to be able to synchronize on the monitor (so they should be in the same scope). I could have used a different arbitrary object to mix in the MonitorMixin (like, say a new Object), but this is more "semantic" -- when creating a monitor that monitors the status of a queue, why not make the queue itself the monitor?

Finally, be sure you do wrap your condition variable calls in #synchronize calls -- otherwise you'll end up getting errors like "current thread not owner". #synchronize transfers temporary "ownership" of the monitor to the calling thread, sets Thread.critical = true, yields to the block, and then sets Thread.critical back to false. Lastly, it executes Thread.pass -- so that any threads woken up in the block (with cond.signal) will execute immediately. Of course, you could have learned that reading the Ruby source code, too, but...it's not terribly well-documented.

Check out the other interesting methods in the regular place. You can also say #wait_until instead of #wait_while on the condition variable, and if you want to wake/ping all threads waiting on a condition instead of just one, use #broadcast instead of #signal. Also check out the threadsafe Queue.

Filed under: ruby Leave a comment
Comments (5) Trackbacks (1)
  1. If you use queues in an example, you could just use two of them for your goal: one to add jobs, the second to wait for their results (elegant solution when you have as many IN events than OUT events).

    The monitor construct is useful in a single case: when you want to make sure that at most one thread execute inside the monitor. Usually all threads wait for a condition before entering the monitor, does stuff, exits (or wait again), and signal.

    To demonstrate it better, I think that you should have implemented the queue within the monitor as well because here you’re pop-ing with a synchronization construct, and monitoring the queue size in another one (without possibility to add new jobs in the queue easily).

    I would have done everything with the monitor construct with:
    - a Worker subclass of Thread with a busy? method
    - a regular array joblist that you can test on empty?

    Basically, you exit when joblist.empty? and not workers.find(&:busy?) which is easier to understand that test on a variable tracking the queue length.

    • So, I don’t actually care about what the outputs are, only that the job has finished. The reason there are two monitors here is because the action of popping off jobs and completing them are completely independent — there’s no reason one thread couldn’t be decrementing the results_unprocessed counter while another is simultaneously popping off a new job.

      I’m not sure I really like a solution that involves iterating over a list every time we want to see if we’re done — if you have a lot of worker threads (50? 100?) that’s a lot of unnecessary time spent inside the monitor (O(n) time on number of threads instead of O(1)).

      Lastly, I disagree that adding new jobs during execution wouldn’t be easy — it would be as simple as adding the job to the queue and incrementing the results_unprocessed variable. (Not sure if the queue manipulation would need to be inside the q synchronize block, but whatever).

      In any case, I’ll write up your idea and include it here, see if it makes more sense to me.

      • Okay, so here’s what I think you’re suggesting: http://gist.github.com/656925

        So you still need two condition variables, even with only one monitor. The example is a bit more complicated now, and…I’m not sure exactly what the benefit of subclassing Thread is. But there you have it.

  2. If expensiveness of threads is a worry to you, then they’re out. Use EventMachine, Revactor or Fibers instead.

    • That’s true, but Ruby’s (green) threads are comparatively cheap to native threads; which is why JRuby has an option to use a threadpool, built into the interpreter itself (so Thread.new won’t necessarily create a new thread).

      Anyway, for ReReplay I was initially using EventMachine, but I needed events to happen at very particular points in the future, and EventMachine’s granularity (which is at best 10ms) simply wasn’t good enough. Using threads I can have a granularity of under a millisecond. I was just sharing some commentary regarding what I learned while using monitors.


Leave a comment