前言

想象一下遇到下面的场景你会如何处理:

  1. 手机号是否重复注册
  2. 用户是否参与过某秒杀活动
  3. 伪造请求大量 id 查询不存在的记录,此时缓存未命中,如何避免缓存穿透

针对以上问题常规做法是:查询数据库,数据库硬扛,如果压力并不大可以使用此方法,保持简单即可。

改进做法:用 list/set/tree 维护一个元素集合,判断元素是否在集合内,时间复杂度或空间复杂度会比较高。如果是微服务的话可以用 redis 中的 list/set 数据结构, 数据规模非常大此方案的内存容量要求可能会非常高。

这些场景有个共同点,可以将问题抽象为:如何高效判断一个元素不在集合中?那么有没有一种更好方案能达到时间复杂度和空间复杂双优呢?

有!布隆过滤器。

布隆过滤器

布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中,它的优点是空间效率和查询时间都远远超过一般的算法。

工作原理

布隆过滤器的原理是,当一个元素被加入集合时,通过 K 个散列函数将这个元素映射成一个位数组中的 K 个点(offset),把它们置为 1。检索时,我们只要看看这些点是不是都是 1 就(大约)知道集合中有没有它了:如果这些点有任何一个 0,则被检元素一定不在;如果都是 1,则被检元素很可能在。这就是布隆过滤器的基本思想。

简单来说就是准备一个长度为 m 的位数组并初始化所有元素为 0,用 k 个散列函数对元素进行 k 次散列运算跟 len(m)取余得到 k 个位置并将 m 中对应位置设置为 1。

优缺点

优点:

  • 空间占用极小,因为本身不存储数据而是用比特位表示数据是否存在,某种程度有保密的效果。
  • 插入与查询时间复杂度均为 O(k),常数级别,k 表示散列函数执行次数。
  • 散列函数之间可以相互独立,可以在硬件指令层加速计算。

缺点:

  • 误差(假阳性率)。
  • 无法删除。

误差(假阳性率)

布隆过滤器可以 100% 判断元素不在集合中,但是当元素在集合中时可能存在误判,因为当元素非常多时散列函数产生的 k 位点可能会重复。这里我们直接给结论,假设:

  • 位数组长度 m
  • 散列函数个数 k
  • 预期元素数量 n
  • 期望误差p

在创建布隆过滤器时我们为了找到合适的 m 和 k ,可以根据预期元素数量 n 与 p 来推导出最合适的 m 与 k 。

$ m=-\frac{n*lnp}{(ln2)^2}$

$ k=ln2*\frac{m}{n} $

无法删除

位数组中的某些 k 点是多个元素重复使用的,假如我们将其中一个元素的 k 点全部置为 0 则直接就会影响其他元素。这导致我们在使用布隆过滤器时无法处理元素被删除的场景。

可以通过定时重建的方式清除脏数据。假如是通过 redis 来实现的话重建时不要直接删除原有的 key,而是先生成新的再通过 rename 命令删除旧数据即可。

判断是否在黑名单

不安全网页的黑名单包含100亿个黑名单网页,每个网页的URL最多占用64B。现在想要实现一种网页过滤系统,可以根据网页的URL判断该网页是否在黑名单上,请设计该系统。

  1. 该系统允许有万分之一以下的判断失误率。
  2. 使用的额外空间不要超过30GB。

黑名单中样本的个数为 100 亿个,记为1:失误率不能超过 0.01%,记为p:每个样本的大小为 64B,这个信息不会影响布隆过滤器的大小,只和选择哈希两数有关,一般的哈希函数都可以接收 64B 的输入对象,所以使用布隆过滤器还有一个好处是不用顾忌单个样本的大小,它丝亳不能影响布隆过滤器的大小。

所以n=100亿,p=0.01%,布隆过滤器的大小m 由以下公式确定:

根据公式计算出 m=19.19n, 向上取整为 20n,即需要 2000 亿个bit,也就是 25GB。

哈希函数的个数由以下公式决定:

计算出哈希两数的个数为k=14个。

然后用 25GB 的 bitMap 再单独实现 14 个哈希两数,根据如上描述生成布隆过滤器即可。

因为我们在确定布隆过滤器大小的过程中选择了向上取整,所以还要用如下公式确定布隆过滤器真实的失误率为:

根据这个公式算出真实的失误率为 0.006%,这是比 0.01%更低的失误率,哈希函数本身不占用什么空间,所以使用的空间就是bitMap 的大小(即 25GB),服务器的内存都可以达到这个级别,所有要求达标。之后的判断阶段如上文的描述

预防非法 id 导致缓存穿透

由于 id 不存在导致请求无法命中缓存流量直接打到数据库,同时数据库也不存在该记录导致无法写入缓存,高并发场景这无疑会极大增加数据库压力。解决方案可以采用布隆过滤器.

数据写入数据库时需同步写入布隆过滤器,同时如果存在脏数据场景(比如:删除)则需要定时重建布隆过滤器,使用 redis 作为存储时不可以直接删除 bloom.key,可以采用 rename key 的方式更新 bloom

代码实现

core/bloom/bloom.go一个布隆过滤器具备两个核心属性:

  • 位数组:
  • 散列函数

go-zero实现的bloom filter中位数组采用的是Redis.bitmap,既然采用的是 redis 自然就支持分布式场景,散列函数采用的是MurmurHash3

Redis.bitmap 为什么可以作为位数组呢?

Redis 中的并没有单独的 bitmap 数据结构,底层使用的是动态字符串(SDS)实现,而 Redis 中的字符串实际都是以二进制存储的。a 的ASCII码是 97,转换为二进制是:01100001,如果我们要将其转换为b只需要进一位即可:01100010。下面通过Redis.setbit实现这个操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
set foo a
OK
get foo
"a"
setbit foo 6 1
0
setbit foo 7 0
1
get foo
"b"

bitmap 底层使用的动态字符串可以实现动态扩容,当 offset 到高位时其他位置 bitmap 将会自动补 0,最大支持 2^32-1 长度的位数组(占用内存 512M),需要注意的是分配大内存会阻塞Redis进程。根据上面的算法原理可以知道实现布隆过滤器主要做三件事情:

  1. k 次散列函数计算出 k 个位点。
  2. 插入时将位数组中 k 个位点的值设置为 1。
  3. 查询时根据 1 的计算结果判断 k 位点是否全部为 1,否则表示该元素一定不存在。

对象定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 表示经过多少散列函数计算
// 固定14次
maps = 14

type (
    // 定义布隆过滤器结构体
    Filter struct {
        bits   uint
        bitSet bitSetProvider
    }
    // 位数组操作接口定义
    bitSetProvider interface {
        check([]uint) (bool, error)
        set([]uint) error
    }
)

bits代表我们要用多少位来存储这个布隆过滤器。

bitset是一个接口,是具体用来存储布隆过滤器的结构.

BitSetProvider接口提供了两个方法,check是用来检查该数据是否存在,set是用来将该数据设置为存在。代码里redisBitSet实现了该接口。

redisBitSet

首先需要理解两段 lua 脚本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// ARGV:偏移量offset数组
// KYES[1]: setbit操作的key
// 全部设置为1
setScript = `
    for _, offset in ipairs(ARGV) do
        redis.call("setbit", KEYS[1], offset, 1)
    end
    `
// ARGV:偏移量offset数组
// KYES[1]: setbit操作的key
// 检查是否全部为1
testScript = `
    for _, offset in ipairs(ARGV) do
        if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
            return false
        end
    end
    return true
    `

为什么一定要用 lua 脚本呢? 因为需要保证整个操作是原子性执行的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// redis位数组
type redisBitSet struct {
    store *redis.Client
    key   string
    bits  uint
}
// 检查偏移量offset数组是否全部为1
// 是:元素可能存在
// 否:元素一定不存在
func (r*redisBitSet) check(offsets []uint) (bool, error) {
    args, err := r.buildOffsetArgs(offsets)
    if err != nil {
        return false, err
    }
    // 执行脚本
    resp, err := r.store.Eval(testScript, []string{r.key}, args).Result()
    // 这里需要注意一下,底层使用的go-redis
    // redis.Nil表示key不存在的情况需特殊判断
    if err == redis.Nil {
        return false, nil
    } else if err != nil {
        return false, err
    }

    exists, ok := resp.(int64)
    if !ok {
        return false, nil
    }

    return exists == 1, nil
}

