Chapter 4 - Part 2. Syncronizing concurrent operations

Chapter 4 - Part 2. Syncronizing concurrent operations

Ayano Kagurazaka Lv3

If not specified, all the code is written in c++23 standard, compiled with g++ 13.2
command: g\++ -std=c\++2b -O0 .cpp -o out && ./out

Chapter 4. Synchronizing concurrent operations - Part 2

Other ways to associate tasks with future

Make a schedule(ed task)

Another way to associate tasks with future is to use std::packaged_task<>. Unlike std::async, this object wraps the associated future into it, so we can have a higher level of abstraction, and if we are using a scheduler or a thread pool, that object can deal with std::packaged_task<> rather than a bunch of functions.

std::packaged_task<> is a template class, and the template argument is the function signature of the task (for example, the function std::string foo(int bar) has signature std::string(int)). In fact, std:packaged_task<> itself is a callable object, means it overload operator(), and can be passed to std::thread, std::async, or even invoked directly. If it was invoked directly, it will execute the task and store the result in the associated future that can be retrieved by calling .get_future().

One example of std::packaged_task<> will be a poster-worker scenario. In this case, the task poster will push task for the worker to execute, and the worker will execute the task asynchronously. And the worker thread will share the result with the poster with the future associated with std::packaged_task<>.

poster and worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
std::mutex mtx;
std::deque<std::packaged_task<void()>> tasks;

