Author: Mohammad Mazhar Ansari

In this blog we will learn how Mule 4 Batch Processing works using an example to understand different configuration parameters.

What is Mule 4 Batch Processing?

  • One of MuleSoft selling point or you can say best feature is Batch Processing
  • It help to process large volumes of data in chunks and in parallel

Overview of Batch Job and its configuration parameters:

Batch Processing is only available in Mule EE (MuleSoft Enterprise Edition)

General -> Name: Name of the Batch JOB Activity in Mule Flow

General -> Max Failed Records: Mule has three options for handling a record-level error: Finish processing, Continue processing and Continue processing until the batch job accumulates a maximum number of failed records. This behavior can be controlled by Max Failed Records.

  • The default value is Zero which corresponds to Finish processing.
  • The value -1, corresponds to Continue processing.
  • The value +ve integer, corresponds to Continue processing until the batch job accumulates a maximum number of failed records

General -> Scheduling Strategy: Scheduling Strategy decides now more than 1 Batch instance will run in case others start before current batch job finishes.

  • ORDERED_SEQUENTIAL (Default)
  • ROUND_ROBIN

General -> Job Instance ID: By default the Batch Job Instance ID is created as UUID automatically by MuleSoft however this field can be used to overwrite default behavior

General -> Batch Block Size: Number of records treated as chunks and processed by one thread. Default value is 100.

General -> Max Concurrency: Max number of Thread to start to process different blocks in parallel. Default and Max Value is 16.

History -> Max Age and Time Unit: Batch process retains the history of batch instances in the temporary directory of Mule Runtime. By default, the retention policy is set to 7 days. A monitoring process will remove the temporary data that has met the expiration criteria. Using Max Age and Time Unit we can change this default behavior.

Overview of Batch Step and its configuration parameters:

General -> Name: Name of the Batch Step Activity in Mule Flow

A batch step uses two attributes to filter records:

General -> Accept Expression: To process only records that evaluate to true; if the record evaluates to false, the batch step skips the record and sends it to the next one. In other words, the records with an accepted expression that resolves to false are the ones that Mule filters out.

General -> Accept Policy: Batch step to process only the records which, relative to the value of the accept policy attribute, evaluate to true. Refer to the table below for a list of the available values for the accept policy.

Now lets run an example to cover below points:

  • Batch Block Size
  • Max Failed Records
  • Accept Expression
  • Accept Policy

Sample Flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:batch="http://www.mulesoft.org/schema/mule/batch"
	xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd 
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
	<flow name="batchPOC" doc:id="011a4555-c6f8-439f-8fb9-df30137d8287" >
		<http:listener doc:name="Listener" doc:id="beaca31d-2d53-47db-b16e-1b0db8043425" config-ref="HTTP_Listener_config" path="/batch"/>
		<ee:transform doc:name="Transform Message" doc:id="20b5aec3-9bed-44fe-88d0-9f18a91900e6" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
1 to 15]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<batch:job jobName="loggingBatch_Job" doc:id="5d7833ae-480a-409a-baa9-7ccb3f4ba6c5" blockSize="3" maxFailedRecords="-1" schedulingStrategy="ROUND_ROBIN">
			<batch:process-records >
				<batch:step name="Batch_Step_01" doc:id="113e7380-2559-49d3-b3c7-2e62b0cd0cbf" acceptExpression="payload &lt; 10">
					<logger level="INFO" doc:name="Logger" doc:id="9392037b-16cb-4bf0-9853-af533a09c641" message='#["Batch_Step_01: " ++ payload as String]'/>
					<ee:transform doc:name="Transform Message" doc:id="94a20d33-415b-4b7c-ae4c-10b803ac1df8" >
						<ee:message >
							<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
100 + (payload as Number)]]></ee:set-payload>
						</ee:message>
					</ee:transform>
				
</batch:step>
				<batch:step name="Batch_Step_02" doc:id="b1db6a6c-2603-41bf-a232-455057b8bdf7">
					<logger level="INFO" doc:name="Logger" doc:id="a8b6cfc9-323b-4065-998f-4b73d77cf9c4" message='#["Batch_Step_02: " ++ payload as String]'/>
					<ee:transform doc:name="Transform Message" doc:id="d9ff8200-bc9a-4760-85f5-94d818d452f9" >
						<ee:message >
							<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