// 将k位点全部设置为1
func (r *redisBitSet) set(offsets []uint) error {
    args, err := r.buildOffsetArgs(offsets)
    if err != nil {
        return err
    }
    err = r.store.Eval(setScript, []string{r.key}, args).Err()
    // 底层使用的是go-redis,redis.Nil表示操作的key不存在
    // 需要针对key不存在的情况特殊判断
    if err == redis.Nil {
        return nil
    }
    return err
}

// 构建偏移量offset字符串数组,因为go-redis执行lua脚本时参数定义为[]string
// 因此需要转换一下
// buildOffsetArgs 做了一点异常检查,然后转换为字符串
func (r *redisBitSet) buildOffsetArgs(offsets []uint) ([]string, error) {
    var args []string
    for _, offset := range offsets {
	// 因为前面对bits求模了,所以正常情况这里是不会走到的
        if offset >= r.bits {
            return nil, ErrTooLargeOffset
        }
	// uint数字转换为字符
        args = append(args, strconv.FormatUint(uint64(offset), 10))
    }
    return args, nil
}

// 删除
func (r *redisBitSet) del() error {
    return r.store.Del(r.key).Err()
}

// 自动过期
func (r *redisBitSet) expire(seconds int) error {
	return r.store.Expire(r.key, time.Second*time.Duration(seconds)).Err()
}

func newRedisBitSet(store *redis.Client, key string, bits uint)*redisBitSet {
    return &redisBitSet{
        store: store,
        key:   key,
        bits:  bits,
    }
}

到这里位数组操作就全部实现了,接下来看下如何通过 k 个散列函数计算出 k 个位点

k 次散列计算出 k 个位点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// k次散列计算出k个offset
func (f *Filter) getLocations(data []byte) []uint {
    // 创建指定容量的切片
    locations := make([]uint, maps)
    // maps表示k值,作者定义为了常量:14
    for i := uint(0); i < maps; i++ {
	// 每次用当前循环的次数当做盐byte(i),加盐算得hash值。hash算法是murmur3。
        // 哈希计算,使用的是"MurmurHash3"算法,并每次追加一个固定的i字节进行计算
	hashValue := murmur3.Sum64(append(data, byte(i)))
        // 取下标offset
	// 用hash值对bits求模,获得这次的位置locations[i]
        locations[i] = uint(hashValue % uint64(f.bits))
    }
    // 14次的位置合在一起,代表了这个数据
    return locations
}

插入与查询

添加与查询实现就非常简单了,组合一下上面的函数就行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 添加元素
func (f *Filter) Add(data []byte) error {
    //算得这个数据的位置,然后标记上这个数据
    locations := f.getLocations(data)
    return f.bitSet.set(locations)
}

// 检查是否存在
func (f *Filter) Exists(data []byte) (bool, error) {
    locations := f.getLocations(data)
    isSet, err := f.bitSet.check(locations)
    if err != nil {
        return false, err
    }
    if !isSet {
        return false, nil
    }

    return true, nil
}

改进建议

整体实现非常简洁高效,那么有没有改进的空间呢?

个人认为还是有的,上面提到过自动计算最优 m 与 k 的数学公式,如果创建参数改为:

  • 预期总数量expectedInsertions
  • 期望误差falseProbability

就更好了,虽然作者注释里特别提到了误差说明,但是实际上作为很多开发者对位数组长度并不敏感,无法直观知道 bits 传多少预期误差会是多少。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// New create a Filter, store is the backed redis, key is the key for the bloom filter,
// bits is how many bits will be used, maps is how many hashes for each addition.
// best practices:
// elements - means how many actual elements
// when maps = 14, formula: 0.7*(bits/maps), bits = 20*elements, the error rate is 0.000067 < 1e-4
// for detailed error rate table, see <http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html>
func New(store *redis.Redis, key string, bits uint)*Filter {
    return &Filter{
        bits:   bits,
        bitSet: newRedisBitSet(store, key, bits),
    }
}

// expectedInsertions - 预期总数量
// falseProbability - 预期误差
// 这里也可以改为option模式不会破坏原有的兼容性
func NewFilter(store *redis.Redis, key string, expectedInsertions uint, falseProbability float64)*Filter {
    bits := optimalNumOfBits(expectedInsertions, falseProbability)
    k := optimalNumOfHashFunctions(bits, expectedInsertions)
    return &Filter{
        bits:   bits,
        bitSet: newRedisBitSet(store, key, bits),
        k:      k,
    }
}

