Wednesday, April 30, 2014

Worker Pattern

In a previous post, I explained and extended the Command Pattern.

By design, the Command Pattern is reusable but constrained to its state at initialization.

A Light Toggle Command (which turns a light on and off) is very much reusable as long as it's one Command per Light. It's the same for an hypothetical Http Command. An Http Command could send an HTTP request passed to its constructor, but to send another one, a new Http Command would need to be instantiated. There are workarounds for sure (e.g. use thread local variable) but by design the Command Pattern is not very runtime reusable.


Concept


The Worker Pattern is based on the analogy of a real worker. A worker gets some work to do, does it, produces results and is ready for more work.
Code-wise, it's a Command Pattern with a parameter and a result.

Design


As mentioned above, a Worker is nothing more than a Command with an argument returning a result:

public interface IWorker<W, R> {
  public R perform(W pWork) throws Exception;
}
Despite the small changes, this design provides us with some extra features.

Fail fast, retry and protect


The Retry, Circuit Breaker and Supervisor Patterns can be applied to workers, just like they were for commands as explained in my previous post Command Patterns.

A Supervisor Worker can monitor the time of execution of a proxied worker to fail fast when needed.
A Retry Worker can re-execute a proxied worker upon certain exception thrown.
A Circuit Breaker Worker can trip and protect a proxied worker from being executed again for a little while.


Split, Perform and Merge


Thanks to its design, it's possible to perform work in parallel while keeping the same end implementation.
This might yield some performance improvements in certain situations.

Concept


The concept is, provided a divisible amount of work, divide it, perform each chunk of work separately, merge the results and return a single result.

Implementation


An SPM Worker contains all the logic to wrap an existing worker and start performing its work in parallel.
Any SPM Worker requires 2 specific components to work:
  1. an implementation of IWorkSplitter. Its job is to divide the work into chunks of work to be done.
  2. an implementation of IResultMerger. Each chunk of work will be performed in parallel and each will return a result. The IMerger is in charge of merging those results into a single one to be returned.
A simple implementation, SimpleSPMWorker, is provided. It proxies an IWorker and will use it to perform all chunks of work in parallel. Therefore, it assumes the given IWorker is thread-safe and stateless.

Its implementation is as follows:
public class SimpleSPMWorker<W, R>
 implements IWorker<W, R> {

 private IWorkSplitter<W> mSplitter;
 private IResultMerger<R> mMerger;
 private ExecutorService mExecutorService;

 public SimpleSPMWorker(ExecutorService pExecutorService, IWorkSplitter<W> pSplitter, IResultMerger<R> pMerger) {
  mSplitter = pSplitter;
  mMerger = pMerger;
  mExecutorService = pExecutorService;
 }

 @Override
 public R perform(W pWork) throws Exception {
  Collection<W> oWork = mSplitter.split(pWork);
  Collection<R> oResults = perform(oWork);
  return mMerger.merge(oResults);
 }

 protected Collection<R> perform(Collection<W> pWork) throws Exception {
  List<R> oResults = new ArrayList<R>();
  List<Callable<R>> oTasks = new ArrayList<Callable<R>>();
  for (final W w : pWork) {
   oTasks.add(createTask(w));
  }
  List<Future<R>> oFutures = mExecutorService.invokeAll(oTasks);
  for (Future<R> f : oFutures) oResults.add(f.get());
  return oResults;
 }

 protected Callable<R> createTask(final W pWork) {
  return new Callable<R>() {
   @Override
   public R call() throws Exception {
    return mImpl.perform(pWork);
   }
  };
 }

}

