消息中间件kafka

Kafka基础概念

Kafka的使用场景:

  1. 数据通道和消息系统
  2. 数据分析场景,在报表中数据分析

例如:

  1. 对页面访问量PV、页面曝光、页面点击等行为事件分析
  2. 实时计算中的Kafka Source,Dataflow Pipeline
  3. 业务的消息系统,通过发布订阅消息解耦多组微服务,消除峰值

Kafka是一种分布式的,基于发布、订阅的消息系统。设计目标如下:

  1. 以时间复杂度O(1)的方式提供消息持久化能力,顺序写,顺序消费,即使对TB级以上数据也能保持常数时间复杂度的访问性能;
  2. 高吞度,单机10w/s;
  3. 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的顺序传输;
  4. 同时支持离线数据处理和实时数据处理;
  5. Scale out:支持在线水平扩展;

使用消息系统的理由:

  1. 解耦

    服务之间,不依赖API接口,而是插入一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵循同样的接口约束。

    基于这种消息发布订阅的机制,可以联动多个业务下游子系统,能够不侵入的情况下分布编排和开发,保证数据一致性。

  2. 冗余

    当处理数据的过程失败,除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。例如通过“插入-获取-删除”的范式,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  3. 扩展性

    消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可,不需要改变代码,不需要调节参数。

  4. 灵活性&峰值处理能力

    在访问量剧增的请款修改,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。使用消息队列能够使关键组件叮嘱突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  5. 可恢复性

    一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  6. 顺序保证

    大多是用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

  7. 缓冲

    在任何的系统中,都会有需要不同的处理时间的元素。消息队列可以通过缓冲来帮助服务最高效率的执行–写入队列的处理会尽可能的快速,该缓冲有助于控制和优化数据流处理的速度。

  8. 异步通讯

    当用户不想也不需要立即处理消息,消息队列提供了异步处理机制,允许用户把一个消息放入队列,并不立即处理,持久化保存下来,在需要的时候再去处理。

Topic & Partition

topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。把一类消息按照主题来分类,有点类似于数据库的表。

为了使kafka的吞吐率可以线性提高,物理上把topic分成一个或多个Partition。对应到系统上就是一个或多个目录。

Broker

Broker:kafka集群包含一个或多个服务器,每个服务器节点称为一个Broker。

Broker存储Topic的数据。如果某Topic有N个Partition,集群有N个Broker,那么每个Broker存储该Topic的一个Partition。

从scale out的性能角度思考,通过Broker kafka server的更多节点,带更多的存储,建立更多的Partition把IO负载到更多的物理节点,提升总吞吐IOPS。

从scale up的角度考虑,一个Node拥有越多的physical disk,也可以负载更多的Partition,提升总吞吐IOPS。

如果某Topic有N个Partition,集群有N+M个Broker,那么其中有N个Broker存储该Topic的一个Partition,M个Broker不存储该Topic。

如果Topic的Partition个数大于Broker的个数,那么一个Broker会存储一个或多个Partition。

每一条消息发送到Broker中,会根据Partition规则选择被存储到哪一个Partition,如果Partition规则设置的合理,所有消息可以均匀分布到不同的Partition。

例如:

  • 当Partition数量小于Broker数量,此时增加Partition数量,消息写入性能呈线性上升。
  • 当Partition数量等于或者大于Broker数量,此时增加Partition数量,消息写入性能变化不大,而且有可能降低。(这是由于可能一个Broker上Partition数量过多,导致吞吐量下降)

存储原理

kafka的消息是存在于文件系统之上的。kafka高度依赖文件系统来存储和缓存消息。但是kafka通过内存缓存和顺序io,解决磁盘读写效率的问题。

操作系统会将内存剩余的所有空闲内存都用做磁盘缓存(free 中看到的cache),也就是使用了内存缓存。

任何发布到Partition的消息都会被追加到Partition数据文件的尾部,这样的顺序写磁盘操作让kafka的效率非常高。

kafka集群保留所有发布的message,无论是否被消费过。kafka提供可配置的保留策略去删除旧数据,通过保留时间或者通过分区大小。

offset偏移量:每条消息都有一个当前Partition下唯一的64字节的offset,相当于当前分区的第一条消息的偏移量,也就是第几条消息,消费者可以指定消费的位置信息,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。

例如当kafka有1个Broker,创建2个Topic,分别为Topic1和Topic2,Partition数量分别为1,2。此时目录下就会创建三个文件夹:

1

kafka的文件存储中,同一个Topic下有多个不同的Partition,每个Partition都为一个目录,而每个目录又被平均分配成多个大小相等的segment file中,segment file又由index file和data file组成,他们总是成对出现,后缀.index和.log分别表示segment索引文件和数据文件。

例如index中3,497为例,代表第三个message(全局Partition表示第368769+3个message)以及该消息的物理偏移地址为497。

查询时,通过偏移量,确定segment文件,再通过二分查找在index中查找到索引,通过索引的物理偏移地址,在log中找到对应物理位置。

kafka通过标准的数据存储结构准确的知道message的偏移。在Partition中的每一条message都包含了以下三个属性:

  • offset:表示message在当前Partition中的偏移量,是一个逻辑上的值,唯一确定了Partition中的一条message,可以简单的认为是一个ID
  • MessageSize:表示message内容Data的大小
  • Data:message的具体内容

从0.10.0.0版本起,为分片日志文件中新增一个.timeindex的索引文件,可以根据时间戳定位消息。

timeindex中存储的是时间戳和index文件的offset,先通过时间戳找到最大的,

生产消费

Producer

producer将消息发送Broker时,会根据Partition机制选择将其存储到哪一个Partition。这个机制可以将所有消息均匀分布到不同的Partition。

  • 指明Partition的情况下,直接将给定的value作为Partition的值
  • 没有指明Partition但有key的情况下,将key的hash值与分区数取余,得到Partition值
  • 既没有Partition也没有key的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区取余,得到Partition值,也就是Round-Robin轮训算法

为保证producer发送的数据,能可靠的发送到指定的Topic,Topic的每个Partition收到producer发送的数据后,都需要向producer发送ack。如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

  • 选择完Partition后,生产者知道消息所属的Topic和Partition,将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的kafka Broker。
  • 当Broker接受到消息后,如果成功写入,则返回一个包含消息的Topic、Partition以及位移的RecordMetadata对象,否则返回异常
  • 生产者接受到结果后,对于异常可能会进行重试

0.11版本kafka,引入幂等性:producer不论向server发送多少重复数据,server端都只会持久化一条。

要启动幂等性,需要将producer的参数中enable.idompotence设置为true即可。

开启幂等性的producer在初始化时会被分配一个PID,发送同一Partition的消息会附带sequence Number。

而Broker端会对<Pid,Partition,SeqNumber>做缓存,当具有相同逐渐的消息提交时,Broker只会持久化一条。

但是PID重启后就会变化,同时不同的额Partition也具有不同的主键,所以幂等性无法保证夸分区绘画的Exactly Once。

因此,不建议使用kafka的幂等,而是在在消费端做业务的幂等。

Consumer

kafka中有Consumer Group的概念,也就是消费组。当多个消费者形成一个消费组来消费Topic时,每个消费者会受到不同Partition的消息。

假设有一个T1 Topic,有4个Partition,分别是p1 p2 p3 p4,同时有一个消费组G1,消费组只有一个consumer,c1,那么消费者c1将会收到这4个Partition的消息。如果在G1中增加一个C2,那么Partition会平均分配给两个consumer。当消费者数量大于Partition,那么剩余的消费者会空闲,不会收到任何消息。

也就是说,增加消费者并不会提升性能。

此时可以在C1中进行多线程消费,通过二次sharding。但是为了保障offset提交的正确性,需要使用watermark机制,保障最小的offset提交,才往Broker提交。

kafka的Topic可以同时给多个consumer group消费,每个消费组都能读到全量消息。

如果应用需要读取全量消息,那么请为该引用设置一个消费组;

如果应用消费能力不足,那么可以考虑在这个消费组增加消费者。

Consumer Group

当新增消费者,会将原先消费者的Partition分给新的消费者。当消费者离开消费组,例如重启、宕机等,消费的分区又会分配给其他分区。这种重平衡现象,可以保证kafka的高可用和水平扩展。在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂不可用。而且重平衡会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段时间也会降低消费性能。(消费者管理通常通过zookeeper实现)

消费者通过定期发送心跳到一个座位组协调者Group Coordinator 的Broker来保持在消费组内存活。这个Broker不是固定的,每个消费组可能都不同。

当消费者拉取消息或者提交时,便会发送心跳,入股哦消费者超过一定时间没有发送心跳,那么回话就会过期,组协调者会认为改消费者已经宕机,然后触发重平衡。

