Redis学习笔记之实用篇

本篇是Redis学习笔记基础篇的补充,主要补充了底层原理和一些常见的使用场景。

基础使用的补充

key的分级存储

Redis在使用的过程中,为了更好的体验Key的意义,一般会使用Key作为分级存储,例如mitaka下有user组和age组,存储数据ID为1、2的时候,将key以冒号的形式隔开

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> SET mitaka:user:1 xu
127.0.0.1:6379> SET mitaka:user:2 xiaoye
127.0.0.1:6379> SET mitaka:age:1 18
127.0.0.1:6379> SET mitaka:age:2 28
127.0.0.1:6379> KEYS *
1) "mitaka:age:2"
2) "mitaka:user:1"
3) "mitaka:age:1"
4) "mitaka:user:2"
  • 基本格式:[业务名称]:[数据名]:[id],这样的话,可读性强,而且避免key冲突,方便管理;例如保存登录用户的信息login:user:10
  • 长度不超过44字节,节省内存,key是tring类型,底层编码包含intembstrraw三种,embstr在小于44字节使用,采用连续内存空间,占用内存更小
  • 不包含特殊字符

下载Redis图形化客户端RDM,官网:

https://github.com/RedisInsight/RedisDesktopManager

https://resp.app/

https://redis.com/redis-enterprise/redis-insight/#insight-form

mac os m2版本下载地址:https://download.redisinsight.redis.com/latest/RedisInsight-v2-mac-arm64.dmg

RDM中可以看到类似目录层级结构

image-20221206233251024

BigKey

BigKey通常以Key的大小和Key中成员的数量来综合判定:

  • key本身的数据量过大:例如,一个string类型的key,值为5MB
  • key中的成员过多:例如,一个zset类型的key,成员数量为10000个
  • key中成员的数量过大:例如,一个hash类型的key,成员虽然只有1000个,但是Value总大小为100MB

推荐值:

  • 单个keyvalue小于10KB
  • 对于集合类型的key,建议元素数量小于1000

获取key占用的内存大小

1
2
3
4
127.0.0.1:6379> MEMORY USAGE a
(integer) 48 // a占用48字节
127.0.0.1:6379> get a
"1"

BigKey的危害

  • 网络阻塞:对BigKey请求时,少量的QPS就可能导致带宽使用率被占满,导致Redis或者物理机变慢
  • 数据倾斜:BigKey所在实例的内存使用率会超过其他实例,无法使数据分片的内存资源达到均衡
  • Redis阻塞:对元素较多的hash、list、zset等做运算时,会有更多的耗时,使主线程被阻塞
  • CPU压力:对BigKey的数据序列化和反序列化会导致CPU的使用率飙升,影响Redis实例和本机其他应用

找到BigKey的方法:

  • --big-keys参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    redis-cli -h 127.0.0.1 -p 6379 --big-keys

    Unrecognized option or bad number of args for: '--big-keys'
    ➜ ~ redis-cli -h 127.0.0.1 -p 6379 --bigkeys

    # Scanning the entire keyspace to find biggest keys as well as
    # average sizes per key type. You can use -i 0.1 to sleep 0.1 sec
    # per 100 SCAN commands (not usually needed).

    [00.00%] Biggest string found so far 'mitaka:age:2' with 2 bytes
    [00.00%] Biggest stream found so far 'stream1' with 44 entries
    [00.00%] Biggest string found so far 'mitaka:user:2' with 6 bytes

    -------- summary -------

    Sampled 9 keys in the keyspace!
    Total key length in bytes is 62 (avg len 6.89)

    Biggest string found 'mitaka:user:2' has 6 bytes // 这个类型占用最大的内存
    Biggest stream found 'stream1' has 44 entries

    0 lists with 0 items (00.00% of keys, avg size 0.00)
    0 hashs with 0 fields (00.00% of keys, avg size 0.00)
    8 strings with 16 bytes (88.89% of keys, avg size 2.00)
    1 streams with 44 entries (11.11% of keys, avg size 44.00)
    0 sets with 0 members (00.00% of keys, avg size 0.00)
    0 zsets with 0 members (00.00% of keys, avg size 0.00)
  • 使用SCAN扫描,然后逐个判断

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    127.0.0.1:6379> SCAN 0     // 0 是角标
    1) "0" // 这里是下次扫描的角标,0代表扫描完了
    2) 1) "mitaka:age:2"
    2) "mitaka:user:1"
    3) "c"
    4) "b1"
    5) "stream1"
    6) "mitaka:user:2"
    7) "a"
    8) "mitaka:age:1"
    9) "b"
  • 第三方工具,Redis-Rdb-Tools分析RDB快照文件,全面分析内存使用情况,或者封装SCAN命令和一系列的判断逻辑,或者使用LUA脚本

  • 网络监控,监控进出Redis的网络数据,超出预警值时主动告警

删除BigKey

BigKey占用内存较多,删除时也会消耗很长时间,导致Redis主线程阻塞,引发一系列问题。

  • Redis 3.0及以下版本:如果是集合类型,则便利BigKey的元素,先逐个删除子元素,最后删除BigKey

  • Redis 4.0以后:提供了异步删除的命令,unlink

    1
    2
    3
    4
    127.0.0.1:6379> UNLINK stream1
    (integer) 1
    127.0.0.1:6379> KEYS stream*
    (empty array)

恰当的数据类型

使用恰当的数据类型将BigKey转换成小key是一种更加推荐解决BigKey的方法

一个有100万个keyhash,是一个BigKey

方案1:通过json序列化后存储到string中;这种方式,会占用更大的内存,数据耦合,而且还是一个BigKey

方案2:打散存储到string中;这种也会占用更大内存,而且没法做到统一控制

方案3:将大划小,通过id/100作为key,分为1万个hash,每个hash中只有100个键值对。

集群模式下的批处理

批量执行命令可以使用pipeline,节省clientserver之间的网络时间。但是加集群模式下,由于key会进行hash然后取模,可能会放到不同的slot中。而批处理是一个网络连接,如果使用不同的slot,则会使用多个网络连接。

在集群模式下,批处理命令的多个key比如落在一个slot中,否则会执行失败。

解决方案

  • 串行命令:

    实现思路:for循环遍历,依次执行每个命令

    这种方案其实不是批处理,还是单次处理

  • 串行slot

    实现思路:客户端计算每个keyslot,然后将一致的slot分为一组,使用pipeline串行处理

    这种方案逻辑处理会有些复杂,而且slot越多,耗时越久。

  • 并行slot

    实现思路:在串行slot的基础上,改为并行处理,

    这种方案实现更加复杂

  • hash_tag

    实现思路:将所有key设置相同的hash_tag,则所有keyslot一定相同

    这种方案容易出现数据倾斜,但是耗时短,实现简单。

    1
    2
    127.0.0.1:6379> MSET {a}name xu {a}age 18
    OK

相比而言,推荐使用并行slot

配置优化

持久化配置优化:

  • 用来做缓存的Redis实例,尽量不要开启持久化功能
  • 建议关闭RDB,只使用AOF持久化
  • 利用脚本定期在Slave节点做RDB,实现数据备份
  • 设置合理的rewrite阈值,避免频繁bgrewrite
  • 配置no-appendfsync-on-rewrite=yes,禁止在rewrite期间做AOF,避免因AOF引起的阻塞

部署相关建议:

  • 物理机需要预留内存,用于forkrewrite的线程
  • 单个实例内存不建议太大,例如4GB或者8GB,可以加快fork的速度,减少主从同步、数据迁移压力
  • 不要与CPU密集型应用部署在一起
  • 不要与高硬盘负载应用一起部署,例如数据库、消息队列

Redis作为缓存

场景例如用于存储session,例如用户登录的token,需要认证的接口可通过客户传过来的token即可完成验证,不需要再次使用用户名密码通过查询数据库验证。

相对于存储在本地进程,使用Redis可以实现分布式服务认证。

每次使用token成功认证后,刷新token过期时间,以延长token过期时间。

这里介绍另外一种缓存,记录一些常用的数据。例如某些文件目录下的信息,缓存高频目录的信息,可以减少数据库的压力。

服务端配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Required
##########

# Set a memory usage limit to the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
maxmemory 100mb

# Optional
##########

# Evict any key using approximated LFU when maxmemory is reached.
maxmemory-policy allkeys-lfu

# Enable active memory defragmentation.
activedefrag yes

# Don't save data on the disk because we can afford to lose cached data.
save ""

使用缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // redis 客户端配置
})

mycache := cache.New(&cache.Options{
Redis: rdb, // 缓存配置
LocalCache: cache.NewTinyLFU(100, time.Minute), // 使用本地缓存,缓存使用lfu算法,本地最大存储100个,保存1分钟
})

obj := new(Object)
for i := 0; i < 100; i++ { //存放100个
err := mycache.Once(&cache.Item{ // set之前会get一下,如果key存在,则不存储新的value
Key: "mykey" + strconv.Itoa(i),
Value: obj, // destination
Do: func(*cache.Item) (interface{}, error) { // 将Object作为Value存储
return &Object{
Str: "mystring123123",
Num: 421,
}, nil
},
})
if err != nil {
panic(err)
}
}
}

慢查询

在Redis中,执行时耗时超过某个阈值的命令,称为慢查询。也就是无论读还是写,超时了就是慢查询。

慢查询的阈值可以通过配置指定:

  • slowlog-log-slower-than:慢查询阈值,单位ms,默认10000,也就是10s,建议1000

超过阈值的的操作会被记录到慢查询日志中,日志长度有上限

  • slowlog-max-len:慢查询日志(本质是一个队列)长度,默认128,建议1000

查看慢查询

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> SLOWLOG get 1                // 获取1个慢查询
1) 1) (integer) 3 // 日志编号
2) (integer) 1670426153 // 加入日志是的时间戳
3) (integer) 13415 // 慢查询耗时
4) 1) "info" // 慢查询命令
5) "172.17.0.1:62882" // 客户端地址
6) "redisinsight-common-c22f7437" // 客户端名称
127.0.0.1:6379> SLOWLOG len // 慢查询当前长度
(integer) 4
127.0.0.1:6379> SLOWLOG reset // 清理慢查询队列
OK

内存配置

当Redis内存不足时,可能导致key频繁被删除、响应时间变长,QPS不稳定等问题。当内存使用率达到90%以上时就需要警惕,并快速定位到内存占用的原因。

内存占用有这几种

  • 数据内存:是Redis主要部分,存储Redis的键值信息,主要问题是BigKey的问题,内存碎片问题
  • 进程内存:Redis主进程本身运行肯定需要内存,例如代码、常量池等等;这部分内存大约几兆,在大多数生产环境中与Redis数据占用的内存相比可以忽略
  • 缓冲区内存:一般包括客户端缓冲区、AOF缓冲区(AOF刷盘前的缓存区域)、复制缓冲区(用于主从复制,如果过小,会导致全量复制)等。客户端缓冲区又包括输入输出缓冲区两种。这部分内存占用波动较大,不当使用BigKey,可能导致内存溢出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
