跳到主要内容
左猫娘右猫娘

【C++】协程与Lambda小细节

· 阅读需 11 分钟
Heng_Xin
ここから先は一方通行だ!

今天在完善 iocp 与 io_uring 协程api的统一, 主要是超时取消这块.

Tip

win 你是真的np, 设计的什么破api这么难用... 真难对接...

这我需要从盘古开天辟地时候说起...

Tip

2025年7月6日22:11:07注: 本文内容有误: iocp不能通过本文的方法取消; (原因见: iocp不应该使用PostQueuedCompletionStatus来取消)

但是协程小细节是真的!

1. io_uring API

咱们直接看 io_uring 的接口设计: (完整代码)

auto res = co_await AioTask::linkTimeout(
ioUring.makeAioTask().prepRead(STDIN_FILENO, buf, 0),
ioUring.makeAioTask().prepLinkTimeout(&kt, 0)
);

if (res.index() == 1) {
print::println("时间到了哦~");
break;
}
cpp

肥肠简单, 就是尝试读取 STDIN_FILENO (标准控制台输入流), 然后定时 kt 时间.

接口设计一目了然:

struct AioTask {
// ...

/**
* @brief 异步读取文件
* @param fd 文件描述符
* @param buf [out] 读取到的数据
* @param offset 文件偏移量
* @return AioTask&&
*/
[[nodiscard]] AioTask&& prepRead(
int fd,
std::span<char> buf,
std::uint64_t offset
) && {
::io_uring_prep_read(_sqe, fd, buf.data(), static_cast<unsigned int>(buf.size()), offset);
return std::move(*this);
}

/**
* @brief 创建未链接的超时操作
* @param ts 超时时间
* @param flags
* @return AioTask&&
*/
[[nodiscard]] AioTask&& prepLinkTimeout(
struct __kernel_timespec *ts,
unsigned int flags
) && {
::io_uring_prep_link_timeout(_sqe, ts, flags);
return std::move(*this);
}

[[nodiscard]] inline static auto linkTimeout(AioTask&& lhs, AioTask&& rhs) {
lhs._sqe->flags |= IOSQE_IO_LINK;
return whenAny(std::move(lhs), std::move(rhs));
}
};
cpp

硬要说不ok的, 实际上就是 linkTimeout 的命名吧? lhsrhs 是不同的东西...

但是 windousu...

2. iocp API

它的 API, 首先是同步的! 或者说, 并没有所谓的暂停/等待x时间后取消的api, 而是仅提供了一个 PostQueuedCompletionStatus 破烂!

它的作用实际上是向目标完成端口发送一个数据, 导致它可以返回被 GetQueuedCompletionStatusEx, 并不是严格意义上的取消!

/*
BOOL PostQueuedCompletionStatus(
HANDLE CompletionPort, // 目标完成端口的句柄
DWORD dwNumberOfBytesTransferred, // 自定义的字节数, 可用于传递信息
ULONG_PTR dwCompletionKey, // 自定义的完成键, 可用于区分不同的操作或I/O源
LPOVERLAPPED lpOverlapped // OVERLAPPED结构的指针
// 注: GetQueuedCompletionStatusEx 拿到的是 此处的 OVERLAPPED
// 但是之前的 OVERLAPPED 也会被 iocp 取出! 因此我们需要自己标记一下 ... 写个状态机 ...
);
*/
bool ok = ::PostQueuedCompletionStatus(
_self->_iocpHandle,
0,
static_cast<ULONG_PTR>(State::Cancel),
static_cast<::OVERLAPPED*>(_self.get())
);
if (!ok) [[unlikely]] {
throw std::runtime_error{"PostQueuedCompletionStatus ERROR: "
+ std::to_string(::GetLastError())};
}
cpp

完全不像 io_uring 一样, 通过链接进行定时关联, 然后内核帮你搞定内部状态 (如果定时到了, 那么定时任务和关联任务都会被接收到, 但是关联任务会被设置为 已取消); 而且 win 需要我们用户自己区分 (甚至这都不一定是PostQueuedCompletionStatus的原意期望用法, win架构师知道这些怎么用吗?? 知道怎么还不改?? 这么难用...)

