无限缓存的channel拥有下面的特性:

  1. 不会阻塞write。 它总是能处理write的数据,或者放入到待读取的channel中,或者放入到缓存中
  2. 无数据时read会被阻塞。当没有可读的数据时,从channel中读取的goroutine会被阻塞
  3. 读写都是通过channel操作。 内部的缓存不会暴露出来
  4. 能够查询当前待读取的数据数量。因为缓存中可能也有待处理的数据,所以需要返回len(buffer)+len(chan)
  5. 关闭channel后,还未读取的channel还是能够被读取,读取完之后才能发现channel已经完毕。这和正常的channel的逻辑是一样的,这种情况叫"drain"未读的数据

因为我们不能修改内部的channel结构,也不能重载 chan <- 和 <- chan 操作符,所以我们只能通过两个channel的方式封装一个数据结构,来提供读写。

这个数据结构为:

1
2
3
4
5
type UnboundedChan struct {
	In     chan<- T // channel for write
	Out    <-chan T // channel for read
	buffer []T      // buffer
}

其中In这个channel用来写入数据,而Out这个channel用来读取数据。你可以close In这个channel,等所有的数据都读取完后,Out channel也会被自动关闭。 用户是不能自己关闭Out这个channel的,你也关闭不了,因为它是<-chan类型的。

你可以通过Len方法得到所有待读取的数据的长度,也可以通过BufLen只获取缓存中的数据的长度,不包含外发Out channel中数据的长度。

1
2
3
4
5
6
7
8
// Len returns len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
	return len(c.buffer) + len(c.Out)
}
// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
	return len(c.buffer)
}

依照Go三巨头之一的设计,底层buffer最好采用ringbuffer的实现方式,如果buffer满了应该能自动扩容:

Such a library should do well in cases of very fast, “bursty” messages. A large enough buffered channel should be able to absorb bursts while a fast dedicated goroutine drains the channel into a ring buffer from which the messages are delivered at a slower pace to the final consumer of the messages. That ring buffer will need to be efficiently implemented, and will need to be able to grow efficiently (irrespective of size) and that will require some careful engineering. Better to leave that code to a library that can be tuned as needed than baking it into the runtime (and then possibly being at the mercy of release cycles).

所以我又实现了一个ringbuffer,这个ringbuffer比较简单,原因在这里我们不需要考虑并发的问题,这个ringbuffer只会在一个goroutine使用,所以它的实现就非常的简单了,需要注意"读追上写",以及"写满"这两个边界问题就好了。通过使用ringbuffer,上面的实现就可以更改为下面的代码,可以进一步减少写爆发(burst)的时候分配过多的问题:

unbounded_chan.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package chanx

// T defines interface{}, and will be used for generic type after go 1.18 is released.
type T interface{}

// UnboundedChan is an unbounded chan.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan struct {
	In     chan<- T    // channel for write
	Out    <-chan T    // channel for read
	buffer *RingBuffer // buffer
}

// Len returns len of In plus len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
	return len(c.In) + c.buffer.Len() + len(c.Out)
}

// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
	return c.buffer.Len()
}

// NewUnboundedChan creates the unbounded chan.
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan(initCapacity int) UnboundedChan {
	return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}

// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer.
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
	// 创建三个字段和无限缓存的chan类型
	in := make(chan T, initInCapacity)
	out := make(chan T, initOutCapacity)
	ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}
	// 通过一个goroutine,不断地从in中读取出来数据,放入到out或者buffer中
	go process(in, out, ch)

	return ch
}