127.0.0.1:6379> INFO memory                // 查看内存占用信息
# Memory
used_memory:2060856 // 内存占用总量
used_memory_human:1.97M
used_memory_rss:9383936
used_memory_rss_human:8.95M
used_memory_peak:2179296
used_memory_peak_human:2.08M
used_memory_peak_perc:94.57%
used_memory_overhead:925920
used_memory_startup:895680
used_memory_dataset:1134936
used_memory_dataset_perc:97.40%
allocator_allocated:2252760
allocator_active:8912896
allocator_resident:10420224
total_system_memory:8232894464
total_system_memory_human:7.67G
used_memory_lua:31744
used_memory_vm_eval:31744
used_memory_lua_human:31.00K
used_memory_scripts_eval:0
number_of_cached_scripts:0
number_of_functions:0
number_of_libraries:0
used_memory_vm_functions:32768
used_memory_vm_total:64512
used_memory_vm_total_human:63.00K
used_memory_functions:184
used_memory_scripts:184
used_memory_scripts_human:184B
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
allocator_frag_ratio:3.96
allocator_frag_bytes:6660136
allocator_rss_ratio:1.17
allocator_rss_bytes:1507328
rss_overhead_ratio:0.90
rss_overhead_bytes:-1036288
mem_fragmentation_ratio:4.60
mem_fragmentation_bytes:7342728
mem_not_counted_for_evict:0
mem_replication_backlog:0
mem_total_replication_buffers:0
mem_clients_slaves:0
mem_clients_normal:29496
mem_cluster_links:0
mem_aof_buffer:0
mem_allocator:jemalloc-5.2.1
active_defrag_running:0
lazyfree_pending_objects:0
lazyfreed_objects:0

127.0.0.1:6379> MEMORY DOCTOR // 内存检查医生
Hi Sam, this instance is empty or is using very little memory, my issues detector can't be used in these conditions. Please, leave for your mission on Earth and fill it with some data. The new Sam and I will be back to our programming as soon as I finished rebooting.
127.0.0.1:6379> MEMORY MALLOC-STATS // 内存分配状态,Redis使用jemalloc分配内存
___ Begin jemalloc statistics ___
Version: "5.2.1-0-g0"
... 省略部分信息

127.0.0.1:6379> MEMORY STATS
' 1) "peak.allocated" // Redis启动后消耗内存的峰值
2) (integer) 2179296
3) "total.allocated" // 分配器分配的总字节数,也就是当前的总内存使用量
4) (integer) 2110112
5) "startup.allocated" // 启动时消耗的初始内存量
6) (integer) 895680
7) "replication.backlog" // 复制积压缓冲区的大小
8) (integer) 0
9) "clients.slaves" // 主从复制中所有从节点的读写缓冲区大小
10) (integer) 0
11) "clients.normal" // 除从节点外,所有其他客户端的读写缓冲区大小
12) (integer) 29496
13) "cluster.links"
14) (integer) 0
15) "aof.buffer" // AOF持久化使用的缓存和AOF重写时产生的缓存
16) (integer) 0
17) "lua.caches"
18) (integer) 0
19) "functions.caches"
20) (integer) 184
21) "db.0" // 数据库0的信息
22) 1) "overhead.hashtable.main" // 数据库的hash链表开销内存综合,即元数据内存
2) (integer) 528
3) "overhead.hashtable.expires" // 用于存储key过期时间所消耗的内存
4) (integer) 32
5) "overhead.hashtable.slot-to-keys"
6) (integer) 0
23) "overhead.total" // 等于 startup.allocated + replication.backlog + clients.slaves + clients.normal + aof.buffer + db.x
24) (integer) 925920
25) "keys.count" // 当前key总数
26) (integer) 10
27) "keys.bytes-per-key" // 每个key的平均大小,(total.allocated - startup.allocated)/ keys.count
28) (integer) 121443
29) "dataset.bytes"
30) (integer) 1184192
31) "dataset.percentage"
32) "97.50994873046875"
33) "peak.percentage"
34) "96.825393676757812"
35) "allocator.allocated"
36) (integer) 2919320
37) "allocator.active"
38) (integer) 10223616
39) "allocator.resident"
40) (integer) 11730944
41) "allocator-fragmentation.ratio"
42) "3.50205397605896"
43) "allocator-fragmentation.bytes"
44) (integer) 7304296
45) "allocator-rss.ratio"
46) "1.1474359035491943"
47) "allocator-rss.bytes"
48) (integer) 1507328
49) "rss-overhead.ratio"
50) "0.81145250797271729"
51) "rss-overhead.bytes"
52) (integer) -2211840
53) "fragmentation"
54) "4.5555543899536133"
55) "fragmentation.bytes"
56) (integer) 7429544

127.0.0.1:6379> MEMORY help
1) MEMORY <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) DOCTOR
3) Return memory problems reports.
4) MALLOC-STATS
5) Return internal statistics report from the memory allocator.
6) PURGE
7) Attempt to purge dirty pages for reclamation by the allocator. 尝试清除脏页以供分配器回收。
8) STATS
9) Return information about the memory usage of the server. 返回有关服务器内存使用情况的信息。
10) USAGE <key> [SAMPLES <count>]
11) Return memory in bytes used by <key> and its value. Nested values are // 查看某个key占用的内存
12) sampled up to <count> times (default: 5, 0 means sample all).
13) HELP
14) Prints this help.

查看客户端信息

1
2
3
4
5
6
127.0.0.1:6379> CLIENT LIST
id=67 addr=172.17.0.1:62398 laddr=172.17.0.2:6379 fd=8 name= age=69848 idle=0 flags=N db=0 sub=0 psub=0 ssub=0 multi=-1 qbuf=26 qbuf-free=20448 argv-mem=10 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=22298 events=r cmd=client|list user=default redir=-1 resp=2
id=11 addr=172.17.0.1:65126 laddr=172.17.0.2:6379 fd=10 name=redisinsight-auto-discovery age=151134 idle=151134 flags=N db=0 sub=0 psub=0 ssub=0 multi=-1 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1800 events=r cmd=info user=default redir=-1 resp=2
id=66 addr=172.17.0.1:57476 laddr=172.17.0.2:6379 fd=14 name= age=70274 idle=70254 flags=N db=0 sub=0 psub=0 ssub=0 multi=-1 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1800 events=r cmd=xreadgroup user=default redir=-1 resp=2
id=14 addr=172.17.0.1:62882 laddr=172.17.0.2:6379 fd=11 name=redisinsight-common-c22f7437 age=151095 idle=1 flags=N db=0 sub=0 psub=0 ssub=0 multi=-1 qbuf=0 qbuf-free=20474 argv-mem=0 multi-mem=0 rbs=2048 rbp=1024 obl=0 oll=0 omem=0 tot-mem=23296 events=r cmd=info user=default redir=-1 resp=2
id=72 addr=172.17.0.1:65270 laddr=172.17.0.2:6379 fd=12 name=redisinsight-common-c22f7437 age=12021 idle=12021 flags=P db=0 sub=0 psub=1 ssub=0 multi=-1 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1824 events=r cmd=psubscribe user=default redir=-1 resp=2

主从还是集群

主从模式和集群模式,都可以实现Redis高可用方案,但是在两者选择上,还是有些侧重点。

集群虽然具备高可用特性,也可以实现自动故障恢复,但是使用上有一些限制:

  • 集群完整性问题:缺少slot之后,是否能够服务,需要通过配置设置
  • 集群宽带问题:集群节点和节点之间需要较大的网络带宽
  • 数据倾斜问题:分配slot不均或者pipeline时会导致数据倾斜
  • 客户端性能问题
  • 命令的集群兼容性问题
  • lua和事务问题

因此,如果主从能够满足业务需求的情况下,尽量不要搭建Redis集群。

使用场景的补充

缓存更新策略

为了解决缓存一致性的问题,更新数据库之后,还需要更新缓存。

更新对比删除

更新数据库之后,是使用更新缓存还是使用删除缓存?

  • 更新会出现的两种情况

    1. 先更新数据库,再更新缓存

    image-20221207153120913

    此时,这种不一致,只能等到下次更新缓存的时候才能解决。

    1. 先更新缓存,再更新数据库

      image-20221207153255458

      通常情况下,更新缓存再更新数据库是要避免的一种手段

    相比而言,使用更新缓存,数据不一致的情况比较多。

  • 删除缓存会出现的两种情况

    1. 先删除缓存,再更新数据库

      image-20221207153551158

      这种情况也有另外一个解决方案,那就是延迟双删,在更新请求完成之后,将缓存删除

      image-20221207154759309

      这种解决思路的关键在于对 N 的时间的判断,如果 N 时间太短,线程 A 第二次删除缓存的时间依旧早于线程 B 把脏数据写回缓存的时间,那么相当于做了无用功。而 N 如果设置得太长,那么在触发双删之前,新请求看到的都是脏数据。

    2. 更新数据库后删除缓存

      也就是延迟双删的思路中,在更新数据库之后将缓存删除

      image-20221207154933719

      线程C在T2时间段获取到的数据会不一致,但是其实这个时间是非常短的。

      但是还有一种情况可能出现数据不一致的情况

      image-20221207155218114

      总的来说,这个不一致场景出现条件非常严格,因为并发量很大时,缓存不太可能不存在;如果并发很大,而缓存真的不存在,那么很可能是这时的写场景很多,因为写场景会删除缓存。

对比四种更新策略:

image-20221207155303812

从一致性的角度来看,采取更新数据库后删除缓存值,是更为适合的策略。因为出现不一致的场景的条件更为苛刻,概率相比其他方案更低。

做个简单总结,足以适应绝大部分的互联网开发场景的决策:

  • 针对大部分读多写少场景,建议选择更新数据库后删除缓存的策略。
  • 针对读写相当或者写多读少的场景,建议选择更新数据库后更新缓存的策略。

为了避免这种不一致性永久存在,使用缓存的时候,我们必须要给缓存设置一个过期时间,例如 1 分钟,这样即使出现了更新 Redis 失败的极端场景,不一致的时间窗口最多也只是 1 分钟。

这是我们最终一致性的兜底方案,万一出现任何情况的不一致问题,最后都能通过缓存失效后重新查询数据库,然后回写到缓存,来做到缓存与数据库的最终一致。

缓存穿透

缓存穿透是指缓存中没有存储访问的数据,缓存永远不生效,请求穿透缓存,打到数据库上。例如访问一些不存在的用户id。

解决方案:

  1. 将数据库不存在的数据也缓存下来:缺点是会有额外的内存消耗,而且可能造成短期的不一致;
  2. 将用户id的复杂度加大,避免推断
  3. 使用布隆过滤器,服务启动之前,将所有的数据加载到布隆过滤器中,进行缓存之前先判断在布隆过滤器中是否已经存在;缺点是实现复杂,存在误判可能

缓存雪崩

缓存雪崩是指当大量缓存数据在同一时间过期,或者Redis单机,大量缓存数据失效,数据库压力瞬间增大,导致影响到其他的服务,引起雪崩效应。

解决方案:

  1. 给不同的key设置随机的TTL
  2. 利用Redis集群的高可用性
  3. 给缓存的业务添加降级限流策略
  4. 给业务增加多级缓存

缓存击穿

