Golang中的本地锁和分布式锁

在多线程环境下,由于共享进程间的内存,因此同时访问该内存时,避免出现并发修改出现的数据竞态,因此需要对资源进行加锁,实现只允许一个线程(协程)同一时间访问该资源。

加锁和不加锁的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
n := 0
var wg sync.WaitGroup
// var lock sync.Mutex
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
// lock.Lock()
// defer lock.Unlock()
n++
wg.Done()
}()
}
wg.Wait()
fmt.Println(n)
}

死锁

死锁是指两个或多个进程或线程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无外力作用,它们将无法推进下去,此时系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

死锁产生的条件

  1. 互斥条件

    现成对资源的访问是排他性的,一个线程占用资源,其他线程必须处于等待状态,直到该资源被释放

  2. 请求和保持条件

    线程已经保持了一个资源的占用,但又提出使用另一个资源请求,此时该资源被其他线程占用,于是该资源既要等待该资源,也要保持原资源不释放。

  3. 不剥夺条件

    线程已获得的资源,在未使用完之前,不能被其他线程剥夺,只能在使用完以后由自己释放

  4. 环路等待条件

    死锁发生时,必然存在一个进程-资源环形链,简单的说,就是p0等待p1占用的资源,p1等待p0占用的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var value int
var mu1 sync.Mutex
var mu2 sync.Mutex
mu2.Lock() // main获取mu2的锁
go func() {
mu1.Lock() // goroutine获取mu1的锁
time.Sleep(time.Second) // 逻辑处理
mu2.Lock() // goroutine等待mu2的锁,阻塞状态
value += 100
mu2.Unlock()
mu1.Unlock()
}()
time.Sleep(2 * time.Second) // 逻辑处理
mu1.Lock() // main等待mu1的锁,阻塞
mu1.Unlock()
mu2.Unlock()
// 发生死锁
fmt.Println(value)

解决死锁的办法:

  • 如果并发查询多个表,约定访问顺序
  • 同一个事务中,尽可能做到一次锁定获取所需要的资源
  • 对于容易产生死锁的业务场景,尝试升级锁颗粒度,使用表级锁
  • 采用分布式事务锁或者使用乐观锁

分布式锁

常见的分布式锁实现方案有一下几种:

  1. 基于MySQL的悲观锁、乐观锁
  2. 基于Redis的分布式锁
  3. 基于Zookeeper的分布式锁(或者ETCD)

基于mysqlMySQL

悲观锁和乐观锁都是一种思想。

悲观锁

对于数据的处理,持悲观态度,认为只要操作对象(例如一个订单),无论操作对象有多少(订单中的商品),都会发生冲突,获取和修改数据时,别人回修改数据,因此在整个数据处理过程中,操作时都上锁。

悲观锁的实现,通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select ... for update来实现悲观锁。

1
SELECT * FROM user1 WHERE id=1 FOR UPDATE;

使用方式,需要与commit结合,

1
2
3
4
SELECT @@autocommit; 
set autocommit = 0; // 当autocommit为0时,其他的FOR UPDATE语句会阻塞住
SELECT * FROM user1 WHERE id=1 FOR UPDATE;
COMMIT; // 执行完,将autocommit设置为1

需要注意:

  • for update 语句中的where条件,如果是主键,则是行锁,也就是只会锁住一行;如果不是主键,则会升级为表锁
  • 如果检索语句查询不到,查询条件是主键,则不会锁行;如果查询条件不是主键,则一样会锁表
  • 锁只会锁住for update语句,如果不是for update语句,则不会阻塞

在GORM中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Goods struct {
ID uint
Num uint64
}

func PessimismLock(db *gorm.DB, wg *sync.WaitGroup) error {
defer wg.Done()
// 悲观锁
g := Goods{ID: 1}
// 在事务中开启begin就相当于将autocommit关闭
tx := db.Begin()
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).First(&g).Error; err != nil {
tx.Rollback()
return err
}
g.Num -= 1
if err := tx.Save(&g).Error; err != nil {
tx.Rollback()
return err
}
// commit就相当于将autocommit开启,其他进程就不会阻塞
tx.Commit()
return nil
}

乐观锁

操作对象有很多时,不上锁,而是通过版本号(或者时间戳)实现。相比而言,弱化了锁的概念,从而相比悲观锁,提升了性能。

1
2
3
select * from user where id = 1; // 获取到信息的同事,还获取到版本号

updaet user set age = age + 1 , version = version +1 where id = 1 and version =1 ; // 更新时,通过版本号查询以及更新

也就是当并发更新时,只有version正确的线程可以更新成功,其他的线程就会失败,失败的线程,则进行重试,重新执行。

在GORM中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Goods struct {
ID uint
Num uint64
Version uint64
}

