
Chapter 4 - Part 2. Syncronizing concurrent operations

If not specified, all the code is written in c++23 standard, compiled with g++ 13.2
command: g\++ -std=c\++2b -O0 .cpp -o out && ./out
Chapter 4. Synchronizing concurrent operations - Part 2
Other ways to associate tasks with future
Make a schedule(ed task)
Another way to associate tasks with future is to use std::packaged_task<>
. Unlike std::async
, this object wraps the associated future into it, so we can have a higher level of abstraction, and if we are using a scheduler or a thread pool, that object can deal with std::packaged_task<>
rather than a bunch of functions.
std::packaged_task<>
is a template class, and the template argument is the function signature of the task (for example, the function std::string foo(int bar)
has signature std::string(int)
). In fact, std:packaged_task<>
itself is a callable object, means it overload operator()
, and can be passed to std::thread
, std::async
, or even invoked directly. If it was invoked directly, it will execute the task and store the result in the associated future that can be retrieved by calling .get_future()
.
One example of std::packaged_task<>
will be a poster-worker scenario. In this case, the task poster will push task for the worker to execute, and the worker will execute the task asynchronously. And the worker thread will share the result with the poster with the future associated with std::packaged_task<>
.
1 | std::mutex mtx; |
Output:
1 | process start |
icon-padding
This code is an infinite loop, so I have to terminate it manually.
In this case, we made a worker thread that accepts tasks from a poster and execute them in order. The worker thread operates in a loop, picking one task each time, waiting to finish the task, and then pick another one. The poster thread will post tasks to the worker thread, and return the associated future for return value. The poster thread will not wait for the task to finish, but the future can be used to retrieve the result.
This pattern can be extended to contain more functionality. For example, for the template argument, we can specify the type of function we are passing in, therefore extending what the worker thread can do.
icon-padding
Note here even though poster and worker work asynchronously(because we started a thread for worker), but poster and worker themselves are not working in parallel. This is obvious if we take a look at the implementation of std::packaged_task<>::operator()
:
1 | // ... |
As we can see, the function given executed by directly calling it rather than using std::async
or std::thread
, so it is executed synchronously.
Make a (std::)promise
When there is a function that cannot be expressed in a single function, or those places where tasks come from multiple places, the pattern we used above no longer works (because task poster and worker might not share the same mutex). Also, if we run one thread for each task, and we have task coming in at a high rate, we might end up with a lot of threads, consume a lot of resources and slow down the system because of the context switch.
One good example would be network connections, where we can have a lot of connections(10k+) but we have only limited concurrency ability (96 cores 192 threads for Threadripper PRO 7995WX. Maybe one server unit will install 4 of them, but it’s still way too small). In this case, the ideal solution would be letting small amount of threads to handle connections with each one handle multiple connections.
According to our scheme, data pack from various connections will come in random order, and being handled by one thread. And likewise, data pack will be sent out in random order. In this case, we can use std::promise<>
to handle this situation.
std::promise<T>
provides a way to store a value of type T
that can be retrieved by the associated future. In this case, the waiting thread will get the future from the promise, and the networking thread can use the promise to set value and get future ready.
Similarly, you can get the future from a std::promise
with .get_future()
, but in this case, you can set the value of a promise with .set_value()
. This makes us able to communicate with another thread without using return value or global variable. The following code simulated a network connection process:
This code is very long.
TLDR: network connection with promise
Folding Network Connection
1 | struct PacketIn { |
This output is very long.
Output
1 | in packet with id 1998741 added~ |
icon-padding
Here I terminated manually because in our case it’s an infinite loop
This is a very long code segment, but we will view it part by part and try to explain everything.
-
line 1-47
- These are definitions of packet datatypes, for data coming in, we only have to have id and data, but for data going out, we also need a promise to store the result(which makes this class move-only).
-
line 50-99
- This is a connection. A connection should be able to store incoming and outgoing data, and check if there is any left. A connection should be able to send data back, in this case, we made a mock method.
- To make the code thread safe, we used a mutex to lock the data when writing to it.
icon-padding
because here we don’t have much time-consuming operation that can be parallelized, we don’t need to use std::shared_mutex
to lock the data.
-
line 101-133
- This is a connection set that will help us to hold a bunch of connections. It will also help us to add promise to the promise map (defined at line 103) which will be used to store the data from the incoming connection.
- Normally when the connection is closed, the connection will be removed from the list, but in our case, we don’t have a close connection, so we just return false for finish check.
-
line 135-151
- This is the core part of our code: a connection processor. When the connection has not finished, it will continuously process the connection in the connection set, each time one packet.
- In line 155, we used a reference to a promise to get and store the promise value (assuming the id is always unique).
- Because we need to add connection and process connection at the same time, we used a mutex to lock the processor.
-
line 153-172
- This is a function that simulates the process of connections going in. It will generate many connections and add them to the connection set.
- Because we need to add connection and process connection at the same time, we used a mutex to lock the producer.
-
line 174-189
- Main function, nothing much to say.
Exceptional situations with promises
When we write code, sometimes we don’t handle exception in the own method, but we will throw it to the caller. For the future it’s the same story. Suppose you have a function that will throw exception, and you call it with std::async
:
1 | int foo() { |
Normally it will return a future that contains the value. With exception, when we call get()
method, the exception will be thrown. It’s the same for std::packaged_task
and std::promise
. However, std::promise
has an explicit method to set exception: set_exception()
. So we can modify the function like this:
1 | int foo() { |
Or since it also accepts std::exception_ptr
as argument, we can this if we know the exception:
1 | int main() { |
The book said it is more efficient to use the latter for better compile time optimization and cleaner code, but I don’t know why.
icon-padding
std::future::get()
can also throw exception when the promise associated to this future is destoryed without setting value. In this case, the exception will be std::future_error
with error code std::future_errc::broken_promise
.
Although std::future
is a powerful tool, it can only have one thread waiting for the result. If we want to have multiple threads waiting for the result, we can use std::shared_future
. It is a wrapper of std::future
that can be copied and shared. The only difference is that std::shared_future
cannot be moved.
Shared future
When multiple threads are waiting for some return value, std::future
no longer works because it’s move-only. And, even if you pass it to multiple threads via reference, it can only get()
once, and any other get()
call will cause race condition and undefined behavior. In this case, we can use std::shared_future
to share the future with multiple threads.
For example, when multiple objects, say several elements in a complex spreed sheet with formulas depending on one variable, are waiting for the value of that variable, std::shared_future
is good to use.
However, in general, accessing one object simultaneously from multiple threads will cause a race condition. The following chart presented one possible execution sequence:
%%{ init: { 'theme': 'base', 'themeVariables': { 'primaryColor': '#add8e6', 'noteTextColor': '#0f0f0f', 'noteBkgColor': '#f5f6fa', 'lineColor': '#eaa1af', 'labelBoxBkgColor': '#add8e6' } } }%% flowchart TD Thread1 --->wait1["sf.wait()"] Thread2 --->wait2["sf.wait()"] wait2 ---> |dangling reference| obj int --->|out of scope| obj["freed sf"] subgraph th1Race [race condition] wait1 --->|reference| std::shared_future["sf"] wait2 --->|reference| std::shared_future["sf"] std::shared_future --->|produce async result| int end wait1 ---> |dangling reference| obj
In this case, if we still use reference to pass future objects, when it goes out of scope, the references will become dangling reference and is undefined behavior.
However, since we are using std::shared_future
, a copyable object, we can simply pass the copy to each thread, and when the future is ready, all the threads will get the result, like this:
%%{ init: { 'theme': 'base', 'themeVariables': { 'primaryColor': '#add8e6', 'noteTextColor': '#0f0f0f', 'noteBkgColor': '#f5f6fa', 'lineColor': '#eaa1af', 'labelBoxBkgColor': '#add8e6' } } }%% flowchart TD subgraph sp1 [thread 1 scope] wait1[thread 1] --> ftr1["sf1.wait()"] ftr1 ---> ok1["ok"] end ftr1 --->|get from|int ftr[sf] --->|copy|ftr1 subgraph sp [global scope] ftr --->|produce async result| int end ftr --->|copy|ftr2 int --->|out of scope| null[freed sf] subgraph sp2 [thread 2 scope] wait2[thread 2] --> ftr2["sf2.wait()"] ftr2 ---> ok2["ok"] end ftr2 --->|get from|int
In this case, since we are copying the result, there will be no dangling references.
To construct a std::share_future
, we can use existing std::future
. Consider the following code:
1 | std::promise<int> p; |
Output:
1 | 1 |
As we can see, initially f
is valid, but after moving it to construct sf
, it’s no longer valid. This is because std::future
is move-only, and after moving it, it’s in a moved-from state. After construction, sf
is now valid.
We can also construct std::shared_future
from std::promise
:
1 | std::promise<int> p; |
icon-padding
This is implicit transfer of ownership. Even though std::future
is move-only. But get_future
returns a rvalue reference, and std::shared_future
, inside its constructor, will always move the value to a rvalue reference. So it’s safe to use get_future
to construct std::shared_future
.
You will get a better understanding if you take a look at the constructor of std::shared_future
and std::promise::get_future
Constructor of
std::shared_future
:
1 | shared_future(future<_Res>&& __uf) noexcept |
std::promise::get_future
:
1 | future<_Res> |
Another way (actually a way easier way) to construct a std::shared_future
object is to use std::future::share()
:
1 | std::promise<int> p; |
This is verrrrrrrry useful since when then type of promise is really long (like objective-c), you don’t have to type it again.
Timeout
Sometimes, like for an HTTP request, we want to set a timeout for request. Or, like crontab, we want to execute a task at a specific time. In this case, C++ has a lot of ways to represent time and do time-related operations.
icon-padding
Most tools for time representation is in <chrono>
clock
A clock is a source of time that provides the following info:
- Current time (now)
- Type of value used to represent time
- tick period (the time between two ticks)
- whether the tick is constant and cannot be adjusted or not
In std::chrono
, there are several clocks:
std::chrono::system_clock
: wall clock time from the system-wide realtime clock. It’s not steady because it can be adjusted by the system.std::chrono::steady_clock
: monotonic clock that will never be adjustedstd::chrono::high_resolution_clock
: the clock with the shortest tick period available
duration
A duration is a time interval and the simplest part of the time support. They are handled by std::chrono::duration<>
template class. The first template argument is the type of the value used to represent the time interval, and the second template argument is the tick period. For example, std::chrono::duration<int, std::ratio<1, 30>>
is a duration that represents 1/30 second, and std::chrono::duration<double, std::ratio<60, 1>>
is a duration that represents 60 seconds.
Standard library provides a lot of predefined duration types, basically covered most of the use cases, and they are designed to hold sufficiently large time interval. Their names are also self-explanatory:
std::chrono::nanoseconds
std::chrono::microseconds
std::chrono::milliseconds
std::chrono::seconds
std::chrono::minutes
std::chrono::hours
std::chrono::literals
provides a way to construct duration with literal suffixes. For example, 1s
is a std::chrono::seconds
object with value 1, and 1ms
is a std::chrono::milliseconds
object with value 1. Since the literals are defined with typedef
, 1s
is actually std::chrono::seconds(1)
.
We can also implicitly cast one time type to another, but the cast have to be explicit when it’s from a smaller type to a bigger type (ms -> s or min -> h). For example, we can construct a std::chrono::seconds
object from a std::chrono::hours
object by implicitly casting. But when we are constructing a std::chrono::hours
object from a std::chrono::seconds
object, we have to explicitly cast it with std::chrono::duration_cast<>:
1 | std::chrono::seconds s(1); |
With duration, we can let a future wait for some time, like this:
1 | auto f = std::async(waitFor, 500); |
Output:
1 | ok! |
std::future::wait_for
returns a status to indicate whether the future is ready or not. Here we checked whether it’s ready or not, if not, there is a timeout.
icon-padding
The time for duration-based wait is calculated using a steady clock, so it’s not affected by system time adjustment.
time points
A time point can be represented by std::chrono::time_point
template class, which first specify the clock type as the first template argument, and the second template argument is the type of the value used to represent the time point. For example, std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>
is a time point that represents the time in seconds from the system clock.
The value of a time point is the length of the time since the epoch (a common one is the Unix epoch, which is 1970-01-01 00:00:00 UTC and another common one is the time the program starts). Different clocks can share one epoch, but they don’t have to. For example, when you get the epoch from two differrnt clock’s now
(say std::chrono::system_clock
and std::chrono::steady_clock
), the following code:
1 | using namespace std::chrono; |
has this output(time dependent):
1 | 1705857527 |
which shows the epoch time is different.
time points can do basic addition and subtraction, for example, to do benchmarking, we can do this:
1 | auto start = std::chrono::steady_clock::now(); |
Output:
1 | time elapsed: 502058917 |
The clock param of std::chrono::time_point
is used to specify the epoch, but it also used to track the time. For example, if we have a std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>
object, if we adjust the system time, the wait time will also be adjusted. But if we use std::chrono::steady_clock
, the wait time will not be affected.
This is an example to use a std::condition_variable
with timeout:
1 | bool wait_loop(std::condition_variable& cv, std::mutex& mtx, bool& done, int time) { |
Output:
1 | safe~ |
When we use std::condition_variable::wait_until
, if we don’t pass in predicate, we have to use a loop to check the condition to handle spurious wake up (a thread wakes up when it’s not notified and the predicate is not satisfied). With a loop, when the time is not out, we can check whether the condition is met or not(with done
). If no, the thread will go to sleep again.
Function with timeout
There are two major functions that accept timeout as param: std::this_thread::sleep_for
and std::this_thread::sleep_until
(you probably saw me used the former before to simulate time-consuming process). The former will block the thread for a specific time, and the latter will block the thread until a specific time point. The latter can be used to trigger events like crontab.
Normally, a locked mutex can only be locked with RAII wrapper or .lock()
, and unlocked when called .unlock()
. But for some mutex, like std::timed_mutex
and std::recursive_timed_mutex
, we can also use .try_lock_for()
and .try_lock_until()
to lock the mutex for a specific time. If the mutex is not locked after the time is out, the function will return false.
Other functions that accept timeout are listed in this table:
class/namespace | functions | return value |
---|---|---|
std::this_thread namespace |
sleep_for(duration) , sleep_until(time_point) |
N/A |
std::condition_variable or std::condition_variable_any |
wait_for(lock, duration) , wait_until(lock, time_point) (both function accepts optional predicate) |
bool (the return value of the predicate when woken) |
std::timed_mutex , std::recursive_timed_mutex , std::shared_timed_mutex |
try_lock_for(duration) , try_lock_until(time_point) |
bool (true if lock is successful, false otherwise) |
std::shared_timed_mutex |
try_lock_shared_for(duration) , try_lock_shared_until(time_point) |
bool (true if lock is successful, false otherwise) |
std::unique_lock<TimedLockable>(lockable, duration) , std::unqiue_lock(lockable, time_point) |
owns_lock() (checks whether the lock is acquired or not), try_lock_for , try_lock_until |
bool (true if the lock is acquired, false otherwise) |
std::shared_lock<SharedTimeLockable>(lockable, duration) , std::shared_lock(lockable, time_point) |
owns_lock() (checks whether the lock is acquired or not), try_lock_for(duration) , try_lock_until(time_point) |
bool (trueif lock is acquired, false otherwise) |
std::future std::shared_future |
wait_until(time_point) , wait_for(duration) |
std::future_status::timeout if wait time out, std::future_status::ready if the future is ready, std::future_status::deferred if the future holds a deferred-start function |
Summary
- We can schedule a task to be executed in another thread with
std::async
andstd::packaged_task
std::packaged_task
is callable. It will block the thread and store task result in a future.
- We can use
std::promise
to communicate between threadsstd::promise
is a wrapper ofstd::future
that can be used to store a value that can be retrieved by the associated future.std::promise
can be used to set exception
std::shared_future
is a wrapper ofstd::future
that can be copied and sharedstd::shared_future
can be constructed fromstd::future
orstd::promise
- Since
std::shared_future
is copyable, we can use it to prevent dangling reference and race conditions - One easy way to construct
std::shared_future
is to usestd::future::share()
- We can use
std::chrono
namespace to represent times.system_clock
fetches time from system-wide realtime clock, and it can be adjusted.steady_clock
is a monotonic clock that will never be adjusted.high_resolution_clock
is the clock with the shortest tick period available.std::chrono::duration
is a time interval.- The first template argument is the type of the value used to represent the time interval, and the second template argument is the tick period.
- Standard library provides a lot of predefined duration types, basically covered most of the use cases.
std::chrono::literals
provides a way to construct duration with literal suffixes.- We can implicitly cast one time type to another, but the cast have to be explicit when it’s from a smaller type to a bigger type (ms -> s or min -> h).
- Different clocks can share one epoch, but they don’t have to.
- Many objects accepts timeout as param
- Time can be represented by
std::chrono::duration
andstd::chrono::time_point
- methods with for accepts time duration, and methods with until accepts time point
- Time can be represented by
See more in next part~