If not specified, all the code is written in c++23 standard, compiled with g++ 13.2
command: g\++ -std=c\++2b -O0 ch2.cpp -o out && ./out
Chapter 2. Managing Threads - Part 2
Move-only objects
Sometimes we want to pass parameters that are move_only (for example, std::unique_ptr and std::thread itself). In this case, we must use std::move to pass the parameter since the default behavior of std::thread is copying the parameter.
voidtask1(){ for (int i = 0; i < 114; i++){ std::cout << "1" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } }
voidtask2(){ for (int i = 0; i < 114; i++) { std::cout << "2" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } }
intmain(){ auto t1 = std::thread(task1); auto t2 = std::thread(task2); std::this_thread::sleep_for(std::chrono::seconds(1)); std::thread t3; t3 = std::move(t2); std::cout << "moved t2 to t3\n"; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "moved t3 to t1\n"; t1 = std::move(t3); }
Output:
1 2 3 4 5 6 7 8 9 10 11 12
12
12
moved t2 to t3 12
2 1 moved t3 to t1 terminate called without an active exception [1] 32111 abort ./out
First we start two threads t1 and t2, which runs task1 and task2. Then after some time, we declare t3 with default constructor, implies it doesn’t have a thread associated with it. Then we use move to transfer ownership of the thread from t2 to t3, which doesn’t terminate the thread. However, when we are moving t3 to t1, which has an active thread going on, std::terminate is called and the program terminates.
If we take a glance at the = operator definition std::thread:
The operator performed a check whether this thread is joinable or not. If it’s not, it will terminate the program. Therefore, when we are moving t3 to t1, t1 is still joinable and std::__terminate() is called.
icon-padding
A thread is joinable (aka can be wait to finish) only if there is a thread attached to it. Therefore, when we are moving t3 to t1, because t1 still has a thread running, t1.joinable() returns true and std::__terminate() is called.
Thread in functions
Since threads are objects, we can pass them as function parameters. If we want to pass create a thread and pass the thread object to another function, we must use std::move to pass the thread object to avoid copying:
move only thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/* definition of BigObject and processBigObject is the same as above */ voidthreadHandler(std::thread&& th){ // do sth with the thread if(th.joinable()) { th.join(); } }
intmain(){ auto p = std::unique_ptr<BigObject>(newBigObject()); p->data = "114514"; std::thread t(processBigObject, std::move(p)); threadHandler(std::move(t)); }
output:
1
114514
We can also construct a thread in a function and return it:
return thread
1 2 3 4 5 6 7 8 9 10 11 12 13
std::thread createThread(){ // using a lambda expression to create the thread return std::thread([](){ std::cout << "Hello from thread\n"; }); }
intmain(){ auto t = createThread(); t.join(); }
output:
1
Hello from thread
With the move support of std::thread, we can then make our ThreadWrapper safer by using move rather than reference:
thread wrapper with rvalue reference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
classThreadWrapperImproved { public: explicitThreadWrapperImproved(std::thread t) : t(std::move(t)) { if(!t.joinable()){ // if the thread is not joinable, it means the thread is whether finished or detached or there's no thread associated with it throw std::logic_error("No thread"); } }
In this case, we can use thread object directly and don’t have to worry about the thread being detached outside the class, therefore we don’t have to check whether the thread is joinable in the destructor.
Joining thread
With the concept of RAII, we can make a safer std::thread that will always be waited to finish at the end of the scope unless explicitly detached, and when we are transferring other threads’ ownership, we don’t have to worry about whether it will terminate the whole program. Also, many operations (like = operator and copy constructor) will be more like copyable object, which is more intuitive and require less code (in this case it’s still moving ownership when they are implemented):
With multithreading, we can divide a task into several subtasks and run them in parallel. For example, we can divide a task into 4 subtasks and run them in 4 threads.
The recommended number of threads can be obtained from the function std::thread::hardware_concurrency(). However, this is not usually the final number of threads we want to use. If we are dividing an array that have only 5 elements but std::thread::hardware_concurrency() returns 8, we only need to create 5 threads.
Also, std::thread::hardware_concurrency() is only a hint of the concurrency ability of CPU, in most cases it will return the core number of the CPU, but it might return 0 if it cannot obtain any information.
An example of task subdivision is finding sum of all elements in a very large collection (similar to std::accumulate). We will divide the collection into several subcollections and sum them in parallel with a sum function:
template<typename It, typename T> voidaccumulateBlock(It start, It end, T& result){ result = std::accumulate(start, end, result); }
template<typename It, typename T> T accumulateParallel(It start, It last, T init){ std::size_t cnt = std::distance(start, last); if (!cnt) return init; std::size_t thread_cnt = std::thread::hardware_concurrency(); thread_cnt = std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt); auto result = std::vector<T>(thread_cnt); auto threadPool = std::vector<std::thread>(thread_cnt); auto blockSize = cnt / thread_cnt; auto begin = start; for (int i = 0; i < thread_cnt - 1; i++) { auto end = begin; std::advance(end, blockSize); threadPool.push_back(std::thread(accumulateBlock<It, T>, begin, end, std::ref(result[i]))); begin = end; } threadPool.push_back(std::thread(accumulateBlock<It, T>, begin, last, std::ref(result[thread_cnt - 1]))); for(auto& i : threadPool) { if(i.joinable()) i.join(); } return std::accumulate(result.begin(), result.end(), init); }
The code is straight forward. It divides the range into subranges and give the range information to accumulateBlock. Then start a thread with it to sum in parallel. After all thread finished executing, the result are added together. All the threads are launched inside a loop, with each thread running the same function with different input.
It’s worth noticing that when determining the thread count, we are using std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt). This is because std::thread::hardware_concurrency() might return 0, and we don’t want to create 0 threads. Therefore, we use THREAD_DEFAULT as the default thread count.
Also, in this case we want to create thread no more than the number of elements in the array or the number of cores of the CPU, because if we give more threads than the number of core of CPU, context switch will occur and the performance might not be as good as letting one core execute one thread per time.
Identifying threads
In the accumulateParallel case, we don’t care about identifying single threads because we simply use them to do work and won’t do any operation on threads themselves(except joining them).
However, in some cases we will need to identify threads, such as when you want to store information about each thread in maps (like std::unordered_map) and use some identifier as keys. In this case, we can use std::thread::id to identify threads. Note that std::thread::id will return a default-constructed std::thread::id object has no thread associated with it.
For example, we might want to store the range information of each thread in a map in the accumulateParallel function:
template<typename It, typename T> T accumulateParallel(It start, It last, T init){ auto ranges = std::unordered_map<std::thread::id, std::tuple<It, It>>(); std::size_t cnt = std::distance(start, last); if (!cnt) return init; std::size_t thread_cnt = std::thread::hardware_concurrency(); thread_cnt = std::min(thread_cnt ? thread_cnt : THREAD_DEFAULT, cnt); auto result = std::vector<T>(thread_cnt); auto threads = std::vector<std::thread>(thread_cnt); auto blockSize = cnt / thread_cnt; auto begin = start; for (int i = 0; i < thread_cnt - 1; i++) { auto end = begin; std::advance(end, blockSize); threads.push_back(std::thread(accumulateBlock<It, T>, begin, end, std::ref(result[i]))); ranges.try_emplace(threads[threads.size() - 1].get_id(), std::tuple<It, It>(begin, end)); begin = end; } threads.push_back(std::thread(accumulateBlock<It, T>, begin, last, std::ref(result[thread_cnt - 1]))); for(auto& i : threads) { if(i.joinable()) i.join(); } return std::accumulate(result.begin(), result.end(), init); }
Here we created a map ranges that maps std::thread::id to a tuple of iterators.
Summary
Ways to start a thread
std::thread t(Callable, args...) will create a thread object that will run the callable object with the arguments specified.
std::thread t = std::move(otherThread) will move the ownership of the thread from otherThread to t.
std::thread is move only, which means we cannot copy it. When using it as function parameter or assigning it to another thread, we must use std::move to move the ownership of the thread.
Ways to wait for a thread
t.join() will block the current thread until the thread t finishes.
t.detach() will detach the thread t from the current thread and let it run in the background.
Always check if the thread is joinable before calling join() or detach().
In case of exception, we must either use try-catch statement or ThreadWrapper with RAII to ensure the thread will be joined in the end.
Potential problems about detached thread
Detached thread sometimes will have reference to deleted objects.
Fix: use copy rather than reference when using variables in function executed by a thread.
Potential problems about passing parameters
Implicit cast might occur when passing parameters to a thread, result in undefined behavior.
Fix: cast the parameter to the type we want before passing it to the thread.
Passing reference to a thread will copy the parameter.
Fix: use std::ref to pass the reference to the thread.
When we are passing a move-only object to a thread, we must use std::move to pass the parameter because the default behavior of std::thread is copying the parameter.
Passing ownership of threads
If a is a thread that is running a process, moving another thread’s ownership will make the whole program terminate.
Fix: always check whether a thread is joinable before moving ownership to it.
Fix: Use a ThreadWrapper class and RAII to ensure moving ownership will always check whether the thread is joinable.
Using thread to subdivide tasks
std::thread::hardware_concurrency() will check the concurrency ability of the CPU (most of the time it returns number of core of CPU but when it cannot get anything it will return 0)
It’s often the best to create number of thread that equals your CPU core to avoid context switching.
Identifing thread
All thread object have an identifier std::thread::id and the thread object with no thread attached to it will return a default constructed std::thread::id