Java Schedulable CompletionService

Wolfie — August 5, 2008, 4:10 pm

This week I wanted to knock-up a tool to test the performance of our Monte Carlo grid with a set of more complex algorithms so it seemed like a good idea to start by replicating flows from production. Each incoming trade is timestamped on receipt and thus could be replayed into the test grid at exactly the same rate. The Java language has an excellent set of tools for multi-threading and quite complex problems can be solved with relative ease given a solid understanding of classic threading models. However this time I noticed that there had been an omission of some importance from the concurrency package, while there is a ExecutorCompletionService class there is no complimentry Scheduled ExecutorCompletionService to complement the ScheduledThreadPoolExecutor and while as in most languages it is easy enough to “roll your own” it was not quite as straightforward as I first thought but after a little cogitating I came up with a straightforward solution and thought I’d share.
fag packet specification
First of all you need to wrap the data you want to carry within the schedule in a Callable and in turn wrap that in a Future object prior to scheduling within the ExecutorCompletionService. The subtle bit is passing in a reference to the external CompletionService during construction of the ScheduledJob object and then instantiating a WeakReference to keep a reference to it for the call() override; this makes sure that our data items can be garbage collected later-on when they are no longer required.

Below is an example of how to do this with classes Job and ScheduledJob which have been specialised to carry a data payload of type Map<String, Object> which is general purpose and handy for most database uses but you can use anything else you like.



import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;

/**
 @author Wolfie
 *
 */
public class ScheduledJob implements Callable<Future<Map<String, Object>>> {
  
  private Job _job;
  private WeakReference<CompletionService<Map<String, Object>>> _completer;
  
  public ScheduledJob(Job theJob, CompletionService<Map<String, Object>> completionService ) {
    _completer = new WeakReference<CompletionService<Map<String, Object>>>(completionService);
    _job = theJob;
  }

  /* (non-Javadoc)
   * @see java.util.concurrent.Callable#call()
   */
  @Override
  public Future<Map<String, Object>> call() throws Exception {
    return _completer.get().submit(_job);
    
  }
}

class Job implements Callable<Map<String, Object>> {
  
  private Map<String, Object> data;

    public Job(Map<String, Object> payload) {
      data = payload;
    }
    
    public Map<String, Object> call() throws Exception {
      return data;
    }
}
 

Definitions should be provided for the ScheduledExecutorService and CompletionService like this, remembering that the ExecutorService should be initialised with a thread-pool size suitable for your solution.



  public ScheduledExecutorService _scheduledService = Executors.newScheduledThreadPool(3);
  public CompletionService<Map<String, Object>> _completionService = new ExecutorCompletionService<Map<String, Object>>(_scheduledService);

 

All that then remains is to instantiate your data payload, wrap it in a Job which is wrapped in a ScheduledJob and schedule it within the ScheduledExecutorService with your desired delay. Retrieval (as shown below) is a matter of calling thread blocking function take() on the CompletionService, which returns the next chronologically available Future object. Obtaining our original data is achived through a call to get() and we’re done.



    HashMap<String, Object> map3 = new HashMap<String, Object>();
    map3.put("Payload"new String("Nearly done now"));
    theApp._scheduledService.schedule(new ScheduledJob(new Job(map3), theApp._completionService )2000, TimeUnit.MILLISECONDS);
    // Retrieve timers
    Logger.getLogger(TimerPoolSample.class.getName()).info("Retrieving timers");
    
    try {
      // Read the results off the completion service
      for(int i = 0; i < 3; i++)
      {
        Future<Map<String, Object>> result = theApp._completionService.take();
        
        Map<String, Object> data = result.get();
        if(data != null)
        {
          Logger.getLogger(TimerPoolSample.class.getName()).info("Retrieved : " + data.get("Payload"));
          data = null;
          result = null;
        }
      }
    }
    catch(InterruptedException ix)
    {
      Logger.getLogger(TimerPoolSample.class.getName()).severe("Thread Interrupted : " + ix.getMessage());
    }
    catch(ExecutionException ex)
    {
      Logger.getLogger(TimerPoolSample.class.getName()).severe("Execution failure : " + ex.getMessage());
    }
    

    
 

You can download the example Eclipse project here and experiment yourself.

For further reading I recommend Java Concurrency in Practice by Brian Goetz.

2 Comments »

RSS feed for comments on this post. TrackBack URI.

  1. Comment by xoggoth @ August 6, 2008, 9:51 pm

    Aaaah. Once upon a time, before I retired, I would have understood all that and would doubtless have PUT YOU RIGHT! young Wolfie. As it is. What?

  2. Comment by Colin Campbell @ August 11, 2008, 4:49 am

    Onyaaa Wolfie! I am glad I don’t have to deal with that stuff. I was hopeless at programming in University.

Leave a comment


*
To prove you're a person (not a spam script), type the security word shown in the picture. Click on the picture to hear an audio file of the word.
Click to hear an audio file of the anti-spam word

Line and paragraph breaks automatic, e-mail address never displayed, HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>