前言

队列(queue)是非常常用的一个数据结构,它只允许在表的前端(head)进行出队(dequeue)操作,而在表的后端(tail)进行入队(enqueue)操作。和栈数据结构一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾(tail),进行删除操作的端称为队头(header)。

在并发环境中使用队列,就必须考虑到多线程(多纤程)并发读写的问题,可能存在多个写(入队)操作线程,同时也可能存在多个线程读操作线程,在这种情况下,我们要保证数据的不丢失,不重复,而且也要保证队列的功能不变,也就是先入先出的逻辑,只要存在数据,就可以出列。

诚然,通过一个排外锁可以实现队列的并发访问。一般实现队列的时候通过指针,而且只在队头队尾操作,所以这种排外锁保护的临界区并没有很复杂的执行逻辑,临界区的处理很快,所以一般情况下通过排外锁实现队列的效率已经很高了。但是在一些情况下,通过实现 lock-free 算法,我们可以进一步提升并发队列的性能。

本文介绍 lock-free queue 算法的一些背景知识,并实现了三种并发队列,并提供了性能测试的结果。

代码库可以在github上找到: smallnest/queue。

lock-free queue 算法

说起 lock-free queue 算法,不得不提到 Maged M. Michael 和 Michael L. Scott 1996年发表的论文 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms,这篇文章回顾了并发队列的一些实现以及局限性,提出了一种非常简洁的lock-free queue的实现,并且还提供了一个在特定机器比如不存在CAS指令的机器上的two-lock queue算法。这篇文章的被引用次数将近1000次。

只得一提的是, Java中的ConcurrentLinkedQueue就是基于这个算法实现的:

This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

大部分lock-free的算法都是通过CAS操作实现的。

这篇文章提供了一个lock-free queue算法的伪代码,代码量也非常少,所以很容易通过各种编程语言实现。在这里我把伪代码列在这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
 structure node_t {value: data type, next: pointer_t}
 structure queue_t {Head: pointer_t, Tail: pointer_t}

 initialize(Q: pointer to queue_t)
    node = new_node()		// Allocate a free node
    node->next.ptr = NULL	// Make it the only node in the linked list
    Q->Head.ptr = Q->Tail.ptr = node	// Both Head and Tail point to it

 enqueue(Q: pointer to queue_t, value: data type)
  E1:   node = new_node()	// Allocate a new node from the free list
  E2:   node->value = value	// Copy enqueued value into node
  E3:   node->next.ptr = NULL	// Set next pointer of node to NULL
  E4:   loop			// Keep trying until Enqueue is done
  E5:      tail = Q->Tail	// Read Tail.ptr and Tail.count together
  E6:      next = tail.ptr->next	// Read next ptr and count fields together
  E7:      if tail == Q->Tail	// Are tail and next consistent?
              // Was Tail pointing to the last node?
  E8:         if next.ptr == NULL
                 // Try to link node at the end of the linked list
  E9:            if CAS(&tail.ptr->next, next, <node, next.count+1>)
 E10:               break	// Enqueue is done.  Exit loop
 E11:            endif
 E12:         else		// Tail was not pointing to the last node
                 // Try to swing Tail to the next node
 E13:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 E14:         endif
 E15:      endif
 E16:   endloop
        // Enqueue is done.  Try to swing Tail to the inserted node
 E17:   CAS(&Q->Tail, tail, <node, tail.count+1>)

 dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
  D1:   loop			     // Keep trying until Dequeue is done
  D2:      head = Q->Head	     // Read Head
  D3:      tail = Q->Tail	     // Read Tail
  D4:      next = head.ptr->next    // Read Head.ptr->next
  D5:      if head == Q->Head	     // Are head, tail, and next consistent?
  D6:         if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
  D7:            if next.ptr == NULL  // Is queue empty?
  D8:               return FALSE      // Queue is empty, couldn't dequeue
  D9:            endif
                 // Tail is falling behind.  Try to advance it
 D10:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 D11:         else		     // No need to deal with Tail
                 // Read value before CAS
                 // Otherwise, another dequeue might free the next node
 D12:            *pvalue = next.ptr->value
                 // Try to swing Head to the next node
 D13:            if CAS(&Q->Head, head, <next.ptr, head.count+1>)
 D14:               break             // Dequeue is done.  Exit loop
 D15:            endif
 D16:         endif
 D17:      endif
 D18:   endloop
 D19:   free(head.ptr)		     // It is safe now to free the old node
 D20:   return TRUE                   // Queue was not empty, dequeue succeeded