缓存击穿也叫做热key问题,是指热key(被高并发访问并且缓存重建业务较复杂的key)失效,相同的请求透过缓存,直接打到数据库,大流量导致数据库也无法承担。

解决方案:

  1. 互斥锁:当发现缓存过期,需要从数据库获取数据时,先获取锁,获取锁之后才访问数据库;如果没有获取锁,则休眠一段时间再次重试在缓存中获取数据,没有的话,再次尝试获取锁;缺点是线程需要等待,性能会收到影响,而且可能有死锁的风险
  2. 逻辑过期:将热key过期后的数据也缓存起来,作为老数据,当新数据失效时,返回老数据,通过子进程获取互斥锁,新数据放到缓存中,然后释放锁。其他的进程没有访问到老数据也没有获取到锁时,则直接返回老数据;解决了线程等待的问题,性能好,缺点是不保证一致性,有额外的内存,而且实现复杂

全局ID生成策略

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,要满足的特性:

  • 唯一性
  • 高可用
  • 递增性
  • 安全性
  • 高性能

实现方式可以有以下几种

  • UUID:通过时间戳+机器号+随机值生成,可以实现全局唯一,并且自增
  • Redis自增:通过Redis自增,来替代时间戳,通过Redis自增+机器号+时间日期,可以实现全局以为一,并且自增
  • snowflake算法:雪花算法
  • 数据库自增:类似于UUID,自增主键

Redis自增ID策略:

  • 每天一个key,方便统计订单量
  • ID构造是 时间戳 + 计数器

对比分布式锁的实现方式‘

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

特点:

  • 多进程可见
  • 互斥
  • 高可用
  • 高性能
  • 安全性
MySQL Redis Zookeeper
互斥 利用MySQL本身的互斥锁机制 利用setnx 利用节点唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用过期时间,到期释放 临时节点,断开连接自动释放

关注推送

关注推送也叫做Feed流,直译为投喂。为用户持续的提供沉浸式的体验,通过无限下拉刷新获取新的消息。例如刷朋友圈、微博、抖音。

Feed流常见的两种模式:

  • TimeLine:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注,例如朋友圈
    • 优点:信息全面,不会有缺失;实现相对简单
    • 缺点:信息噪音较多,用户不一定感兴趣;内容获取效率低
  • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣信息来吸引用户,例如微博、抖音
    • 优点:投喂用户感兴趣信息,用户粘性很高,容易沉迷
    • 缺点:如果算法不精确,可能引起反作用

实现方案有三种:

  • 拉模式:也叫做读扩散,用户从服务端拉取关注的人发送的信息;
    • 优点:节省空间,保存发送者的数据即可;
    • 缺点:耗费时间,用户通过时间戳拉取数据,每次都要拉取所有数据,然后排序
  • 推模式:服务器将信息发送扩散给所有的用户;
    • 优点:快,用户获取自己收件箱中的数据即可
    • 缺点:信息占用空间大,每一个消息以关注人数翻倍
  • 推拉结合:将拉模式和推模式结合,针对活跃关注数少(千万级)的人使用推模式,关注的人比较少的用户使用拉模式。

多级缓存

在超高并发的场景下,传统缓存方案可能在缓存失效的情况,或者中间层服务无法支撑请求的情况,此时可以使用多级缓存,充分利用请求处理的每个环节,分别添加缓存,减轻中间层压力,提升服务性能,并且避免缓存失效后请求打到数据库上。

image-20221208144829613

进程缓存,例如上述Redis作为缓存章节中,使用本地缓存。

数据结构

Redis底层使用以下几种数据结构,在这几种数据结构之上,表现为五种数据使用形式。

  • 动态字符串SDS
  • IntSet
  • Dict
  • ZipList
  • QuickList
  • SkipList
  • RedisObject

动态字符串SDS

Redis中保存的key是字符串,value往往是字符串或者字符串的集合。

Redis没有直接使用C语言中的字符串,因为C语言字符串存在很多问题:

  • 获取字符串长度需要通过运算
  • 非二进制安全
  • 不可修改

在这个基础上,Redis构建一种新的字符串结构,称为简单动态字符串(Simple Dynamic String),简称SDS。

1
2
127.0.0.1:6379> SET hello 世界
OK

创建两个SDS,一个是hello,一个是世界

src/sds.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */ /* 已保存字符串字节数,不包含约束标识\n */
uint8_t alloc; /* excluding the header and null terminator */ /* 申请的总字节数,不包含结束标识\n */
unsigned char flags; /* 3 lsb of type, 5 unused bits */ /* 不同的SDS的头类型,用来控制SDS的头大小 */
char buf[]; /*数据体*/
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

例如一个包含字符串name的SDS结构如下:

image-20221208211113618

结构体中的字段,是连续的内存空间。

SDS扩容

SDS之所以叫做动态字符串,是因为具备动态扩容的能力。例如在name后面再追加字符串,这里会申请新内存空间:

  • 如果新字符串小于1M,则新空间扩展后字符串长度的两倍+1
  • 如果新字符串大于1M,则新空间扩展后字符串长度+1M.称为内存预分配

整数集合IntSet

IntSet是Redis中set集合的一种实现方式,基于整数数组来实现,并且具备长度可变、有序等特征。

src/intset.h

1
2
3
4
5
typedef struct intset {
uint32_t encoding; /* 编码方式,支持存放16位、32位、64位整数 */
uint32_t length; /* 元素个数 */
int8_t contents[]; /* 整数数组,保存集合数据,长度通过encoding指定,是2字节、4字节、8字节*/
} intset;

encoding包含三种模式,表示存储的整数大小不同:

src/intset.c

1
2
3
4
5
/* Note that these encodings are ordered, so:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
#define INTSET_ENC_INT16 (sizeof(int16_t)) // 2字节整数,2^16
#define INTSET_ENC_INT32 (sizeof(int32_t)) // 4字节整数,2^32
#define INTSET_ENC_INT64 (sizeof(int64_t)) // 8字节整数,2^64

为了方便查找,Redis会将IntSet中所有的整数按照升序一次保存在contents数组中,结构如图:

image-20221208211958051

数组中,数字5、10、20都在2^16以内,因此encoding使用INTSET_ENC_INT16,长度是3,数组中每个元素占用2个字节。

IntSet动态升级

此时,IntSet每个元素占据2个字节,如果新插入数据,而且数据需要4个字节,超过int16_t的范围,IntSet会自动升级编码方式到合适的大小。流程如下:

  1. 升级编码为INTSET_ENC_INT32,每个整数占4个字节,并按照新的编码方式及元素个数扩容数组(是扩容,Header不会更换地址空间)
  2. 倒序以此将数据中的元素拷贝到扩容后的正确为止(正序扩容会覆盖后面的元素)
  3. 将待添加的元素放入到数组末尾
  4. 将IntSet的encoding属性改为INTSET_ENC_INT32,将length更新

源码:

src/intset.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value); // 获取当前值编码
uint32_t pos; // 要插入的位置
if (success) *success = 1; // 判断成功或是失败

/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
if (valenc > intrev32ifbe(is->encoding)) { // 判断编码是否超过当前的encoding
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value); //
} else {
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}

is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}

_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;

/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

/* Set the value at the beginning or the end. */
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

IntSet可以看作是特殊的整数数组,具备一些特点:

  1. Redis会确保IntSet中的元素唯一、有序
  2. 具备类型升级机制,可以节省内存空间
  3. 底层采用二分查找方式来查询

字典Dict

在Redis这个键值型数据库中,键与值的映射关系正是通过Dict来实现的。

Dict由两部分组成,分别是:哈希表(dict),哈希节点(DictEntry

deps/hiredis/dict.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 节点
typedef struct dictEntry {
void *key; // 键
void *val; // 值
struct dictEntry *next; // 下一个entry的指针
} dictEntry;


// 值类型
typedef struct dictType {
unsigned int (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;

// dict,hash表
typedef struct dict {
dictEntry **table; // entry数组,数组中保存的指向entry的指针
dictType *type; // 哈希表类型,内置不同的hash函数
unsigned long size; // 哈希表大小,总等于 2^n
unsigned long sizemask; // 哈希表大小的掩码,总等于size - 1,用于做hash运算
unsigned long used; // entry 个数
void *privdata; // 私有数据, 做特殊hash运算时用
} dict;

// 字典迭代器,用于迭代dict
typedef struct dictIterator {
dict *ht; // 指向hash表的指针
int index; // 索引
dictEntry *entry, *nextEntry; // 本节点和下一个节点的指针
} dictIterator;

src/dict.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 节点
typedef struct dictEntry {
void *key; // 键
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; // 值,并且可以存储不同的数据类型
struct dictEntry *next; /* Next entry in the same hash bucket. */ // 下一个entry的指针
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
} dictEntry;

// 哈希函数
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(dict *d, const void *key);
void *(*valDup)(dict *d, const void *obj);
int (*keyCompare)(dict *d, const void *key1, const void *key2);
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
/* Allow a dictEntry to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dictEntry is allocated. */
size_t (*dictEntryMetadataBytes)(dict *d);
} dictType;

// dict,hash表
typedef struct dict {
dictEntry **table; // entry数组,数组中保存的指向entry的指针
dictType *type; // 哈希表类型,内置不同的hash函数
unsigned long size; // 哈希表大小,总等于 2^n
unsigned long sizemask; // 哈希表大小的掩码,总等于size - 1,用于做hash运算
unsigned long used; // entry 个数
void *privdata; // 私有数据, 做特殊hash运算时用
} dict;

struct dict {
dictType *type; // 哈希表类型,内置不同的hash函数

dictEntry **ht_table[2]; // entry数组,有两个元素,一个是使用的,另外一个是空的
unsigned long ht_used[2];

long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash进度

/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ // hash是否暂停, >0 代表暂停
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};

// 字典迭代器,用于迭代dict
/* If safe is set to 1 this is a safe iterator, that means, you can call
* dictAdd, dictFind, and other functions against the dictionary even while
* iterating. Otherwise it is a non safe iterator, and only dictNext()
* should be called while iterating. */
typedef struct dictIterator {
dict *d;
long index;
int table, safe;
dictEntry *entry, *nextEntry;
/* unsafe iterator fingerprint for misuse detection. */
unsigned long long fingerprint;
} dictIterator;

向Dict添加键值对时,Redis首先根据key计算出hash值(h),然后利用h&sizemask(其实也就是取模,位运算更快)来计算元素应该存储到数组中的哪个索引位置。

dict中的dictEntry

image-20221208220452732

存储1个数据

image-20221208220525758

如果出现hash冲突,会加入到链表头部,这样可以不需要便利所有链表

image-20221208220539425

Dict扩容

Dict中的HashTable就是数组结合单向链表的实现,当集合中元素过多,导致哈希冲突过多,链表过长,则会降低查询效率。此时会进行Dict扩容。

Dict在每次新增键值对时都会检查负载因子(LoadFactor = used/size),满足以下两种情况时会触发哈希表扩容:

  • 负载因子LoadFactor >= 1,并且服务器没有执行BGSAVE或者BGREWRITEAOF等后台进程
  • 负载因子LoadFactor > 5

src/dict.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0]) &&
(dict_can_resize ||
d->ht_used[0]/ DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht_used[0] + 1);
}
return DICT_OK;
}