func process(in, out chan T, ch UnboundedChan) {
	// in关闭,数据读取完后也把out关闭
	defer close(out)
loop:
	for {
		val, ok := <-in
		// 如果in已经被closed, 退出loop
		if !ok { // in is closed
			break loop
		}

		// 否则尝试把从in中读取出来的数据放入到out中
		select {
		// 放入成功,说明out刚才还没有满,buffer中也没有额外的数据待处理,所以回到loop开始
		case out <- val:
			continue
		default:
		}

		// out is full
		// 如果out已经满了,需要把数据放入到缓存中
		ch.buffer.Write(val)
		// 处理缓存,一直尝试把缓存中的数据放入到out,直到缓存中没有数据了,
           	// 为了避免阻塞住in channel,还要尝试从in中读取数据,因为这个时候out是满的,所以就直接把数据放入到缓存中
		for !ch.buffer.IsEmpty() {
			select {
			// 从in读取数据,放入到缓存中,如果in被closed, 退出loop
			case val, ok := <-in:
				if !ok { // in is closed
					break loop
				}
				ch.buffer.Write(val)
			// 把缓存中最老的数据放入到out中,并移出第一个元素
			case out <- ch.buffer.Peek():
				ch.buffer.Pop()
				// 避免内存泄露. 如果缓存处理完了,恢复成原始的状态
				if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
					ch.buffer.Reset()
				}
			}
		}
	}
	// in被关闭,退出loop后,buffer中可能还有未处理的数据,需要把它们塞入到out中
        // 这个逻辑叫做"drain"。
        // 这一段逻辑处理完后,就可以把out关闭掉了
	// drain
	for !ch.buffer.IsEmpty() {
		out <- ch.buffer.Pop()
	}

	ch.buffer.Reset()
}

ringbuffer.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package chanx

import (
	"errors"
)

var ErrIsEmpty = errors.New("ringbuffer is empty")

// RingBuffer is a ring buffer for common types.
// It never is full and always grows if it will be full.
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers.
type RingBuffer struct {
	buf         []T
	initialSize int
	size        int
	r           int // read pointer
	w           int // write pointer
}

func NewRingBuffer(initialSize int) *RingBuffer {
	if initialSize <= 0 {
		panic("initial size must be great than zero")
	}
	// initial size must >= 2
	if initialSize == 1 {
		initialSize = 2
	}

	return &RingBuffer{
		buf:         make([]T, initialSize),
		initialSize: initialSize,
		size:        initialSize,
	}
}

func (r *RingBuffer) Read() (T, error) {
	if r.r == r.w {
		return nil, ErrIsEmpty
	}

	v := r.buf[r.r]
	r.r++
	if r.r == r.size {
		r.r = 0
	}

	return v, nil
}

func (r *RingBuffer) Pop() T {
	v, err := r.Read()
	if err == ErrIsEmpty { // Empty
		panic(ErrIsEmpty.Error())
	}

	return v
}

func (r *RingBuffer) Peek() T {
	if r.r == r.w { // Empty
		panic(ErrIsEmpty.Error())
	}

	v := r.buf[r.r]
	return v
}

func (r *RingBuffer) Write(v T) {
	r.buf[r.w] = v
	r.w++

	if r.w == r.size {
		r.w = 0
	}

	if r.w == r.r { // full
		r.grow()
	}
}

func (r *RingBuffer) grow() {
	var size int
	if r.size < 1024 {
		size = r.size * 2
	} else {
		size = r.size + r.size/4
	}

	buf := make([]T, size)

	copy(buf[0:], r.buf[r.r:])
	copy(buf[r.size-r.r:], r.buf[0:r.r])

	r.r = 0
	r.w = r.size
	r.size = size
	r.buf = buf
}

func (r *RingBuffer) IsEmpty() bool {
	return r.r == r.w
}

// Capacity returns the size of the underlying buffer.
func (r *RingBuffer) Capacity() int {
	return r.size
}

func (r *RingBuffer) Len() int {
	if r.r == r.w {
		return 0
	}

	if r.w > r.r {
		return r.w - r.r
	}

	return r.size - r.r + r.w
}

func (r *RingBuffer) Reset() {
	r.r = 0
	r.w = 0
	r.size = r.initialSize
	r.buf = make([]T, r.initialSize)
}

转载

实现无限缓存的channel