Andrew Certain | 4 Jan 23:14 2007

Unexpected bevahiour in ThreadPoolExecutor


I've been using a ThreadPoolExecutor with corePoolSize == maxPoolSize,
expecting that I'd always have a fixed number of threads to operate on
tasks in the queue.  However, if a thread throws an exception and
terminates unexpectedly, no thread is spawned to replace it unless it's
the last thread in the pool, even if the task queue is full.  Looking at
the code, it seems that new threads are only spawned when execute is
called.  However, this behavior is at best unexpected (and I would
consider it a bug), especially in the following scenario:

You create a ThreadPoolExecutor (TPE) with corePoolSize = maxPoolSize = n.  
You then submit N >> n long-running tasks to the TPE at the start and 
never submit another task.  If one of those long-running tasks throws an 
exception and terminates, a new thread is not respawned, leaving you with 
n-1 threads to service the large number of tasks left in the queue.  If 
this happens frequently enough, you're left with only one thread in the 
pool (since workerDone will spawn a new thread if there is something in 
the queue and there are no more threads).

Am I doing something wrong with my usage of the TPE?  Surely the behavior 
I'm expecting (submit a bunch of tasks to a thread pool with a fixed 
number of threads at startup time and wait until they terminate) isn't 
unique.  Are there any good solutions?  Why wouldn't workerDone spawn
new threads if there are less than corePoolSize threads running and there 
is work in the queue?

Thanks.

Andrew

(Continue reading)

studdugie | 5 Jan 00:17 2007
Picon

Re: Closing a socket from another thread

Closing a socket from another thread works flawlessly in Sun JVM 1.5.x
and 1.6.x. I've been exercising this functionaly in the 1.5.x revisons
for at least 3 months. I just swithced to 1.6 and so far no problems.

On 12/19/06, Shaffer, Darron <Darron_Shaffer <at> stercomm.com> wrote:
>
>
> This is a pain before JDK 1.4, because every JVM has its own little quirks.
> In fact, for at least one 1.3 JVM closing a ServerSocket from a different
> thread would trigger a bug causing some future accepted sockets to be handed
> to the wrong ServerSocket!
>
> However, in 1.4 with the arrival of NIO the SocketChannel versions of
> sockets have much better specifications of thread behavior.  Its now
> supposed to just work, causing any threads blocked on the closed sockets to
> receive an exception.  However, I haven't tested this extensively because
> with NIO I avoid all blocking operations and don't have to do this sort of
> thing.
>
>  ________________________________
>  From: concurrency-interest-bounces <at> cs.oswego.edu
> [mailto:concurrency-interest-bounces <at> cs.oswego.edu] On
> Behalf Of Peter Kovacs
> Sent: Tuesday, December 19, 2006 7:30 AM
> To: Concurrency-interest <at> cs.oswego.edu
> Subject: [concurrency-interest] Closing a socket from another thread
>
>
> Hi,
>
(Continue reading)

David Holmes | 5 Jan 00:20 2007
Picon

Re: Unexpected bevahiour in ThreadPoolExecutor

Andrew,

You are right. This is a known "quality-of-implementation" issue with TPE
and is being addressed for Java 7. Meanwhile if you wrap your tasks so that
no exceptions can be thrown, then the worker threads won't terminate.

David Holmes

> -----Original Message-----
> From: concurrency-interest-bounces <at> cs.oswego.edu
> [mailto:concurrency-interest-bounces <at> cs.oswego.edu]On Behalf Of Andrew
> Certain
> Sent: Friday, 5 January 2007 8:14 AM
> To: concurrency-interest <at> cs.oswego.edu
> Subject: [concurrency-interest] Unexpected bevahiour in
> ThreadPoolExecutor
>
>
>
> I've been using a ThreadPoolExecutor with corePoolSize == maxPoolSize,
> expecting that I'd always have a fixed number of threads to operate on
> tasks in the queue.  However, if a thread throws an exception and
> terminates unexpectedly, no thread is spawned to replace it unless it's
> the last thread in the pool, even if the task queue is full.  Looking at
> the code, it seems that new threads are only spawned when execute is
> called.  However, this behavior is at best unexpected (and I would
> consider it a bug), especially in the following scenario:
>
> You create a ThreadPoolExecutor (TPE) with corePoolSize =
> maxPoolSize = n.
(Continue reading)

