微服务之高可用性

服务雪崩

服务雪崩效应是一种因“服务提供者的不可用”(原因)导致“服务调用者不可用”(结果),并将不可用逐渐放大的现象。

img

如上图所示,一个服务失败,导致整条链路的服务都失败的情形,我们称之为服务雪崩。

解决服务雪崩的方案

除了限流和熔断之外,还有比较简单的解决方案,例如扩容、升级服务器,而这种场景使用状况比较少,成本高;或者直接限制访问时间,也就是使用超时机制

超时机制

超时是为了保护服务,避免Consumer服务因为provider响应慢而变得响应很慢,这样Consumer可以尽量保持原有的性能。当上游请求已经超时,则将该请求返回,而不需要再向下游链路发起请求,节约下游链路性能。

例如在golang中,rpc client使用超时机制

1
2
3
4
5
6
7
// 1s 超时
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// grpc.DialContext 超时
dial, err := grpc.DialContext(ctx, "consul://172.16.211.17:8500/mitaka-grpc?wait=14s",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`))

重试机制

如果provide只是偶尔抖动,那么超时后直接放弃,不做后续处理,就会导致当前请求错误。这种情况下的超时可以重试一下,重试如果正常返回,那么这次请求就可以成功。

例如在golang中,rpc client使用midawre的重试机制

注册retrymidware

1
2
3
4
5
dial, err := grpc.DialContext(ctx, "consul://172.16.211.17:8500/mitaka-grpc?wait=14s",
// 重试
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`))

局部接口使用

1
2
3
4
5
6
7
reply, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka"},
// 最大重试次数
grpc_retry.WithMax(3),
// 在什么返回码时重试
grpc_retry.WithCodes(codes.Unavailable, codes.DataLoss),
// 每次重试的超时时间
grpc_retry.WithPerRetryTimeout(time.Second))

服务端测试

1
2
3
4
5
6
7
func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
log.Println("get request from ", req.Name, time.Now().String())
return &minegrpc.HelloReply{
Message: fmt.Sprintf("i am service %s", serviceID),
// 返回错误码,让client重试
}, status.Errorf(codes.Unavailable, "test")
}

