Scenario

Recently, I’m doing some data processing works, in which performance is quite a challenge. So, think of it, you have a big data set, separated to many parts, each containing many data entries. (You can take that data is in form of JSON.) You want to analyze them, and finally put them to your database. What is the basic procedure to do this?

  • Open the file, read all JSON objects to a list.
  • Then process the list of JSON, and convert it to a list of models you want to put into the database.
  • At last, use a bulk insert to put them to the database.

Easy, huh? 😏 But, what if, the size of the data file is count in GB, which may contain millions of data entries? 😨 In this case, simply load all data into memory is not feasible. And such large chunk of data may overwhelm relational database. For example, MySQL will likely to crash, causing the whole database unavailable. So, what’s the cure? 🤔

Fortunately, we are using C#, which provides many approaches to deal with this.

This article contains many codes, be prepared. 🫡

You can find the source code at Bulk Task Optimization.


Preparation

To make it clear, I’ll show you a basic outline of the program. First, is our ITask interface. You know, for test, our data size is much smaller.

1
2
3
4
5
6
7
8
interface ITask
{
const int DataSize = 20;
const int DataProcessingTime = 50;
const int DataSavingTime = 1000;

void Run();
}

And, our original task without any optimization. Exactly three steps as we mentioned above. Here I used Sleep to simulate time consuming tasks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class OriginalTask : ITask
{
public void Run()
{
List<int> data = LoadData();
ProcessData(data);
SaveData(data);
}

private List<int> LoadData()
{
List<int> data = [];
for (int i = 0; i < ITask.DataSize; i++)
{
data.Add(i);
}
return data;
}

private void ProcessData(List<int> data)
{
foreach (int item in data)
{
Console.Write(item);
Console.Write(" ");
Thread.Sleep(ITask.DataProcessingTime);
}

Console.WriteLine();
}

private void SaveData(List<int> data)
{
Console.WriteLine("Saving data...");
Thread.Sleep(ITask.DataSavingTime);
}
}

And our benchmark class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using System.Diagnostics;
using System.Globalization;

static class Benchmark
{
public static void Measure(string name, ITask task)
{
Console.WriteLine($"----- Benchmarking {name}...");

Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
task.Run();
stopwatch.Stop();

Console.WriteLine("----- Elapsed time: {0}", stopwatch.ElapsedMilliseconds.ToString(CultureInfo.InvariantCulture));
Console.WriteLine();
}
}

So in our main function, we can simply run and benchmark our task.

1
2
3
4
5
6
7
class Program
{
private static void Main(string[] args)
{
Benchmark.Measure("Original Task", new OriginalTask());
}
}

Memory Optimization

So first, let’s deal with memory issues. The problems we’ve encountered can be summarized as follows.

  • A data file is too large to be loaded into memory.
  • Bulk update size is too large for the database.

Let’s solve them one by one.

Load part of the file?

In traditional C or C++, you can hardly think of a way to achieve this with minimal effort. Of course you can, just similar to fgets or else, but may not be that graceful. However, in C#, the dream comes true with yield statement.

All you need is to replace this int generator with a file reader. So in this way, we reduced our memory use in loading data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class LoadPartTask : ITask
{
public void Run()
{
List<int> data = [];
foreach (var item in AllData())
{
data.Add(ProcessData(item));
}
Console.WriteLine();
SaveData(data);
}

private IEnumerable<int> AllData()
{
for (int i = 0; i < ITask.DataSize; i++)
{
yield return i;
}
}

private int ProcessData(int item)
{
Console.Write(item);
Console.Write(" ");
Thread.Sleep(ITask.DataProcessingTime);
return item;
}

private void SaveData(List<int> data)
{
Console.WriteLine("Saving data...");
Thread.Sleep(ITask.DataSavingTime);
}
}

Split up bulk update?

Well, this is simple, just split the data chunk. It differs with LoadPartTask only in Run().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SplitBulkTask : ITask
{
public void Run()
{
List<int> data = [];
int bulkSize = 5;
int currentSize = 0;
foreach (var item in AllData())
{
data.Add(ProcessData(item));
currentSize++;
if (currentSize == bulkSize)
{
Console.WriteLine();
SaveData(data);
data = [];
currentSize = 0;
}
}
if (currentSize > 0)
{
Console.WriteLine();
SaveData(data);
}
}
}

The size of bulk chunk is uncertain. It depends on your computer’s performance, the network threshold, and so on. You may need to adjust it based on the actual performance.

Benchmark

If you run benchmark on these three, you can see how they behave. However, elapsed time means nothing as our goal here is not for speed.

