Chapter 4 - Part 1. Syncronizing concurrent operations

Chapter 4 - Part 1. Syncronizing concurrent operations

Ayano Kagurazaka Lv3

If not specified, all the code is written in c++23 standard, compiled with g++ 13.2
command: g\++ -std=c\++2b -O0 .cpp -o out && ./out

Chapter 4. Synchronizing concurrent operations - Part 1

Waiting for events or other threads

It’s common for one thread to wait for another thread to finish. One way to achieve this is use a task_completed flag, but it’s far from ideal.

Suppose there are two threads, one thread is fetching data from a remote API, and the other thread is used to process the data. In this case, the latter thread have to wait the former thread to finish. In this case, if we use a flag that is guaranteed to be thread-safe, we can achieve this, but there are significant drawbacks.

When the worker thread is waiting for the API thread, it has to constantly check the flag, occupying computing power that can be used by other threads. Also, if the thread-safety is achieved by using mutex, when the worker thread is waiting for the flag, the lock must be held by it, means API thread cannot update the flag since it requires unlocking mutex first.

The first problem is come with the nature of this method, so let’s solve the second problem. One solution will be unlocking the mutex for a short period of time and lock it back to provide a window for API thread to change flag, like the following code:

1
2
3
4
5
6
7
8
9
10
11
bool flag;
std::mutex m;

void waitint(){
std::unique_lock<std::mutex> lk(m);
while(!flag) {
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lk.lock();
}
}

This code provides a window for API thread to change the flag, and it provides a remedy for first problem because when a thread is sleeping, it doesn’t occupy computing power. However, this is still not ideal, since the sleep time is hard to choose. If it’s too small, the API thread may not have enough time to change the flag, and if it’s too large, it will create significant lagging.

Condition variables

The third option is to use tools provided by STL, namely std::condition_variable and std::condition_variable_any. A condition variable is associated with a condition and when that condition is satisfied, it will notify other threads. Condition variables are used with a mutex. std::condition_variable works exclusively with std::mutex, but std::condition_variable_any works with any mutex-like type. However, the latter requires more resources, so the former is preferred when you don’t need the flexibility.

With condition variable, we can rewrite a process waiting for a flag like this:

condition variable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

std::mutex mut;
std::queue<int> dataQueue;
std::condition_variable dataCondition;