Dict收缩

Dict除了扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactor < 0.1时,会做哈希表收缩

src/server.c

1
2
3
4
5
6
7
8
int htNeedsResize(dict *dict) {
long long size, used;

size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}

src/dict.c

1
2
3
4
5
6
7
8
9
10
11
12
/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
unsigned long minimal;

if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht_used[0];
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}

Dict的rehash

不管是收缩还是扩容,必定会创建新的哈希表,导致哈希表的size和sizemask发生变化,而key的查询语sizemask有关,因此必须对哈希表中的每一个key重新计算索引,插入新的哈希表,这个过程称为rehash。

过程如下

  1. 计算新hash表的realeSize,取值取决于当前要做的是扩容还是缩容:

    • 如果是扩容,则新size为第一个大于等于dict.ht[0].used + 12^n
    • 如果是缩容,则新size为第一个大于等于dict.ht[0].used2^n(不得小于4)
  2. 按照新的realeSize申请内存空间,创建dictht,并赋值给dict.ht[1]

  3. 设置dict.rehashidx=0,标识开始rehash

  4. 将老hash表中的每一个dictentry都rehash到新的hash表中

    这个过程是每次执行新增、查询、修改、删除操作时,都要检查下dict.rehashidx是否大于-1,如果是,则rehash一个entry后,都要将rehashidx++,直到所有的entry都rehash

  5. dict.ht[1]赋值给dict.ht[0],给dict.ht[1]初始化为空哈希表,释放原来的dict.ht[0]的内存

  6. 将rehashidx赋值为-1,代表rehash结束

  7. 在rehash过程中,新增操作,直接写入新hash表中,查询、修改和删除则会在老hash表和新hash表依次查找并执行。这样可以确保数据只减,不增,随着rehash最终为空

rehash前

image-20221208225446907

rehash时

image-20221208225539936

rehash后

image-20221208225559504

压缩列表ZipList

ZipList是一种特殊的双端链表,由一系列特殊编码的连续内存块组成。可以在任意一端进行压入、弹出操作,并且该操作的时间复杂度为O(1)。

image-20221208222604830

image-20240416175844630

其中entry并不像普通链表那样记录前后节点的指针,因为记录两个指针要占用16个字节,浪费内存,而是直接记录前一个entry的长度

image-20221208230036374

Encoding编码

ZipList中的entry中的encoding编码分为字符串和整数两种:

  • 字符串:如果encoding0001或者10开头,则content是字符串

    image-20221208230228077

    例如保存字符串ab

    image-20221208230301494

    再保存一个bc

    image-20221208230358306

  • 整数:如果encoding11开头,则content是整数,而且encoding固定值占用1个字节

    image-20221208230526270

    例如,一个ZipList中包含两个整数值:2和5

    image-20221208230620807

源码

src/ziplist.h

1
2
3
4
5
6
7
8
/* Each entry in the ziplist is either a string or an integer. */                // 记录ZipList中的节点信息
typedef struct {
/* When string is used, it is provided with the length (slen). */
unsigned char *sval;
unsigned int slen;
/* When integer is used, 'sval' is NULL, and lval holds the value. */
long long lval;
} ziplistEntry;

src/ziplist.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* We use this function to receive information about a ziplist entry.
* Note that this is not how the data is actually encoded, is just what we
* get filled by a function in order to operate more easily. */
typedef struct zlentry {
unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/ // 记录前一个节点长度的字节数
unsigned int prevrawlen; /* Previous entry len. */ // 前一个节点长度
unsigned int lensize; /* Bytes used to encode this entry type/len. // 记录当前字节长度的字节数
For example strings have a 1, 2 or 5 bytes
header. Integers always use a single byte.*/
unsigned int len; /* Bytes used to represent the actual entry. // 当前字节长度
For strings this is just the string length
while for integers it is 1, 2, 3, 4, 8 or
0 (for 4 bit immediate) depending on the
number range. */
unsigned int headersize; /* prevrawlensize + lensize. */ // 头信息占用字节长度
unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on // 编码,确定数据时数字还是字符串
the entry encoding. However for 4 bits
immediate integers this can assume a range
of values and must be range-checked. */
unsigned char *p; /* Pointer to the very start of the entry, that // ZipList中起始节点指针
is, this points to prev-entry-len field. */
} zlentry;

ZipList的连锁更新问题

ZipList的每个entry都包含前一个节点的长度,长度是1或者5个字节

  • 如果前一节点的长度小于254个字节,则采用1个字节保存这个长度
  • 如果前一节点的长度大于254个字节,则采用5个字节来保存这个长度值,第一个字节以0xfe,后4个字节才是真实长度数据

如果出现,插入某个节点,导致后面节点更改长度,又导致后面节点更改长度,就会出现连续多次空间扩展,也就是连锁更新(Cascade Update),新增、删除都可能导致连锁更新的发生。

ZipList特性:

  1. 压缩列表可以看做一种连续内存空间的双向链表
  2. 列表的节点之间不是通过指针连接,而是记录上一个节点和本节点长度来寻址,内存占用较低
  3. 如果列表数据过多,导致链表过长,可能影响查询性能
  4. 增或删较大数据时,有可能发生连续更新问题

因此,在Redis 7.0版本,通过listpack完全取代ZipList,并且为了兼容性,保留了ziplist的相关属性。通过在entry末尾记录本entry长度,解决连锁更新的问题。

快速列表QuickList

ZipList占据连续内存,当节点过多,申请内存的效率就会很低。为了解决这个问题,引入快速列表QuickList,将多个ZipList通过双向链表连接起来。

QuickList是一个双端链表,链表中的每个节点都是一个ZipList(Redis 7.0中,每个节点是ListPack)。

image-20221209095847218

为了避免QuickList中的每个ZipList中entry过多,Redis通过配置限制

1
2
5) "list-max-ziplist-size"
6) "-2"
  • 如果为正,代表ZipList的允许entry个数的最大值
  • 如果为负,代表ZipList的最大内存大小,分5中情况:
    • -1:每个ZipList的内存占用不能超过4KB
    • -2:每个ZipList的内存占用不能超过8KB,默认值
    • -3:每个ZipList的内存占用不能超过16KB
    • -4:每个ZipList的内存占用不能超过32KB
    • -5:每个ZipList的内存占用不能超过64KB

除了控制ZipList的大小,还可以控制ZipList做压缩,通过配置

1
2
23) "list-compress-depth"
24) "0"

因为链表一般是首位访问较多,所以首位是不压缩的。这个参数就是控制首位不压缩的节点个数:

  • 0:特殊值,代表不压缩,默认值,不压缩内存申请效率高,但是超找效率不如压缩
  • 1:代表QuickList首位各有1个节点不压缩,中间节点压缩
  • 2:代表QuickList首位各有2个节点不压缩,中间节点压缩
  • 以此类推

QuickList源码

src/quicklist.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/* quicklistNode is a 32 byte struct describing a listpack for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max lp bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, PLAIN=1 (a single item as char array), PACKED=2 (listpack with multiple items).
* recompress: 1 bit, bool, true if node is temporary decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode { // QuickList中的节点结构体
struct quicklistNode *prev; // 前一个节点指针
struct quicklistNode *next; // 后一个节点指针
unsigned char *entry; // 当前节点指针
size_t sz; /* entry size in bytes */ // 当前节点ZipList的字节大小
unsigned int count : 16; /* count of items in listpack */ // 当前节点ZipList中entry的个数
unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 编码方式,1是ZipList,2是LZP压缩模式
unsigned int container : 2; /* PLAIN==1 or PACKED==2 */ // 数据容器类型,1是PLAIN,2是PACKED
unsigned int recompress : 1; /* was this node previous compressed? */ // 是否被压缩,1说明被解压了,后续要重新压缩
unsigned int attempted_compress : 1; /* node can't compress; too small */ // 测试用
unsigned int dont_compress : 1; /* prevent compression of entry that will be used later */
unsigned int extra : 9; /* more bits to steal for future usage */ // 预留字段
} quicklistNode;

/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: 0 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor.
* 'bookmarks are an optional feature that is used by realloc this struct,
* so that they don't consume memory when not used. */
typedef struct quicklist { // QuickList 结构体
quicklistNode *head; // 头节点指针
quicklistNode *tail; // 尾结点指针
unsigned long count; /* total count of all entries in all listpacks */ // 所有ZipList中entry数量
unsigned long len; /* number of quicklistNodes */ // ZipList的数量
signed int fill : QL_FILL_BITS; /* fill factor for individual nodes */ // ZipList上限,默认值 -2
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ // 首位不压缩的节点数量
unsigned int bookmark_count: QL_BM_BITS; // 内存重分配时的书签数量及数组,一般用不到
quicklistBookmark bookmarks[];
} quicklist;

示意图如下:

image-20221209103834488

跳表SkipList

在QuickList中,通过首位访问是比较快的,但是如果需要访问中间元素,则需要进行从首或者尾进行遍历,遍历到某个ZipList之后,在ZipList中可以二分查找,但是如果QuickList非常长,这个遍历效率会非常低。

因此,当QuickList中节点过多时,需要引入SkipList。SkipList,跳表,是一个链表,但是与传统链表有几点差异:

  • 元素按照升序排列存储
  • 节点可以包含多个指针,指针跨度不同

image-20221209104717967

SkipList中,最大可以允许32级指针。

src/server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode { // SkipList中节点信息
sds ele; // 节点存储的值
double score; // 节点分数,用于排序、查找
struct zskiplistNode *backward; // 前一个节点指针
struct zskiplistLevel { // 层级信息
struct zskiplistNode *forward; // 下一个节点指针
unsigned long span; // 索引跨度
} level[]; // 保存所有层级的指针信息
} zskiplistNode;

typedef struct zskiplist { // SkipList数据结构
struct zskiplistNode *header, *tail; // 头节点、尾结点指针
unsigned long length; // 节点数量
int level; // 最大索引层级,默认值1
} zskiplist;

image-20221209112523056

SkipList特点:

  • 跳跃表是一个双向链表,每个节点包含scoreele值,也就是打分和值
  • 节点按照score值排序,score值一样则按照ele字典排序
  • 每个节点都可以包含多层指针,层数是1到32之间的随机数
  • 不同层指针到下一个节点的跨度不同,层级越高,跨度越大
  • 增删改查效率与红黑树基本一致,实现却更加简单

RedisObject

Redis中任意数据类型的键值都会被封装为一个RedisObject,也叫做Redis对象

src/server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef struct redisObject {
unsigned type:4; // 对象类,数据类型,从0-4有5种,占4个bit
unsigned encoding:4; // 数据编码,同一个数据类型,也会有不同的数据编码,占4个bit
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or // 表示该对象最后一次被访问的时间,占用24个bit,便于判断空闲时间太久的key
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount; // 对象引用计数器,有对象引用+1,没有对象引用-1,为0代表没有应用引用,则可回收
void *ptr; // 指针,指向实际存放数据的空间
} robj; // 对象头部,占用16个字节