image-20231216112435426


Speed Optimization

Using async

Asynchronization is vital for speed optimization, so here we modify our ITask to support async. As we’ve already solved the memory problems, here we omit the process of creating bulk chunk, taking them as a whole.

1
2
3
4
5
6
7
8
interface IAsyncTask
{
const int BulkCount = 20;
const int BulkProcessingTime = 300;
const int BulkSavingTime = 1000;

Task Run();
}

Then, for our Benchmark, we need add an overload.

1
2
3
4
5
6
7
8
9
10
11
12
public static void Measure(string name, IAsyncTask task)
{
Console.WriteLine($"----- Benchmarking {name}...");

Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
task.Run().Wait();
stopwatch.Stop();

Console.WriteLine("----- Elapsed time: {0}", stopwatch.ElapsedMilliseconds.ToString(CultureInfo.InvariantCulture));
Console.WriteLine();
}

Basic async task

Below is a preliminary version of your async task. We take some time to get bulk chunk, then some other to save the bulk. Later, our improvement will be done in Run() only since other process is memory related, and has been solved earlier.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BasicTask : IAsyncTask
{
public async Task Run()
{
foreach (int item in AllBulk())
{
Console.Write($"{item} ");
await SaveBulk(item);
}
Console.WriteLine();
}

private IEnumerable<int> AllBulk()
{
for (int i = 0; i < IAsyncTask.BulkCount; i++)
{
Thread.Sleep(IAsyncTask.BulkProcessingTime);
yield return i;
}
}

private async Task SaveBulk(int bulk)
{
await Task.Delay(IAsyncTask.BulkSavingTime);
Console.Write($"{bulk} ");
}
}

If you run benchmark on it, you’ll see that, async does not work, as tasks are still done synchronized. Why?

image-20231216131442501

This problem bothers me quite long, but the answer is quite straightforward. That is, we always await for the result, so that all operations are in a fixed order, which then cannot be paralleled.

So you may wonder again, in this case, how async works? Well, it does run asynchronized, but, with other threads. Itself still run synchronized.

Pipeline tasks

So how to make Run() itself run multiple tasks asynchronously? That is, creating more tasks and pipelining them. 😲 The core problem is to identify processes that can be run parallelly, which, in our case, is getting bulk chunk, and saving data.

Here we create a pipeline with a max capacity of 5. At the beginning, initialize them with CompletedTask, so that the first run will not be blocked. Just remember to wait for all tasks to finish in the end. Other methods remain the same.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class PipelineTask : IAsyncTask
{
private const int MaxPipeline = 5;
private readonly Task[] _tasks = new Task[MaxPipeline];

public async Task Run()
{
for (int i = 0; i < MaxPipeline; i++)
{
_tasks[i] = Task.CompletedTask;
}

int currentTask = 0;
foreach (int item in AllBulk())
{
Console.Write($"{item} ");
await _tasks[currentTask];
_tasks[currentTask] = SaveBulk(item);
currentTask = (currentTask + 1) % MaxPipeline;
}

Task.WaitAll(_tasks);
Console.WriteLine();
}
}

This way, when we call SaveBulk, the program will not be blocked. Instead, it only records the task, so that later we can wait for it. If the task already completed when wait for it, it means the pipeline is not busy. In this case, we reached the maximum speed, as the total time we spent is roughly all in AllBulk(). Otherwise, the pipeline is busy, but we can still get some async benefits.

Benchmark

Here is the bench mark between basic async task and pipelined async task. Improvement is obvious.

image-20231216131435488

Tuning pipeline size

As we just mentioned, the performance largely rely on whether the pipeline is busy or not. So what’s a proper size for pipeline?

You can adjust pipeline size MaxPipeline, BulkProcessingTime and BulkSavingTime to find out. But in case you do not want to do it personally, I give you the answer. That is, MaxPipeline ≈ BulkSavingTime / BulkProcessingTime. If BulkProcessingTime is greater than BulkSavingTime, size of 2 can do the job. Too many pipeline slots will not improve performance, just a little bit waste of resource.

Advantages against traditional thread

Using async method does create threads, but it has some advantages against traditional threading.

First, it is more flexible. You may already realized it is the classic “Producer - Consumer” model. For the traditional approach, we have to create a fixed number of threads, and using locks to sync them. However, simply using aync/await can dynamically create threads, reducing the number of idle threads. And, it eliminates the use of lock.

Second, … (idk)


Epilogue

Well, I think we’re getting better understanding of async task now. 🥳

Have to say, C# is a good language. But, we always love C++. 🥰