The code is pretty self-explanatory but there's is one feature not very obvious here.
Because of the use of ExecutionService, the order of the results to be merged, is is the same of the order of the chunks of work.
  • First, the work is split into a collection: oWork (line #16)
  • It creates a  Callable instance for each chunk of work to be performed, into a collection oTasks. Each Callable from oWork is in charge of a work at the same index (line #25)
  • Then, the ExecutionService invokes all oTasks and yields to a oFutures, keeping the same order as designed (see Javadoc of ExecutorService) (line #27)
  • Finally, it waits indefinitely for all results to be available in oFutures and add them one after another into a list oResults (line #28) to be returned.
  • When it comes to merging (line #18), the results oResults passed should be in the same order as work divided earlier on, i.e. the result of oWork.get(i) is in oResults.get(i).

Chaining


Thanks to its design again, it's possible to chain workers.

A ChainWorker is as follows:
public class ChainWorker<W,R,S> implements IWorker<W, R> {
 
 private IWorker<S, R> mLeft;
 private IWorker<W, S> mRight;
 
 public ChainWorker(IWorker<S, R> pLeft, IWorker<W, S> pRight) {
  mLeft = pLeft;
  mRight = pRight;
 }
 @Override
 public R perform(W pWork) throws Exception {
  S oResult = mRight.perform(pWork);
  return mLeft.perform(oResult);
 }
}

The concept is to pass 2 workers into the ChainWorker and when it comes to performing work it will basically do:
 R oResult = pLeft.perform(pRight.perform(W));

Provided the following workers,
IWorker<Integer[], Integer> oSumWorker = new IWorker<Integer[], Integer>() {
 @Override
 public Integer perform(Integer[] pWork) throws Exception {
  int oResult = 0;
  for (Integer i : pWork) oResult += i.intValue();
   return oResult;
  }
};
IWorker<Integer, Double> oSqrtWorker = new IWorker<Integer, Double>() {
 @Override
 public Double perform(Integer pWork) throws Exception {
  return Math.sqrt(pWork.intValue());
 }
};
IWorker<Integer[], Integer[]> oSqrWorker = new IWorker<Integer[], Integer[]>() {
 @Override
 public Integer[] perform(Integer[] pWork) throws Exception {
  Integer[] oResult = new Integer[pWork.length];
  for (int i = 0; i < pWork.length; i++) oResult[i] = pWork[i] * pWork[i];
   return oResult;
  }
};
...it's possible to chain them manually in order to do sqrt(a^2 + b^2 + c^2), given a=1,b=2 and c=3:
Integer[] oInput = new Integer[] { 1, 2, 3 };
Integer[] oSqr = oSqrWorker.perform(oInput);
Integer oSum = oSumWorker.perform(oSqr);
Double oSqrt = oSqrtWorker.perform(oSum);

Whe using the ChainWorker, we end up with:
ChainWorker<Integer[], Integer, Integer[]> oChainOne = new ChainWorker<Integer[], Integer, Integer[]>(oSumWorker, oSqrWorker);
ChainWorker<Integer[], Double, Integer> oChainTwo = new ChainWorker<Integer[], Double, Integer>(oSqrtWorker, oLinkOne);
Double oSqrt = oChainTwo.perform(oInput);

To help chaining workers, one could use the ChainBuilder:
public class ChainBuilder<W,R> {
 private IWorker<W, R> mChain;
 private ChainBuilder(IWorker<W, R> pWorker) {
  mChain = pWorker;
 }
 public static <W,R> ChainBuilder<W, R> newBuilder(IWorker<W, R> pWorker) {
  return new ChainBuilder<W, R>(pWorker);
 }
 public <R2> ChainBuilder<W,R2> chain(IWorker<R, R2> pNextWorker) {
  return new ChainBuilder<W, R2>(new ChainWorker<W, R2, R>(pNextWorker, mChain));
 }
 public IWorker<W, R> getChain() {
  return mChain;
 }
}

Doing the same exact chaining previously shown gets then a lot easier with ChainBuilder:
IWorker<Integer[], Double> oChainWorker = ChainBuilder
 .newBuilder(oSqrWorker)
 .chain(oSumWorker)
 .chain(oSqrtWorker)
 .getChain();
Double oSqrt = oChainWorker.perform(oInput);

Examples


Search


The source code on github provides an example application applying Worker Pattern to an existing domain.
The sample is about a Searcher, its SearchParameters and SearchResults, all 3 being part of a fictional domain. It defines a SearcherWorker, wrapping the Searcher, using SearchParameters as work and SearchResults as result.
It implements IWorkSplitter and IResultMerger to split SearchParameters and merge SearchResults respectively and uses SimpleSPMWorker to divide work and execute searches in parallel.

Http Worker


Similar to the Search Sample, this one wraps Apache HttpClient into an HttpWorker.
It demonstrate the use of the Retry and Circuit Breaker pattern with HttpWorker and simulate connection failures.


References


[1] Command Pattern, Luc Pezet, Mar. 22 2014
[2] Source code, Luc Pezet, GitHub.com

No comments:

Post a Comment