// type 数据类型
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */ // int embstr raw
#define OBJ_LIST 1 /* List object. */ // QuickList
#define OBJ_SET 2 /* Set object. */ // IntSet,HashTable
#define OBJ_ZSET 3 /* Sorted set object. */ // ListPack,SkipList
#define OBJ_HASH 4 /* Hash object. */ // ListPack,HashTable

// 数据编码
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */ // 动态字符串的一种方式
#define OBJ_ENCODING_INT 1 /* Encoded as integer */ // long类型的整数的字符串
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */ // hash表,也就是字典dict
#define OBJ_ENCODING_ZIPMAP 3 /* No longer used: old hash encoding. */ // 已废弃
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */ // 双端链表,已废弃
#define OBJ_ENCODING_ZIPLIST 5 /* No longer used: old list/hash/zset encoding. */ // Redis 7中已经不使用ZipList
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ // 整数集合
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ // 跳表
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ // 动态字符串的一种方式
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of listpacks */ // 快速列表
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */ // Stream 流
#define OBJ_ENCODING_LISTPACK 11 /* Encoded as a listpack */ // Redis 7中使用ListPack代替ZipList

五种数据类型

Redis 6.0的数据类型、编码类型和底层数据结构对应

image-20220618182945035

String

string是Redis中最常见的数据存储类型:

  • 基本编码方式是RAW,基于SDS实现,存储上限是512MB

    RAW编码:

    image-20221209115226190

  • 如果SDS长度小于44字节,则会采用EMBSTR编码,此时object haed与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。

    EMBSTR编码:

    此时RedisObject占用16个字节,SDS占用44+头部的3个字节+尾部\0的1个字节,一共64字节,内存分配时不会有内存碎片。因此推荐String格式的字符串长度不要超过44个字节。

    image-20221209115351780

  • 如果存储的字符串是整数值,并且大小在LONG_MAX范围内,则会采用INT编码:直接将数据保存在RedisObject的ptr指针位置(正好8字节),不再需要SDS

    INT编码:

    image-20240416180017673

1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> set raw aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
OK
127.0.0.1:6379> set emb abc
OK
127.0.0.1:6379> set int 123
OK
127.0.0.1:6379> OBJECT encoding raw
"raw"
127.0.0.1:6379> OBJECT encoding emb
"embstr"
127.0.0.1:6379> OBJECT encoding int
"int"

List

Redis的List类型可以从首、尾操作列表中的元素,满足的数据结构是QuickList,包含LinkedList和ZipList的特点,节点和节点之间通过双向链表访问,内存占用较低,每个节点内包含多个ZipList,存储上限高。

  • 3.2版本之前,Redis采用LinkedList和ZipList来实现List,当元素数量小于512个,并且元素大小小于64字节时,采用ZipList编码,否则使用LinkedList编码
  • 3.2版本之后,Redis同意使用QuickList来实现List

src/t_list.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// LPUSH/RPUSH/LPUSHX/RPUSHX 命令,都是调用pushGenericCommand
/* LPUSH <key> <element> [<element> ...] */
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD,0);
}

/* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.
* 'xx': push if key exists. */
void pushGenericCommand(client *c, int where, int xx) {
int j;
// 尝试找到KEY对应的List
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
// 检查类型是否正确
if (checkType(c,lobj,OBJ_LIST)) return;
// 检查是否为空
if (!lobj) {
if (xx) {
addReply(c, shared.czero);
return;
}
// 为空,创建QuickList
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_listpack_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}

for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
server.dirty++;
}

addReplyLongLong(c, listTypeLength(lobj));

char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}

src/object.c

1
2
3
4
5
6
7
8
9
robj *createQuicklistObject(void) {
// 申请内存并初始化QuickList
quicklist *l = quicklistCreate();
// 创建RedisObject,type为 OBJ_LIST,ptr指向QuickList
robj *o = createObject(OBJ_LIST,l);
// 设置编码为QuickList
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}

image-20221209143255162

Set

Set是Redis中的单列集合,满足以下特点:

  • 不保证有序性
  • 保证元素唯一性(可以判断元素是否存在)
  • 求交集、并集、差集

满足这种查询元素高效率的要求的数据结构,是HashTable,也就是Dic,不过Dic是双列集合(可以存键值对)

  • 为了查询效率和唯一性,Set采用HT编码(Dict)。Dict中的key可以用来存储元素,value统一为null。

  • 当存储的所有数据都是整数,并且元素数量不超过set-max-intset-entries时,Set会采用IntSet编码,以节省内存

    1
    2
    3
    127.0.0.1:6379> CONFIG GET *max-intset*
    1) "set-max-intset-entries"
    2) "512"

src/t_set.c

1
2
3
4
5
6
7
8
9
10
11
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(sds value) {
// 判断value是否是数值类型 LongLong
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
// 如果是数值类型,则采用IntSet编码
return createIntsetObject();
// 如果不是,则采用默认编码,也就是HT
return createSetObject();
}

src/object.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建IntSet编码的RedisObject
robj *createIntsetObject(void) {
// 初始化IntSet,并且申请内存
intset *is = intsetNew();
// 创建RedisObject
robj *o = createObject(OBJ_SET,is);
// 指定编码为OBJ_ENCODING_INTSET
o->encoding = OBJ_ENCODING_INTSET;
return o;
}

// 创建HT编码的RedisObject
robj *createSetObject(void) {
// 初始化Dict类型,并申请内存
dict *d = dictCreate(&setDictType);
// 创建createObject
robj *o = createObject(OBJ_SET,d);
// 设置编码为OBJ_ENCODING_HT
o->encoding = OBJ_ENCODING_HT;
return o;
}

src/t_set.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
// set中增加元素,会判断长度,过长会转变为OBJ_ENCODING_HT
int setTypeAdd(robj *subject, sds value) {
long long llval;
// 已经是HT编码,直接添加元素
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
// 是 IntSet 编码
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
// 判断数据类型,如果是LongLong
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
// 当IntSet元素数量超出set_max_intset_entries,则转换成IntSet
size_t max_entries = server.set_max_intset_entries;
/* limit to 1G entries due to intset internals. */
if (max_entries >= 1<<30) max_entries = 1<<30;
if (intsetLen(subject->ptr) > max_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* Failed to get integer from object, convert to regular set. */
// 不是整数,直接转换成 HT
setTypeConvert(subject,OBJ_ENCODING_HT);

/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}

插入整数

image-20221209144539245

改为HT

image-20221209144619653

Zset

ZSet也就是SortedSet,其中每一个元素都要指定一个score值和member指:

  • 可以根据score值排序
  • member必须唯一
  • 可以根据member查询分数

因此,zset底层数据结构必须满足键值存储、键必须唯一、可排序这几个需求。zset通过Dict和SkipList结合实现。

src/server.h

1
2
3
4
5
6
typedef struct zset {
// Dict 指针
dict *dict;
// SkipList 指针
zskiplist *zsl;
} zset;

src/object.c

1
2
3
4
5
6
7
8
9
10
11
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
// 创建Dict
zs->dict = dictCreate(&zsetDictType);
// 创建SkipList
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}

image-20221209145524270

当元素数量不多时,HT和SkipList的优势不明显,而且更耗内存。因此zset还会采用ZipList结构来节省内存,不过需要同时满足两个条件:

  1. 元素数量小于zset-max-ziplist-entries(Redis 7的配置是zset-max-listpack-entries),默认是128
  2. 每个元素的长度都小于zset-max-ziplist-value(Redis 7的配置是zset-max-listpack-value),默认是64

src/t_zset.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/

/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements, ch = 0;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */

/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
else break;
scoreidx++;
}

/* Turn options into simple to check vars. */
int incr = (flags & ZADD_IN_INCR) != 0;
int nx = (flags & ZADD_IN_NX) != 0;
int xx = (flags & ZADD_IN_XX) != 0;
int gt = (flags & ZADD_IN_GT) != 0;
int lt = (flags & ZADD_IN_LT) != 0;

/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */

/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}

if ((gt && nx) || (lt && nx) || (gt && lt)) {
addReplyError(c,
"GT, LT, and/or NX options at the same time are not compatible");
return;
}
/* Note that XX is compatible with either GT or LT */

if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}

/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}

/* Lookup the key and create the sorted set if does not exist. */
// zadd 添加元素时,先根据key找到zset,不存在则创建新的key
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
// zset不存在时
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_listpack_entries == 0 ||
server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
// zset_max_listpack_entries 为0,则是仅用了ListPack
// 或者超过 zset_max_listpack_value ,使用createZsetObject,也就是 HT + SkipList
{
zobj = createZsetObject();
} else {
// 反之,则使用ListPack
zobj = createZsetListpackObject();
}
dbAdd(c->db,key,zobj);
}

for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = 0;

ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_OUT_ADDED) added++;
if (retflags & ZADD_OUT_UPDATED) updated++;
if (!(retflags & ZADD_OUT_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);

reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}

cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}

src/object.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建zset对象
robj *createZsetObject(void) {
// 申请内存
zset *zs = zmalloc(sizeof(*zs));
robj *o;
// 创建dict
zs->dict = dictCreate(&zsetDictType);
// 创建SkipList
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}

// 创建ListPack对象
robj *createZsetListpackObject(void) {
// 申请内存
unsigned char *lp = lpNew(0);
// 创建ListPack对象
robj *o = createObject(OBJ_ZSET,lp);
o->encoding = OBJ_ENCODING_LISTPACK;
return o;
}

在给zset中添加元素时,也会进行判断

src/t_zset.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/* Add a new element or update the score of an existing element in a sorted
* set, regardless of its encoding.
*
* The set of flags change the command behavior.
*
* The input flags are the following:
*
* ZADD_INCR: Increment the current element score by 'score' instead of updating
* the current element score. If the element does not exist, we
* assume 0 as previous score.
* ZADD_NX: Perform the operation only if the element does not exist.
* ZADD_XX: Perform the operation only if the element already exist.
* ZADD_GT: Perform the operation on existing elements only if the new score is
* greater than the current score.
* ZADD_LT: Perform the operation on existing elements only if the new score is
* less than the current score.
*
* When ZADD_INCR is used, the new score of the element is stored in
* '*newscore' if 'newscore' is not NULL.
*
* The returned flags are the following:
*
* ZADD_NAN: The resulting score is not a number.
* ZADD_ADDED: The element was added (not present before the call).
* ZADD_UPDATED: The element score was updated.
* ZADD_NOP: No operation was performed because of NX or XX.
*
* Return value:
*
* The function returns 1 on success, and sets the appropriate flags
* ADDED or UPDATED to signal what happened during the operation (note that
* none could be set if we re-added an element using the same score it used
* to have, or in the case a zero increment is used).
*
* The function returns 0 on error, currently only when the increment
* produces a NAN condition, or when the 'score' value is NAN since the
* start.
*
* The command as a side effect of adding a new element may convert the sorted
* set internal encoding from listpack to hashtable+skiplist.
*
* Memory management of 'ele':
*
* The function does not take ownership of the 'ele' SDS string, but copies
* it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
*out_flags = 0; /* We'll return our response flags. */
double curscore;

