Monday, 28 March 2016

Julia Evans: Thread pools! How do I use them?

http://jvns.ca/blog/2016/03/27/thread-pools-how-do-i-use-them/

In Java, a thread is "blocked" when it's waiting for a "monitor" on an object. When I originally googled this I was like "er what's a monitor?". It's when you use synchronization to make sure two different threads don't execute the same code at the same time.
// scala pseudocode
class Blah {
    var x = 1
    def plus_one() {
        this.synchronized {
            x += 1
        }
    }
}
This synchronize means that only one thread is allowed to run this x += 1block at a time, so you don't accidentally end up in a race. If one thread is already doing x += 1, the other threads end up -- you guessed it -- BLOCKED.
The two things that were blocking my threads were:
  • lazy vals in Scala used synchronized internally, and so can cause problems with concurrency
  • Double.parseDouble in Java 7 is a synchronized method. So only one thread can parse doubles from strings at a time. Really? Really. They fixed it in Java 8 though so that's good.

This FuturePool abstraction is cool. Just give the thread work and it'll do it! Don't worry about what's underneath! But now we need to understand what's underneath.
In Java, you normally handle thread pools with something called an ExecutorService. This keeps a thread pool (say 32 threads). Or it can be unbounded! In this case I wanted to only have as many threads as I have CPU cores, ish.
So let's say I run ExecutorService.submit(work) 32 times, and there are only 32 threads. What happens the 33rd time? Well, ExecutorService keeps an internal queue of work to do. So it holds on to Thing 33 and does something like
loop {
    if(has_available_thread) {
        available_thread.submit(queue.pop())
    }
}
In my case, I was reading a bunch of data off disk. maybe 10GB of data. And I was submitting all of that data into the ExecutorService work queue. Unsurprisingly, the queue exploded and crashed my program.
I tried to fix this by changing the internal queue in ExecutorService to an ArrayBlockingQueue with size 100, so that it would not accept an unlimited amount of work. Awesome!

comment ->

Did you solve this? Usually what I do here is 1) inject a limitted queue like you did and 2) injecting a BlockingRejectedExecutionHandler to theThreadPoolExecutor constructor. That way, my submittor will simply block until there is more space in the queue.
Last, but not least, depending on your problem you might want to look at using a ForkJoinPool (link) instead. It handles tasks that are split into smaller tasks. If your problem is recursive in nature, you could use this to execute your million tasks.

No comments:

Post a Comment