void dataFetcher() {
for (int i = 0; i < 10; i++)
{
{
std::lock_guard<std::mutex> lock(mut);
dataQueue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// std::cout << "pushed " << i << std::endl;
}
dataCondition.notify_one();
std::cout << "pushed and notified:" << i << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "fin\n";
}

void dataProcessor() {

while (true)
{
std::unique_lock<std::mutex> lock(mut);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
dataCondition.wait(lock, []() {
std::cout << "checked\n";
return !dataQueue.empty();
});
int data = dataQueue.front();
dataQueue.pop();
lock.unlock();
std::cout << data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (dataQueue.empty()) break;
}
}

int main(){
std::thread th1(dataFetcher);
std::thread th2(dataProcessor);
th1.join();
th2.join();
}

In dataFetcher we simulated the process of getting data from a producer. When the producer obtains a data, it pushes it into a queue, and notify the consumer thread with notify_one().

In dataProcessor, we lock the mutex with std::unique_lock, then wait for the condition variable by calling wait(). It checks the condition(the return value of provided callable object(in this case, a lambda expression)). If the condition is not satisfied, it will unlock the mutex and put the thread in a waiting state. Then, when the thread received a notify_one(), it will check the condition again. If the condition is satisfied, it will lock the mutex and continue execution.

We can understand this process better by running it. The output is:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

pushed and notified:0
checked
0
pushed and notified:1
checked
1
pushed and notified:2
checked
2
pushed and notified:3
checked
3
pushed and notified:4
checked
4
pushed and notified:5
checked
5
pushed and notified:6
checked
6
pushed and notified:7
checked
7
pushed and notified:8
checked
8
pushed and notified:9
fin
checked
9

We can see that the consumer thread is notified after the producer thread pushed a data into the queue. Then, the consumer thread checked the condition, and since the queue is not empty, it popped the data and printed it. Then, it checked the condition again, and since the queue is empty, it unlocked the mutex and wait for another notification. When the producer thread finished pushing all the data, it notified the consumer thread, and the consumer thread checked the condition again. Since the queue is empty, it exited the loop and finished execution.

If we take a closer look at the implementation of std::condition_variable::wait, we will find that it’s actually waiting for notification, wrapped with a while loop that checks the return value of given predicate. This means if the predicate is true, it will return immediately, but will lock the thread and wait for notification if the predicate is false. Also, to lock and unlock the mutex, the function requires a std::unique_lock object.

implementation of `std
1
2
3
4
5
6
7
8
#ifndef _LIBCPP_HAS_NO_THREADS
template <class _Predicate>
void
condition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred)
{
while (!__pred())
wait(__lk);
}

This looks like our own implementation of waiting for a flag, but it’s more efficient and well-designed. Instead of constantly checking the flag, it will only check the condition when it’s notified without locking mutex(since holding a mutex longer than necessary is a bad design decision) and computation power.

Application

With condition variables, we can implement a thread-safe wrapper for std::queue. We should be able to read queue elements (front(), back()), write to queue (push(), pop(), emplace()), and check if the queue is empty (empty()). However, in our case, when we are transferring data between threads, one thread typically have to wait for the other threads to finish.

Therefore, it’s reasonable to add two methods: try_pop() that will pop and retrieve the front element but always return immediately and indicate whether it’s successful, and wait_and_pop() that waits until there is a value to pop and retrieve. These methods require a condition variable to notify them, so in our case, we will notify the consumer thread once we have data goes in.

Also, given that we can separate read and write, it’s the best to use std::shared_mutex to yield better performance, but as a trade-off we have to use std::condition_variable_any because std::condition_variable only works with std::mutex.

shared queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

template<typename T>
class SharedQueue{
using mtxguard = std::lock_guard<std::shared_mutex>;
using mtxshared = std::shared_lock<std::shared_mutex>;
using mtxunique = std::unique_lock<std::shared_mutex>;
public:
SharedQueue() = default;

explicit SharedQueue(const std::initializer_list<T>& li)
{
data = std::queue<T>(li);
}
explicit SharedQueue(const SharedQueue& other) {
data = other.data;
}
explicit SharedQueue(SharedQueue&& other) noexcept {
data = std::move(other.data);
}
SharedQueue& operator=(const SharedQueue& other) = delete;

T front() {
mtxshared lock(mutex);
return data.front();
}
const T& front() const {
mtxshared lock(mutex);
return data.front();
}

T back() {
mtxshared lock(mutex);
return data.back();
}
const T& back() const {
mtxshared lock(mutex);
return data.back();
}

void push(const T& obj) {
mtxguard lock(mutex);
data.push(obj);
cond.notify_one();
}
void push(T&& obj) noexcept {
mtxguard lock(mutex);
data.push(obj);
cond.notify_one();
}
template<class... Args>
void emplace(Args&&... args) {
mtxguard lock(mutex);
data.emplace(args...);
cond.notify_one();
}
void pop() {
mtxguard lock(mutex);
data.pop();
}
bool empty() const noexcept {
mtxshared lock(mutex);
return data.empty();
}

std::size_t size() const noexcept {
mtxshared lock(mutex);
return data.size();
}

bool try_pop(T& out) {
mtxguard lock(mutex);
if (data.empty()) return false;
out = data.front();
data.pop();
return true;
}
std::shared_ptr<T> try_pop() {
mtxguard lock(mutex);
if (data.empty()) return std::shared_ptr<T>();
auto ret = std::make_shared<T>(std::move(data.front()));
return ret;
}

std::shared_ptr<T> wait_and_pop() {
mtxunique lock(mutex);
cond.wait(lock, [this]{return !data.empty();});
auto ret = std::make_shared<T>(std::move(data.front()));
data.pop();
return ret;
}
void wait_and_pop(T& out) {
mtxunique lock(mutex);
cond.wait(lock, [this]{return !data.empty();});
out = data.front();
data.pop();
}
private:
std::queue<T> data;
mutable std::shared_mutex mutex;
std::condition_variable_any cond;
};

icon-padding

Note we marked the mutex as mutable, since it’s the best to mark methods as const, and as an overhead, we have to mark the mutex so that we can lock it in const methods.

Let’s test our implementation with a simulated consumer-producer scenario:

icon-padding

Note I defined a value as end of data, so that the consumer know when to stop.
The reason why I don’t check if the queue is empty is because the consumer thread might process data faster than producer thread, and exit before producer thread pushed all the data.

consumer-producer scenario
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

#define EOD INT32_MAX

template<typename T>
void producer(SharedQueue<T>& queue) {
for (int i = 0; i < 10; i++)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
queue.push(i);
std::cout << "pushed: " << i << std::endl;
}
queue.push(EOD);
std::cout << "pushed end of data, terminating..."<< std::endl;
}

template<typename T>
void consumer(SharedQueue<T>& queue) {
while (true)
{
auto a = queue.wait_and_pop();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (*a == EOD) {
std::cout << "data end\n";
break;
}
std::cout << "processed: " << *a << std::endl;
}
std::cout << "processed all, terminating...\n";

}

int main(){
auto a = SharedQueue<int>();
std::thread th1(producer<int>, std::ref(a));
std::thread th2(consumer<int>, std::ref(a));
th1.join();
th2.join();
}

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
pushed: 0
processed: 0
pushed: 1
processed: 1
pushed: 2
pushed: 3
processed: 2
pushed: 4
processed: 3
pushed: 5
processed: 4
pushed: 6
processed: 5
pushed: 7
processed: 6
pushed: 8
processed: 7
pushed: 9
pushed end of data, terminating...
processed: 8
processed: 9
data end
processed all, terminating...

icon-padding

The output is machine-dependent and might scramble, because std::cout is not thread-safe.

This is exactly what we want.

One-shot events

Sometimes the event that we are waiting for only occur once. For this scenario, C++ STL provided us std::future and std::shared_future that enables thread to periodically check for the event when it is doing other things and wait for the event to occur. std::future has format like std::unique_ptr. It accepts a template argument and can be only moved, and std::shared_future is like std::shared_ptr, it can be copied and shared between threads.

warning

std::future and std::shared_future doesn’t guarantee thread-safety, so they have to be protected by synchronization mechanisms like lock.

One good (and most primitive) use of std::future would be waiting for the return value of a function. For example, we have a thread that is doing very heavy computation and another thread is waiting for it. In this case it’s a one-shot event, and we want to get the return value, so using std::condition_variable is not a good idea. Instead, we can use std::future to achieve this.

To obtain a std::future object from a thread, we will use std::async, which has similar syntax to std::thread, but gives you a std::future object containing the future return value of the function. Once we need the value, we call .get() of the std::future object, which will block the thread until the value is ready.

future
1
2
3
4
5
6
7
8
9
10
11
12
13
int dice() {
int wait = time(NULL) % 6 + 1;
std::cout << "dice rolled!\n";
std::this_thread::sleep_for(std::chrono::seconds(wait));
return wait;
}

int main(){
auto ftr = std::async(dice);
std::cout << "dice obtained:\n";
std::cout << ftr.get();
}

Output:

1
2
3
dice rolled!
dice obtained:
5

Syntax of passing in parameters to std::async is similar to std::thread. In this case, if we are using member function of objects, the second parameter must be the pointer to the object. (cpp implicitly made this pointer as the first parameter of member functions when calling from objects)

future member function
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int dice(int n) {
srand(time(NULL));
int wait = rand() % n + 1;
std::cout << "dice rolled!\n";
std::this_thread::sleep_for(std::chrono::milliseconds(wait * 100));
return wait;
}

class Dicer {
public:
Dicer() = default;
std::vector<int> ndm(int n, int m) {
auto ret = std::vector<int>(n);
auto ftr = std::vector<std::future<int>>(n);
for (std::size_t i = 0; i < n; i++)
{
ftr[i] = std::async(dice, m);
}
for (int i = 0; i < ftr.size(); i++) {
ret[i] = ftr[i].get();
result.push_back(ret[i]);
}
return ret;
}
Dicer& operator=(const Dicer& other) = delete;
Dicer(const Dicer& other) = delete;
Dicer& operator=(Dicer&& other) = default;
Dicer(Dicer&& other) = default;
std::vector<int> operator()(){
return result;
}
private:
static std::vector<int> result;
};

std::vector<int> Dicer::result = std::vector<int>();

int main(){
auto dicer = Dicer();
auto ftr1 = std::async(dice, 20);
std::cout << ftr1.get() << std::endl;
dicer.ndm(2, 6);
auto ftr3 = std::async(&Dicer::ndm, &dicer, 1, 20);
ftr3.wait();
auto ftr2 = std::async(Dicer());
for (auto& i: ftr2.get()) {
std::cout << i << " ";
}
}

In this case we defined a die roller and let it rolled a 1d20 !!for sanity check!!. In line 28, we extracted the member function pointer from the class and passed in the pointer to the object as the second parameter. After we obtained the value(after a long wait), we print it out.

sanity check
1
2
dice rolled!
1

A natural 1, but it works.

With the natural 1 on sanity check, you take mental damage and have to do a medical check as psychiatric treatment. So you rolled a 2d6 to see how much damage you take and a d20 to see if you are cured. (For the sake of demonstration, we roll the dice and then get the result from the object)

cure check
1
2
3
4
dice rolled!
dice rolled!
dice rolled!
6 6 5 %

!!Sadly you take 12 damage, and you are not cured.!! We wait until your 1d20 finished rolling, then we print out the result. From this example we can see the syntax of std::async is similar to std::thread.

std::async also supports rvalue reference. Assume we have a class that have move constructor implemented, std::async will move the object rather than copying it. Consider following code:

async move schematic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MoveOnly {
public:
MoveOnly() = default;
MoveOnly(const MoveOnly&) = delete;
MoveOnly& operator=(const MoveOnly& other) {
std::cout << "Copied~\n";
return *this;
}
MoveOnly(MoveOnly&& other) noexcept {
std::cout << "Moved~\n";
}
int operator()() {
return 42;
}
};

int main(){
auto moveonly = MoveOnly();
auto ftr = std::async(MoveOnly());
std::cout << ftr.get();
return 0;
}

Output:

output
1
2
3
Moved~
42

Behavior of std::async

std::async can have different behaviors by providing different policies. Policies can be passed in as the first argument of std::async. There are two types of policies:

  • std::launch::async: The function will be executed asynchronously in a new thread.
  • std::launch::deferred: The function will be executed synchronously in the same thread when .get() is called.

With bitwise or, we can combine these two policies. For example, std::launch::async | std::launch::deferred means the function will be executed asynchronously in a new thread if .get() is called, otherwise it will be executed synchronously in the same thread.

Consider the following code:

async behavior
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
auto ftr1 = std::async(std::launch::async, []{
std::cout << "1 start\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "1 ok\n";
return 1;
});
auto ftr2 = std::async(std::launch::deferred, []{
std::cout << "2 start\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "2 ok\n";
return 2;
});
std::cout << "start\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "reach ftr1\n";
std::cout << ftr1.get() << std::endl;
std::cout << "reach ftr2\n";
std::cout << ftr2.get() << std::endl;
auto ftr3 = std::async(std::launch::deferred | std::launch::async, []{

std::cout << "3 start\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "3 ok\n";
return 3;
});
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "reach ftr3\n";
std::cout << ftr3.get() << std::endl;

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
start
1 start
reach ftr1
1 ok
1
reach ftr2
2 start
2 ok
2
3 start
3 ok
reach ftr3
3

We can see that ftr1 immediately started executing when std::async is called, even before ftr1.get(). However, ftr2 start executing when ftr2.get() is called. And ftr3 choose to execute asynchronously, since it’s a combination of std::launch::deferred and std::launch::async and the behavior is implementation-defined.

Summary

  • To wait for an event, we can use a flag, but it’s not ideal because it’s not efficient, and it’s hard to choose the sleep time. So we use condition variables as an optimized solution.
    • notify_one only notify one waiting task, and notify_all notify all waiting tasks.
    • std::condition_variable works with only std::mutex, but std::condition_variable_any works with any mutex-like type.
    • std::condition_variable::wait only start to wait when the predicate is not satisfied.
    • One application of a condition variable is to implement a thread-safe queue.
  • std::future is a very useful tool when passing data around.
    • std::future is like std::unique_ptr, it can only be moved.
    • std::future can be produced by std::async, and std::async accepts param in similar fashion as std::thread.

See more in next part~

Comments