Size Based Aggregator in Mule 4

Author: Manish Prabhu

Introduction:

The Aggregator module in Mule 4 is used to combine two or more input messages together. The aggregated messages will follow a set of processors used in the particular mule flow. There are three aggregators available in Mule 4 under the aggregator module known as group based aggregator, size based aggregator & time based aggregator. The aggregator module can be imported in Anypoint studio from the exchange as follows:

Size based aggregator:

This aggregator operates on the basis of two basic parameters:

  1. Size of the aggregator 2) Timeout

Drag and drop size based aggregator in your mule flow. You will get the following configuration:

Max size parameter is used to aggregate the number of messages. Timeout is the second configurable parameter to allow the aggregator to wait for aggregating the messages for a certain amount of time. The timeout unit is in Seconds by default. In the above configuration example, the max size is 3 and the timeout is 20 seconds. It indicates that if 3 messages arrived before the timeout period i.e. 20 seconds then those messages will be aggregated and processed. If the number of messages arrived is less than 3 & if the timeout occurs then it will process the number of available messages(arrived messages before the timeout period).

The aggregation information is stored inside the object store. The object store can be configured as follows:

Go to the advanced tab of size based aggregator, you will get an object store configuration option.

Click on create new configuration & configure an object store as follows.

If we forget to add this configuration, then the aggregator will not work.

There are two options available in size based aggregator: incremental aggregation & aggregation complete. We need to add the processing components in the aggregation complete section. A flow reference is added in the above configuration.

The complete flow is written as:

Size-based-aggregator-processing-subflow is written as:

Inside the above logger component, we have configured a logger message as follows:

In order to achieve the timeout functionality, we need to configure an aggregator listener. It will operate based on the configured timeout value in size based aggregator. It will listen to the aggregator & follows the timeout functionality of the aggregator. In this case, the attribute isAggreagtionComplete will be false.

The aggregator listener should listen to the same aggregator hence the name should be the same in aggregator listener configuration. The aggregator listener is configured as:

We need to check the attribute value of isAggreagtionComplete hence we have written not attributes.isAggregationComplete in when part of the choice router. It has the same flow reference to the logger component as that of the main size based aggregation flow.

Testing the aggregator application:

The application is deployed over cloudhub. The output of the first scenario with three messages is tested as follows:

Post three messages in sequence:

1)

{

“name”:”jj1",

“type”:”test”

}

2)

{

“name”:”jj2",

“type”:”test”

}

3)

{

“name”:”jj3",

“type”:”test”

}

The cloudhub logs are:

22:37:33.796 06/13/2021 Worker-0 [MuleRuntime].uber.08: [size-based-aggregator].size-based-aggregator-main-flow.CPU_INTENSIVE @2aaea808 INFO

event:d8349850-cc69–11eb-a51f-06b5239023a4 final payload: [

{

“name”: “jj1”,

“type”: “test”

},

{

“name”: “jj2”,

“type”: “test”

},

{

“name”: “jj3”,

“type”: “test”

}

]

You can see an array of aggregated messages.

In second scenario, we will post only one message as follows:

{

“name”:”jj4",

“type”:”test”

}

23:17:47.127 06/13/2021 Worker-0 [MuleRuntime].uber.19: [size-based-aggregator].size-based-aggregatorFlow.CPU_INTENSIVE @760cf557 INFO

event:cb940fd1-cc6a-11eb-a51f-06b5239023a4 final payload: [

{

“name”: “jj4”,

“type”: “test”

}

]

 

The application will wait for 20 seconds & will show the array containing one message as aggregation output.

We use cookies on this site to enhance your user experience. For a complete overview of how we use cookies, please see our privacy policy.