Peter Kovacs | 7 Jan 23:16 2007
Picon

ExecutorCompletionService

Hi,

I am looking at using the class ExecutorCompletionService (
http://dcl.mathcs.emory.edu/util/backport-util-concurrent/doc/api/edu/emory/mathcs/backport/java/util/concurrent/ExecutorCompletionService.html
). I would like to use some kind of ThreadPool executor for execution
which blocks when the maximum number of threads in the pool is reached
and all threads are utilized.

My motivation in looking for this kind of behavior is that I have a
large number of calculations to be concurrently done. The number of
calculations is defined by an external input source which potentially
far exceeds the system memory available. So waiting calculation tasks
cannot indefinitely be "buffered" in the system -- wherever they're
waiting: either in client queue(s) or in the threadpool (waiting for
CPU).

1.) Is there such a ThreadPool executor implementation already
existing out there?

2.) I assume that --when used whith such a ThreadPool-- the
ExecutorCompletionService.submit method
(http://dcl.mathcs.emory.edu/util/backport-util-concurrent/doc/api/edu/emory/mathcs/backport/java/util/concurrent/ExecutorCompletionService.html#submit(edu.emory.mathcs.backport.java.util.concurrent.Callable)
will block when there are no more threads available in the pool. Is
this correct?

3.) Is having calls to ExecutorCompletionService.submit wait for free
worker threads a good practice at all? (It is definitely easy to use
on the client side, but who knows...there may be considerations
against it)

(Continue reading)

Joe Bowbeer | 8 Jan 04:23 2007
Picon

Re: ExecutorCompletionService

On 1/7/07, Peter Kovacs <peter.kovacs.1.0rc <at> gmail.com> wrote:


I would like to use some kind of ThreadPool executor for execution
which blocks when the maximum number of threads in the pool is reached
and all threads are utilized.

1.) Is there such a ThreadPool executor implementation already
existing out there?


CallerRunsPolicy is a close approximation.  Would that suffice?

http://java.sun.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Doug Lea provided a WaitWhenBlocked handler in his original thread pool:

http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.java

You can try porting that if all else fails.


_______________________________________________
Concurrency-interest mailing list
Concurrency-interest <at> altair.cs.oswego.edu
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
Holger Hoffstätte | 8 Jan 09:32 2007
Picon

Re: ExecutorCompletionService

Peter Kovacs wrote:
> I would like to use some kind of ThreadPool executor for execution
> which blocks when the maximum number of threads in the pool is reached
> and all threads are utilized.

As Joe pointed out this existed in the Doug Lea's original concurrent
library, and since I needed it for backwards compatbility I "ported" it
(accidentally for use with the backport lib too):

source:
http://fisheye.codehaus.org/browse/mule/trunk/mule/core/src/main/java/org/mule/util/concurrent/WaitPolicy.java?r=trunk

test case:
http://fisheye.codehaus.org/browse/mule/trunk/mule/tests/core/src/test/java/org/mule/test/util/concurrent/WaitPolicyTestCase.java?r=trunk

Feel free to use it any way you want; if you have questions drop me a line.

That being said, waiting for the pool/queue only works for certain amounts
of work and producer/consumer ratios: if you produce work more quickly
than you consume it over an extended period of time, you will eventually
run out of something, somewhere, unless you throttle.

cheers
Holger
Peter Kovacs | 8 Jan 13:18 2007
Picon

Re: ExecutorCompletionService

Hi Joe,

Thank you for your reply.