func OptimisticLock(db *gorm.DB, wg *sync.WaitGroup) error {
defer wg.Done()
g := Goods{ID: 1}
tx := db.Begin()
for {
if err := tx.First(&g).Error; err != nil {
tx.Rollback()
return err
}
if g.Num <= 0 {
tx.Rollback()
return errors.New("no goods left")
}
// 这里需要注意零值更新的问题,因此需要使用map更新
res := tx.Model(&Goods{}).Where("id = ? and version = ? ", g.ID, g.Version).Updates(map[string]interface{}{
"num": g.Num - 1,
"version": g.Version + 1,
})
// 若是更新失败,则一直重试,更新成功,则跳出循环
if res.RowsAffected != 0 {
break
}
}
tx.Commit()
return nil
}

基于Redis实现分布式锁

可以尝试手撸一个Redis的分布式锁,通过set nx ex,通过lua脚本实现获取锁的时候,获取是否是自己的锁,加上锁之后,间隔一段时间给锁续约。

也可以使用开源Redis分布式锁:redsync

1
go get github.com/go-redsync/redsync/v4

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)

// Obtain a new mutex by using the same name for all instances wanting the
// same lock.
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)

// Obtain a lock for our given mutex. After this is successful, no one else
// can obtain the same lock (the same mutex name) until we unlock it.
if err := mutex.Lock(); err != nil {
panic(err)
}

// Do your work that requires the lock.

// Release the lock so other processes or threads can obtain a lock.
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
func main() {
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
logger.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
LogLevel: logger.Info, // 日志级别
IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: true, // 禁用彩色打印
},
)

// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: "gorm:gorm@tcp(127.0.0.1:3306)/gorm?charset=utf8mb4&parseTime=True&loc=Local", // DSN data source name
DefaultStringSize: 256, // string 类型字段的默认长度
DisableDatetimePrecision: true, // 禁用 datetime 精度,MySQL 5.6 之前的数据库不支持
DontSupportRenameIndex: true, // 重命名索引时采用删除并新建的方式,MySQL 5.7 之前的数据库和 MariaDB 不支持重命名索引
DontSupportRenameColumn: true, // 用 `change` 重命名列,MySQL 8 之前的数据库和 MariaDB 不支持重命名列
SkipInitializeWithVersion: false, // 根据当前 MySQL 版本自动配置
}), &gorm.Config{
Logger: newLogger,
NamingStrategy: schema.NamingStrategy{
TablePrefix: "my_",
},
})
if err != nil {
log.Fatal(err)
}

// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)

wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
err := RedisSync(db, &wg, rs, 1)
if err != nil {
log.Println(err)
}
}
wg.Wait()
}

func RedisSync(db *gorm.DB, wg *sync.WaitGroup, rs *redsync.Redsync, id uint) error {
defer wg.Done()

mutexname := fmt.Sprintf("goods:id:%d", id)
// Obtain a new mutex by using the same name for all instances wanting the
// same lock.
mutex := rs.NewMutex(mutexname)

// Obtain a lock for our given mutex. After this is successful, no one else
// can obtain the same lock (the same mutex name) until we unlock it.
err := mutex.Lock()
if err != nil {
return err
}

g := Goods{ID: id}
// 在事务中开启begin就相当于将autocommit关闭
tx := db.Begin()
err = tx.First(&g).Error
if err != nil {
tx.Rollback()
goto UNLOCK
}
if g.Num <= 0 {
tx.Rollback()
err = errors.New("no goods left")
goto UNLOCK
}

g.Num -= 1
if err := tx.Save(&g).Error; err != nil {
tx.Rollback()
goto UNLOCK
}
// commit就相当于将autocommit开启,其他进程就不会阻塞
tx.Commit()

// Do your work that requires the lock.

UNLOCK:
// Release the lock so other processes or threads can obtain a lock.
if ok, err := mutex.Unlock(); !ok || err != nil {
return fmt.Errorf("something wrong: %w", err)
}

return nil
}

通过查看源码,可以看到redsync的逻辑,思想与自己实现很相似

1
2
3
4
5
6
7
8
9
// 创建客户端和资源池,通过go-redis的包内的client对象
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)

创建一个分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second, // 过期时间
tries: 32, // 重试次数
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01, // 解决漂移时钟的问题,设置0.01s
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1, // 仲裁,大于一半的个数
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}

type Mutex struct {
name string // 锁名称,也就是key
expiry time.Duration // 过期时间

tries int // 重试次数
delayFunc DelayFunc // 重试间隔时间

driftFactor float64 // 解决时钟漂移的问题
timeoutFactor float64

quorum int

genValueFunc func() (string, error)
value string // 值
until time.Time

pools []redis.Pool
}

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (m *Mutex) Lock() error {
return m.LockContext(nil)
}

func (m *Mutex) LockContext(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}