/* NaN as input is an error regardless of all the other parameters. */
if (isnan(score)) {
*out_flags = ZADD_OUT_NAN;
return 0;
}

/* Update the sorted set according to its encoding. */ // 判断编码方式,如果是ListPack
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *eptr;

// 判断当前元素是否已经存在,已经存在
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}

/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

if (newscore) *newscore = score;

/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
// 如果元素不存在
/* check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
// 判断新增1个元素之后,ziplist长度,如果大于zset_max_listpack_entries,或者值大于zset_max_listpack_value
if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries ||
sdslen(ele) > server.zset_max_listpack_value ||
!lpSafeToAdd(zobj->ptr, sdslen(ele)))
{
// 转换成SkipList
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
} else {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
}
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
}

/* Note that the above block handling listpack would have either returned or
* converted the key to skiplist. */ // 编码方式,如果是SkipList,无需转换
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;

de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

curscore = *(double*)dictGetVal(de);

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}

/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

if (newscore) *newscore = score;

/* Remove and re-insert when score changes. */
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*out_flags |= ZADD_OUT_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}

ZipList本身没有排序功能,而且没有键值对的概念,因此需要实现zset,则:

  • 将score和element作为紧挨在一起的两个entry,element在前,score在后
  • score越小,越接近队首,score越大越接近队尾,按照score升序排列

image-20221210201059649

Hash

Hash结构与Redis中的Zset非常像:

  • 都是键值对
  • 都需要根据键获取值
  • 键必须唯一

区别如下:

  • zset的键是member,值是score,hash键和值都是任意值
  • zset要根据score排序,hash则无需排序

因此,Hash底层采用的编码与Zset基本一致,只需要把排序有关的SkipList去掉即可。

  • Hash结构默认采用ZipList编码(Redis 7改成ListPack),用以节省内存。ZipList中相邻的两个entry分别保存fieldvalue
  • 当数据量较大时,Hash结构会转换为HT编码,也就是Dict,条件有两个:
    1. ZipList中元素个数超过hash-max-ziplist-entries(默认512),Redis 7为 hash-max-listpack-entries
    2. ZipList中任意entry大小长度超过hash-max-ziplist-value(默认64Byte),Redis 7 为 hash-max-listpack-value

ZipList的方式:

image-20221210201749136

Hash的方式:

image-20221210201815252

src/t_hash.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
void hsetCommand(client *c) {
int i, created = 0;
robj *o;

if ((c->argc % 2) == 1) {
addReplyErrorArity(c);
return;
}

// 判断hash的key是否存在,不存在则创建一个新的,默认采用ListPack编码
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
// 判断是否需要将ListPack转为Dict
hashTypeTryConversion(o,c->argv,2,c->argc-1);

for (i = 2; i < c->argc; i += 2)
// 这里循环遍历命令中的键值,一个一个插入
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

/* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[0]->ptr;
if (cmdname[1] == 's' || cmdname[1] == 'S') {
/* HSET */
addReplyLongLong(c, created);
} else {
/* HMSET */
addReply(c, shared.ok);
}
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty += (c->argc - 2)/2;
}

robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
// 查询key
robj *o = lookupKeyWrite(c->db,key);
// 判断类型,如果是Hash,则返回
if (checkType(c,o,OBJ_HASH)) return NULL;
// 如果没找到,则创建Hash
if (o == NULL) {
o = createHashObject();
dbAdd(c->db,key,o);
}
return o;
}

/* Check the length of a number of objects to see if we need to convert a
* listpack to a real hash. Note that we only check string encoded objects
* as their string length can be queried in constant time. */
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
size_t sum = 0;
// 先判断是不是ListPack
if (o->encoding != OBJ_ENCODING_LISTPACK) return;

// 依次遍历添加进来的元素中的field和value
for (i = start; i <= end; i++) {
if (!sdsEncodedObject(argv[i]))
continue;
size_t len = sdslen(argv[i]->ptr);
// 判断长度 如果大于 hash_max_listpack_value ,则将类型转换成HT
if (len > server.hash_max_listpack_value) {
hashTypeConvert(o, OBJ_ENCODING_HT);
return;
}
sum += len;
}
// 如果 总大小,超过1G,也转为HT
if (!lpSafeToAdd(o->ptr, sum))
hashTypeConvert(o, OBJ_ENCODING_HT);
}

/* Add a new field, overwrite the old with the new value if it already exists.
* Return 0 on insert and 1 on update.
*
* By default, the key and value SDS strings are copied if needed, so the
* caller retains ownership of the strings passed. However this behavior
* can be effected by passing appropriate flags (possibly bitwise OR-ed):
*
* HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function.
* HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function.
*
* When the flags are used the caller does not need to release the passed
* SDS string(s). It's up to the function to use the string to create a new
* entry or to free the SDS string before returning to the caller.
*
* HASH_SET_COPY corresponds to no flags passed, and means the default
* semantics of copying the values if needed.
*
*/
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;

/* Check if the field is too long for listpack, and convert before adding the item.
* This is needed for HINCRBY* case since in other commands this is handled early by
* hashTypeTryConversion, so this check will be a NOP. */
// 判断编码是ListPack
if (o->encoding == OBJ_ENCODING_LISTPACK) {
// 判断长度是否大于 hash_max_listpack_value ,或者值大于 hash_max_listpack_value
if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value)
hashTypeConvert(o, OBJ_ENCODING_HT);
}

if (o->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *zl, *fptr, *vptr;

zl = o->ptr;
fptr = lpFirst(zl);
if (fptr != NULL) {
fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
/* Grab pointer to the value (fptr points to the field) */
vptr = lpNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;

/* Replace value */
zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value));
}
}

if (!update) {
/* Push new field/value pair onto the tail of the listpack */
zl = lpAppend(zl, (unsigned char*)field, sdslen(field));
zl = lpAppend(zl, (unsigned char*)value, sdslen(value));
}
o->ptr = zl;

/* Check if the listpack needs to be converted to a hash table */
// 插入之后,再次校验长度是否大于hash_max_listpack_entries
if (hashTypeLength(o) > server.hash_max_listpack_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictFind(o->ptr,field);
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
}

/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}

src/object.c

1
2
3
4
5
6
7
8
9
robj *createHashObject(void) {
// 创建Hash,申请内存,默认使用ListPack编码的对象
unsigned char *zl = lpNew(0);
// 编码方式是Hash
robj *o = createObject(OBJ_HASH, zl);
// 设置编码是ListPack
o->encoding = OBJ_ENCODING_LISTPACK;
return o;
}

Redis网络模型

用户空间和内核空间

服务器大多采用Linux系统,内核也是Linux,应用都是需要通过Linux内核与硬件交互。

image-20221210203200225

因此,服务器在运行过程中,内核也需要使用到一定的CPU资源和内存资源。

为了避免用户应用导致冲突甚至内核崩溃,用户应用于内核是分离的:

  • 进程的寻址空间会划分为两部分:内核空间、用户空间

    应用或者说内核是无法直接访问物理内存的,而是通过操作虚拟内存空间,映射到物理内存。虚拟内存的地址可以通过一个无符号整形指定,不同位数操作系统的虚拟内存范围不一样,例如32位操作系统最大虚拟内存地址是2^32,也就是4GB。

    内核空间和用户空间的划分,是使用高位的1GB划分为内核空间,地位的3GB划分为用户空间。

    当进程运行在用户空间时,则称为是用户态;

    当进程运行在内核空间时,则称为是内核态。

    image-20221210203828934

  • 用户空间只能执行受限的命令(Ring3),而且不能直接调用系统资源,必须通过内核提供的接口来访问

  • 内核空间可以执行特权命令(Ring0),调用一切系统资源

例如使用IO资源,就会经过内核态到用户态(读取)、用户态到内核态(写入)的切换。inux系统为了提高IO效率,会在用户空间和内核空间都加入缓冲区:

  • 写数据时,要把用户缓冲区的数据,拷贝到内核缓冲区,然后写入设别
  • 读数据时,要把设备的数据,读取到内核缓冲区,然后拷贝到用户缓冲区

image-20221210204124936

也就是说,应用处理数据的性能,可以从这两个方面进行优化:

  1. 减少数据从用户态到内核态的切换
  2. 减少数据在网络栈中的等待时间

为了在这两个方面进行优化,操作系统实现五种IO模型。

五种IO模型

image-20221210204444011

例如读取数据的过程:

  1. 应用无法直接操作硬件设备,只能通过内核的接口进行调用,因此需要先等待数据就绪
  2. 内核缓冲区数据就绪之后,将内核缓冲区的数据读取到用户缓冲区

针对这种情况,在《UNIX网络编程》一书中,总结归纳了五种IO模型:

  • 阻塞IO(Blocking IO)
  • 非阻塞IO(Nonblocking IO)
  • IO多路复用(IO Multiplexing)
  • 信号驱动IO(Signal Driven IO)
  • 异步IO(Asynchronous IO)

阻塞IO

阻塞IO在两个阶段(读取数据时等待内核缓冲区数据就绪;拷贝数据时等待数据拷贝就绪),都必须阻塞等待

image-20221210204855343

在等待内核缓冲区数据就绪的过程中,即使内核返回没有数据,阻塞IO也不会报错,而是一直等待数据就位。

非阻塞IO

非阻塞IO的recvfrom操作,会立即返回结果,而不是阻塞用户进程。

image-20221210210608387

在内核返回没有数据时,系统调用过程直接返回一个失败的信息,并且再次重试。在等待内核缓冲区数据就绪的过程中,非阻塞IO不会阻塞。

一方面,在等待数据的过程中,循环获取内核缓冲区状态,还是会消耗CPU资源,而且数据就绪时,从内核拷贝数据到用户空间的过程,还是处于阻塞状态。因此,非阻塞IO不一定会比阻塞IO要好。

IO多路复用

无论是阻塞IO还是非阻塞IO,用户应用在一阶段都需要调用revcfrom来获取数据,差别在于无数据时的处理方案:

  • 如果调用revcfrom时,恰好没有数据,阻塞IO会使进程阻塞,非阻塞IO使CPU空转,都不能充分发挥CPU的作用
  • 如果调用revcfrom时,恰好有数据,则用户进程可以直接进入第二阶段,读取并处理数据

比如服务端处理客户端Socket请求时,在单线程情况下,只能一次处理每一个Socket,如果正在处理Socket恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有其他客户端Socket都必须等待,性能自然会很差。

就像服务员给顾客点餐,分两步:

image-20221210211253081

  1. 顾客思考要吃什么(等待数据就绪)
  2. 顾客想好了,开始点餐(读取数据)

提升效率的方法:

  1. 增加更多服务员;也就是使用多线程,使用多颗CPU
  2. 不排队,谁想好了吃什么,服务员就给谁点餐;也就是数据就绪之后,用户应用就去读取数据

使用方案2会更好一点,但是有一个前提,就是用户进程需要知道内核中缓冲数据是否已经就绪。

文件描述符(File Descriptor):简称FD,是一个从0开始递增的无符号整数,用来关联Linux中的一个文件。

在Linux中,一切皆文件,例如常规文件、视频、硬件设备等,也包括网络套接字(Socket)