I have considered using CallerRunsPolicy. If it is the "caller" who is
submitting tasks to the ExecutorCompletionService, "keeping it busy"
with actual work (when there are no more free threads left in the
pool) has indeed a similar effect to what I am after. I fear however
that the "caller" thread would then be too long busy with the actual
work leaving worker threads idly waiting for input.

Thank you for your tip about WaitWhenBlocked. I wonder why it is not
included in the "official" package. This behaviour appears to me so
widely useful. This is why I suspect that the reason for not including
this class may be that, after all, there are some conceptual problems
with it.

Thanks
Peter

On 1/8/07, Joe Bowbeer <joe.bowbeer <at> gmail.com> wrote:
> On 1/7/07, Peter Kovacs <peter.kovacs.1.0rc <at> gmail.com> wrote:
>
> >
> > I would like to use some kind of ThreadPool executor for execution
> > which blocks when the maximum number of threads in the pool is reached
> > and all threads are utilized.
> >
> > 1.) Is there such a ThreadPool executor implementation already
> > existing out there?
> >
> >
>
> CallerRunsPolicy is a close approximation.  Would that suffice?
>
> http://java.sun.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
>
> Doug Lea provided a WaitWhenBlocked handler in his original thread pool:
>
> http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.java
>
>  You can try porting that if all else fails.
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest <at> altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
>
Peter Kovacs | 8 Jan 14:04 2007
Picon

Re: ExecutorCompletionService

Hi Holger,

Thank you for your reply and pointing me to your "port". Wow...I am
surprised to see how elegantly this behavior can be achieved. (Being a
newbie to util.concurrent, I am often amazed how well this package is
designed; how elegantly one can use it.) I would definitely consider
replacing my tentative counter-based hacking with this one.

I am intrigued by your final remark, however. You say: "if you produce
work more quickly than you consume it over an extended period of time,
you will eventually run out of something, somewhere, unless you
throttle." My purpose with having the producer block when a limited
number of threads are fully utilized is just to suspend producing. Oh
I guess I see: do you mean by "throttling" that there is an (not
necessarily justified) overhead with handling rejected tasks (and task
rejection itself)?

Thanks a lot
Peter

On 1/8/07, Holger Hoffstätte <holger <at> wizards.de> wrote:
> Peter Kovacs wrote:
> > I would like to use some kind of ThreadPool executor for execution
> > which blocks when the maximum number of threads in the pool is reached
> > and all threads are utilized.
>
> As Joe pointed out this existed in the Doug Lea's original concurrent
> library, and since I needed it for backwards compatbility I "ported" it
> (accidentally for use with the backport lib too):
>
> source:
> http://fisheye.codehaus.org/browse/mule/trunk/mule/core/src/main/java/org/mule/util/concurrent/WaitPolicy.java?r=trunk
>
> test case:
> http://fisheye.codehaus.org/browse/mule/trunk/mule/tests/core/src/test/java/org/mule/test/util/concurrent/WaitPolicyTestCase.java?r=trunk
>
> Feel free to use it any way you want; if you have questions drop me a line.
>
> That being said, waiting for the pool/queue only works for certain amounts
> of work and producer/consumer ratios: if you produce work more quickly
> than you consume it over an extended period of time, you will eventually
> run out of something, somewhere, unless you throttle.
>
> cheers
> Holger
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest <at> altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
Holger Hoffstätte | 8 Jan 14:48 2007
Picon

Re: ExecutorCompletionService

Peter Kovacs wrote:
> I am intrigued by your final remark, however. You say: "if you produce
> work more quickly than you consume it over an extended period of time,
> you will eventually run out of something, somewhere, unless you
> throttle." My purpose with having the producer block when a limited
> number of threads are fully utilized is just to suspend producing. Oh

Ah, OK - sorry for the confusion. In that case you do throttle so it
should work. However you probably still don't really know how long to wait
up front, unless the calculations have an at least somewhat predictable
time to run.

> I guess I see: do you mean by "throttling" that there is an (not
> necessarily justified) overhead with handling rejected tasks (and task
> rejection itself)?

