无锁队列通过原子操作实现线程安全的并发访问,使用 std::atomic 管理 head 和 tail 指针,结合内存序控制与虚拟头节点简化边界处理,在高并发下需解决 ABA 问题与内存回收难题。

实现一个无锁队列(lock-free queue)需要利用 C++ 的原子操作(atomic operations)来避免使用互斥锁,从而提升多线程环境下的性能。核心思想是通过原子指针操作和内存顺序控制,确保多个线程可以安全地并发执行入队(enqueue)和出队(dequeue)操作。
基本原理:基于链表的无锁队列
最常见的无锁队列实现是基于单向链表的结构,使用两个原子指针:head 和 tail,分别指向队列的头和尾。每个节点包含数据和指向下一个节点的指针。
关键点在于所有对指针的操作都必须是原子的,并且要正确处理 ABA 问题和内存重排序。
以下是一个简化但可运行的无锁队列实现:
#include #include templateclass LockFreeQueue {private: struct Node { std::shared_ptr data; std::atomic next; Node() : data(nullptr), next(nullptr) {} Node(const T& d) : data(std::make_shared(d)), next(nullptr) {} }; std::atomic head; std::atomic tail; // 辅助函数:尝试释放已出队的节点 void free_if_need(Node* old_head) { if (old_head) { delete old_head; } }public: LockFreeQueue() { Node* dummy = new Node(); head.store(dummy, std::memory_order_relaxed); tail.store(dummy, std::memory_order_relaxed); } ~LockFreeQueue() { while (Node* h = head.load()) { head.store(h->next.load()); delete h; } } void enqueue(const T& data) { Node* new_node = new Node(data); Node* old_tail = nullptr; Node* next = nullptr; while (true) { old_tail = tail.load(std::memory_order_acquire); next = old_tail->next.load(std::memory_order_acquire); // 检查 tail 是否滞后 if (old_tail != tail.load(std::memory_order_acquire)) { continue; // 重新尝试 } // 如果 tail 没有指向最后一个节点,尝试推进 tail if (next != nullptr) { tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_acquire); continue; } // 尝试将新节点链接到 tail 后 if (old_tail->next.compare_exchange_weak(next, new_node, std::memory_order_release, std::memory_order_acquire)) { break; // 成功链接 } } // 尝试更新 tail 指向新节点 tail.compare_exchange_weak(old_tail, new_node, std::memory_order_release, std::memory_order_acquire); } std::shared_ptr dequeue() { Node* old_head = nullptr; while (true) { old_head = head.load(std::memory_order_acquire); Node* old_tail = tail.load(std::memory_order_acquire); Node* next = old_head->next.load(std::memory_order_acquire); // 判断队列是否为空 if (old_head == old_tail) { if (next == nullptr) { return std::shared_ptr(); // 队列空 } // tail 滞后,尝试推进 tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_acquire); continue; } // 读取数据并尝试移动 head if (head.compare_exchange_weak(old_head, next, std::memory_order_release, std::memory_order_acquire)) { std::shared_ptr res = next->data; // 延迟释放 old_head(注意:生产环境应使用 hazard pointer 或 RCU) free_if_need(old_head); return res; } } }};
关键点解析
1. 原子指针操作: 使用 `std::atomic` 来保证 head 和 tail 的读写是原子的,防止数据竞争。
立即学习“C++免费学习笔记(深入)”;
2. 内存顺序(memory order):
memory_order_acquire 用于读操作,确保后续读写不会被重排到该操作之前。memory_order_release 用于写操作,确保前面的读写不会被重排到该操作之后。compare_exchange_weak 在循环中使用,允许失败后重试。
3. 虚拟头节点(dummy node): 构造时创建一个空节点作为初始 head 和 tail,简化边界判断。
4. tail 滞后处理: 其他线程可能在修改 next,但还没更新 tail,此时需要帮助完成 tail 推进。
存在的问题与改进方向
上述实现虽然能工作,但在高并发下仍可能遇到问题:
ABA 问题: 虽然指针值没变,但中间可能已被释放并重用。可通过双字 CAS(如 __int128 包装指针+计数器)解决。内存回收困难: 直接 delete 可能导致其他线程访问已释放内存。应使用 Hazard Pointer、RCU 或 epoch-based reclamation。性能: 多线程竞争 tail 时可能频繁失败。可考虑 Michael-Scott 队列的优化版本。
使用示例
“`cpp#include #include #include
int main() {LockFreeQueue queue;
std::vector producers;std::vector consumers;for (int i = 0; i < 2; ++i) { producers.emplace_back([&queue, i] { for (int j = 0; j < 1000; ++j) { queue.enqueue(i * 1000 + j); } });}for (int i = 0; i < 2; ++i) { consumers.emplace_back([&queue] { for (int j = 0; j < 1000; ++j) { auto val = queue.dequeue(); if (val) { std::cout << "Dequeued: " << *val << std::endl; } } });}for (auto& t : producers) t.join();for (auto& t : consumers) t.join();return 0;
}
基本上就这些。无锁队列实现复杂,调试困难,建议在真正需要极致性能时才使用,否则优先考虑互斥锁 + 条件变量的方案。
以上就是c++++怎么实现一个无锁队列(lock-free queue)_c++原子操作实现无锁并发结构的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1481400.html
微信扫一扫
支付宝扫一扫