// 随机数生成value
value, err := m.genValueFunc()
if err != nil {
return err
}

// 重试次数
for i := 0; i < m.tries; i++ {
if i != 0 {
select {
case <-ctx.Done():
// Exit early if the context is done.
return ErrFailed
case <-time.After(m.delayFunc(i)):
// Fall-through when the delay timer completes.
}
}

start := time.Now()

n, err := func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
// 加锁的核心逻辑
return m.acquire(ctx, pool, value)
})
}()
if n == 0 && err != nil {
return err
}

now := time.Now()
// driftFactor 解决时钟漂移的问题
// 当前时间,加上(超时时间扣除获取pools中的锁花费的时间,以及漂移时钟的时间)为一个过期时间节点
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
// 大于大多数,并且没有超时,则认为拿到了锁
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
// 没有拿到锁,则需要将其他的加上了锁的节点上的锁释放掉
_, err = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
// 解锁的逻辑
return m.release(ctx, pool, value)
})
}()
if i == m.tries-1 && err != nil {
return err
}
}

return ErrFailed
}

// 加锁的核心逻辑
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
// setnx 当key不存时,进行设置
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (m *Mutex) Unlock() (bool, error) {
return m.UnlockContext(nil)
}

// UnlockContext unlocks m and returns the status of unlock.
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, m.value)
})
if n < m.quorum {
return false, err
}
return true, nil
}

// 用lua脚本,通过value判断是否是自己的锁
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}

重置过期时间,代码中,默认没有使用重置过期时间,需要手动启用协程实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
return m.ExtendContext(nil)
}

// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
start := time.Now()
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
})
if n < m.quorum {
return false, err
}
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
}
return false, ErrExtendFailed
}

// 同样使用lua脚本,实现只有自己加上的锁,才能续约
var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
return false, err
}
return status != int64(0), nil
}

红锁 redlock

redis是集群的形式,锁写入到某一个节点,在集群内部同步的过程中,出现某个节点异常宕机,导致锁没有同步到其他的节点,此时在其他的客户端上,可能也会获取到锁。

解决方案,是将这个分布式锁,在所有的节点上都设置上,此时所有的节点不是以集群的形式运行,而是以单机的情况下运行。那么服务器的个数需要是单个,保证超过一半的节点获取锁作为最终获取锁的条件。这个方案就是红锁。

红锁在redsync中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
Status bool
Err error
}

ch := make(chan result)
// pools可以是多个redis client资源池
for _, pool := range m.pools {
// 便利redis池,也就是节点,通过协程往每个节点发送请求,获取锁
go func(pool redis.Pool) {
r := result{}
r.Status, r.Err = actFn(pool)
// 将获取到的锁放入到channel中
ch <- r
}(pool)
}
n := 0
var err error
// 获取对应个数的结果
for range m.pools {
r := <-ch
// 判断结果状态,如果状态正常,则n+1
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, r.Err)
}
}
// 返回获取锁的节点的个数,用于后续判断是否超过一半的节点
return n, err
}

补充分布式锁的示例,结合分布式定时任务,补充更新锁时间的功能以及重试功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func work() {
ctx := context.Background()

redisClient := NewRedis()
rs := NewRedSync(redisClient)

s := gocron.NewScheduler(time.Local)
// 分布式定时任务,这里设置每隔5s打印时间
s.Every(5).Seconds().Do(func() {
log.Println("time: ", time.Now().String())
})

mutexName := fmt.Sprintf(cronName, 1)
mutex := rs.NewMutex(mutexName,
// 过期时间10s
redsync.WithExpiry(10*time.Second),
// 每隔 1s 重试,重试指的是没有获取锁之后重试,默认重试次数32次,结合每隔1s,重试时间是32s,大于10s,防止当服务挂掉重启之后无法再次获得锁
redsync.WithRetryDelay(1*time.Second),
)

err := mutex.LockContext(ctx)
if err != nil {
if errors.Is(err, redsync.ErrFailed) {
log.Println("other thread get cron task")
return
}
log.Fatal(err)
}
log.Println("get cron task success")

// 开启协程更新锁过期时间
go func() {
for {
select {
// 通过context控制,如果上层执行cancel,则将锁释放,一般在优雅退出时可以主动释放
case <-ctx.Done():
if _, err = mutex.UnlockContext(ctx); err != nil {
log.Fatal(err)
}
log.Println("release cron task lock success")
return
// 默认情况下,给锁更新过期时间
default:
// 每隔5s更新过期时间
time.Sleep(5 * time.Second)
if _, err = mutex.ExtendContext(ctx); err != nil {
log.Fatal(err)
}
log.Println("extend cron task lock success")
}
}
}()

log.Println("start cron task success")
// 阻塞执行定时任务
//s.StartBlocking()
// 异步执行定时任务
s.StartAsync()
}