Author: Ashish Yadav
In this article, we will talk about how to process large amounts of data using MuleSoft. There are many scenarios where we are supposed to process hundreds of thousands or millions of records.
Let’s consider a use case where we get data from oracle DB and load it into salesforce einstein analytics. Here, we are moving data from one system to another. The source is the oracle database, and the destination being salesforce einstein analytics.
We are supposed to process many records in this scenario, let’s say 500k records. Also, we need to perform intermediary steps before the data is loaded into the end system. In our scenario, we will perform a transform on these 500k records before loading them into Salesforce analytics.
The first way to process this data is to break it down into chunks and use a batch job component. We need to break it down because processing too much data at once might result in out of memory issues. We generally have a limited number of vcores, so the memory management must be optimized.
Processing using Batch Job
There are various configurations we can have with the batch job itself.
In the above diagram, we are first transforming our data outside the batch job, and then passing it into chunks using batch job and finally aggregating it and then performing the loading using the Einstein Analytics component.
The drawback of this method is that because all the 500k records are being transformed all at once, outside the batch job, it takes too much space in memory, and the mule runtime may go out of memory, causing issues.
In the above diagram, the transform component is inside the batch job. So here, the transformation will happen in chunks and not all at once. This consumes less memory at a given step. Here, we have some properties which we can customize:
- Batch Block Size: This is the number of records given to each thread for execution. The greater this number is, the thread will perform the less I/O operation, but more memory is needed to hold the block.
- Max Concurrency: The total number of threads used for batch processing. By default, this is 2 times the number of processor cores. Each thread processes data in parallel.
- Batch Aggregator: Component to aggregate records before processing them. This helps to send bulk data to the target system. Here we have two options:
- Aggregator size: Here, we give a numeric value, and that many records will be aggregated and processed together.
- Streaming: This enables us to aggregate all records.
By setting these properties to required values, we can decide the performance and memory usage of the batch job. We can try varying different values here to get our desired result. This should work for most cases, but we can use another way of processing if it doesn’t.
Processing using For Each
In For Each also, we can have two configurations.
In the above diagram, we are first transforming our data outside each component and then passing it into chunks to the Einstein Analytics component to load the data.
This has the same drawback as the batch job scenario. Because we transform all data at once, the application may go out of memory.
In the above diagram, we now have the transform component inside the for each. This has better memory usage as the whole data is broken into smaller pieces and then processed. Here we have one useful property:
- Batch Size: This is the number of records processed together in one step of the For Each.
So, for example, if my batch size is 20k. So the first 20k records out of 500k will be processed in one step. First, we will transform these records and then load them into salesforce analytics. So at one given time, only memory for 20k records is occupied.
We can try varying the batch size to get our desired result in both performance and memory usage.
For each can be useful for the scenarios where a batch job doesn’t get the work done. As it has less overhead, it can be better for certain use cases.
One common error which can be caused by memory getting full is:
- The application is not responding to the Mule system health monitor
There are many reasons for this error, but memory overflow is one of the major ones. When this happens, the application on cloudhub restarts.
The amount of memory available is directly proportional to how much data we can process. On on-prem systems, we can have a huge amount of memory and so it’s easier to process millions of records. But when the application is on cloudhub, we have limited vcores. So it becomes especially important to optimize memory on cloudhub. Few tips for memory optimization:
- Process data in smaller chunks instead of processing all data at once.
- Sometimes, unused variables may be taking up too much memory. We can use the ‘Remove Variable’ component in the core module to free up the memory used by it once the function of the variable is completed.
- Try to use dataweave for processing large amounts of information, as it’s the most efficient compared to other methods.
Another important way is to reduce the input data. So, for example, we have 500k records that we get from the SQL query and are processing them in a single execution. Instead, we can split this query into two, returning 250k records and then processes these in two individual executions.
In the end, the core takeaway from this article is we should process data into smaller pieces and not all at once.