1 调度器管理模块coordinator
HeartbeatStreams:生成的调度操作,最终需要发送给tikv节点,用于实施调度操作,从而达
// newCoordinator creates a new coordinator.
调度器管理模块,说白了,就是为了生成一系列调度操作,以及控制调度操作生成的速度,每类调度操作对应region的管理,一个region对应一个raft group,所以针对region的管理,就是针对raft group 成员的管理,例如增加/删除member,基于不同目的(负载,数据量)的raft member 位置转移等,调度模块从属关系coordinator–>RaftCluster -->Server
type Server struct {
调度器模块定义,从调度器定义里,发现调试器模块包含一系列其它模块
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
cluster *RaftCluster
checkers *schedule.CheckerController
regionScatterer *schedule.RegionScatterer
regionSplitter *schedule.RegionSplitter
schedulers map[string]*scheduleController
opController *schedule.OperatorController
hbStreams *hbstream.HeartbeatStreams
pluginInterface *schedule.PluginInterface
}
从上面coordinator的定义,也能看出来coordinator 包含一系列子模块,例如子模块分成三类:
生成调度子模块分析
// NewCheckerController create a new CheckerController.
Check 控制器的定义,里面包含一系列不同目的的检察器
learnerChecker: checker.NewLearnerChecker(cluster)
// LearnerChecker ensures region has a learner will be promoted.
// NewLearnerChecker creates a learner checker.
2.1.2LearnerChecker生成调度操作operator流程分析
重点分析函数CreatePromoteLearnerOperator:
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
// Builder is used to create operators. Usage:
// operation record
originPeers peersMap
unhealthyPeers peersMap
originLeaderStoreID uint64
targetPeers peersMap
targetLeaderStoreID uint64
err error
// skip origin check flags
skipOriginJointStateCheck bool
// build flags
allowDemote bool
useJointConsensus bool
lightWeight bool
forceTargetLeader bool
// intermediate states
currentPeers peersMap
currentLeaderStoreID uint64
toAdd, toRemove, toPromote, toDemote peersMap // pending tasks.
steps []OpStep // generated steps.
peerAddStep map[uint64]int // record at which step a peer is created.
// comparison function
stepPlanPreferFuncs []func(stepPlan) int // for buildStepsWithoutJointConsensus
}
Build 模块初始化过程如下
// options
for _, option := range opts {
option(b)
}
// origin peers
err := b.err
originPeers := newPeersMap()
unhealthyPeers := newPeersMap()
for _, p := range region.GetPeers() {
originPeers.Set(p)
}
for _, p := range region.GetPendingPeers() {
unhealthyPeers.Set(p)
}
for _, p := range region.GetDownPeers() {
unhealthyPeers.Set(p.Peer)
}
// origin leader
originLeaderStoreID := region.GetLeader().GetStoreId()
b.rules = rules
b.originPeers = originPeers
b.unhealthyPeers = unhealthyPeers
b.originLeaderStoreID = originLeaderStoreID
b.targetPeers = originPeers.Copy()
b.allowDemote = supportJointConsensus
return b
}
下一步分析build的PromoteLearner方法,这个方法目的:根据你想要升级的learner peer,初始化相关build 成员 ,也就是修改build用于存放目标peer的角色 ,从之前leaner 角色,修改成metapb.PeerRole_Voter
// PromoteLearner records a promote learner operation in Builder.
b.targetPeers.Set(&metapb.Peer{
Id: peer.GetId(),
StoreId: peer.GetStoreId(),
Role: metapb.PeerRole_Voter,
}
return b
}
下一步高用build的Build(0)方法,这个方法,会根据build 成员的状态,生成相应的调试操作,原理其实也很简单,build记录两部分信息,一部分信息对应原始的region 信息,另一部分对应region期望的region目标状态信息。然后两组信息对比,最终生成调度操作(用于修正region到目标状态)。
// Build creates the Operator.
kind, b.err = b.buildStepsWithoutJointConsensus(kind)
}
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil
}
两组信息对比算法如下,我们重点关注leaner到follower角色的转换.
在我们的例子里,如果 通过b.originPeers 发现 原始的peer是leaner, b.targetPeers对应peer的期望状态是voter角色。那么我们会保存一条记录,表示那个store的peer角色由learner升级到voter角色
originPeers
and targetPeers
to initialize toAdd
, toRemove
, toPromote
, toDemote
. b.peerAddStep = make(map[uint64]int)
return b.brief(), nil
}
每个调度操作,分很多steps,下面这个函数用于生成调度操作的steps
// Some special cases, and stores that do not support using joint consensus.
if plan.promote != nil {
b.execPromoteLearner(plan.promote)
}
}
return kind, nil
}
调用b.peerPlan生成对应step paln, 最终基于stepplan生成操作步骤,
func (b *Builder) peerPlan() stepPlan {
func (b *Builder) planPromotePeer() stepPlan {
type stepPlan struct {
func (b *Builder) execPromoteLearner(peer *metapb.Peer) {
// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.
基于以上步骤,调用build 函数生成opeartor,代码如下:
// Build creates the Operator.
// NewOperator creates a new operator.
限制调度速度的模块opController
// NewOperatorController creates a OperatorController.
// OperatorController is used to limit the speed of scheduling.
这个模块的核心功能,就是控制把opeartor 发送到TIKV的速度,所以我们找出opcontrol发送opeartor到tikv的函数:
switch op.Status() {
case operator.STARTED:
oc.SendScheduleCommand(region, step, source)
}
}
}
oc.SendScheduleCommand(region, step, source),针对指定region对应opeartor的step到tikv节点,前面分析我们的step对应PromoteLearner,也就是上升leaner到voter。生成heatbeatResponse对象,这个对象会通过region的心跳请求回复给这个tikv.从而推动tikv升级region peer 到voter
oc.hbStreams.SendMsg(region, cmd)
}
第三个部分,分析tikv和pd心跳流处理
用于处理心跳信息模块hbStreams
// SendMsg sends a message to related store.
select {
case s.msgCh <- msg:
case <-s.hbStreamCtx.Done():
}
}
hbstream模块,会启动一个后台服务线程,用于定期从channel 模块里取出消息,处理发送到相应的tikv
case msg := <-s.msgCh:
storeID := msg.GetTargetPeer().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store := s.storeInformer.GetStore(storeID)
storeAddress := store.GetAddress()
if stream, ok := s.streams[storeID]; ok {
if err := stream.Send(msg);
}
}
HeartbeatStreams模块通过一个map结构管理和其它TIKV结点的流,这个数据结构的数据更新,通过每次心跳grpc 处理更新
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
type heartbeatServer struct {
heartbeatServer是pdpb.PD_RegionHeartbeatServer包装器
done := make(chan error, 1)
go func() { done <- s.stream.Send(m) }()
select {
case err := <-done:
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return errors.WithStack(errSendRegionHeartbeatTimeout)
}
}