Chapter 2 - Part 2. Managing Threads

Chapter 2 - Part 2. Managing Threads

Ayano Kagurazaka Lv3

If not specified, all the code is written in c++23 standard, compiled with g++ 13.2
command: g\++ -std=c\++2b -O0 ch2.cpp -o out && ./out

Chapter 2. Managing Threads - Part 2

Move-only objects

Sometimes we want to pass parameters that are move_only (for example, std::unique_ptr and std::thread itself). In this case, we must use std::move to pass the parameter since the default behavior of std::thread is copying the parameter.

move only
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

class BigObject{
public:
std::string data;
BigObject(const BigObject&) = delete;
BigObject& operator=(const BigObject&) = delete;
};

void processBigObject(std::unique_ptr<BigObject> obj){
std::cout << obj->data;
// do something...
}

int main(){
auto p = std::unique_ptr<BigObject>(new BigObject());
p->data = "114514";
std::thread t(processBigObject, std::move(p));
t.join();
}

Output:

1
114514

We can also use std::move to pass ownership of thread to another thread.

passing threads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void task1(){
for (int i = 0; i < 114; i++){
std::cout << "1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

void task2(){
for (int i = 0; i < 114; i++)
{
std::cout << "2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

int main(){
auto t1 = std::thread(task1);
auto t2 = std::thread(task2);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::thread t3;
t3 = std::move(t2);
std::cout << "moved t2 to t3\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "moved t3 to t1\n";
t1 = std::move(t3);
}

Output:

1
2
3
4
5
6
7
8
9
10
11
12
12

12

moved t2 to t3
12

2
1
moved t3 to t1
terminate called without an active exception
[1] 32111 abort ./out

First we start two threads t1 and t2, which runs task1 and task2. Then after some time, we declare t3 with default constructor, implies it doesn’t have a thread associated with it. Then we use move to transfer ownership of the thread from t2 to t3, which doesn’t terminate the thread. However, when we are moving t3 to t1, which has an active thread going on, std::terminate is called and the program terminates.

If we take a glance at the = operator definition std::thread:

std::thread and move constructor
1
2
3
4
5
6
thread& operator=(thread&& __t) noexcept
{
if (joinable()) std::__terminate();
swap(__t);
return *this;
}

The operator performed a check whether this thread is joinable or not. If it’s not, it will terminate the program. Therefore, when we are moving t3 to t1, t1 is still joinable and std::__terminate() is called.

icon-padding

A thread is joinable (aka can be wait to finish) only if there is a thread attached to it. Therefore, when we are moving t3 to t1, because t1 still has a thread running, t1.joinable() returns true and std::__terminate() is called.

Thread in functions

Since threads are objects, we can pass them as function parameters. If we want to pass create a thread and pass the thread object to another function, we must use std::move to pass the thread object to avoid copying:

move only thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

/*
definition of BigObject and processBigObject is the same as above
*/
void threadHandler(std::thread&& th){
// do sth with the thread
if(th.joinable()) {
th.join();
}
}

int main(){
auto p = std::unique_ptr<BigObject>(new BigObject());
p->data = "114514";
std::thread t(processBigObject, std::move(p));
threadHandler(std::move(t));
}

output:

1
114514

We can also construct a thread in a function and return it:

return thread
1
2
3
4
5
6
7
8
9
10
11
12
13

std::thread createThread(){
// using a lambda expression to create the thread
return std::thread([](){
std::cout << "Hello from thread\n";
});
}

int main(){
auto t = createThread();
t.join();
}

output:

1
Hello from thread

With the move support of std::thread, we can then make our ThreadWrapper safer by using move rather than reference:

thread wrapper with rvalue reference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

class ThreadWrapperImproved {
public:
explicit ThreadWrapperImproved(std::thread t) : t(std::move(t)) {
if(!t.joinable()){
// if the thread is not joinable, it means the thread is whether finished or detached or there's no thread associated with it
throw std::logic_error("No thread");
}
}

~ThreadWrapperImproved() {
t.join();
}

ThreadWrapperImproved(ThreadWrapperImproved const &) = delete;
ThreadWrapperImproved &operator=(ThreadWrapperImproved const &) = delete;
};

In this case, we can use thread object directly and don’t have to worry about the thread being detached outside the class, therefore we don’t have to check whether the thread is joinable in the destructor.

Joining thread

With the concept of RAII, we can make a safer std::thread that will always be waited to finish at the end of the scope unless explicitly detached, and when we are transferring other threads’ ownership, we don’t have to worry about whether it will terminate the whole program. Also, many operations (like = operator and copy constructor) will be more like copyable object, which is more intuitive and require less code (in this case it’s still moving ownership when they are implemented):

JoinableThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

class JoiningThread{
public:
JoiningThread() noexcept = default;
template<typename F, typename ... Args>
explicit JoiningThread(F&& func, Args&& ... args) :
data(std::forward<F>(func), std::forward<Args>(args)...){

}

explicit JoiningThread(std::thread th) noexcept:
data(std::move(th)) {

}

~JoiningThread(){
if (data.joinable())
data.join();
}

JoiningThread(std::thread&& other) noexcept :
data(std::move(other)){
}

JoiningThread& operator=(JoiningThread&& other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
return *this;
}

JoiningThread& operator=(JoiningThread other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
return *this;
}

JoiningThread(JoiningThread& other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
}

bool joinable() const noexcept {
return data.joinable();
}

void join(){
data.join();
}

void detach(){
data.detach();
}

std::thread& getThread() noexcept {
return data;
}

const std::thread& getThread() const noexcept {
return data;
}

private:
std::thread data;
};

Task subdivision with multithreading

With multithreading, we can divide a task into several subtasks and run them in parallel. For example, we can divide a task into 4 subtasks and run them in 4 threads.

The recommended number of threads can be obtained from the function std::thread::hardware_concurrency(). However, this is not usually the final number of threads we want to use. If we are dividing an array that have only 5 elements but std::thread::hardware_concurrency() returns 8, we only need to create 5 threads.

Also, std::thread::hardware_concurrency() is only a hint of the concurrency ability of CPU, in most cases it will return the core number of the CPU, but it might return 0 if it cannot obtain any information.

An example of task subdivision is finding sum of all elements in a very large collection (similar to std::accumulate). We will divide the collection into several subcollections and sum them in parallel with a sum function:

task subdivision
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

#define THREAD_DEFAULT 4

template<typename It, typename T>
void accumulateBlock(It start, It end, T& result){
result = std::accumulate(start, end, result);
}

template<typename It, typename T>
T accumulateParallel(It start, It last, T init){
std::size_t cnt = std::distance(start, last);
if (!cnt) return init;
std::size_t thread_cnt = std::thread::hardware_concurrency();
thread_cnt = std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt);
auto result = std::vector<T>(thread_cnt);
auto threadPool = std::vector<std::thread>(thread_cnt);
auto blockSize = cnt / thread_cnt;
auto begin = start;
for (int i = 0; i < thread_cnt - 1; i++)
{
auto end = begin;
std::advance(end, blockSize);
threadPool.push_back(std::thread(accumulateBlock<It, T>, begin, end, std::ref(result[i])));
begin = end;
}
threadPool.push_back(std::thread(accumulateBlock<It, T>, begin, last, std::ref(result[thread_cnt - 1])));
for(auto& i : threadPool) {
if(i.joinable()) i.join();
}
return std::accumulate(result.begin(), result.end(), init);
}

The code is straight forward. It divides the range into subranges and give the range information to accumulateBlock. Then start a thread with it to sum in parallel. After all thread finished executing, the result are added together. All the threads are launched inside a loop, with each thread running the same function with different input.

It’s worth noticing that when determining the thread count, we are using std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt). This is because std::thread::hardware_concurrency() might return 0, and we don’t want to create 0 threads. Therefore, we use THREAD_DEFAULT as the default thread count.

Also, in this case we want to create thread no more than the number of elements in the array or the number of cores of the CPU, because if we give more threads than the number of core of CPU, context switch will occur and the performance might not be as good as letting one core execute one thread per time.

Identifying threads

In the accumulateParallel case, we don’t care about identifying single threads because we simply use them to do work and won’t do any operation on threads themselves(except joining them).

However, in some cases we will need to identify threads, such as when you want to store information about each thread in maps (like std::unordered_map) and use some identifier as keys. In this case, we can use std::thread::id to identify threads. Note that std::thread::id will return a default-constructed std::thread::id object has no thread associated with it.

For example, we might want to store the range information of each thread in a map in the accumulateParallel function:

thread id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename It, typename T>
T accumulateParallel(It start, It last, T init){
auto ranges = std::unordered_map<std::thread::id, std::tuple<It, It>>();
std::size_t cnt = std::distance(start, last);
if (!cnt) return init;
std::size_t thread_cnt = std::thread::hardware_concurrency();
thread_cnt = std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt);
auto result = std::vector<T>(thread_cnt);
auto threads = std::vector<std::thread>(thread_cnt);
auto blockSize = cnt / thread_cnt;
auto begin = start;
for (int i = 0; i < thread_cnt - 1; i++)
{
auto end = begin;
std::advance(end, blockSize);
threads.push_back(std::thread(accumulateBlock<It, T>, begin, end, std::ref(result[i])));
ranges.try_emplace(threads[threads.size() - 1].get_id(), std::tuple<It, It>(begin, end));
begin = end;
}
threads.push_back(std::thread(accumulateBlock<It, T>, begin, last, std::ref(result[thread_cnt - 1])));
for(auto& i : threads) {
if(i.joinable()) i.join();
}
return std::accumulate(result.begin(), result.end(), init);
}

Here we created a map ranges that maps std::thread::id to a tuple of iterators.

Summary

Ways to start a thread

  • std::thread t(Callable, args...) will create a thread object that will run the callable object with the arguments specified.

  • std::thread t = std::move(otherThread) will move the ownership of the thread from otherThread to t.

  • std::thread is move only, which means we cannot copy it. When using it as function parameter or assigning it to another thread, we must use std::move to move the ownership of the thread.

Ways to wait for a thread

  • t.join() will block the current thread until the thread t finishes.

  • t.detach() will detach the thread t from the current thread and let it run in the background.

  • Always check if the thread is joinable before calling join() or detach().

  • In case of exception, we must either use try-catch statement or ThreadWrapper with RAII to ensure the thread will be joined in the end.

Potential problems about detached thread

  • Detached thread sometimes will have reference to deleted objects.
    • Fix: use copy rather than reference when using variables in function executed by a thread.

Potential problems about passing parameters

  • Implicit cast might occur when passing parameters to a thread, result in undefined behavior.

    • Fix: cast the parameter to the type we want before passing it to the thread.
  • Passing reference to a thread will copy the parameter.

    • Fix: use std::ref to pass the reference to the thread.
  • When we are passing a move-only object to a thread, we must use std::move to pass the parameter because the default behavior of std::thread is copying the parameter.

Passing ownership of threads

  • If a is a thread that is running a process, moving another thread’s ownership will make the whole program terminate.
    • Fix: always check whether a thread is joinable before moving ownership to it.
    • Fix: Use a ThreadWrapper class and RAII to ensure moving ownership will always check whether the thread is joinable.

Using thread to subdivide tasks

  • std::thread::hardware_concurrency() will check the concurrency ability of the CPU (most of the time it returns number of core of CPU but when it cannot get anything it will return 0)

  • It’s often the best to create number of thread that equals your CPU core to avoid context switching.

Identifing thread

  • All thread object have an identifier std::thread::id and the thread object with no thread attached to it will return a default constructed std::thread::id

Associated code

ch2.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222

#include <thread>
#include <iostream>
#include <chrono>
#include <string>
#include <functional>
#include <memory>
#include <numeric>
#include <unordered_map>

class ThreadWrapper{
public:
explicit ThreadWrapper(std::thread& t): t(t){

}
~ThreadWrapper(){
if(t.joinable()){
t.join();
}
}
ThreadWrapper(ThreadWrapper const&) = delete;
ThreadWrapper& operator=(ThreadWrapper const&) = delete;
private:
std::thread& t;
};

class problem{
public:
problem(int& a): data(a){

}
void operator()(){
for(int i = 0; i < 114; i++){
std::cout << data + i << std::endl;
}
}
private:
int& data;
};

void wtf(){
int a = 42;
auto p = problem(a);
std::thread t(p);
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(3));

}

