icon-arrow icon-check icon-mail icon-phone icon-facebook icon-linkedin icon-youtube icon-twitter icon-cheveron icon-download icon-instagram play close close icon-arrow-uturn icon-calendar icon-clock icon-search icon-chevron-process icon-skills icon-knowledge icon-kite icon-education icon-languages icon-tools icon-experience icon-coffee-cup
Werken bij Integration & Application Talents
Blog 22/03/2019

Processing dynamic amount of flows in parallel in Mule 3

Parallel processing

Some weeks ago I wrote a blog about processing dynamic amount of flows in Mule 4. In this blog I described it can be very useful to be able to process flows in parallel in order to improve the performance of a service, and how we were able to achieve this in Mule 4. After writing this blog post, we found a similar use case. However, in this case we needed to update an existing project instead of building a new one. Unfortunately this flow was a Mule 3 flow, where the Group Based Aggregator is not available. One option would be to migrate the whole project from Mule 3 to Mule 4, but this would take a lot of time. So back to the drawing board!

Mike Heeren Integratie interim expert bij Integration & Application Talents
Mike Heeren /
Integratie expert

For Each loops and Scatter-Gather components

In the preceding blogpost about Mule 4 I already described the downsides of the For Each and Scatter-Gather components when you want to achieve dynamic parallelism within a Mule flow. These reasons also apply for Mule 3. If you are interested in these reasons, please click here.

During this blog we will use the same case as example, as we did in the earlier blogpost. We will create an HTTP POST method which will receive a list of cities. The Mule flow will enrich this data with the latitude and longitude using the MetaWeather API. Off course, we want to retrieve the latitude and longitude data for each city concurrently again.

What about the Request-Reply scope and VM queues?

You are able to process data in parallel threads by using VM queues. What we could do is that we nest a VM queue within a For Each loop in order to trigger multiple (sub) threads. However, we still need the main thread to wait until all sub threads were completed, in order to aggregate the result.

For Each loop with VM queue

Another component that is available in Mule 3 is the Request-Reply scope. This, in combination with VM queues, can be used to convert a part of a synchronous flow to an asynchronous flow (so it can make the main flow wait until the parallelly executed sub flow is completed).

This sounded like a good solution. However, unfortunately it seems that you are not able to start multiple VM queues from the Request phase, but receive just 1 trigger in the Reply phase when all sub threads were completed.

Request-Reply scope with For Each and VM queues

Another thing I’ve tried was to wrap a For Each loop (or Collection Splitter­ – Collection Aggregator construction) around the Request-Reply scope. However, this caused the behavior that the main flow would wait for each sub flow to complete, before starting the next sub flow. So basically all sub flows would be processed sequentially then.

Note: The Message Properties element within the For Each is required to make the Request-Replywork within a For Each loop.

For Each loop with Request-Reply scope and VM queues

Back to the drawing board… Again!

We decided to switch back to the Mule 4 solution as starting point. We started by nesting a Async block inside a For Each loop. For the aggregation we use a custom Java class named ParallelProcessor and a custom Exception implementation named ParallelProcessorException:

ParallelProcessor.java

public class ParallelProcessor implements Serializable {
  private static final long serialVersionUID = 1L;
  private final int totalRecords;
  private List completedRecords = new ArrayList<>();
  
  public ParallelProcessor(final int totalRecords) {
    this.totalRecords = totalRecords;
  }
  
  public List getCompletedRecords() {
    synchronized (completedRecords) {
      return completedRecords;
    }
  }

  public void registerCompletedRecord(final Serializable record) {
    synchronized (completedRecords) {
      completedRecords.add(record);
    }
  }

  public void waitUntilComplete(final long maxWait, final long checkInterval) throws ParallelProcessorException {
    final long startTime = new Date().getTime();
    final long maxEndTime = startTime + maxWait;
    int amountOfIncompleteRecords = -1;
    while (new Date().getTime() <= maxEndTime) {
      amountOfIncompleteRecords = getAmountOfIncompleteRecords();
      if (amountOfIncompleteRecords == 0) {
        return;
      }
      try {
        Thread.sleep(checkInterval);
      } catch (final InterruptedException ie) {
        throw new ParallelProcessorException("Wait interrupted", ie);
      }
    }
    final StringBuilder errorStringBuilder = new StringBuilder("[").append(amountOfIncompleteRecords).append("] out of [").append(totalRecords).append("] were not completed in time");
    throw new ParallelProcessorException(errorStringBuilder.toString());
  }

  private int getAmountOfIncompleteRecords() {
    synchronized (completedRecords) {
      return totalRecords - completedRecords.size();
    }
  }
}

ParallelProcessorException.java

public class ParallelProcessorException extends Exception {
  public ParallelProcessorException(final String message) {
    super(message);
  }

  public ParallelProcessorException(final String message, final Throwable cause) {
    super(message, cause);
  }
}

Before the For Each loop, we will create a Flow Variable of the type ParallelProcessor. While creating this variable, we immediately set the total amount of sub flows that need to be processed (so in our case this is the amount of cities in the request payload).

At the end of the Async block (so where we would place the Group Based Aggregator in Mule 4), we will call the registerCompletedRecord method on the Flow Variable. Here we will also pass the payload of the completed record.

Finally, after the For Each loop, we will call the waitUntilComplete method on the Flow Variable. Here we will also configure the maximum time to wait for all records to be completed. This will be the case when the amount of registered completed records is equal to the amount which was set during the construction of the Flow Variable. When not all sub flows were completed in time, a ParallelProcessorException will be raised.

After the wait is complete we are still able to read the completedRecords from the Flow Variable. This contains a List of all payloads that were passed to the registerCompletedRecord methods. In our example we will use this to construct the response payload.

Parallel processing in Mule 3

When executing the above flow, we see the same response as in our Mule 4 flow.

Important: Please note that the order of the response array is determined by which sub flow finished processing first, instead of the trigger order! When you execute the flow multiple times, you’ll see that the order of the response entities varies every time.

Parallel processing in Mule 3 (Request and response)

The above sample application can be downloaded here.

Geen reacties

Geef jouw mening

Reactie plaatsen
Mike Heeren Integratie interim expert bij Integration & Application Talents
Mike Heeren /
Integratie expert

Wil je deel uitmaken van een groep gedreven en ambitieuze experts? Stuur ons jouw cv!