IO多路复用:是利用单个线程来同时监听多个FD,并在某个FD可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。

image-20221210211744633

相比阻塞IO和非阻塞IO而言,使用图中的SELECT模式,可以一次性监听多个FD,有一个FD就绪,则立马返回,然后进入阶段二。

监听FD的方式、通知的方式又有多种实现,常见的有:

  • select
  • poll
  • epoll

差异:

  • select和poll,只会通知用户进程有FD就绪,但不确定具体是哪个FD,需要用户进程遍历FD来确认
  • epoll则会通知用户进程FD就绪的同事,把已就绪的FD写入用户空间

IO多路复用-select

select是Linux中最早的IO多路复用实现方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 定义类型别名 __fd_mask,本质是 long int ,占4个字节,32bit
typedef long int __fd_mask;

// fd_set 记录要监听的fd集合,及其对应状态
typedef struct {
// fds_bits 是一个long类型数组,长度是 1024/32 = 32
// 长度32*每一个元素的长度是32bit,等于1024bit,一个bit记录一个FD,也就是一个select最多可以监听1024个FD
// 一共 1024个bit位,每个bit代表一个fd,0代表未就绪,1代表就绪
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
// ...
} fd_set;

// select 函数,用于监听多个fd的集合
int select(
int nfds, // 要监听的fd_set的最大fd+1
// Linux将IO的事件归为三类,可读、可写、异常,因此,fd状态也分为三类
fd_set *readfds, // 要监听读事件的fd集合
fd_set *writefds, // 要监听写事件的fd集合
fd_set *exceptfds, // 要监听异常事件的fd集合
struct timeval *timeout // 超时时间:null为永不超时;0为不阻塞等待;大于0为等待的时间
)

整个过程如下:

image-20221210214947102

其中:

  • 需要将fd_set从用户空间拷贝到内核空间
  • 当有任意fd就绪,则内核空间将结果写入fd_set到,而且是便利写入,将没有就绪的fd覆盖掉
  • 再将内核空间的fd_set拷贝到用户空间,并且覆盖原先的fd_set
  • 用户空间便利fd_set获取到准备就绪的fd,进行处理

select模式存在的问题:

  • 需要将整个fd_set从用户空间拷贝到内核空间,select结束还要再次拷贝回用户空间;这是影响select性能的重要原因
  • select无法得知具体是哪个fd就绪,需要便利整个fd_set(结构体中记录的bit位)
  • fd_set监听的fd数量不能超过1024(代码已经固化)

IO多路复用-poll

poll模式对select模式做了简单改进,但性能提升不明显,部分关键代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// pollfd 中的事件类型
#define POLLIN 0x0001 // 可读事件
#define POLLPRI 0x0002
#define POLLOUT 0x0004 // 可写事件
#define POLLERR 0x0008 // 错误事件
#define POLLHUP 0x0010
#define POLLNVAL 0x0020 // fd未打开

// pollfd结构
struct pollfd {
int fd; // 要监听的fd
short events; // 要监听的事件类型:读、写、异常
short revents; // 实际发生的事件类型
};

// poll函数
int poll(
struct pollfd *fds, // pollfd数组,可以自定义大小
nfds_t nfds, // 数组元素个数
int timeout // 超时时间
)

IO流程:

  1. 创建pollfd数组,向其中添加关注的fd信息,数组大小自定义
  2. 调用poll函数,将pollfd数组拷贝到内核空间,转链表存储,无上限
  3. 内核便利fd,判断是否就绪
  4. 数据就绪或超时后,拷贝pollfd数据到用户空间,返回就绪fd数量n
  5. 用户进程判断n是否大于0
  6. 大于0则便利pollfd数组,找到就绪的fd

与select对比:

  • select模式中fd_set大小固定1024,而pollfd在内核中采用链表,理论上无上限
  • 监听fd越多,每次便利消耗时间也越久,性能反而会下降

主要解决了select的fd个数限制问题,但是依然需要便利fd,也需要将fd进行拷贝。

IO多路复用-epoll

源码:https://github.com/liweiwang1993/eventpoll/blob/master/eventpoll.cpp

epoll模式是对select和poll的改进,提供三个函数:

1
2
3
4
5
6
7
8
struct eventpoll {
// 截取部分
/* List of ready file descriptors */
struct list_head rdllist; // 一个链表,记录就绪的FD

/* RB tree root used to store monitored fd structs */
struct rb_root rbr; // 一颗红黑树,记录要监听的FD
};
1
2
// 1. 会在内核创建eventpoll结构体,返回对应的句柄epfd
int epoll_create(int size);
1
2
3
4
5
6
7
8
// 2. 将一个FD添加到epoll的红黑树中,并设置ep_poll_callback
// callback触发时,就把对应的FD加入到rdlist这个就绪列表中
epoll_ctl(
int epfd, // epoll实例的句柄
int op, // 要执行的操作,包括:增ADD、改MOD、删DEL
int fd, // 要监听的FD
struct epoll_event *event // 要监听的事件类型:读、写、异常等
);
1
2
3
4
5
6
7
// 3. 检查rdlist列表是否为空,不为空则返回就绪FD的数量
int epoll_wait(
int epfd, // eventpoll实例的句柄
struct epoll_event *events, // 空event数组,用于接受就绪的FD
int maxevents, // events数组的最大长度
int timeout // 超时时间;-1永不超时;0不阻塞;大于0位阻塞时间
);

image-20221210221442752

epoll通过在内核空间的红黑树和链表,不需要拷贝fd,而且红黑树也没有fd个数限制,数量很多的时候也不会对红黑树上增、删、查找速度造成很大影响。通过将就绪fd拷贝到events中减少fd拷贝,epoll_wait只处理就绪的fd链表,不需要遍历fd,解决select和poll的问题。

IO多路复用-事件通知机制

当FD有数据可读时,调用epoll_wait就可以得到通知,但是时间通知的模式有两种:

  • LevelTriggered:简称LT。当FD有数据可读时,会重复通知多次,直到数据处理完成。是epoll的默认模式。
  • EdgeTriggered:简称ET。当FD有数据可读时,只会被通知一次,不管数据是否处理完成。

例如:

  1. 一个客户端Socket对应的FD已经注册到了epoll实例中
  2. 客户端Socket发送2KB的数据
  3. 服务端调用epoll_wait,得到通知说FD就绪
  4. 服务端FD读取到1KB数据
  5. 如果是LT,回到步骤3,再次调用epoll_wait,形成循环
  6. 如果是ET,另外1KB的数据就无法被读取到

其实是在epoll过程中,当有fd就绪,链表中的fd拷贝到用户态之后,这些FD会从链表中删除,然后检查通知模式,如果是LT,则会将未处理完的FD添加回链表中,如果是ET,则不会。

如果选择ET,出现这种情况时,

方案1:可以通过手动调用epoll_ctl检查数据是否读完,没有读完,再手动将数据添加到链表中;

方案2:再步骤4时,使用循环读取,数据没有读取完,不返回;(这种方式需要注意,需要使用非阻塞的方式)

LT的方式可能出现的问题:

1、重复发送会有性能影响

2、多个线程处理情况下,可能导致惊群现象

IO多路复用在web服务上的例子

基于epoll模式的web服务的基本流程图:

image-20221210223240610

信号驱动IO

信号驱动IO使与内核建立SIGIO的信号关联并设置回调,当内核有FD就绪时,会发出SIGIO信号通知用户,期间用户应用可以执行其他业务,无需阻塞等待。

image-20221210223454987

这种模式,才是真正的非阻塞模式,因为根本不需要等待内核数据就绪,而是通过回调的形式,内核通知用户进程。

这种模式存在的问题:

  1. 当有大量IO操作时,信号较多,SIGIO处理函数不能及时处理,可能导致信号队列溢出
  2. 内核空间与用户空间的频繁信号交互,性能也比较低

异步IO

异步IO的整个过程都是非阻塞的。通过一个注册一个函数到内核空间,在内核空间中的数据就绪,并且拷贝到用户空间之后,执行这个函数通知用户进程去处理数据。

image-20221210223732777

这种模式,可能出现的问题:

  1. 当数据量很大,占用内存过多时,性能不高,而且可能导致函数调用发生异常,

区分同步和异步

IO操作时同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作),也就是阶段二是同步还是异步:

image-20221210224114125

Redis网络模型

Redis的核心业务部分,也就是命令处理,此时Redis是单线程。但是整个Redis是多线程。

在Redis 4.0时,引入多线程异步处理一些耗时较长的任务,例如异步删除命令unlinkbgsaverewrite

在Redis 6.0时,核心网络模型中引入多线程,进一步提高对于多个CPU的利用率

Redis核心业务使用单线程的原因:

  • Redis是纯内存操作,执行速度非常快,性能瓶颈时网络延迟,而不是执行速度,因此多线程并不会带来巨大的性能提升
  • 多线程会导致过多的上下文切换,带来不必要的开销
  • 引入多线程会面临线程安全问题,必然要引入线程锁这样的安全手段,实现复杂度增高,而且性能也会大打折扣

Redis通过IO多路复用来提高网络性能,并且支持各种不同的多路复用实现,并且将这些实现进行封装,提供统一的高性能事件库API库AE:

以6.2版本为例

image-20221210224832286

image-20221210224908284

实际调用的IO多路复用方式

src/ae.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT // 判断是否支持HAVE_EVPORT,如果支持,调用ae_evport.c,下面的代码同理
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL // 判断是否支持HAVE_EPOLL,如果支持,调用ae_epoll.c,
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE // 判断是否支持HAVE_KQUEUE,如果支持,调用ae_kqueue.c,
#include "ae_kqueue.c"
#else // 如果以上都不支持,就调用ae_select.c
#include "ae_select.c"
#endif
#endif
#endif

Redis网络模型的流程:

image-20221210225515989

image-20221210225539093

单线程流程:

image-20221210225853658

Redis 6.0版本引入多线程,目的是为了提高IO读写效率。因此在解析客户端命令、写响应结果时采用了多线程。核心的命令执行、IO多路复用模块,依然是由主线程执行。

多线程流程:

image-20221210225730118

image-20221210225616051

image-20221210225655449

Redis通信协议

RESP协议

Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub)

  1. 客户端向服务端发送一条命令
  2. 服务端解析并执行命令,返回相应结果给客户端

因此客户端发送命令的格式、服务端响应结果的格式,必须有一个规范,这个规范就是通信协议。

Redis中采用RESP(Redis Serialization Protocl)协议:

  • Redis 1.2版本引入RESP协议
  • Redis 2.0版本中成为与Redis服务端通信的标准,称为RESP2
  • Redis 6.0版本中,从RESP2升级到RESP3协议,增加了更多数据类型并且支持6.0的新特性–客户端缓存。

这里以RESP2为例。

数据类型