initialize 初始化一个队列,并使用一个辅助的空的节点做header,方便入队和出队的处理。

在入对的时候, E1~E3先创建一个新的节点,并把入队的数据保存在这个节点上,下一步就要插入到队尾。

E4~E16是一个循环,不断尝试将数据插入到队列中,在并发的情况下CAS可能不成功,所以会不断尝试,并发的线程中总会有一个是成功的,所以它是一个lock-free的算法。

E5~E6是得到尾指针和尾指针指向的下一个节点。如果没有并发,没有并发的情况下,这里尾指针指向的下一个节点为空。但是如果在并发的情况下,在E7行的时候可能有别的线程已经加入了新的节点,或者先前的尾节点已经出对,所以在E7的实现先做一个判断,如果不满足的话重新获取。

在E8条件满足的情况下,说明当前获取的尾指针还是尾指针,那么在E9行通过CAS把这个节点加入到队列中,跳出循环,但是这个时候尾指针还没有改变。 否则可能在这个过程中已经有新的节点加入到队列中,那么在E12行,尝试把尾指针往后移动,指向新的节点。

在循环结束后,肯定已经入队,尝试把尾指针指向新插入的节点。当然这个时候可能又有新的节点加入了,导致CAS不成功,不过没有关系,因为这个节点已经加入了队列,只不过它已经不是尾节点了而已。更新加入的节点的逻辑会移动尾节点到最后的新加入的节点上。

在出队的时候,D2~D4获得头指针和尾指针,D5在头指针未变的情况下记一步处理,说明这个时候还没有其他出队操作。

D6~D10是尾指针和头指针指向的节点相同。有两种情况:1是空队列,则直接返回false,因为无数据可出列,2是新入列一个数据,还没来得及调整尾指针,那么这个时候移动一下尾指针。再重新尝试。

否则的话,D12先获取第一个数据,先把数据保存起来,再尝试把头指针移动到这个节点上。返回这个数据并将当前的头指针的节点数据置空,因为头指针是一个辅助节点,不需要保存数据。

实现

lock-free queue

根据论文中的伪代码,我们可以使用Go语言实现一个lock-free的queue。这里指针我们使用unsafe.Pointer来实现,这样方便进行CAS操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package queue
import (
	"sync/atomic"
	"unsafe"
)
// LKQueue is a lock-free unbounded queue.
// lock-free的queue
type LKQueue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
}
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
	value interface{}
	next  unsafe.Pointer
}
// NewLKQueue returns an empty queue.
func NewLKQueue() *LKQueue {
	n := unsafe.Pointer(&node{})
	return &LKQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
// 入队
func (q*LKQueue) Enqueue(v interface{}) {
	n := &node{value: v}
	for {
		tail := load(&q.tail)
		next := load(&tail.next)
		if tail == load(&q.tail) { // are tail and next consistent?
			if next == nil {    // 还没有新数据入队
				if cas(&tail.next, next, n) {   //增加到队尾
					cas(&q.tail, tail, n) // Enqueue is done.  try to swing tail to the inserted node //入队成功,移动尾巴指针
					return
				}
			} else { // tail was not pointing to the last node
				// try to swing Tail to the next node
                // 已有新数据加到队列后面,需要移动尾指针
				cas(&q.tail, tail, next)
			}
		}
	}
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
	for {
		head := load(&q.head)
		tail := load(&q.tail)
		next := load(&head.next)
		if head == load(&q.head) { // are head, tail, and next consistent? // head还是那个head
			if head == tail { // is queue empty or tail falling behind? // head和tail一样
				if next == nil { // is queue empty? // 说明是空队列
					return nil
				}
				// tail is falling behind.  try to advance it
                // 只是尾指针还没有调整,尝试调整它指向下一个
				cas(&q.tail, tail, next)
			} else {
				// read value before CAS otherwise another dequeue might free the next node
                // 读取出队的数据
				v := next.value
                // 既然要出队了,头指针移动到下一个
				if cas(&q.head, head, next) {
					return v // Dequeue is done.  return
				}
			}
		}
	}
}

// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) {
	return (*node)(atomic.LoadPointer(p))
}

// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
	return atomic.CompareAndSwapPointer(
		p, unsafe.Pointer(old), unsafe.Pointer(new))
}

这个 lock-free 的实现使用了一个辅助头指针(head),头指针不包含有意义的数据,只是一个辅助的节点,这样的话,出队入队中的节点会更简单。

入队的时候,通过 CAS 操作将一个元素添加到队尾,并且移动尾指针。

出队的时候移除一个节点,并通过 CAS 操作移动 head 指针,同时在必要的时候移动尾指针。

two-lock queue

上面的lock-free queue通过CAS实现了高效的并发队列,同时,这篇论文还实现了一种two-lock算法,可以应用在没有原子操作的多处理器上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package queue
import (
	"sync"
)
// CQueue is a concurrent unbounded queue which uses two-Lock concurrent queue qlgorithm.
type CQueue struct {
	head  *cnode
tail*cnode
	hlock sync.Mutex
	tlock sync.Mutex
}
type cnode struct {
	value interface{}
	next  *cnode
}
// NewCQueue returns an empty CQueue.
func NewCQueue()*CQueue {
	n := &cnode{}
	return &CQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *CQueue) Enqueue(v interface{}) {
	n := &cnode{value: v}
	q.tlock.Lock()
	q.tail.next = n // Link node at the end of the linked list
	q.tail = n      // Swing Tail to node
	q.tlock.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q*CQueue) Dequeue() interface{} {
	q.hlock.Lock()
	n := q.head
	newHead := n.next
	if newHead == nil {
		q.hlock.Unlock()
		return nil
	}
	v := newHead.value
	newHead.value = nil
	q.head = newHead
	q.hlock.Unlock()
	return v
}

mutex-based queue

传统的,我们可以实现一个mutex + slice组成的queue, 在不过分追求性能(时间+空间)的情况下实现一个简单的queue。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package queue
import "sync"
// SliceQueue is an unbounded queue which uses a slice as underlying.
type SliceQueue struct {
	data []interface{}
	mu   sync.Mutex
}
// NewSliceQueue returns an empty queue.
// You can give a
func NewSliceQueue(n int) (q *SliceQueue) {
	return &SliceQueue{data: make([]interface{}, n)}
}
// Enqueue puts the given value v at the tail of the queue.
func (q*SliceQueue) Enqueue(v interface{}) {
	q.mu.Lock()
	q.data = append(q.data, v)
	q.mu.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *SliceQueue) Dequeue() interface{} {
	q.mu.Lock()
	if len(q.data) == 0 {
		q.mu.Unlock()
		return nil
	}
	v := q.data[0]
	q.data = q.data[1:]
	q.mu.Unlock()
	return v
}

性能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
goos: darwin
goarch: amd64
pkg: github.com/smallnest/queue
BenchmarkQueue/lock-free_queue#4-4           	 8399941	       177 ns/op
BenchmarkQueue/two-lock_queue#4-4            	 7544263	       155 ns/op
BenchmarkQueue/slice-based_queue#4-4         	 6436875	       194 ns/op
BenchmarkQueue/lock-free_queue#32-4          	 8399769	       140 ns/op
BenchmarkQueue/two-lock_queue#32-4           	 7486357	       155 ns/op
BenchmarkQueue/slice-based_queue#32-4        	 4572828	       235 ns/op
BenchmarkQueue/lock-free_queue#1024-4        	 8418556	       140 ns/op
BenchmarkQueue/two-lock_queue#1024-4         	 7888488	       155 ns/op
BenchmarkQueue/slice-based_queue#1024-4      	 8902573	       218 ns/op

转载

使用 Go 实现 lock-free 的队列