if ((payload mod 4) == 0)
	1/0
else
	payload]]></ee:set-payload>
						</ee:message>
					</ee:transform>
				</batch:step>
				<batch:step name="Batch_Step_03" doc:id="29d68595-8f9a-4236-82b8-ca8f31577c3e">
					<logger level="INFO" doc:name="Logger" doc:id="b45e7ac7-5112-4eca-98b6-06f5dc8b4d96" message='#["Batch_Step_03: " ++ payload as String]'/>
				</batch:step>
				<batch:step name="Batch_Step_04" doc:id="d5a5f4ef-dc4a-4cb6-860d-557e1839aa27" acceptPolicy="ALL">
					<logger level="INFO" doc:name="Logger" doc:id="54efa46a-327d-4b82-9429-f4500c638da2" message='#["Batch_Step_04: " ++ payload as String]'/>
				</batch:step>
			
</batch:process-records>
			<batch:on-complete >
				<ee:transform doc:name="Transform Message" doc:id="762f6eb3-d3a7-48c9-ba8a-7086a49cc82a" >
					<ee:message >
						<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
					</ee:message>
				</ee:transform>
				<logger level="INFO" doc:name="Logger" doc:id="d6d1f7e8-b4bb-4bb8-beb6-e0f19606064c" message="#[payload]"/>
			</batch:on-complete>
		</batch:job>
		<ee:transform doc:name="Transform Message" doc:id="f5b5bf3c-b9b9-41b3-b6bf-3b8350fdfbcc" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output text/plain
---
"Success"]]></ee:set-payload>
			</ee:message>
		</ee:transform>
	</flow>

</mule>

Overview:

  • Setting Payload as an Array with value [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] before starting Batch Job
  • Batch Job’s Max Failed Records is set to -1 i.e. it will continue processing
  • Batch Job’s Batch Block Size is set to 3 i.e. each thread will only take 3 records and process it
  • Batch Job’s Batch_Step_01 Accept Expression is set to payload < 10 and Accept Policy is set to NO_FAILURES i.e. Batch_Step_01 will process only record 9 (1-9) records and not process last 6 (10-15) records
  • Batch Job’s Batch_Step_02 Accept Expression is set to BLANK and Accept Policy is set to NO_FAILURES i.e. Batch_Step_02 will process all the records even ignored records by Batch_Step_01
  • Batch Job’s Batch_Step_02 is doing dataweave transformation on payload as show below i.e. every 4th record will be failed because of divide by zero error so in total only 12 records will be successful and rest 3 records will be failure
%dw 2.0
output application/java
---
if ((payload mod 4) == 0)
	1/0
else
	payload
  • Batch Job’s Batch_Step_03 Accept Expression is set to BLANK and Accept Policy is set to NO_FAILURES i.e. Batch_Step_03 will process all (12) successful records and ignore failed (3) records from Batch_Step_02
  • Batch Job’s Batch_Step_04 Accept Expression is set to BLANK and Accept Policy is set to ALL i.e. Batch_Step_04 will process all (15) records

Run the flow and collect the logs. I tried to put the logs in sheet format after removing unnecessary content and sorted based on Step Name. it will look like below.

Observations:

  • Each thread processed exactly 3 records i.e. defined by Batch Block Size
  • Even after 3 record failure did not stopped batch to process records as Max Failed Records defined as -1
  • Batch_Step_01 only processed 9 (1-9) records and Ignored 6 (10-15) records i.e. Accept Expression filtered 6 (10-15) records
  • Batch_Step_02 processed all 15 (1-15) records as there is no Accept Expression
  • Batch_Step_03 processed only 12 records i.e. processed only successful records from last step Batch_Step_02 as Accept Policy was set to NO_FAILURES
  • Batch_Step_04 processed all 15 records as Accept Policy was set to ALL
  • Batch_Step_02 for records (10-15) are processed almost at the same time as for Batch_Step_01 for records (1-9)
  • Each Batch of records are processed by different threads as thread switching is possible after each step completion
  • All its possible that for few records they are still processing at Batch Step and other records next Batch Step processing kicked off

Note: Run the example flow with different combinations and see the behavior.

Leave a Comment