2

How to release resources from an interrumpted task or job?

Posted by Guillermo García on 5:33 AM in , ,
Since Java 5, task scheduling is quite easy thanks to the java.util.concurrent package and the  ScheduledExecutorService implementations. One of the must used implementations is the ScheduledThreadPoolExecutor instanced by the Executors.newScheduledThreadPool(...) factory methods.

When one Runnable job (or task) is scheduled with the ScheduledExecutorService implementations, a reference to the new scheduled task is returned to the invoker. This reference points to a ScheduledFuture object, and it allows manipulate the thread that will execute, in the "future", the Runnable task.

With this architecture, the invoker of the scheduler have a reference to manipulate the thread, but not the Runnable object directly. So, for example, if the invoker doesn't keep a reference to Runnable object, it can't force the release of resources of the Runnable object after interrupt the thread that executes the task (using the ScheduledFuture.cancel(true) method).

NOTE: ScheduledFuture.cancel(false) doesn't interrupt the thread and only mark it as canceled for the scheduler. This is useful to stop periodicals tasks, but not for interrupting a running thread.

Obviously, the must elegant way to release resources if to implement a finally block in the Runnable.run() method, but sometimes that is not enough, because the finally block may not be executed after a thread interruption. If you don't believe me check this post.

So, what to do? Keep a reference to each scheduled task in the invoker? That might work, if you can encapsulate all the task scheduling operations and assure that every time that a thread is canceled, the corresponding Runnable object is forced to release the resources. In conclusion, a lot of work and discipline.

So, in my humble opinion, there is simpler way to do this:

  1. Extend the ScheduledThreadPoolExecutor to overwrite the decorateTask()
  2. Decorate our the Runnable with our implementation of a ScheduledFuture
  3. Implement our custom cancel() method that actually cancel the thread but also manipulate the Runnable object to force the resource releasing.

Sounds complicated I know, but check this code out and judge it by yourself.

The new Scheduler...
public class MyScheduler extends ScheduledThreadPoolExecutor {

    public MyScheduler(int poolSize) {
        super(poolSize);
    }

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> futureTask) {
        RunnableScheduledFuture<V> myFutureTask;
        try {
            // Trying to decorate the MyRunnable with the decorator
            myFutureTask = new MyScheduledFuture<V>((MyRunnable) runnable, futureTask);
        } catch (ClassCastException e) {
            // If the runnable is not an instance of MyRunnable use the default decorator
            myFutureTask = super.decorateTask(runnable, futureTask);
        }
        return myFutureTask;
    }
}

The new future task...
public class MyScheduledFuture<V> implements RunnableScheduledFuture<V> {

    private MyRunnable myRunnable;
    private RunnableScheduledFuture<V> futureTask;

    public MyScheduledFuture(MyRunnable myRunnable, RunnableScheduledFuture<V> futureTask) {
        this.myRunnable= myRunnable;
        this.futureTask= futureTask;
    }

    @Override
    public void run() {
        //Delegate the run method to the original future task
        this.futureTask.run();
    }

    @Override
    public boolean cancel(final boolean pMayInterruptIfRunning) {
        // Cancel the thread as usual
        boolean isCanceled = this.futureTask.cancel(pMayInterruptIfRunning);

        //Release resources
        if(pMayInterruptIfRunning){
            myRunnable.forceReleaseOfResources();
        }
        
        return isCanceled;
    }

    //Delegate the other methods of the interface 
    //to the given task (as in the run() method)
    //...
}

The interface to be used for those task that handles resources ...
public interface MyRunnable extends Runnable {
    /**
     * Release resources
     */
    public void forceReleaseOfResources();
}

So, using directly MyScheduler class to schedule MyRunnable tasks will force the release of resources when the thread is interrupted by ScheduledFuture.cancel(true).

But, what about the ScheduledExecutorService.shutdown() and ScheduledExecutorService.shutdownNow() methods?

Here we have a little issue: actually the ScheduledExecutorService.shutdown() calls ScheduledFuture.cancel(false), so your resources will not been released. You must uses this method only when you want to stop the scheduler, but you want to "wait" for those task that are in execution.

In other hand, the ScheduledExecutorService.shutdownNow() doesn't call the ScheduledFuture.cancel() methods, it only interrupts the running and idles threads. So if you want to interrupt all the tasks and forcing the release of resources, you will need to overwrite the shutdownNow() method to call the ScheduledFuture.cancel(true) on your tasks. If you need to stop the scheduler too, call super.shutdownNow().

The brand new Scheduler...
public class MyScheduler extends ScheduledThreadPoolExecutor {

    private List<RunnableScheduledFuture<?>> myTasks = new ArrayList<RunnableScheduledFuture<?>();

    public MyScheduler(int poolSize) {
        super(poolSize);
    }

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> futureTask) {
        RunnableScheduledFuture<V> myFutureTask;
        try {
            // Trying to decorate the MyRunnable with the decorator
            myFutureTask = new MyScheduledFuture<V>((MyRunnable) runnable, futureTask);
            //Adding the scheduled task
            myTasks.add(myFutureTask);
        } catch (ClassCastException e) {
            // If the runnable is not an instance of MyRunnable use the default decorator
            myFutureTask = super.decorateTask(runnable, futureTask);
        }
        return myFutureTask;
    }

    @Override
    public List<Runnable> shutdownNow() {
        if (myTasks!= null) {
            // Cancels and interrupts all the jobs from the watch list
            for (RunnableScheduledFuture<?> job : myTasks) {
                job.cancel(true);
            }
            myTasks= null;
        }
        // Shutdown the scheduler
        return super.shutdownNow();
    }

}

NOTE: You must use a custom list of tasks and not the ScheduledExecutorService.getQueue() because this list contains the tasks that are waiting for execution (periodicals tasks, one-time tasks not executed yet, etc), so the tasks in execution aren't in that list. Manage this list could be very hard, so if you find another way to get the running tasks ... post a comment!!!

2 Comments


Hey Guillermo,

I am Achal Agrawal, currently an intern at Capgemini (which is a partner of Inova's Innovation management software)

I'm writing to you here because i could not find contact address of any person working for inova in france.

I had an idea to improve the innovation management software, about which i had sent a mail through the inova site a week back, but i dont trust them to transfer the mails to right people.

Could you give me your email address or the address of someone who is working on the innovation management software, so i can propose the idea and see if it interests them. The idea was deemed interesting and pertinent by a leading professor on Innovation, (Prof. Jean Pierre Benghozi)

My address is achal.agrawal @ capgemini.com

Looking forward to hearing from you.

Achal AGRAWAL
intern@Capgemini France


invested my money superior roulette casino games and made the casino choice

Copyright © 2009 ggarciao.com
- Cup of Java -
All rights reserved.