This article introduces the Asio
library for convenient C++-native
asynchronous programming and suggests an approach for solving common
use-after-free bugs.
Table of Contents
Quick intro to Asynchronous Input/Output
Distributed applications that involve message passing over a network necessarily adopt non-blocking input/output. This is a natural consequence of the fact that such applications are IO-bound, rather than CPU-bound: rather than being continuously engaged in intensive computation, the CPU is mostly waiting for messages to be sent and received, limited by network delays, flow-control, congestion-control, and possibly other policy limits.
This is illustrated in the following figure:
Process A Process B
| |
T |---- Send Request -----> | ← Message sent from A to B
| | |
| |==== WAITING (BLOCKED) ===| ← A is idle, waiting for B
| | |
T’ |<---- Send Response -----| ← B finishes and replies
| |
v v
/* pseudocode */
response = send(B, msg) /* this takes (T’-T)time */
process_response(response)
If A
made a blocking call, as shown above, as in the pure, original form
of RPC1, suspending execution of the current thread until a response was
received or timeout occurred, then the CPU is doing virtually no work with
respect to the thread of this application, between T and T’.
The application will be put to sleep, and woken up when an interrupt is fired
and the kernel stack has read bytes from the network interface destined for
the socket that A is listening for a response on.
What if A
wanted to make another such call to C
? Or make two or more
calls at the same time to B
? This simple model does not allow for such
concurrency. Applications of any complexity will, however, almost always
require concurrency as without it the performance would be unacceptable:
n
requests from A
to B
would take n x (T’ - T)
time.
If concurrency is required, one option would be to use threads. However,
this solution does not scale well: n
parallel blocking requests would
require n
threads. Additionally, there are system limits to the number
of threads that can be spawned. More to the point, while threads would allow
for concurrency, they would not speed up the application beyond that. For
one thing, past a point, thread setup and teardown and scheduling add up to
an overhead that must be considered. The crux of the matter however is that
most of the time is still spent waiting for data from the network. The
requests are generated and sent quickly by the sender, A
. But then A
must
wait for a time that no amount of CPU power (and no number of threads) at A
will change. This time is the response time of B and includes a number
of delays: transmission
, propagation
, queueing
, and processing
delays
at A
, B
, and all the hops on the network path in between. If a reliable
protocol is used, the kernels may further have to act according to flow
control, error control, and congestion control signals, all while the
application task is sleeping and gets little to no CPU time.
Threads therefore are a good concurrency solution when the threads themselves are relatively few in number, perform long-running, independent (to minimize need for synchronization) blocking tasks, and/or tasks that put the CPU to work continuously rather than keeping it idle.
In situations such as the above however, where we have bursty CPU usage
followed by relatively long periods of idling, another solution is appropriate.
This is non-blocking, asynchronous input/output. Concurrency here does not
involve separate threads; instead, a single thread designs its control flow
to be non-blocking and to react to events. A
sends a request to B
and does
not block/suspend its thread, waiting for a response. Instead, it is free to
send other requests and is notified when response data has arrived2.
Input and output (reads to and writes from the network socket) are interleaved.
This achieves a concurrency model that is more suitable to the IO-bound nature
of network applications. A
– assuming it has work to do – can use the CPU
between T
and T’
instead of leaving it idle while its thread is blocked.
Process A Process B Process C Process D
| | | |
T |---- Send Request 1 -----> | | |
| |---- Send Request 2 --------|----------------|---------------> |
| |---- Send Request 3 --------|--------------> | |
| … … … …
| |---- Send Request 4 —-----> | | |
| … … … …
| | <---- Response 2 —---------|----------------|---------------- |
| | <---- Response 3 —---------|----------------| |
| … … … …
| |---- Send Request 5 --------|----------------|---------------> |
| | <---- Response 4 —---------| | |
T’ | <---- Response 1 —---------| | |
… … … …
| | | |
| | | |
V V V V
In the figure, the responses come out of order. This could be due to a number
of reasons: reordering in the network (due to different round-trip-times,
retransmissions etc), differing response times, prioritization policies at
B
, etc. Between T
(time of sending request 1
) and T’
(time of getting
the response to request 1
), other work is done. The thread at A
is not
blocked.
How might this be achieved?
Operating systems typically make available an API for monitoring input-output
events. Examples are epoll
(or the older poll
and select
APIs) in Linux
and Kqueue
in the BSD
s. In all such systems this boils down to providing
an API
for:
- registering interest in certain events (e.g. socket is readable, socket is writable, timer expired etc)3.
- making a single blocking call that unblocks when any matching event (or maximum timeout) occurs.
An example of this is given below4.
for (;;) {
// BLOCKS here.
nev = kevent(kq, chlist, N,
evlist, N,
&tmout); /* set upper time limit to block */
if (nev == -1) {
perror("kevent()");
exit(EXIT_FAILURE);
}
else if (nev == 0) {
/* handle timeout */
}
else if (nev > 0) {
for (i = 0; i < nev; i++) {
/* handle events */
}
}
}
The Linux epoll API5 is structurally similar.
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
// ...
// Now look at each event that occured and handle it.
}
The blocking call is typically made in an infinite loop. The events are tracked
inside the kernel, which unblocks the thread and provides notifications when
any events occurred. This does require the entire application to be modeled
like an event loop. Is Socket x readable? Call the read_bytes(x)
handler.
Is socket y writable ? Send some bytes to y from the buffer.
Because this article is not about epoll as such, I will forgo any extensive
examples here.
Asio
It would seem the story ends here. Epoll gives you what you need. Why use
anything else? While using epoll
or kqueue
directly is simple enough,
it is:
- not portable.
epoll
is Linux-specific,kqueue
is BSD-specific. - boilerplate-heavy. Managing a single timer requires more calls than you might expect, using primitive POSIX calls, etc.
- a bare-bones C API. C is great for some things. Not so much for others.
Various libraries are available to address the first two points: portability,
and convenience. Some of them are written in C
, with bindings for other
languages. Other languages offer modules for asynchronous programming as part
of their standard library6.
The C++
standard library currently has no APIs for asynchronous input-output
(or for networking in general, for that matter). Instead you’ll either have
to use the likes of epoll
directly or hunt for some library. You could use
one of these C libraries. But you would soon find yourself writing wrappers
everywhere to C++-ify the API. You will want RAII
, constructors,
std::chrono
-compatible interfaces, generic programming to maximize code reuse,
smart pointers, and so on. Macros, const-correctness, C-style casts, void
pointer manipulation, etc. may cause some compilation pain or will need to be
"quarantined" behind an isolated adaptor layer lest it ‘infects’ the entire
C++ codebase. And so you will find yourself building another library on top of
your dependency.
But luckily there is no need for that. Luckily, there is already a
well-established battle-tested C++
library for asynchronous network
programming: asio
7. This wraps the likes of OS APIs like epoll
(depending on the platform) and gives you a higher-level, C++
-native API.
Brief Introduction to asio
A minimal example of asio
is given below.
For simplicity assume we have a server talking to clients over TCP
.
Whenever a TCP
connection is accepted, the server creates a connection object
that encapsulates various state variables, timers for application-layer
timeouts etc. Assume the server expects to receive heartbeats at the
application layer for liveness. If not received within a certain interval,
the server tears down the connection object and throws it away.
Below is a simple demonstrative code example8. We create N
connection
objects which contain a timer to check for incoming heartbeats. We send a
certain number of heartbeats (simulating their arrival from the network).
After a short time we stop sending heartbeats, which the connection objects
detect and signal. The objects are discarded.
We also use sigc::signals
9, often found in this sort of event-driven
code, to propagate events (signals) to outer handlers, using
the observer pattern10.
#include <boost/asio.hpp>
#include <boost/asio/posix/basic_stream_descriptor.hpp>
#include <sigc++/signal.h>
// NOTE: asio can be easily used standalone without boost as well.
namespace asio = boost::asio;
using namespace std::chrono_literals;
class connection {
public:
connection(asio::io_context &ioctx) : m_timer(ioctx) {}
void start() { restart_heartbeat_monitor(); }
void close() { m_timer.cancel(); }
virtual ~connection() { std::cerr << "connection destroyed\n"; }
void restart_heartbeat_monitor() {
// Any pending async waits will be canceled.
m_timer.expires_after(m_HEARTBEAT_INTERVAL);
// call handler on timeout or cancellation
m_timer.async_wait([this](std::error_code ec) {
if (ec) {
m_sig_error.emit(ec.message());
return;
}
on_heartbeat_timeout();
});
};
void on_heartbeat_received() {
std::cerr << "Heartbeat received, restarting monitor." << std::endl;
restart_heartbeat_monitor();
}
void on_heartbeat_timeout() {
std::cerr << "heartbeat timeout!!\n";
close();
}
static auto heartbeat_interval() { return m_HEARTBEAT_INTERVAL; }
private:
static inline constexpr std::chrono::milliseconds m_HEARTBEAT_INTERVAL {
100};
boost::asio::high_resolution_timer m_timer;
sigc::signal<void(const std::string &)> m_sig_error;
};
int main(int, const char **) {
using connection_id_t = std::uint32_t;
std::map<connection_id_t, std::shared_ptr<connection>> live_conns;
std::uint32_t next_connection_id = 0;
constexpr std::size_t NUM_CONNS = 5;
unsigned NUM_HEARTBEATS = 5;
asio::io_context ioctx;
asio::high_resolution_timer heartbeat_pump_timer {ioctx};
const std::function<void(void)> wait_send_next_heartbeat =
[&live_conns,
&timer = heartbeat_pump_timer,
&wait_send_next_heartbeat,
&NUM_HEARTBEATS] {
// we only send NUM_HEARTBEATS; after that we make
// a close on the connection and throw it away.
std::cerr << "num heartbeats is " << NUM_HEARTBEATS << std::endl;
if (NUM_HEARTBEATS-- <= 0) {
std::cerr << "clearing connections\n";
// Here, clear only HALF the connections; leaving some
// alone.
const auto sz = live_conns.size();
for (unsigned i = 0; i < sz / 2; ++i) {
live_conns.erase(live_conns.begin());
}
return;
}
timer.expires_after(connection::heartbeat_interval() / 2);
timer.async_wait(
[&live_conns, &wait_send_next_heartbeat](std::error_code ec) {
if (ec.value() == asio::error::operation_aborted) {
std::cerr << "wait_send_next_heartbeat operation aborted\n";
return;
}
for (const auto &[connid, conn] : live_conns) {
std::cerr << "sending Heartbeat\n";
conn->on_heartbeat_received();
}
wait_send_next_heartbeat();
});
};
for (std::size_t i = 0; i < NUM_CONNS; ++i) {
const auto id = ++next_connection_id;
auto conn = std::make_shared<connection>(ioctx);
live_conns[id] = std::move(conn);
}
std::for_each(live_conns.begin(), live_conns.end(), [](auto &elem) {
elem.second->start();
});
wait_send_next_heartbeat();
ioctx.run_for(1s);
std::cerr << "NUM nondestructed live_conns: " << live_conns.size()
<< std::endl;
return 0;
}
While this example is contrived, a few things should nevertheless be apparent:
- in addition to portability, the code is much more convenient than dealing
with e.g.
epoll
directly. One can usestd::chrono
,c++
lambdas, etc. - the code is typically callback-driven. It must be structured in such a way
that handlers are registered to run in response to events. In particular
you schedule handlers to react to event notifications (reactor-style,
invoke callback when
fd
readable) or to completions of actions (proactor pattern, e.g. invoke handler when bytes have been read into a buffer from a readable socket). - asio objects are typically embedded in user objects and they get constructed
with a reference to an
asio::io_context
(which wraps a scheduler i.e. all the glue to interface with the likes of epoll)
I think it would generally be safe to say that once you get the hang of it,
it's a relatively nice productive interface to work with from C++
,
generally preferable to the alternative of using C
APIs.
There are also some things that will very likely be less obvious: namely, the example contains at least 2 use-after-free memory violations, and the potential for more.
Asio Lifetime Dependencies and Use-After-Free Cases
Asio unfortunately implicitly assumes certain contracts related to the lifetime of objects -- that are not necessarily obvious and often undocumented.
The first problem occurs here:
m_timer.async_wait([this](std::error_code ec) {
if (ec) {
m_sig_error.emit(ec.message());
return;
}
on_heartbeat_timeout();
});
This callback will be called with a non-zero error code ec
on error or
cancellation (error::operation_aborted
). However, it may be invoked after
the connection object (and its m_timer
member) have been destructed. If
the handler was scheduled for execution, and cancel()
is called, asio will
still invoke the handler (unless it can be removed from the scheduler queue)
with the operation aborted
error code. In the current example, because
we erase some connection objects from the live_conns
map, removing the
only std::shared_ptr
keeping them alive, those objects are destructed. When
the lambda runs, it emits a sigc
signal (this->m_sig_error.emit()
), thus
using a member (accessing memory) of an object that has been destructed.
Use after Free.
We could do this instead:
m_timer.async_wait([this](std::error_code ec) {
if (ec) {
if (ec == asio::error::operation_aborted){
return;
}
m_sig_error.emit(ec.message());
return;
}
on_heartbeat_timeout();
});
This does not commit use-after-free — at least in my current setup
with the current asio version I’m using for the example8.
The lambda gets invoked and returns if the operation aborted
error code
is set. No members of the destructed object are touched, no invalid memory
reads occur. This is even though the asio::high_resolution_timer
m_timer
member object, which the handler was scheduled through, has been destructed.
Valgrind
and sanitizers
do not flag this as a violation, and a GDB
trace appears to suggest the handler is decoupled from the asio
object
that it was scheduled through (asio::high_resolution_timer
) once enqueued
into asio
’s scheduler task queue inside the asio::io_context
. So no
use-after-free happens inside the asio machinery itself. However, if this
is not already undefined behavior within the context of asio’s API,
it may be hovering dangerously close to it.
We then have another problem. The asio objects (e.g. high_resolution_timer
),
which, as mentioned, get constructed with a reference to the asio::io_context
,
call back into the context in their destructor. In other words, it is
assumed all asio objects are outlived by the context. If this is not the case …
well, there’s probably going to be a crash. Above, we arrange for the
io_context
to go out of scope before some of the connection objects get
destructed, by keeping the connection objects in std::shared_ptr
s. When
these finally go out of scope, they’ll be committing use-after-free in their
destructor by calling into the asio::io_context
. This situation may be rare,
but if objects get stored in a buffer or queue or get passed between threads,
out-of-order destruction like this can well occur in practice.
Lifetime Dependencies in General
The ‘asio idiom’ for solving the use-after-free situations above is to have
any class that embeds an asio object such as asio::high_resolution_timer
inherit from std::enable_shared_from_this
and be constructed as
std::shared_ptr
s. The objects, when registering a lambda-based handler, will
then capture shared_from_this()
by value in the lambda. This ensures the
object remains alive at least until the lambda returns. Therefore
use-after-free is precluded inside the lambda since when it gets invoked
the object is guaranteed to be alive:
m_timer.async_wait([this, self=shared_from_this()](std::error_code ec) {
if (ec) {
m_sig_error.emit(ec.message());
return;
}
on_heartbeat_timeout();
});
To address the other use-after-free instance, the io_context
can be stored as
a std::shared_ptr
member (m_ioctx
) inside the connection class, ensuring
a connection object does not outlive its associated io_context
.
While these appear to solve the problem, we now have another one. When
m_sig_error.emit()
runs, invoking all the signal handlers (callbacks)
connected to it by ‘observers’, we have fresh opportunities for use-after-free.
Consider the following case (assume the connection object has been updated for
the example).
struct connection_wrapper{
connection_wrapper(std::shared_ptr<asio::io_context> ioctx){
m_conn = std::make_shared<connection>(std::move(ioctx));
m_conn.signal_error().connect([this](const auto &e){
m_sig_error.emit(e);
}
}
private:
std::shared_ptr<connection> m_conn;
sigc::signal<void(const std::string &)> m_sig_error;
};
A wrapper object here stores a connection object, connects to its error
sigc::signal
, and propagates it outward. The code looks benign. Yet,
because m_conn
is a std::shared_ptr
, its lifetime is now decoupled
from the lifetime of the outer, parent object (connection_wrapper
).
Because m_conn
could’ve scheduled some callbacks via its
asio::high_resolution_timer
, it could be that a connection_wrapper
object
gets destructed before m_conn
, due to m_conn
being kept alive by the
shared_from_this()
captured in the lambda passed to asio for the handler!
So now we need to prevent the signal handler connected to the error signal
of m_conn
from calling into the connection_wrapper
object if the
connection_wrapper
object has been destructed. But how? We have to make the
connection_wrapper
itself a std::shared_ptr
, and capture a std::weak_ptr
inside the signal handler, lock it, and if expired, return early.
We can see here the lifetime issue is in fact not asio-specific, but in fact general to asynchronous, event-driven, callback-heavy code.
It becomes difficult to destruct objects while ensuring use-after-free
violations do not occur. Worse, the pendulum can swing the other way: if in
one’s eagerness to prevent use-after-free one were to use a std::shared_ptr
where a std::weak_ptr
should be used,a memory leak will occur instead.
Consider the following:
struct connection_wrapper: public std::enable_shared_from_this<connection_wrapper>{
connection_wrapper(std::shared_ptr<asio::io_context> ioctx){
m_conn = std::make_shared<connection>(std::move(ioctx));
m_conn.signal_error().connect([this, self=shared_from_this()](const auto &e){
m_sig_error.emit(e);
};
}
private:
std::shared_ptr<connection> m_conn;
sigc::signal<void(const std::string &)> m_sig_error;
};
Here, the connection_wrapper
will be kept alive inside the signal handler
connected to the connection object’s sigc::signal
(m_sig_error
) due to
being captured as a std::shared_ptr
in a lambda. Because the connection
object is also stored as a std::shared_ptr
inside the connection_wrapper
object, neither will ever be destructed, producing a memory leak.
std::weak_ptr
is meant for this type of scenario, breaking the circular
dependency.
Correct handling therefore requires care and due consideration to the lifetime
of the objects involved, and how the use of std::shared_ptr
may further
complicate matters due to decoupling the lifetime of member objects from the
lifetime of parent objects.
One can do this manually everywhere, but we might be able to do a tiny bit better.
Lifetime Utils
To avoid the potentially error-prone approach described above, the risk can be minimized by abstracting these measures into a small suite of utilities and applying the pattern obviously and consistently.11
The first observation is that decoupling the lifetime of a member object from the lifetime of the parent may not be a great idea, especially where the parent registers callbacks with such internal objects. If there are many such members, it soon becomes confusing. In fact, given a hierarchy of such objects where signals are emitted and propagated outward by outer objects, if the member objects are decoupled from the lifetime of their parents then every single layer must be guarded against use-after-free. This adds overhead in execution time, memory, and readability, and is more error prone.
struct foo{
sigc::signal<void(void)> signal;
};
struct bar {
bar(){
m_foo = std::make_shared<foo>();
// m_foo may be destructed after *this* here;
// use-after-free daneger;
// must guard with std::weak_ptr.
m_foo->signal.connect([this]{
signal.emit();
});
}
std::shared_ptr<foo> m_foo;
sigc::signal<void(void)> signal;
};
struct foobar{
foobar(){
m_bar = std::make_shared<bar>();
// m_bar may be destructed after *this* here;
// use-after-free danger;
// must guard with std::weak_ptr.
m_bar->signal.connect([this]{
on_signal();
});
}
void on_signal(){}
std::shared_ptr<bar> m_bar;
};
...
auto &foo = get_foo_from_somewhere();
// calls the bar (which may have been destructed) handler,
// which calls the foobar (which may have been destructed) handler.
foo.signal.emit();
Second, because each such object must store the asio::io_context
as a
std::shared_ptr
to avoid one of the use-after-free problems originally
covered (and possibly other ‘context’ it depends on), there is also added
overhead per each such object.
Instead, we may proceed as follows:
- identify the outermost parent object that contains all these asynchronous
asio
objects (or other types of similar ‘async’ objects). - make this object inherit
std::from enable_shared_from_this
. - keep all the async members (e.g. connection) that may call back into the parent, as value members (for lack of a better term), as opposed to members that are references, pointers, or smart pointers. Similarly and recursively, connection (and all such classes having asio object members) will store asio objects or other such async objects as value members.
- with this, no nested object may outlive the outermost parent object. In other words, their lifetime is bound to the lifetime of this outermost object. Let’s call this outermost object the lifetime anchor.
- the lifetime anchor must inherit from
std::enable_shared_from_this
and passstd::weak_from_this()
to all the nested objects that are asynchronous. Let’s call this weak lifetime. - any
sigc::signal
(or similar) callbacks (e.g. perhaps called from independent objects rather than member objects i.e. objects whose lifetime is not bound to the lifetime anchor), must guard their calls by locking theweak_lifetime
. If expired, return early. - any handlers scheduled via asio objects, must lock the weak pointer BEFORE the handler gets registered, turning the weak lifetime into a strong lifetime. The lambda captures the strong-lifetime by value. This ensures the outermost object (the lifetime anchor) as well as all its nested objects are alive when the handler runs.
- store required
shared_ptr
dependencies such asstd::shared_ptr<asio::io_context>
in the lifetime anchor only, thus avoiding repeated overhead (beyond storing theweak_lifetime
std::weak_ptr
).
The above has been abstracted into a single-header set of utilities
that reduces boilerplate and provides meaningful names so that the pattern is
obvious at the point of application. The ‘outermost class’ is to inherit from a
lifetime_anchor
class (which inherits from std::enable_shared_from_this
).
This provides various members such as guard_lifetime()
and bind_lifetime()
.
These also have corresponding free-function versions that take weak_lifetime
as a parameter.
guard_lifetime()
guard_lifetime(<callback>)
wraps its callback argument into another lambda
that takes care of capturing the weak_lifetime
(weak pointer to the
lifetime_anchor
i.e. the outermost object whose lifetime all the nested
objects’ lifetimes are bound to) and only invoking the callback if the weak
pointer can be locked. In fact, there is an additional layer here: the lifetime
also has a method to check for logical validity. So only if the weak_ptr
to the lifetime can be locked (physical lifetime validity), AND the lifetime
is ‘valid’, does the lambda get invoked.
Logical validity is based on keeping an internal ‘version’ counter. Callbacks will be associated with a version value at the time of creation. At the time of invocation, the callback’s version value is checked against the current version of the lifetime. Additionally, one specific value of the version counter is used as a flag, indicating permanent lifetime invalidation. Once permanently invalidated, the version never changes and the lifetime is logically ended, permanently. Otherwise the lifetime can increment its version every time it wants to prevent previously scheduled callbacks from executing. The version idea is somewhat similar to version vectors in distributed systems to detect read-write conflicts.
Therefore, a given callback is only invoked if:
- the lifetime is still physically alive (weak pointer can be locked), and
- the lifetime version is not set to the permanently-invalid flag value, and
- the version associated with the callback is the same as the current version of the lifetime.
bind_lifetime()
bind_lifetime(<callback>)
does the same as guard_lifetime()
, but it
captures the lifetime anchor as a shared_ptr
(a strong lifetime). Therefore
this is the function to be used with asio objects where not only do we want
to guard against a callback being invoked if the lifetime is gone, but actually
guarantee the lifetime is not gone until the callback has been invoked.
The Lifetime Pattern Applied
The initial example is now rewritten using the lifetime utilities for demonstration.
#include "lifetime_utils.hpp"
#include <boost/asio.hpp>
#include <boost/asio/posix/basic_stream_descriptor.hpp>
#include <sigc++/signal.h>
using namespace std::chrono_literals;
namespace asio = boost::asio;
class connection {
public:
connection(asio::io_context &ioctx, weak_lifetime_t lifetime)
: m_lifetime(std::move(lifetime)), m_timer(ioctx) {}
void start() { restart_heartbeat_monitor(); }
void close() { m_timer.cancel(); }
virtual ~connection() { std::cerr << "connection destroyed\n"; }
void restart_heartbeat_monitor() {
// Any pending async waits will be canceled.
m_timer.expires_after(m_HEARTBEAT_INTERVAL);
const auto safe = make_lifetime_binder(m_lifetime);
// call handler on timeout or cancellation
m_timer.async_wait(safe([this](std::error_code ec) {
if (ec) {
if (ec.value() != asio::error::operation_aborted) {
m_sig_error.emit(ec.message());
}
return;
}
on_heartbeat_timeout();
}));
// NOTE: the above can also be written as:
// m_timer.async_wait(bind_lifetime(m_lifetime, <LAMBDA>));
};
void on_heartbeat_received() {
std::cerr << "Heartbeat received, restarting monitor." << std::endl;
restart_heartbeat_monitor();
}
void on_heartbeat_timeout() {
std::cerr << "heartbeat timeout!!\n";
close();
}
static auto heartbeat_interval() { return m_HEARTBEAT_INTERVAL; }
auto &signal_error() { return m_sig_error; }
private:
weak_lifetime_t m_lifetime;
static inline constexpr std::chrono::milliseconds m_HEARTBEAT_INTERVAL {
100};
boost::asio::high_resolution_timer m_timer;
sigc::signal<void(const std::string &)> m_sig_error;
};
class connection_wrapper : public lifetime_anchor_tu {
public:
connection_wrapper(std::shared_ptr<asio::io_context> ioctx)
: m_ioctx(std::move(ioctx)) {}
void construct() override {
m_conn.construct(*m_ioctx, weak_lifetime());
m_conn->signal_error().connect(guard_lifetime([this](const auto &e) {
std::cerr << "ERROR: " << e << std::endl;
m_sig_error.emit(e);
}));
}
auto &conn() { return *m_conn; }
private:
std::shared_ptr<asio::io_context> m_ioctx;
delayed_construction<connection> m_conn;
sigc::signal<void(const std::string &e)> m_sig_error;
};
int main(int, const char **) {
using connection_id_t = std::uint32_t;
std::map<connection_id_t, std::shared_ptr<connection_wrapper>> live_conns;
std::uint32_t next_connection_id = 0;
constexpr std::size_t NUM_CONNS = 2;
unsigned NUM_HEARTBEATS = 5;
auto ioctx = std::make_shared<asio::io_context>();
asio::high_resolution_timer heartbeat_pump_timer {*ioctx};
const std::function<void(void)> wait_send_next_heartbeat =
[&live_conns,
&timer = heartbeat_pump_timer,
&wait_send_next_heartbeat,
&NUM_HEARTBEATS] {
// we only send NUM_HEARTBEATS; after that we make
// a close on the connection and throw it away.
std::cerr << "num heartbeats is " << NUM_HEARTBEATS << std::endl;
if (NUM_HEARTBEATS-- <= 0) {
std::cerr << "clearing connections\n";
// TODO: here, clear only HALF the connections; leaving some
// alone.
const auto sz = live_conns.size();
for (unsigned i = 0; i < sz / 2; ++i) {
std::cerr << "invalidating current token!\n";
live_conns.begin()->second->set_end_of_life();
live_conns.erase(live_conns.begin());
}
return;
}
timer.expires_after(connection::heartbeat_interval() / 2);
timer.async_wait(
[&live_conns, &wait_send_next_heartbeat](std::error_code ec) {
if (ec.value() == asio::error::operation_aborted) {
std::cerr << "wait_send_next_heartbeat operation aborted\n";
return;
}
for (const auto &[connid, wrapper] : live_conns) {
std::cerr << "sending Heartbeat\n";
wrapper->conn().on_heartbeat_received();
}
wait_send_next_heartbeat();
});
};
for (std::size_t i = 0; i < NUM_CONNS; ++i) {
const auto id = ++next_connection_id;
auto conn = std::make_shared<connection_wrapper>(ioctx);
conn->construct();
live_conns[id] = std::move(conn);
}
std::for_each(live_conns.begin(), live_conns.end(), [](auto &elem) {
elem.second->conn().start();
});
wait_send_next_heartbeat();
ioctx->run_for(1s);
std::cerr << "NUM nondestructed live_conns: " << live_conns.size()
<< std::endl;
return 0;
}
There are a number of things to notice:
- the lifetime anchor inherits from the
lifetime_anchor
base class and by extension fromstd::enable_shared_from_this
and must be constructed as ashared pointer
. - the lifetime propagates itself as a
weak_lifetime
to all nested objects that require it. - because the
weak_lifetime
must be a valid, non-expired,std::weak_ptr
and becauseshared_from_this()
will not produce such aweak_ptr
from within the constructor of the lifetime class – because it has not finished constructing yet! – those nested objects that must be handed theweak_lifetime
in their constructor must have their construction delayed. This is done in the example code via thedelayed_construction
wrapper, which constructs its template type parameter as aunique_ptr
. In general, we may want to put all such members in a struct and construct the struct as aunique_ptr
to achieve the two-stepped construction necessary. - the code, in practice, consists of wrapping our callbacks in another lambda that captures what is needed and makes the necessary checks to make invoking our lambda safe. There are few ways of expressing this, as shown in the example. Generally, prefer the one which looks the least intrusive i.e. introduces the least amount of noise.
Key Takeaways
- Asio assumes specific lifetime guarantees — violating them leads to subtle bugs or UB.
- Use
shared_from_this()
only when truly necessary; avoidshared_ptr
proliferation. - misuse of
shared_ptr
can produce memory leaks std::shared_ptr
inherently decouples lifetimes, which complicates matters.- Lifetime coupling can be made explicit using a lifetime anchor pattern, and use-after-free bugs can be prevented.
- the lifetime pattern can, for convenience, include a version similar in spirit to a simple cancellation token to allow logical invalidation of callbacks