通常情况,应用可以进行优雅关闭,这样消费者户已发送离开的消息到组协调者,这样可以立即进行重平衡。

0.10.1版本kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。

高版本kafka支持配置消费者多长时间不拉取消息但是保持存活,这个配置可以避免活锁,活锁:是指应用没有故障,但是由于某些原因不能进一步消费。

Partition会为每个Consumer Group保存一个偏移量,记录Group消费到的位置。(也就是说,如果多个消费者的消费速度不一样,就会导致记录的这个偏移量或者说缓存出现问题从而影响消费性能。)

kafka从0.9版本将消费端的位移信息保存在集群内部Topic中,key为Topic、Partition、Consumer Group信息,而且支持压缩,将相同的key记录成一条。

Consumer commit offset

消费端可以通过设置参数 enable.auto.commit 控制是否自动提交还是手动,auto.commit.interval.ms是间隔时间,默认5s。

自动提交:

  • 存在重复的数据,每隔5秒发送的可能是一样的
  • 重复消费,如果5秒期间发生重平衡,则新的消费者会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费

手动提交(集中delivery guarantee):

  • 读完消息先commit再处理消息。如果Consumer再commit后还没来记得处理就crash,这个时候消息会丢失。
  • 读完消息,先处理,再commit,如果处理完之后,commit之前,Consumer挂了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,也就是会重复处理。

Consumer exactly Once

二阶段提交。消费者拿到数据,放到临时的地方,进行存档,数据处理之后,将offset保存到临时的地方,处理完成,提交offset,将临时目录中的数据移动正式目录。

如果crash,从存档中恢复offset,将临时目录的文件移动到正式目录中。

Push & Pull

producer向Broker push消息,并且由Consumer从Broker pull消息。

消费模式:

push模式,很难适应消费速率不同的消费者,容易造成由于Consumer消费不及时,导致拒绝服务以及网络拥塞。(rabbitmq:监控queue长度,自动做反压。)

pull模式,可以根据Consumer的消费能力以适当的速录消费消息。kafka更合适,可以简化Broker的设计,同时Consumer可以自己控制消费方式,批量消费也可以逐条消费。pull模式需要检测Broker是否有数据,如果没有数据,消费者可能会陷入循环中。因此需要消费者维护一个长轮训,通过时长参数timeout,如果当前没有数据可供消费,Consumer会等待一段时间之后再返回,这段时长就是timeout。

高可用

没有高可用情况下,一个Broker宕机,上面的所有Partition都无法继续提供服务。如果Broker无法恢复,上面的数据就会丢失。

  • 同步模式:如果producer发送失败,则会重试,重试3次(可配置)之后抛出异常,用户也可以选择继续重试。如果重试,会造成数据阻塞,如果不重试,会造成数据丢失
  • 异步模式:如果producer发送失败,则会尝试重试,3次后记录异常到日志,并且继续发送后续数据。这会造成数据丢失并且用户只能通过日志发现该问题

Replication

一个Partition有多个replica,需要在这些replication之间选一个leader,Producer和Consumer只与这个Leader交互,其他Replica作为Follower从Leader中复制数据。

需要保证数据一致性,因此引入Leader,负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。

Leader

kafka依赖zookeeper集群,所以最简单直观的方案是所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者就是新的Leader,其他replica为Follower。

这个方案的缺点:

  • 脑裂:zookeeper能保证所有watch按顺序触发,但并不能保证同一时刻所有Replica看到的状态一致,这样就可能造成不同replica响应不一致。
  • 羊群效应:如果Broker上的Partition非常多,会造成多个watch被处罚,造成集群内大量的调整。
  • zookeeper负载:每个replica都要在zookeeper上注册一个watch,当集群规模增加到几千个Partition时,zookeeper负载会很重。

Controller

kafka的Leader Election方案解决了上述问题,它在所有Broker中选出一个Controller,所有Partition的Leader选举由Controller,Controller会将Leader的改变通过RPC的方式通知需要响应的Broker。这种方式比zookeeper queue的方式更高效。

选举Controller过程如下:

  1. 每个Broker在zookeeper的Controller Path(/controller)上注册一个watch(选举Broker依赖zookeeper,最新版kafka完全不依赖zookeeper)
  2. 当Controller失败,对应的Controller path会自动消息,此时该watch被触发,活着的Broker会去竞争成为新的Controller,只有一个竞选成功
  3. 竞选成功即为新的Leader,竞选失败则重新在新的Controller path上注册watch,因为zookeeper的watch是一次性的,被触发一次之后即失效,需要重新注册