全局重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
retryOpts := []grpc_retry.CallOption{
// 两次重试间隔
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
grpc_retry.WithCodes(codes.NotFound, codes.Aborted),
}
// grpc.DialContext 超时
dial, err := grpc.DialContext(ctx, "consul://172.16.211.17:8500/mitaka-grpc?wait=14s",
// 重试,使用全局重试配置
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`))
if err != nil {
log.Fatal(err)
}

幂等性

幂等性:幂等,这个词来源自数学领域。幂等性衍生到软件工程中,它的语义是指:函数/接口可以使用相同的参数重复执行, 不应该影响系统状态,也不会对系统造成改变。

举一个简单的例子:正常设计的查询接口,不管调用多少次,都不会破坏当前的系统或数据,这就是一个幂等操作。

如果Consumer会重试,那么中间有网络干扰的因素下,可能会出现接口被正常处理但是返回失败,此时重试则会重新处理请求,为了不出现请求重复,则服务需要做到幂等性。

幂等性的解决方案

需要考虑幂等性的情况

也就是同样的请求发送多次,需要保证结果一致。

例如http请求的类型:

  • get:获取请求,多次结果一致,get请求一般自身支持幂等性

    GET /api/v1/user/1

  • post:创建请求时,则需要考虑幂等性;局部更新时,如果更新时确定值更新,而不是计算更新,则支持幂等性,如果是计算更新,则需要解决幂等性

    POST /api/v1/user

    当插入一条数据时,为了方式用户重复,需要支持幂等性

    insert into users(name, phone) values (mitaka,158xxxxxx)

    当更新数据时,直接更新则本身支持幂等

    update users set name=xiaoyeshiyu where id=1

    如果是计算数量,则需要解决幂等

    update goods set num=num+1 where id=1

  • put:更新请求,如果更新时确定值更新,而不是计算更新,则支持幂等性,如果是计算更新,则需要解决幂等性

    当更新数据时,直接更新则本身支持幂等

    update users set name=xiaoyeshiyu where id=1

    如果是计算数量,则需要解决幂等

    update goods set num=num+1 where id=1

  • delete:删除请求,一般自身支持幂等

唯一索引

通过唯一索引,防止新增数据多增加

比如创建用户时,某个字段设置为唯一索引,当处理同一个创建请求时,会通过唯一索引解决重复提交的问题。

当出现唯一索引的报错,则再查询一次数据,将查询到的数据返回即可。(幂等友好)

1
2
3
4
5
6
7
type Product struct {
ID string
// 通过主键实现幂等
LanguageCode string `gorm:"primaryKey"`
Code string
Name string
}

token机制,防止页面重复提交

例如当部分资源操作无法通过唯一索引,例如添加商品,可能该商品的信息可以不唯一。此时可以通过token机制。

请求传入后端,先检查redis中是否有对应token,如果没有,则创建,如果有则代表该请求已经被处理。

需要注意,token是否重新生成,取决于需求上如何区分是不是同一个请求。存储在redis中的token需要有过期时间。

例如获取token中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TokenRequired() gin.HandlerFunc {
return func(c *gin.Context) {
var token string
for k, v := range c.Request.Header {
if k == "Create-Token" {
token = v[0]
}
}
// 通过redis获取token
redis.SetNx(ctx,"Create-Token",token)
c.Set("Create-Token-Set", true)
c.Set("Create-Token", token)
c.Next()
}

悲观锁

for update,需要注意,要联合主键使用,不然会出现锁表。

1
2
db.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&users)
// SELECT * FROM `users` FOR UPDATE

乐观锁

通过version机制实现更新功能

1
2
3
4
5
6
7
8
9
10
type Goods struct {
ID uint
Num uint64
Version uint64
}

res := tx.Model(&Goods{}).Where("id = ? and version = ? ", g.ID, g.Version).Updates(map[string]interface{}{
"num": g.Num - 1,
"version": g.Version + 1,
})

分布式锁

使用分布式锁,只有一个请求可以获取到锁,同一时刻只有一个请求执行成功,执行成功之后,返回成功,释放锁。没有获取到锁的请求则返回失败。

1
import github.com/go-redsync/redsync/v4

select + insert

在并发不高的场景下,为了支持幂等性,支持重复执行,简单处理方法是再创建一张表,更新时select下关键数据是否已经存在,如果已经存在,则不再处理,如果不存在,则insert。

在高并发场景下,则不建议这种方案。

1
2
3
4
type GoodsHistory struct {
OrderSN string
GoodsDetails []Goods
}

对外提供的api如何提供幂等

例如付款接口,解决重复支付的问题,则是在付款请求时携带source和seq,来源和序列号,在数据库做唯一索引,防止多次支付。

为了幂等友好,一般会先查询一下,然后再进行处理。

限流和熔断、降级

超时机制是将请求发送出去,接收方处理时检查请求是否超时,但是检查该请求也会造成一些没法处理的请求在服务链路中,因此,更好的解决方案是限流,例如当检测到下游服务返回超时请求达到一定数量,则将往下游的请求选择性拒绝(限流)或者直接拒绝来自上游的请求(熔断降级)。

当扇出链路的某个微服务不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路。

熔断中有几个量化的地方

  1. 目标服务调用慢或者超时:开启熔断的阀值量化

    可以通过两个维度:时间与请求数

    时间 多长时间内的超时请求达到多少,触发熔断

    请求数 从服务启动,超时请求数达到多少,触发

    这两个维度都需要记录超时请求数和统计总请求数

  2. 情况好转,恢复调用

    如何量化情况好转:多长时间之后超时请求数低于多少关闭熔断

技术选型

image-20221016175346809

综合选择,当前Sentinel还在维护,并且国内开源,这里选择Sentinel

Sentinel

官方文档

go 文档:快速开始

包和示例代码:sentinel-golang

限流

流量控制

流量控制可以选择限流的策略,即在一定的时间范围内,允许通过的最大流量数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"log"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
)

const resName = "example-flow-qps-resource"

func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}

// 加载限流规则
_, err = flow.LoadRules([]*flow.Rule{
{
// 名称,必填
Resource: resName,
// 与Threshold结合,代表每秒10个请求
TokenCalculateStrategy: flow.Direct,
// 流量控制的行为,Reject代表直接拒绝
ControlBehavior: flow.Reject,
// 与StatIntervalInMs配合使用,代表周期,这里意思是1000ms能处理10个请求
Threshold: 10,
StatIntervalInMs: 1000,
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}

ch := make(chan struct{})
for i := 0; i < 12; i++ {
go func() {
// 使用方法,resName名称一致,Inbound代表入口流量控制
e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound))
if b != nil {
// 此时请求block
// Blocked. We could get the block reason from the BlockError.
//time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)
log.Println("限流。。。")
} else {
// 请求通过
// Passed, wrap the logic here.
//time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)
log.Println("通过。。。")
// Be sure the entry is exited finally.
e.Exit()
}
}()
}
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 限流。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 通过。。。
2022/10/16 18:12:04 限流。。。

12个请求有2个被限流。

预热(冷启动)

冷启动(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式。该方式主要用于系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮的情况。

流量控制可以还可以选择预热(冷启动)的方式,即服务启动之后,处理的流量不会暴增,而是在一段时间内缓慢增加到流量阈值状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"log"
"math/rand"
"sync/atomic"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
)

const resName = "example-flow-qps-resource"

func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}

// 加载限流规则
_, err = flow.LoadRules([]*flow.Rule{
{
// 名称,必填
Resource: resName,
// 预热,冷启动
TokenCalculateStrategy: flow.WarmUp,
// 流量控制的行为,Reject代表直接拒绝
ControlBehavior: flow.Reject,
// 与WarmUpPeriodSec配合使用,代表周期,这里意思是到30s就可以达到1000请求的峰值
Threshold: 1000,
WarmUpPeriodSec: 30,
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}

var passNum int64
var blockNum int64
var totalNum int64

ch := make(chan struct{})
for i := 0; i < 10; i++ {
go func() {
for {
// 使用方法,resName名称一致,Inbound代表入口流量控制
atomic.AddInt64(&totalNum, 1)
e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound))
if b != nil {
// 此时请求block
// Blocked. We could get the block reason from the BlockError.
atomic.AddInt64(&blockNum, 1)
} else {
// 请求通过
// Passed, wrap the logic here.
atomic.AddInt64(&passNum, 1)
// Be sure the entry is exited finally.
e.Exit()
}
time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)
}
}()
}

var oldTotal int64
var oldPass int64
var oldBlock int64
for {
time.Sleep(time.Second)
tmpTotal := atomic.LoadInt64(&totalNum)
tmpPass := atomic.LoadInt64(&passNum)
tmpBlock := atomic.LoadInt64(&blockNum)
log.Printf("total: %5d,pass %5d:block %5d", tmpTotal-oldTotal, tmpPass-oldPass, tmpBlock-oldBlock)
oldTotal = tmpTotal
oldPass = tmpPass
oldBlock = tmpBlock
}
<-ch
}

30s之后

1
2
3
4
2022/10/16 18:40:50 total:  1982,pass   963:block  1019
2022/10/16 18:40:51 total: 1967,pass 1018:block 949
2022/10/16 18:40:52 total: 2003,pass 1005:block 998
2022/10/16 18:40:53 total: 1965,pass 1014:block 951

匀速通过

以固定的时间间隔通过请求,类似一个漏桶,可以起到修正流量,让流量匀速的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"log"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
)

const resName = "example-flow-qps-resource"

func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}

// 加载限流规则
_, err = flow.LoadRules([]*flow.Rule{
{
// 名称,必填
Resource: resName,
// 与Threshold结合,代表每秒10个请求
TokenCalculateStrategy: flow.Direct,
// 流量控制的行为,Throttling代表直接匀速通过
ControlBehavior: flow.Throttling,
// 请求的间隔控制在 1000/10=100 ms,也就是前一个请求通过后,下一个请求需要在100ms后通过。
Threshold: 10,
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}

ch := make(chan struct{})
go func() {
for {
// 使用方法,resName名称一致,Inbound代表入口流量控制
e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound))
if b != nil {
// 此时请求block
// Blocked. We could get the block reason from the BlockError.
//time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)
log.Println("限流。。。")
} else {
// 请求通过
// Passed, wrap the logic here.
//time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)
log.Println("通过。。。")
// Be sure the entry is exited finally.
e.Exit()
}
time.Sleep(50 * time.Millisecond)
}
}()
<-ch
}

熔断降级

熔断降级

Sentinel 熔断降级基于熔断器模式 (circuit breaker pattern) 实现。熔断器内部维护了一个熔断器的状态机,状态机的转换关系如下图所示:

circuit-breaker

熔断器有三种状态:

  1. Closed 状态:也是初始状态,该状态下,熔断器会保持闭合,对资源的访问直接通过熔断器的检查。
  2. Open 状态:断开状态,熔断器处于开启状态,对资源的访问会被切断。
  3. Half-Open 状态:半开状态,该状态下除了探测流量,其余对资源的访问也会被切断。探测流量指熔断器处于半开状态时,会周期性的允许一定数目的探测请求通过,如果探测请求能够正常的返回,代表探测成功,此时熔断器会重置状态到 Closed 状态,结束熔断;如果探测失败,则回滚到 Open 状态。

这三种状态之间的转换关系这里做一个更加清晰的解释:

  1. 初始状态下,熔断器处于 Closed 状态。如果基于熔断器的统计数据表明当前资源触发了设定的阈值,那么熔断器会切换状态到 Open 状态;
  2. Open 状态即代表熔断状态,所有请求都会直接被拒绝。熔断器规则中会配置一个熔断超时重试的时间,经过熔断超时重试时长后熔断器会将状态置为 Half-Open 状态,从而进行探测机制;
  3. 处于 Half-Open 状态的熔断器会周期性去做探测。

即在单位时间内,通过请求错误数或者比例等,判断下游服务已无法处理请求,此时将请求丢弃,或者通过注册中心注销,降级服务,不再流量。

当在单位时间内,发往下游的请求回复正常,则服务从熔断状态恢复正常,

基于错误数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"errors"
"fmt"
"log"
"math/rand"
"sync/atomic"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)

type stateChangeTestListener struct {
}

func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func main() {
var (
total int64
errNum int64
block int64
)

conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 内部状态发生转换时,可以执行某些方法,例如状态转换时,注销/注册到服务发现中心
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorCount,
// 3s后尝试恢复
RetryTimeoutMs: 3000,
// 10请求个不通过
MinRequestAmount: 10,
// 5s为一个周期,也就是说5s内,错误个数超过10个
StatIntervalMs: 5000,
StatSlidingWindowBucketCount: 10,
Threshold: 50,
},
})
if err != nil {
log.Fatal(err)
}

logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
atomic.AddInt64(&total, 1)
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
atomic.AddInt64(&block, 1)
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 9 {
// Record current invocation as error.
// 随机记录一个错误
atomic.AddInt64(&errNum, 1)
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
fmt.Printf("total:%5d block:%5d err:%5d\n", atomic.LoadInt64(&total), atomic.LoadInt64(&block), atomic.LoadInt64(&errNum))
time.Sleep(time.Second)
}
}()
<-ch
}

错误率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorRatio,
// 3s后尝试恢复
RetryTimeoutMs: 3000,
// 5s为一个周期
StatIntervalMs: 5000,
StatSlidingWindowBucketCount: 10,
// 错误比例为40%
Threshold: 0.4,
},
})

慢请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.SlowRequestRatio,
// 3s后尝试恢复
RetryTimeoutMs: 3000,
// 5s为一个周期
StatIntervalMs: 5000,
StatSlidingWindowBucketCount: 10,
// 慢请求比例为50%
Threshold: 0.5,
},
})

集成到微服务中

注册rule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func initSentinel() {
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, slowRtUpperBound=50ms, maxSlowRequestRatio=50%
{
Resource: "mitaka_hello_service",
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 5000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: 50,
Threshold: 0.5,
},
})
if err != nil {
log.Fatal(err)
}
}

使用,集成在接口中

1
2
3
4
5
6
7
8
9
10
11
func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
// 通过Resource名称匹配规则
_, err := sentinel.Entry("mitaka_hello_service")
if err != nil {
return nil, status.Error(codes.ResourceExhausted, "请求次数过多,请稍后重试")
}

return &minegrpc.HelloReply{
Message: fmt.Sprintf("i am service %s", serviceID),
}, nil
}

推荐阅读:

谈谈服务雪崩、降级、熔断

一文理解如何实现接口的幂等性

微服务-熔断机制

限流熔断技术选型:从 Hystrix 到 Sentinel