无锁队列的实现
无锁队列的实现原理一般是利用 Retry-loop 和 CAS 等原子操作。
现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。
例如 CAS(Compare And Swap)的实现原理:
bool compare_and_swap (int *addr, int oldval, int newval) { if ( *addr != oldval ) { return false; } *addr = newval; return true; }
与CAS相似的还有下面的原子操作:
1、Fetch And Add,一般用来对变量做+1的原子操作; 2、Test And Set,写值到某个内存位置并传回其旧值; 3、Test And Test-and-set; GCC 的CAS实现版本如下:bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval, ...)
可参考http://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html
C++11 的CAS实现版本如下:template<class T> bool atomic_compare_exchange_weak(atomic<T>* obj, T* expected, T desired);
看一个无锁队列的实现例子:
class QueueWithoutLock { public: QueueWithoutLock() { dummy = new ListNode(-1); head = tail = dummy; } ~QueueWithoutLock() {} bool Push(uint64_t val) { auto* node = new ListNode(val); ListNode* oldTail = nullptr; do { oldTail = tail; // 如果其它线程 push 成功,这里 tail 可能发生变化,所以重新取值一下 } while (!__sync_bool_compare_and_swap(&(oldTail->next), nullptr, node)); // 更新 tail 到新节点 // 这里没有判断成功,是因为走到这里,表示上面成功 push 了,在更新 tail 之前其 next 不再是 null,其它线程的 push 操作都会失败 __sync_bool_compare_and_swap(&tail, oldTail, node); return true; } ListNode* Pop() { ListNode *p; do { p = head; if (p->next == NULL) { // queue is empty return nullptr; } } while (!__sync_bool_compare_and_swap(&head, p, p->next)); // 如果 head != p(说明其它线程修改了 head),循环继续 return p->next; } private: ListNode* dummy; ListNode* head; ListNode* tail; };
这里有一个潜在的问题——如果某个线程在用 CAS 更新 tail 指针之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的Push()
bool Push(uint64_t val) { auto* node = new ListNode(val); ListNode* oldTail = tail; // 取链表尾指针的快照 ListNode* p = tail; do { while (p->next) p = p->next; // 每次循环里面重新 fetch 尾指针,不依赖 tail 成员 } while (!__sync_bool_compare_and_swap(&(p->next), nullptr, node)); // 重置 tail 指针 __sync_bool_compare_and_swap(&tail, oldTail, node); return true; }
CAS 导致的 ABA 问题,
有2个线程 a、b,
- 线程 a 从共享的地址X中读取到了对象A。
- 在线程 a 准备对地址X进行更新之前,线程b将地址X中的值修改为了B。
- 接着线程 b 将地址X中的值又修改回了A。
- 最新线程 a 对地址X执行 CAS,发现X中存储的还是对象A,对象匹配,CAS成功。
在有 GC环境的编程语言比如说java中,2,3的情况是不可能出现的,因为在java中,只要两个对象的地址一致,就表示这两个对象是相等的。
但在 C++ 中,可以自己控制对象的生命周期,如果我们从一个 list 中删除掉了一个对象,然后又重新分配了一个对象,并将其add back到 list 中去,那么根据 MRU memory allocation算法,这个新的对象很有可能和之前删除对象的内存地址是一样的。这样就会导致ABA的问题。
解决办法:
1、使用 风险指针(hazard pointer),线程a 读出对象A的同时,将对象A的指针放入到自己的 hp 链表中,这样可以保证这个对象不会被删除;
2、使用 read-copy update (RCU) ,在每次更新的之前,都做一份拷贝,每次更新的是拷贝出来的新结构。
标签:无锁,ListNode,CAS,Lock,Free,next,tail,线程,bool From: https://www.cnblogs.com/chenny7/p/16791988.html