No, that is probably insignificant.

best
Holger
Holger Hoffstätte | 9 Jan 04:02 2007
Picon

Attempt at yet another mostly-concurrent result cache


I'd like to ask for some ideas about how to make a previously completely
locked method as concurrent as possible. I've made great headway but a
final bit has thrown me off. I've read JCIP and the chapter on the
Memoizer but it didn't seem to apply here; if some variation of using
FutureTask can help here too I'd love to hear about it.

The single method basically receives an incoming message, looks up a
corrsponding result batch and when the batch is ready, a result is
returned. There are many batches and evaluation/result creation might take
a while, in which I don't want to lock out other threads happily doing
their thing.

Here is what I did so far:

// used to be a synchronizedMap()
private final ConcurrentMap cache = new ConcurrentHashMap();

// now unsynchronized
BatchResult process(Message msg)
{
    String batchKey = msg.getBatchId();

    // get-or-create with the (very rare) unnecessary new object
    Batch batch = cache.get(batchKey);
    if (batch == null)
    {
        batch = new Batch();
        Batch prev = cache.putIfAbsent(batchKey, batch);
        if (prev != null) batch = prev; // flip refs
    }

    // regardlesss of thread we now have a valid batch;
    // add a message and evaluate it

    // PROBLEM: we only want one thread to update the batch at a time;
    // Batches are threadsafe but we want atomic update/evaluation,
    // sync on the batch, so threads for other batches are not blocked
    synchronized (batch)
    {
        batch.add(msg);
        if (batch.isReady())
        {
            // BUG1
            cache.remove(batchKey);
            return batch.toResult();
        }
    }

    // no result yet
    return null;
}

So far so good, though the astute reader will immediately spot the obvious
bug: BUG1 marks a lost update when two threads synchronize on the same
batch, one wins and removes the batch, the second thread adds to a removed
batch which disappears in GC land.

So we try harder:

    // SYNC1
    synchronized (batch)
    {
        // check whether the batch reference is still valid,
        // update if necessary (same as above)
        batch = cache.get(batchKey);
        if (batch == null)
        {
            batch = new Batch();
            Batch prev = cache.putIfAbsent(batchKey, batch);
            if (prev != null) batch = prev; // flip refs
        }

        // BUG2
        batch.add(msg);
        if (batch.isReady())
        {
            cache.remove(batchKey);
            return batch.toResult();
        }
    }

Now a thread that was blocked at SYNC1 does the get-or-create thing and
even plays nice with another thread coming in from above. but wait..if the
thread entering the method at the top synchronizes on the batch created
inside SYNC1 (which it got hold of via the cache), it will not block at
SYNC1, possibly overtaking us and we no longer have per-batch atomic
update/evaluation at BUG2. right?

So, we try even harder..

    // SYNC1
    synchronized (batch)
    {
        // check whether thee batch reference is still valid,
        // update if necessary (same as above)
        batch = cache.get(batchKey);
        if (batch == null)
        {
            batch = new Batch();
            Batch prev = cache.putIfAbsent(batchKey, batch);
            if (prev != null) batch = prev; // flip refs
        }

        // acquire either the already held batch lock again or
        // a new one created either by the toplevel get-or-create
        // or the one we just did
        synchronized (batch)
        {
            batch.add(msg);
            if (batch.isReady())
            {
                cache.remove(batchKey);
                return batch.toResult();
            }
        }
    }

In case you made it until here, here's my question: is this sort of
"double-checked get-or-create" really enough or did I miss something?
My gut feeling teels me that the second synchronize block is somehow just
a nested repetition of the first (requiring a third one ad.inf.) and that
I need to unravel this in some entirely different way. It seems to me that
the batch itself is the only available object available to synchronize on.

I did several multi-threaded state transition diagrams and all conditions
I've played through so far seem to resolve, so I would really like someone
else to think this through. Of course all other funky tricks using
AtomicReference/CAS loops or RWLocks are welcome too. :)

thanks!
Holger

Gmane