Partition选举Leader的过程如下(由Controller执行):

  1. 从zookeeper中读取当前分区的所有ISR(in-sync replicas)集合。(同步副本,副本之间同步速度很快,当有节点同步速度慢,则会从集合中踢出这个Broker)
  2. 调用配置的分区选择算法选择分区的Leader

Partition分布

kafka集群Partition replication默认自动分配。

image-20220912132159624

图中,箭头指向为副本,以Partition-0为例,Broker-1的Partition-0为Leader,副本是Broker-2的Partition-0。

每个Broker一次分配主Partition,下一个Broker(按照BrokerID为序)为副本,如此循环迭代分配。

副本分配算法:

image-20220912132712312

  • 将所有N Broker和待分配的i个Partition排序
  • 将第i个Partition分配到第i mod n 个Broker上(确保负载均衡,每个Broker上都有leader)
  • 将第i个Partition的第j个副本分配到第(i+j)mod n 个Broker上。(负载均衡,并且保持id分布)

Leader

kafka处理失败需要明确定义一个Broker是否活着,对kafka而言,kafka存活包含两个条件:

  1. 副本所在节点需要与zookeeper维持session(zk的心跳实现)
  2. 从副本的最后一条消息的offset需要与主副本的最后一条消息offset差值不超过设定阈值(差值过大,认为改节点离群,在0.10.0版本被移除)或者副本的LEO落后于主副本的LEO时长不大于设定阈值。

Leader会跟踪与其保持同步的replica列表,该列表称为ISR,如果一个Follower宕机,或者落后太多,Leader会把它从LSR中移除,当再次满足以上条件之后,又会被重新加入集合中。

ISR的引入主要是解决同步复制与异步复制两种方案的缺陷:

  • 同步复制,如果有副本宕机或者超时,会拖慢该副本组的整体性能
  • 异步复制,如果所有副本都远落后于主副本,一旦主副本重新选举,则存在消息丢失的情况。

分布式日志系统(replicated log),主要保证:

  • commit log不会丢失
  • commit log在不同机器上一致。

几个常见的基于主从复制的replicated log实现:

  • raft:基于多数节点的ack,超过半数节点确认,则认为数据有效,几点称为leader Follower
  • pacific A:基于所有节点的ack,节点一般称为primary、secondary,kafka正在使用(维护ISR)
  • bookkeeper:基于法定个数节点的ack,节点一般称为write、bookie,pulsar正在使用。

例如raft和pacificA在kafka中的使用。kafka需要在zookeeper中动态维护一个ISR,这个ISR里面所有的replica都跟上leader,才有被选为leader的可能。这种模式下,对于一个f+1个replica,一个Partition能保证不丢失已经commit的消息前提下,容忍f个replica失败,只有1个是正常,在大多数场景是有利的。

如果使用raft,则需要保证replica的个数是上面的一倍,也就是3个。

High watermark & Log End Offset

kafka所有的副本对象都有两个重要的属性,LEO和HW

LEO:日志末端位移,记录该副本底层日志中下一条消息的位移值

HW:水位值,对于同一个副本而言,HW值不会大于LEO值。小于等于HW值的所有消息都被认为是已备份的。

可靠性

producer

producer往kafka生产消息,可以选择是否被ISR中的Follower全部接收成功才返回。

  • 0:producer不等待Broker的ack,如果Broker收到数据,但是没有刷盘就返回,当Broker故障时有可能丢失数据
  • 1:producer等待Broker的ack,Partition的Leader落盘成功后返回ack,如果Follower同步成功之前leader故障,则会导致数据丢失
  • -1(all):producer等到Broker的ack,Partition的Leader和Follower全部落盘成功后返回ack。但是在Broker发送ack时,Leader发生故障,会造成数据重复

设置request.required.acks=-1的同时,min,.insync.replicas大于1。此时可保证数据可靠性。

高性能

架构层面:

  • Partition级别并行:Broker、disk、Consumer端
  • ISR:少量Follower同步

IO层面:

  • Batch读写:一次性可以push、pull多条
  • 磁盘顺序IO
  • page cache
  • zero copy
  • 压缩