RESP中,通过首字节的字符来区分不同的数据类型,常用的数据类型包括5中:

  • 单行字符串:首字节是+,后面跟上单行字符串,以CRLF(\r\n),例如返回 OK : +OK\r\n

  • 错误(Errors):首字节是-,与单行字符串格式一样,只是字符串是异常信息,例如:-Error message\r\n

  • 数值:首字节是:,后面跟上数字格式的字符串,以CRLF结尾,例如::10\r\n

  • 多行字符串:首字节是$,表示二进制安全的字符串,最大支持512MB:

    • 如果大小为0,则代表空字符串:$0\r\n\r\n
    • 如果大小为-1,则代表不存在:$-1\r\n
  • 数组:首字节是*,后面跟上数组元素个数,再跟上元素,元素数据类型不限

    image-20221210230629790

Redis内存策略

单节点Redis内存大小不易过大,会影响持久化或主从同步性能。

可以通过修改配置来设置Redis最大内存:

1
2
3
127.0.0.1:6379> CONFIG GET *maxmemory*
1) "maxmemory"
2) "0"

当内存使用达到上限时,就无法存储更多数据。

内存过期

通过ttl获取数据过期时间,当数据过期时,则删除该数据

1
2
3
4
5
6
7
8
127.0.0.1:6379> set name mitaka        // 设置key value
OK
127.0.0.1:6379> EXPIRE name 5 // 设置过期时间 5s
(integer) 1
127.0.0.1:6379> get name // 立即获取
"mitaka"
127.0.0.1:6379> get name // 过5s之后再获取
(nil)

可以看到,当key的TTL到期之后,再次访问这个key,返回nil,说明这个key已经不存在了,对应的内存也得到释放,从而起到内存回收的目的。

Redis中,所有的key、value都保存在Dict结构中,不过在其database结构体中,有两个Dict:一个用来记录key-value;另一个用来记录key-ttl

src/server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
// 存放所有key及value的地方,也称为keyspace
dict *dict; /* The keyspace for this DB */
// 存放每一个key及其对应的TTL存活时间,只包含设置了TTL的key
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
// 记录平均TTL时长
long long avg_ttl; /* Average TTL, just for stats */
// expire检查时,在dict中抽样的索引位置
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
// 等待碎片整理的key列表
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

image-20221210231512045

Redis通过两个Dict分别记录key-value对,以及key-ttl对,直到一个key是否过期

但是TTL到期的不会立即删除,而是惰性删除或者周期删除

惰性删除

在访问一个key的时候,检查key的存储时间,如果已经过期,才执行删除

src/db.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
// 查找一个key执行写操作
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
// 检查是否过期
expireIfNeeded(db,key);
return lookupKey(db,key,flags);
}

/* Lookup a key for read operations, or return NULL if the key is not found
* in the specified DB.
*
* As a side effect of calling this function:
* 1. A key gets expired if it reached it's TTL.
* 2. The key last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): no special flags are passed.
* LOOKUP_NOTOUCH: don't alter the last access time of the key.
*
* Note: this function also returns NULL if the key is logically expired
* but still existing, in case this is a slave, since this API is called only
* for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */
// 查找一个key执行读取操作
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
// 检查key是否过期
if (expireIfNeeded(db,key) == 1) {
/* If we are in the context of a master, expireIfNeeded() returns 1
* when the key is no longer valid, so we can return NULL ASAP. */
if (server.masterhost == NULL)
goto keymiss;

/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessing expired values in a read-only fashion, that
* will say the key as non existing.
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
goto keymiss;
}
}
val = lookupKey(db,key,flags);
if (val == NULL)
goto keymiss;
server.stat_keyspace_hits++;
return val;

keymiss:
if (!(flags & LOOKUP_NONOTIFY)) {
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
server.stat_keyspace_misses++;
return NULL;
}

/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because slave instances do not expire keys, they wait
* for DELs from the master for consistency matters. However even
* slaves will try to have a coherent return value for the function,
* so that read commands executed in the slave side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
// 判断是否过期
int expireIfNeeded(redisDb *db, robj *key) {
// key过期,则直接返回0
if (!keyIsExpired(db,key)) return 0;

/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;

/* If clients are paused, we keep the current dataset constant,
* but return to the client what we believe is the right state. Typically,
* at the end of the pause we will properly expire the key OR we will
* have failed over and the new primary will send us the expire. */
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;

/* Delete the key */
// 删除过期key
deleteExpiredKeyAndPropagate(db,key);
return 1;
}

/* Check if the key is expired. */
int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;

if (when < 0) return 0; /* No expire for this key */

/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;

/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
if (server.lua_caller) {
now = server.lua_time_snapshot;
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
* cached time, that we update before each call in the call() function.
* This way we avoid that commands such as RPOPLPUSH or similar, that
* may re-open the same key multiple times, can invalidate an already
* open object in a next call, if the next call will see the key expired,
* while the first did not. */
else if (server.fixed_time_expire > 0) {
now = server.mstime;
}
/* For the other cases, we want to use the most fresh time we have. */
else {
now = mstime();
}

/* The key expired if the current (virtual or real) time is greater
* than the expire time of the key. */
return now > when;
}

但是,当一个key已经过期,但是很长时间都没有访问,此时这个key无法被删除。

周期删除

通过一个定时任务,周期性的抽样部分过期的key,然后执行删除。执行周期有两种:

  • Redis设置一个定时任务serverCron(),按照server.hz的频率来执行过期key清理,模式为SLOW
  • Redis的每个事件循环前会调用beforeSleep()函数,执行过程key清理,模式为FAST

src/server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void initServer(void) {
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
// 创建定时器,关联回调函数serverCron,处理周期取决于server.hz,默认10
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
// ...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We have just LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
// 更新lrulock到当前时间,为后期的LRU和LFU做准备
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);

/* Handle background operations on Redis databases. */
// 执行database的数据清理,例如过期key处理
databasesCron();

return 1000/server.hz;
// ...
}

/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
// 清理过期key
if (server.active_expire_enabled) {
if (iAmMaster()) {
// 使用SLOW模式
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
// ...
}

/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors.
*
* Note: This function is (currently) called from two functions:
* 1. aeMain - The main server loop
* 2. processEventsWhileBlocked - Process clients during RDB/AOF load
*
* If it was called from processEventsWhileBlocked we don't want
* to perform all actions (For example, we don't want to expire
* keys), but we do need to perform some actions.
*
* The most important is freeClientsInAsyncFreeQueue but we also
* call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
// 尝试清理部分过期key,清理模式默认为FAST
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
// ...
}

src/ae.c

1
2
3
4
5
6
7
8
9
10
11
12
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// beforeSleep() --> FAST 模式清理
// n = aeApiPoll()
// 如果 n > 0 ,FD就绪,处理IO事件
// 如果到了执行时间,则调用serverCron() --> SLOW 模式清理
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}

SLOW模式规则,相比FAST而言,特点是频率少,耗时长:

  1. 执行频率受server.hz影响,默认为10,即每秒执行10次,每个执行周期100ms
  2. 执行清理耗时不超过一次执行周期的25%
  3. 逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期
  4. 如果没达到时间上限(25ms)并未过期key比例大于10%,再进行一次抽样,否则结束

FAST模式规则(过期key比例小于10%不执行),相比SLOW而言,特点是频率多,耗时短:

  1. 执行频率受beforeSleep()调用频率影响,但两次FAST模式间隔不低于2ms
  2. 执行清理耗时不超过1ms
  3. 逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期
  4. 如果没达到时间上限(1ms)并且过期key比例大于10%,再进行一次抽样,否则结束

淘汰策略

当Redis内存达到设置的阈值时,Redis主动挑选部分key删除以释放更多内存的流程,就是内存淘汰

src/server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
// 尝试进行内存淘汰 performEvictions,如果失败,则会有 out_of_memory
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = (performEvictions() == EVICT_FAIL);
// 当 out_of_memory 时,会拒绝命令
if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}

// ...
}

也就是说:

  • Redis在执行任何命令之前,都会检查内存是否超过阈值
  • 如果超过,则进行内存淘汰
  • 如果内存淘宝失败,则返回报错

Redis支持8中不同策略来选择要删除的key,通过修改配置 maxmemory-policy

  • noeviction:不淘汰任何key,但是内存满时不允许写入新数据,默认
  • volatile-ttl:对设置了ttl的key,比较key的剩余ttl值,越小越先被淘汰
  • allkeys-random:对全体key,随机进行淘汰。也就是从db.dict中随机淘汰
  • volatile-random:对设置了ttl的key,随机进行淘汰。也就是从db.expires中随机挑选
  • allkeys-lru:对全体key,基于LRU算法进行淘汰
  • volatile-lru:对设置了TTL的key,基于LRU算法进行淘汰
  • allkeys-lfu:对全体key,基于LFU算法进行淘汰
  • volatile-lfu:对设置了TTL的key,基于LFU算法进行淘汰

LRU(Least Recently Used):最少最近使用。当前时间减去最后一次访问时间,这个值越大,则淘汰优先级越高

LFU(Least Frequently Used):最少频率使用。会统计每个key的访问频率,值越小淘汰优先级越高

Redis的数据都会被封装为RedisObject结构中

src/server.h

1
2
3
4
5
6
7
8
9
10
11
typedef struct redisObject {
unsigned type:4; // 对象类型
unsigned encoding:4; // 编码方式
// LRU:为秒为单位,记录最近一次访问时间,长度24bit
// LFU:高16位以分钟为单位记录最近一次访问时间,低8位记录逻辑访问次数
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount; // 引用计数,计数为0则可以回收
void *ptr; // 数据指针,指向真实数据
} robj;

LFU的访问次数叫做逻辑访问次数,是因为并不是每次key被访问都计数,而是通过运算:

  1. 生成0~1之间的随机数R
  2. 计算1/(旧次数*lfu_log_factor + 1),记录为P,lfu_log_factor默认为10
  3. 如果R<P,则计数器+1,且最大不超过255(因此,访问频率不会超过255,使用一个较小的数来记录)
  4. 访问次数会随时间衰减,距离上一次访问时间间隔lfu_decay_time分钟(默认1),计数器-1(为了防止访问频率很大之后,很长时间没有访问不会被回收的情况)

src/evict.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* Check that memory usage is within the current "maxmemory" limit.  If over
* "maxmemory", attempt to free memory by evicting data (if it's safe to do so).
*
* It's possible for Redis to suddenly be significantly over the "maxmemory"
* setting. This can happen if there is a large allocation (like a hash table
* resize) or even if the "maxmemory" setting is manually adjusted. Because of
* this, it's important to evict for a managed period of time - otherwise Redis
* would become unresponsive while evicting.
*
* The goal of this function is to improve the memory situation - not to
* immediately resolve it. In the case that some items have been evicted but
* the "maxmemory" limit has not been achieved, an aeTimeProc will be started
* which will continue to evict items until memory limits are achieved or
* nothing more is evictable.
*
* This should be called before execution of commands. If EVICT_FAIL is
* returned, commands which will result in increased memory usage should be
* rejected.
*
* Returns:
* EVICT_OK - memory is OK or it's not possible to perform evictions now
* EVICT_RUNNING - memory is over the limit, but eviction is still processing
* EVICT_FAIL - memory is over the limit, and there's nothing to evict
* */
int performEvictions(void) {
// ...
}

整个逻辑流程如下

image-20221210235351250

推荐阅读

万字图文讲透数据库缓存一致性问题

Linux内核源码

Redis源码