void worker() {
while (true) {
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lock(mtx);
if (tasks.empty()) continue;
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
};

template<typename Func>
std::future<void> postTask(Func f) {
std::packaged_task<void()> task(f);
auto res = task.get_future();
std::lock_guard<std::mutex> lock(mtx);
tasks.push_back(std::move(task));
return res;
}

void printThings() {
std::cout << "task start\n";
for (int i = 0; i < 3; i++) {
std::cout << i << std::endl;
}
}

int main(){
std::thread worker_thread(worker);
std::cout << "process start\n";
postTask(printThings);
std::cout << "1st task fin\n";
postTask(printThings);
std::cout << "2nd task fin\n";
postTask(printThings);
std::cout << "3rd task fin\n";
worker_thread.join();
}

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
process start
1st task fin
2nd task fin
3rd task fin
task start
0
1
2
task start
0
1
2
task start
0
1
2
^C

icon-padding

This code is an infinite loop, so I have to terminate it manually.

In this case, we made a worker thread that accepts tasks from a poster and execute them in order. The worker thread operates in a loop, picking one task each time, waiting to finish the task, and then pick another one. The poster thread will post tasks to the worker thread, and return the associated future for return value. The poster thread will not wait for the task to finish, but the future can be used to retrieve the result.

This pattern can be extended to contain more functionality. For example, for the template argument, we can specify the type of function we are passing in, therefore extending what the worker thread can do.

icon-padding

Note here even though poster and worker work asynchronously(because we started a thread for worker), but poster and worker themselves are not working in parallel. This is obvious if we take a look at the implementation of std::packaged_task<>::operator():

std::packaged_task<>::operator()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ...
private:
__packaged_task_function<result_type(_ArgTypes...)> __f_;
promise<result_type> __p_;
public:
template <class _Fp,
class = __enable_if_t<!is_same<__remove_cvref_t<_Fp>, packaged_task>::value> >
_LIBCPP_INLINE_VISIBILITY
explicit packaged_task(_Fp&& __f) : __f_(_VSTD::forward<_Fp>(__f)) {}
// ...

template<class ..._ArgTypes>
void
packaged_task<void(_ArgTypes...)>::operator()(_ArgTypes... __args)
{
if (__p_.__state_ == nullptr)
__throw_future_error(future_errc::no_state);
if (__p_.__state_->__has_value())
__throw_future_error(future_errc::promise_already_satisfied);
#ifndef _LIBCPP_NO_EXCEPTIONS
try
{
#endif // _LIBCPP_NO_EXCEPTIONS
__f_(_VSTD::forward<_ArgTypes>(__args)...);
__p_.set_value();
#ifndef _LIBCPP_NO_EXCEPTIONS
}
catch (...)
{
__p_.set_exception(current_exception());
}
#endif // _LIBCPP_NO_EXCEPTIONS
}

As we can see, the function given executed by directly calling it rather than using std::async or std::thread, so it is executed synchronously.

Make a (std::)promise

When there is a function that cannot be expressed in a single function, or those places where tasks come from multiple places, the pattern we used above no longer works (because task poster and worker might not share the same mutex). Also, if we run one thread for each task, and we have task coming in at a high rate, we might end up with a lot of threads, consume a lot of resources and slow down the system because of the context switch.

One good example would be network connections, where we can have a lot of connections(10k+) but we have only limited concurrency ability (96 cores 192 threads for Threadripper PRO 7995WX. Maybe one server unit will install 4 of them, but it’s still way too small). In this case, the ideal solution would be letting small amount of threads to handle connections with each one handle multiple connections.

According to our scheme, data pack from various connections will come in random order, and being handled by one thread. And likewise, data pack will be sent out in random order. In this case, we can use std::promise<> to handle this situation.

std::promise<T> provides a way to store a value of type T that can be retrieved by the associated future. In this case, the waiting thread will get the future from the promise, and the networking thread can use the promise to set value and get future ready.

Similarly, you can get the future from a std::promise with .get_future(), but in this case, you can set the value of a promise with .set_value(). This makes us able to communicate with another thread without using return value or global variable. The following code simulated a network connection process:

This code is very long.
TLDR: network connection with promise

Folding Network Connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
struct PacketIn {
PacketIn(int data, int id) : data(data), id(id) {}
PacketIn(const PacketIn& other) {
data = other.data;
id = other.id;
}

PacketIn(PacketIn&& other) noexcept {
data = other.data;
id = other.id;
}

PacketIn& operator=(const PacketIn& other) {
data = other.data;
id = other.id;
return *this;
}
PacketIn& operator=(PacketIn&& other) noexcept {
data = other.data;
id = other.id;
return *this;
}
int data;
int id;

};

struct PacketOut {
int data;
int id;
std::promise<bool> result;

PacketOut(int data, int id) : data(data), id(id) {}

PacketOut& operator=(PacketOut&& other) noexcept {
data = other.data;
id = other.id;
result = std::move(other.result);
return *this;
}

PacketOut(PacketOut &&other) noexcept {
data = other.data;
id = other.id;
result = std::move(other.result);
}
};



class Connection {
public:
Connection(const std::vector<PacketIn> &packet_in, std::vector<PacketOut> &&packet_out)
: packet_in(packet_in), packet_out(std::make_move_iterator(packet_out.begin()), std::make_move_iterator(packet_out.end())){

}

Connection(Connection&& other) noexcept {
packet_in = std::move(other.packet_in);
packet_out = std::move(other.packet_out);
}

Connection& operator=(Connection&& other) noexcept {
packet_in = std::move(other.packet_in);
packet_out = std::move(other.packet_out);
return *this;
}

[[nodiscard]] bool hasPacketIn() const noexcept {
return !packet_in.empty();
}

[[nodiscard]] bool hasPacketOut() const noexcept {
return !packet_out.empty();
}

std::shared_ptr<PacketIn> getPacketIn() {
std::lock_guard<std::mutex> lock(mtx);
auto ret = std::make_shared<PacketIn>(std::forward<PacketIn>(packet_in.back()));
packet_in.pop_back();
return ret;
}

std::shared_ptr<PacketOut> getPacketOut() {
std::lock_guard<std::mutex> lock(mtx);
auto ret = std::make_shared<PacketOut>(std::forward<PacketOut>(packet_out.back()));
packet_out.pop_back();
return ret;
}

void send(int data) const noexcept {
std::cout << "data " << data << " send~\n";
}

private:
mutable std::mutex mtx;
std::vector<PacketIn> packet_in;
std::vector<PacketOut> packet_out;
};

std::unordered_map<int, std::promise<int>> promise_map;

class ConnectionSet {
public:
std::promise<int>& getPromise(int id) {
std::cout << "promise get from: " << id << "~\n";
return promise_map[id];
}

bool isFinished() {
return true;
}

auto begin() noexcept {
return connections.begin();
}

auto end() {
return connections.end();
}

void addConnection(Connection&& conn) noexcept {
connections.emplace_back(std::forward<Connection>(conn));
}

void addConnection(std::vector<PacketIn>&& in, std::vector<PacketOut>&& out) {
connections.emplace_back(in, std::move(out));
}


private:
std::vector<Connection> connections;
};

void processConnections(ConnectionSet& connections, std::mutex& mutex) {
while (!connections.isFinished()) {
for (auto& i : connections) {
std::lock_guard<std::mutex> lock(mutex);
if (i.hasPacketOut()) {
auto data = i.getPacketOut();
i.send(data->data);
data->result.set_value(true);
}
if (i.hasPacketIn()) {
auto data = i.getPacketIn();
auto& prom = connections.getPromise(data->id);
prom.set_value(data->data);
}
}
}
}

void addConnections(ConnectionSet& connections, int conn_count, int data_per_conn, std::mutex& mutex) {
std::random_device rd;
std::mt19937 gen(rd());
std::binomial_distribution rand(10000000, 0.2);
for(int i = 0; i < conn_count; i++) {
std::vector<PacketIn> in;
std::vector<PacketOut> out;
for (int j = 0; j < data_per_conn / 2; j++) {
in.emplace_back(rand(gen), rand(gen));
std::cout << "in packet with id " << in.back().id << " added~\n";
}
for (int j = data_per_conn / 2; j < data_per_conn; j++) {
out.emplace_back(rand(gen), rand(gen));
std::cout << "out packet with id " << out.back().id << " added~\n";
}
std::lock_guard lock(mutex);
connections.addConnection(std::move(in), std::move(out));
std::cout << "connection added~\n";
}
}

int main() {
int worker_cnt = 2, conn_per_worker = 6, data_per_conn = 2;
auto sets = std::vector<ConnectionSet>(worker_cnt);
std::mutex lock;
for (int i = 0; i < worker_cnt; ++i) {
addConnections(sets[i], conn_per_worker, data_per_conn, lock);
}
auto threads = std::vector<std::thread>(worker_cnt * 2);
for (int i = 0; i < worker_cnt; ++i) {
threads[i] = std::thread(processConnections, std::ref(sets[i]), std::ref(lock));
threads[i + worker_cnt] = std::thread(addConnections, std::ref(sets[i]), conn_per_worker, data_per_conn, std::ref(lock));
}
for (auto& i : threads) {
i.join();
}
}

This output is very long.

Output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
in packet with id 1998741 added~
out packet with id 1999669 added~
connection added~
in packet with id 2002356 added~
out packet with id 2002406 added~
connection added~
data add fin~
in packet with id 1998543 added~
out packet with id 1999869 added~
connection added~
in packet with id 1997895 added~
out packet with id 1999961 added~
connection added~
data add fin~
in packet with id 2000847 added~
out packet with id 2002785 added~
connection added~
in packet with id 1999243 added~
out packet with id 2001462 added~
connection added~
data add fin~
in packet with id 1998695 added~
out packet with id 1999153 added~
connection added~
in packet with id 2001660 added~
out packet with id 2002130 added~
connection added~
data add fin~
in packet with id 2001593 added~
out packet with id 1999378 added~
connection added~
in packet with id 1999223 added~
out packet with id 2000339 added~
connection added~
data add fin~
in packet with id 2002044 added~
out packet with id 2001631 added~
connection added~
in packet with id 1999705 added~
out packet with id 1999380 added~
connection added~
data add fin~
in packet with id 1998838 added~
out packet with id 2001870 added~
connection added~
in packet with id 1997887 added~
out packet with id 1998732 added~
connection added~
data add fin~
in packet with id 1998148 added~
out packet with id 1997622 added~
connection added~
in packet with id 2002020 added~
out packet with id 2002349 added~
connection added~
data add fin~
data 1998863 send~
promise get from: 1998695~
data 2000863 send~
promise get from: 2001660~
data 1999639 send~
promise get from: 1998838~
data 2000657 send~
promise get from: 1997887~
data 1998236 send~
promise get from: 2000847~
data 1998465 send~
promise get from: 1999243~
data 2000497 send~
promise get from: 1998148~
data 1999509 send~
promise get from: 2002020~
data 2000750 send~
promise get from: 1998543~
data 1998581 send~
promise get from: 1997895~
data 2001581 send~
promise get from: 2002044~
data 2000491 send~
promise get from: 1999705~
data 1999462 send~
promise get from: 1998741~
data 2003132 send~
promise get from: 2002356~
data 2002465 send~
promise get from: 2001593~
data 1998991 send~
promise get from: 1999223~

icon-padding

Here I terminated manually because in our case it’s an infinite loop

This is a very long code segment, but we will view it part by part and try to explain everything.

  • line 1-47

    • These are definitions of packet datatypes, for data coming in, we only have to have id and data, but for data going out, we also need a promise to store the result(which makes this class move-only).
  • line 50-99

    • This is a connection. A connection should be able to store incoming and outgoing data, and check if there is any left. A connection should be able to send data back, in this case, we made a mock method.
    • To make the code thread safe, we used a mutex to lock the data when writing to it.

icon-padding

because here we don’t have much time-consuming operation that can be parallelized, we don’t need to use std::shared_mutex to lock the data.

  • line 101-133

    • This is a connection set that will help us to hold a bunch of connections. It will also help us to add promise to the promise map (defined at line 103) which will be used to store the data from the incoming connection.
    • Normally when the connection is closed, the connection will be removed from the list, but in our case, we don’t have a close connection, so we just return false for finish check.
  • line 135-151

    • This is the core part of our code: a connection processor. When the connection has not finished, it will continuously process the connection in the connection set, each time one packet.
    • In line 155, we used a reference to a promise to get and store the promise value (assuming the id is always unique).
    • Because we need to add connection and process connection at the same time, we used a mutex to lock the processor.
  • line 153-172

    • This is a function that simulates the process of connections going in. It will generate many connections and add them to the connection set.
    • Because we need to add connection and process connection at the same time, we used a mutex to lock the producer.
  • line 174-189

    • Main function, nothing much to say.

Exceptional situations with promises

When we write code, sometimes we don’t handle exception in the own method, but we will throw it to the caller. For the future it’s the same story. Suppose you have a function that will throw exception, and you call it with std::async:

1
2
3
4
5
6
7
8
9
int foo() {
throw std::runtime_error("foo error");
return 0;
}


int main() {
auto a = std::async(foo);
}

Normally it will return a future that contains the value. With exception, when we call get() method, the exception will be thrown. It’s the same for std::packaged_task and std::promise. However, std::promise has an explicit method to set exception: set_exception() . So we can modify the function like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int foo() {
throw std::runtime_error("foo error");
return 0;
}


int main() {
std::promise<int> promise;
try {
promise.set_value(foo());
}
catch (std::runtime_error) {
promise.set_exception(std::current_exception());
}
}

Or since it also accepts std::exception_ptr as argument, we can this if we know the exception:

1
2
3
4
5
6
7
8
9
int main() {
std::promise<int> promise;
try {
promise.set_value(foo());
}
catch (std::runtime_error) {
promise.set_exception(std::make_exception_ptr(std::runtime_error("foo error")));
}
}

The book said it is more efficient to use the latter for better compile time optimization and cleaner code, but I don’t know why.

icon-padding

std::future::get() can also throw exception when the promise associated to this future is destoryed without setting value. In this case, the exception will be std::future_error with error code std::future_errc::broken_promise.

Although std::future is a powerful tool, it can only have one thread waiting for the result. If we want to have multiple threads waiting for the result, we can use std::shared_future. It is a wrapper of std::future that can be copied and shared. The only difference is that std::shared_future cannot be moved.

Shared future

When multiple threads are waiting for some return value, std::future no longer works because it’s move-only. And, even if you pass it to multiple threads via reference, it can only get() once, and any other get() call will cause race condition and undefined behavior. In this case, we can use std::shared_future to share the future with multiple threads.

For example, when multiple objects, say several elements in a complex spreed sheet with formulas depending on one variable, are waiting for the value of that variable, std::shared_future is good to use.

However, in general, accessing one object simultaneously from multiple threads will cause a race condition. The following chart presented one possible execution sequence:

%%{
    init: {
        'theme': 'base',
        'themeVariables': { 
            'primaryColor': '#add8e6',
            'noteTextColor': '#0f0f0f',
            'noteBkgColor': '#f5f6fa',
            'lineColor': '#eaa1af',
            'labelBoxBkgColor': '#add8e6'

        }
    }
}%%
flowchart TD

Thread1 --->wait1["sf.wait()"]
Thread2 --->wait2["sf.wait()"]

wait2 ---> |dangling reference| obj
int --->|out of scope| obj["freed sf"]
subgraph th1Race [race condition]
    wait1 --->|reference| std::shared_future["sf"]
    wait2 --->|reference| std::shared_future["sf"]
    std::shared_future --->|produce async result| int
end
wait1 ---> |dangling reference| obj

In this case, if we still use reference to pass future objects, when it goes out of scope, the references will become dangling reference and is undefined behavior.

However, since we are using std::shared_future, a copyable object, we can simply pass the copy to each thread, and when the future is ready, all the threads will get the result, like this:

%%{
    init: {
        'theme': 'base',
        'themeVariables': { 
            'primaryColor': '#add8e6',
            'noteTextColor': '#0f0f0f',
            'noteBkgColor': '#f5f6fa',
            'lineColor': '#eaa1af',
            'labelBoxBkgColor': '#add8e6'

        }
    }
}%%
flowchart TD
subgraph sp1 [thread 1 scope]
    wait1[thread 1] --> ftr1["sf1.wait()"]
    ftr1 ---> ok1["ok"]
end
ftr1 --->|get from|int
ftr[sf] --->|copy|ftr1
subgraph sp [global scope]
ftr --->|produce async result| int
end
ftr --->|copy|ftr2
int --->|out of scope| null[freed sf]  
subgraph sp2 [thread 2 scope]
    wait2[thread 2] --> ftr2["sf2.wait()"]
    ftr2 ---> ok2["ok"]
end
ftr2 --->|get from|int

In this case, since we are copying the result, there will be no dangling references.

To construct a std::share_future, we can use existing std::future. Consider the following code:

1
2
3
4
5
6
std::promise<int> p;
auto f(p.get_future());
std::cout << f.valid() << std::endl;
std::shared_future<int> sf(std::move(f));
std::cout << f.valid() << std::endl;
std::cout << sf.valid() << std::endl;

Output:

1
2
3
1
0
1

As we can see, initially f is valid, but after moving it to construct sf, it’s no longer valid. This is because std::future is move-only, and after moving it, it’s in a moved-from state. After construction, sf is now valid.

We can also construct std::shared_future from std::promise:

1
2
std::promise<int> p;
std::shared_future<int> sf(p.get_future());

icon-padding

This is implicit transfer of ownership. Even though std::future is move-only. But get_future returns a rvalue reference, and std::shared_future, inside its constructor, will always move the value to a rvalue reference. So it’s safe to use get_future to construct std::shared_future.

You will get a better understanding if you take a look at the constructor of std::shared_future and std::promise::get_future

Constructor of std::shared_future:

1
2
3
shared_future(future<_Res>&& __uf) noexcept
: _Base_type(std::move(__uf))
{ }

std::promise::get_future:

1
2
3
future<_Res>
get_future()
{ return future<_Res>(_M_future); }

Another way (actually a way easier way) to construct a std::shared_future object is to use std::future::share():

1
2
std::promise<int> p;
auto f = p.get_future().share();

This is verrrrrrrry useful since when then type of promise is really long (like objective-c), you don’t have to type it again.

Timeout

Sometimes, like for an HTTP request, we want to set a timeout for request. Or, like crontab, we want to execute a task at a specific time. In this case, C++ has a lot of ways to represent time and do time-related operations.

icon-padding

Most tools for time representation is in <chrono>

clock

A clock is a source of time that provides the following info:

  • Current time (now)
  • Type of value used to represent time
  • tick period (the time between two ticks)
  • whether the tick is constant and cannot be adjusted or not

In std::chrono, there are several clocks:

  • std::chrono::system_clock: wall clock time from the system-wide realtime clock. It’s not steady because it can be adjusted by the system.
  • std::chrono::steady_clock: monotonic clock that will never be adjusted
  • std::chrono::high_resolution_clock: the clock with the shortest tick period available

duration

A duration is a time interval and the simplest part of the time support. They are handled by std::chrono::duration<> template class. The first template argument is the type of the value used to represent the time interval, and the second template argument is the tick period. For example, std::chrono::duration<int, std::ratio<1, 30>> is a duration that represents 1/30 second, and std::chrono::duration<double, std::ratio<60, 1>> is a duration that represents 60 seconds.

Standard library provides a lot of predefined duration types, basically covered most of the use cases, and they are designed to hold sufficiently large time interval. Their names are also self-explanatory:

  • std::chrono::nanoseconds
  • std::chrono::microseconds
  • std::chrono::milliseconds
  • std::chrono::seconds
  • std::chrono::minutes
  • std::chrono::hours

std::chrono::literals provides a way to construct duration with literal suffixes. For example, 1s is a std::chrono::seconds object with value 1, and 1ms is a std::chrono::milliseconds object with value 1. Since the literals are defined with typedef, 1s is actually std::chrono::seconds(1).

We can also implicitly cast one time type to another, but the cast have to be explicit when it’s from a smaller type to a bigger type (ms -> s or min -> h). For example, we can construct a std::chrono::seconds object from a std::chrono::hours object by implicitly casting. But when we are constructing a std::chrono::hours object from a std::chrono::seconds object, we have to explicitly cast it with std::chrono::duration_cast<>:

1
2
3
std::chrono::seconds s(1);
std::chrono::hours h = std::chrono::duration_cast<std::chrono::hours>(s);
std::chrono::milliseconds ms(s);

With duration, we can let a future wait for some time, like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
auto f = std::async(waitFor, 500);
if(f.wait_for(std::chrono::milliseconds(1000)) == std::future_status::ready) {
std::cout << "ok!\n";
}
else {
std::cout << "timeout!";
}
f = std::async(waitFor, 1500);
if(f.wait_for(std::chrono::milliseconds(1000)) == std::future_status::ready) {
std::cout << "ok!\n";
}
else {
std::cout << "timeout!";
}

Output:

1
2
ok!
timeout!

std::future::wait_for returns a status to indicate whether the future is ready or not. Here we checked whether it’s ready or not, if not, there is a timeout.

icon-padding

The time for duration-based wait is calculated using a steady clock, so it’s not affected by system time adjustment.

time points

A time point can be represented by std::chrono::time_point template class, which first specify the clock type as the first template argument, and the second template argument is the type of the value used to represent the time point. For example, std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> is a time point that represents the time in seconds from the system clock.

The value of a time point is the length of the time since the epoch (a common one is the Unix epoch, which is 1970-01-01 00:00:00 UTC and another common one is the time the program starts). Different clocks can share one epoch, but they don’t have to. For example, when you get the epoch from two differrnt clock’s now (say std::chrono::system_clock and std::chrono::steady_clock), the following code:

1
2
3
4
5
6
7
using namespace std::chrono;
auto p0 = system_clock::now();
auto p1 = steady_clock::now();
auto sys = duration_cast<seconds>(p0.time_since_epoch());
auto steady = duration_cast<seconds>(p1.time_since_epoch());
std::cout << sys.count() << std::endl;
std::cout << steady.count();

has this output(time dependent):

1
2
1705857527
398199

which shows the epoch time is different.

time points can do basic addition and subtraction, for example, to do benchmarking, we can do this:

1
2
3
4
5
auto start = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
auto end = std::chrono::steady_clock::now();
auto diff = end - start;
std::cout << "time elapsed: " << diff.count() << std::endl;

Output:

1
time elapsed: 502058917

The clock param of std::chrono::time_point is used to specify the epoch, but it also used to track the time. For example, if we have a std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> object, if we adjust the system time, the wait time will also be adjusted. But if we use std::chrono::steady_clock, the wait time will not be affected.

This is an example to use a std::condition_variable with timeout:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool wait_loop(std::condition_variable& cv, std::mutex& mtx, bool& done, int time) {
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(time);
std::unique_lock lock(mtx);
while (!done)
{
if(cv.wait_until(lock, timeout) == std::cv_status::timeout) {
std::cout << "timeout~\n";
break;
}
else {
done = true;
std::cout << "safe~\n";
break;
}
}
return done;
}


int main() {
std::mutex mtx;
std::condition_variable cv;
std::thread th(wait_loop, std::ref(cv), std::ref(mtx), 1000);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cv.notify_all();
th.join();
th = std::thread(wait_loop, std::ref(cv), std::ref(mtx), 50);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cv.notify_all();
th.join();
}

Output:

1
2
safe~
timeout~

When we use std::condition_variable::wait_until, if we don’t pass in predicate, we have to use a loop to check the condition to handle spurious wake up (a thread wakes up when it’s not notified and the predicate is not satisfied). With a loop, when the time is not out, we can check whether the condition is met or not(with done). If no, the thread will go to sleep again.

Function with timeout

There are two major functions that accept timeout as param: std::this_thread::sleep_for and std::this_thread::sleep_until (you probably saw me used the former before to simulate time-consuming process). The former will block the thread for a specific time, and the latter will block the thread until a specific time point. The latter can be used to trigger events like crontab.

Normally, a locked mutex can only be locked with RAII wrapper or .lock(), and unlocked when called .unlock(). But for some mutex, like std::timed_mutex and std::recursive_timed_mutex, we can also use .try_lock_for() and .try_lock_until() to lock the mutex for a specific time. If the mutex is not locked after the time is out, the function will return false.

Other functions that accept timeout are listed in this table:

class/namespace functions return value
std::this_thread namespace sleep_for(duration), sleep_until(time_point) N/A
std::condition_variable or std::condition_variable_any wait_for(lock, duration), wait_until(lock, time_point) (both function accepts optional predicate) bool (the return value of the predicate when woken)
std::timed_mutex, std::recursive_timed_mutex, std::shared_timed_mutex try_lock_for(duration), try_lock_until(time_point) bool (true if lock is successful, false otherwise)
std::shared_timed_mutex try_lock_shared_for(duration), try_lock_shared_until(time_point) bool (true if lock is successful, false otherwise)
std::unique_lock<TimedLockable>(lockable, duration), std::unqiue_lock(lockable, time_point) owns_lock()(checks whether the lock is acquired or not), try_lock_for, try_lock_until bool (true if the lock is acquired, false otherwise)
std::shared_lock<SharedTimeLockable>(lockable, duration), std::shared_lock(lockable, time_point) owns_lock()(checks whether the lock is acquired or not), try_lock_for(duration), try_lock_until(time_point) bool (trueif lock is acquired, false otherwise)
std::future std::shared_future wait_until(time_point), wait_for(duration) std::future_status::timeout if wait time out, std::future_status::ready if the future is ready, std::future_status::deferred if the future holds a deferred-start function

Summary

  • We can schedule a task to be executed in another thread with std::async and std::packaged_task
    • std::packaged_task is callable. It will block the thread and store task result in a future.
  • We can use std::promise to communicate between threads
    • std::promise is a wrapper of std::future that can be used to store a value that can be retrieved by the associated future.
    • std::promise can be used to set exception
  • std::shared_future is a wrapper of std::future that can be copied and shared
    • std::shared_future can be constructed from std::future or std::promise
    • Since std::shared_future is copyable, we can use it to prevent dangling reference and race conditions
    • One easy way to construct std::shared_future is to use std::future::share()
  • We can use std::chrono namespace to represent times.
    • system_clock fetches time from system-wide realtime clock, and it can be adjusted.
    • steady_clock is a monotonic clock that will never be adjusted.
    • high_resolution_clock is the clock with the shortest tick period available.
    • std::chrono::duration is a time interval.
      • The first template argument is the type of the value used to represent the time interval, and the second template argument is the tick period.
      • Standard library provides a lot of predefined duration types, basically covered most of the use cases.
      • std::chrono::literals provides a way to construct duration with literal suffixes.
      • We can implicitly cast one time type to another, but the cast have to be explicit when it’s from a smaller type to a bigger type (ms -> s or min -> h).
    • Different clocks can share one epoch, but they don’t have to.
  • Many objects accepts timeout as param
    • Time can be represented by std::chrono::duration and std::chrono::time_point
    • methods with for accepts time duration, and methods with until accepts time point

See more in next part~

Comments