void printRange(int start, int end){
for (int i = start; i < end; i++) std::cout << i << std::endl;
}

void work(const std::string& s){
// do sth with the string
}

void problem2(){
char s[1024];
// getting the string from somewhere
std::thread t(work, s);
t.detach();
}

void dontCopy(const int& i){
std::cout << "in thread:" << &i << std::endl;
}

void whyNoCopy(int& i){
std::cout << "in thread:" << &i << std::endl;
}

class task{
public:
void operator()() const{
std::cout << "Hello Concurrent World from object!\n";
}

void work() const{
std::cout << "Hello Concurrent World from member function!\n";
}
};

class BigObject{
public:
std::string data;
BigObject(const BigObject&) = delete;
BigObject& operator=(const BigObject&) = delete;
};

void processBigObject(std::unique_ptr<BigObject> obj){
std::cout << obj->data;
// do something...
}

void threadHandler(std::thread&& th){
// do sth with the thread
if(th.joinable()) {
th.join();
}
}

void task1(){
for (int i = 0; i < 114; i++){
std::cout << "1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

void task2(){
for (int i = 0; i < 114; i++)
{
std::cout << "2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

std::thread createThread(){
return std::thread([](){
std::cout << "Hello from thread\n";
});
}

class JoiningThread{
public:
JoiningThread() noexcept = default;
template<typename F, typename ... Args>
explicit JoiningThread(F&& func, Args&& ... args) :
data(std::forward<F>(func), std::forward<Args>(args)...){

}

explicit JoiningThread(std::thread th) noexcept:
data(std::move(th)) {

}

~JoiningThread(){
if (data.joinable())
data.join();
}

JoiningThread(std::thread&& other) noexcept :
data(std::move(other)){
}

JoiningThread& operator=(JoiningThread&& other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
return *this;
}

JoiningThread& operator=(JoiningThread other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
return *this;
}

JoiningThread(JoiningThread& other) noexcept{
if(data.joinable()) data.join();
data = std::move(other.data);
}

bool joinable() const noexcept {
return data.joinable();
}

void join(){
data.join();
}

void detach(){
data.detach();
}

std::thread& getThread() noexcept {
return data;
}

const std::thread& getThread() const noexcept {
return data;
}

private:
std::thread data;
};

#define THREAD_DEFAULT 4

template<typename It, typename T>
void accumulateBlock(It start, It end, T& result){
result = std::accumulate(start, end, result);
}



template<typename It, typename T>
T accumulateParallel(It start, It last, T init){
auto ranges = std::unordered_map<std::thread::id, std::tuple<It, It>>();
std::size_t cnt = std::distance(start, last);
if (!cnt) return init;
std::size_t thread_cnt = std::thread::hardware_concurrency();
thread_cnt = std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt);
auto result = std::vector<T>(thread_cnt);
auto threads = std::vector<std::thread>(thread_cnt);
auto blockSize = cnt / thread_cnt;
auto begin = start;
for (int i = 0; i < thread_cnt - 1; i++)
{
auto end = begin;
std::advance(end, blockSize);
threads.push_back(std::thread(accumulateBlock<It, T>, begin, end, std::ref(result[i])));
ranges.try_emplace(threads[threads.size() - 1].get_id(), std::tuple<It, It>(begin, end));
begin = end;
}
threads.push_back(std::thread(accumulateBlock<It, T>, begin, last, std::ref(result[thread_cnt - 1])));
for(auto& i : threads) {
if(i.joinable()) i.join();
}
return std::accumulate(result.begin(), result.end(), init);
}

Comments