总而言之, 我需要重构, 以便跨平台, 以尽可能的保证 API 一致性.

最终, 仅需要这样调用:

因为 iocp 没有内核提供定时, 所以我需要手撕一个 红黑树协程定时器 ...

然而因为 iocp api 还需要对应的 iocp 句柄才可以知道是取消誰, 因此我们还需要获取到对应的协程句柄, 但是不能是传参, 只能是方法内部移形换影, 加上我自以为是写的一些 && & 的东西, 以及生命周期的考虑, 就相对写得比较复杂:

struct AioTask : public ::OVERLAPPED {
struct _AioTimeoutTask {
_AioTimeoutTask(AioTask&& self, TimerLoop::TimerAwaiter&& timerTask)
: _self{std::make_unique<AioTask>(std::move(self))}
, _timerTask{std::move(timerTask)}
{}

_AioTimeoutTask(_AioTimeoutTask&&) = default;
_AioTimeoutTask& operator=(_AioTimeoutTask&&) noexcept = default;

Task<> co() {
co_await _timerTask;
/*
BOOL PostQueuedCompletionStatus(
HANDLE CompletionPort, // 目标完成端口的句柄
DWORD dwNumberOfBytesTransferred, // 自定义的字节数, 可用于传递信息
ULONG_PTR dwCompletionKey, // 自定义的完成键, 可用于区分不同的操作或I/O源
LPOVERLAPPED lpOverlapped // OVERLAPPED结构的指针
// 注: GetQueuedCompletionStatusEx 拿到的是 此处的 OVERLAPPED
// 但是之前的 OVERLAPPED 也会被 iocp 取出! 因此我们需要自己标记一下 ... 写个状态机 ...
);
*/
bool ok = ::PostQueuedCompletionStatus(
_self->_iocpHandle,
0,
static_cast<ULONG_PTR>(State::Cancel),
static_cast<::OVERLAPPED*>(_self.get())
);
if (!ok) [[unlikely]] {
throw std::runtime_error{"PostQueuedCompletionStatus ERROR: "
+ std::to_string(::GetLastError())};
}
co_return;
}
private:
friend AioTask;
std::unique_ptr<AioTask> _self; // 不能使用 AioTask, 因为此时 AioTask 还不完整,
// 但是可以使用 std::unique_ptr<AioTask>; 奇怪吧?
// 原因是 unique_ptr 内部是使用指针的, 所以没问题!
TimerLoop::TimerAwaiter _timerTask;
};

/**
* @brief 异步读取文件
* @param fd 文件句柄
* @param buf [out] 读取到的数据
* @param offset 文件偏移量
* @return AioTask&&
*/
[[nodiscard]] AioTask&& prepRead(
HANDLE fd,
std::span<char> buf,
std::uint64_t offset
) && {
// ::io_uring_prep_read(_sqe, fd, buf.data(), static_cast<unsigned int>(buf.size()), offset);
/*
BOOL ReadFile(
HANDLE hFile, // 文件句柄(可为文件、管道、串口、Socket 等)
LPVOID lpBuffer, // 数据读入的缓冲区指针(你准备好的内存)
DWORD nNumberOfBytesToRead,// 想要读取的字节数
LPDWORD lpNumberOfBytesRead, // 实际读取的字节数(同步时非 NULL, 异步时设为 NULL)
LPOVERLAPPED lpOverlapped // OVERLAPPED 结构指针(异步时必填, 同步时为 NULL)
);
*/
/*
typedef struct _OVERLAPPED {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
struct {
DWORD Offset; // 低 32 位文件偏移量
DWORD OffsetHigh; // 高 32 位文件偏移量
// 两个组成 64 为的偏移量, 破win为了远古兼容, 就这样搞了...
};
PVOID Pointer;
};
HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;
*/
associateHandle(fd);
// 设置偏移量
Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
OffsetHigh = static_cast<DWORD>((offset >> 32) & 0xFFFFFFFF);
bool ok = ::ReadFile(
fd,
buf.data(),
static_cast<DWORD>(buf.size()),
nullptr,
static_cast<::OVERLAPPED*>(this)
);
if (!ok && ::GetLastError() != ERROR_IO_PENDING) [[unlikely]] {
throw std::runtime_error{"ReadFile ERROR: " + std::to_string(::GetLastError())};
}
return std::move(*this);
}

/**
* @brief 创建未链接的超时操作
* @param ts 超时时间
* @param flags
* @return AioTask&&
*/
[[nodiscard]] _AioTimeoutTask prepLinkTimeout(
TimerLoop::TimerAwaiter&& timerTask
) && {
// ::io_uring_prep_link_timeout(_sqe, ts, flags);
return {std::move(*this), std::move(timerTask)};
}

[[nodiscard]] inline static auto linkTimeout(
AioTask&& task,
_AioTimeoutTask&& timeoutTask
) {
// 为什么不能是捕获? 难道是因为对于协程函数, 你的类捕获不会算入协程生命周期?!
// 对的对的, 就是这样! 这个 []()() 得到的是协程对象了, 仅有传参的会算入生命周期
// 但是捕获的话, 作为类的成员, 在 []() 时候是有效的
// 但是 []()() 是创建一个协程对象, 然后 return 了, 因此 [...]() 捕获的就析构了
// 所以会悬挂引用 (ub), 导致野指针 qwq...
#if 0
return [_task = std::move(task),
_timeoutTask = std::move(timeoutTask)]() mutable
-> Task<HX::AwaiterReturnValue<decltype(whenAny(std::move(task), timeoutTask.co()))>> {
_timeoutTask._self->_iocpHandle = _task._iocpHandle; // 出错
co_return co_await whenAny(std::move(_task), _timeoutTask.co());
}();
#else
return [](AioTask&& _task, _AioTimeoutTask&& _timeoutTask)
-> Task<HX::AwaiterReturnValue<decltype(whenAny(std::move(task), timeoutTask.co()))>> {
_timeoutTask._self->_iocpHandle = _task._iocpHandle;
co_return co_await whenAny(std::move(_task), _timeoutTask.co());
}(std::move(task), std::move(timeoutTask));
#endif
}
};
cpp

