1.概述
平时在使用Kafka的时候,可能关注的更多的是Kafka系统层面的。今天来给大家剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。
2.内容
Kafka控制器,其实就是一个Kafka系统的Broker。它除了具有一般Broker的功能之外,还具有选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。
如果在Kafka系统运行过程中,当前的控制器出现故障导致不可用,那么Kafka系统会从其他正常运行的Broker中重新选举出新的控制器。
2.1 控制器启动顺序
在Kafka集群中,每个Broker在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点,步骤如下:
- 第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;
- 其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;
- 其他的代理节点,会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。
2.2 如何查看控制器优先级 ?
控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。之后,可以在Zookeeper系统中查看/controller临时节点的内容,例如:
# 进入Zookeeper集群[hadoop@dn1 bin]$ zkCli.sh -server dn1:2181# 执行查看命令[zk: dn1:2181(CONNECTED) 1] get /controller
成功执行命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器,如下图所示:
当前启动顺序为:dn1、dn2、dn3,修改启动顺序为:dn3、dn1、dn2。再次查看Zookeeper系统中执行“get /controller”命令,输出结果如下图所示:
2.3 切换控制器所属的代理节点
当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。
3.主题分区Leader节点的选举过程
选举控制器的核心思路是:各个代理节点公平竞争抢占Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。选举流程如下图所示:
当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。
- KafkaController:在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数,即onControllerFailover和onControllerResignation;
- ZookeeperLeaderElector:实现主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现的会话超时”这种情况,它主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑;
- LeaderChangeListener:如果节点数据发送变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader函数。当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举,通过调用onResigningAsLeader函数重新选择其他正常运行的代理节点成为新的Leader;
- SessionExpirationListener:当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession函数会被调用,对于Zookeeper系统中会话过期的连接,会先进行一次判断。
4.注册分区和副本状态机
Kafka系统的控制器主要负责管理主题、分区和副本。 Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。 使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。
代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。代码如下所示:
def onControllerFailover() { if(isRunning) {info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch(zkUtils.zkClient) // 在/brokers/topics节点注册监听器 registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() // 注册分区状态机 replicaStateMachine.registerListeners() // 注册副本状态机 initializeControllerContext() // 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() // 启动副本状态机 partitionStateMachine.startup() // 启动分区状态机 // 在自动故障转移中为所有主题注册分区更改监听器 controllerContext.allTopics.foreach(topic => partitionStateMachine. registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d". format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover")}
主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这就会触发分区和副本状态机注册监听器。
5.总结
Kafka系统整体来说,调试还算方便。下载Kafka源代码,导入到IDE中,就可以启动整个Kafka系统了,可以通过DEBUG的方式来亲自了解控制器的执行流程。
6.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。