// 计算最优哈希次数
func optimalNumOfHashFunctions(m, n uint) uint {
    return uint(math.Round(float64(m) / float64(n) * math.Log(2)))
}

// 计算最优数组长度
func optimalNumOfBits(n uint, p float64) uint {
    return uint(float64(-n) *math.Log(p) / (math.Log(2)* math.Log(2)))
}

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package bloom

import (
	"errors"
	"strconv"
	"time"

	"github.com/go-redis/redis"
	"github.com/spaolacci/murmur3"
)

const (
	// for detailed error rate table, see http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
	// maps as k in the error rate table
	maps      = 14
	setScript = `
for _, offset in ipairs(ARGV) do
	redis.call("setbit", KEYS[1], offset, 1)
end
`
	testScript = `
for _, offset in ipairs(ARGV) do
	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
		return false
	end
end
return true
`
)

// ErrTooLargeOffset indicates the offset is too large in bitset.
var ErrTooLargeOffset = errors.New("too large offset")

type (
	// A Filter is a bloom filter.
	Filter struct {
		bits   uint
		bitSet bitSetProvider
	}

	bitSetProvider interface {
		check([]uint) (bool, error)
		set([]uint) error
	}
)

// New create a Filter, store is the backed redis, key is the key for the bloom filter,
// bits is how many bits will be used, maps is how many hashes for each addition.
// best practices:
// elements - means how many actual elements
// when maps = 14, formula: 0.7*(bits/maps), bits = 20*elements, the error rate is 0.000067 < 1e-4
// for detailed error rate table, see http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
func New(store *redis.Client, key string, bits uint) *Filter {
	return &Filter{
		bits:   bits,
		bitSet: newRedisBitSet(store, key, bits),
	}
}

// Add adds data into f.
func (f *Filter) Add(data []byte) error {
	locations := f.getLocations(data)
	return f.bitSet.set(locations)
}

// Exists checks if data is in f.
func (f *Filter) Exists(data []byte) (bool, error) {
	locations := f.getLocations(data)
	isSet, err := f.bitSet.check(locations)
	if err != nil {
		return false, err
	}
	if !isSet {
		return false, nil
	}

	return true, nil
}

func (f *Filter) getLocations(data []byte) []uint {
	locations := make([]uint, maps)
	for i := uint(0); i < maps; i++ {
		hashValue := murmur3.Sum64(append(data, byte(i)))
		locations[i] = uint(hashValue % uint64(f.bits))
	}

	return locations
}

type redisBitSet struct {
	store *redis.Client
	key   string
	bits  uint
}

func newRedisBitSet(store *redis.Client, key string, bits uint) *redisBitSet {
	return &redisBitSet{
		store: store,
		key:   key,
		bits:  bits,
	}
}

func (r *redisBitSet) buildOffsetArgs(offsets []uint) ([]string, error) {
	var args []string

	for _, offset := range offsets {
		if offset >= r.bits {
			return nil, ErrTooLargeOffset
		}

		args = append(args, strconv.FormatUint(uint64(offset), 10))
	}

	return args, nil
}

func (r *redisBitSet) check(offsets []uint) (bool, error) {
	args, err := r.buildOffsetArgs(offsets)
	if err != nil {
		return false, err
	}

	resp, err := r.store.Eval(testScript, []string{r.key}, args).Result()
	if err == redis.Nil {
		return false, nil
	} else if err != nil {
		return false, err
	}

	exists, ok := resp.(int64)
	if !ok {
		return false, nil
	}

	return exists == 1, nil
}

func (r *redisBitSet) del() error {
	return r.store.Del(r.key).Err()
}

func (r *redisBitSet) expire(seconds int) error {
	return r.store.Expire(r.key, time.Second*time.Duration(seconds)).Err()
}

func (r *redisBitSet) set(offsets []uint) error {
	args, err := r.buildOffsetArgs(offsets)
	if err != nil {
		return err
	}

	err = r.store.Eval(setScript, []string{r.key}, args).Err()
	if err == redis.Nil {
		return nil
	}

	return err
}

参考

详解布隆过滤器原理与实现

Go + Redis 布隆过滤器原理与实现