3. 协程的小细节

上面已经是完整的源码的, 也是本次问题出现的地方:

为什么不能是捕获?

  • 因为 []()() 得到的是协程对象了, 仅有传参的会算入生命周期
  • 但是捕获的话, 作为类的成员, 在 []() 时候是有效的
  • 但是 []()() 是创建一个协程对象, 然后 return 了, 因此 [...]() 捕获的就析构了
  • 所以会悬挂引用 (ub), 导致野指针 qwq...

Note

Coroutines (C++20) | cppreference 有描述:

copies all function parameters to the coroutine state: by-value parameters are moved or copied, by-reference parameters remain references (thus, may become dangling, if the coroutine is resumed after the lifetime of referred object ends — see below for examples).

将所有函数参数复制到协程状态: 按值参数被移动或复制, 按引用参数仍然是引用 (因此, 如果在引用对象的生命周期结束后恢复协程, 则可能会变得悬空 - 请参阅下面的示例).

void bad3() {
auto h = [i = 0]() -> coroutine { // 一个也是协程的 lambda
std::cout << i;
co_return;
}(); // 立即调用
// lambda被摧毁
h.resume(); // 使用 (匿名lambda类型)::i 在释放之后
h.destroy();
}
cpp

Tip

虽然文字没有明确说明这种情况, 只是说明了核心原因; 但是例子已经明确给出来了, 还是很明确的!

请作者喝奶茶:
Alipay IconQR Code
Alipay IconQR Code
本文遵循 CC CC 4.0 BY-SA 版权协议, 转载请标明出处
Loading Comments...