0
0
0
0
专栏/.../

PD 调度器模块

 薛港-移动云  发表于  2021-06-17

1 调度器管理模块coordinator


初始化过程分析

因为调度器管理模块从属于 RaftCluster 模块,所以启动 RaftCluster 模块的时候,我们初始化调度器管理模块,从初始化代码,能够看出来,coordinator模块依赖上层模块raft cluster以及用于发送调作的模块HeartbeatStreams。

raft cluster:用于存储当前系统最新的region以及store信息,基于这些信息,我们分析,统计

当前cluster数据分布现状,以及冷热现状。调度器模块需要这些信息,然后基于一些

rule,或者为了系统健康或者更好的用户体验的想法,生成一系列调度操作

HeartbeatStreams:生成的调度操作,最终需要发送给tikv节点,用于实施调度操作,从而达


到用户或者系统定义健康的期望。这个模块主要用于连接管理,发现调度操作

// Start starts a cluster.

func (c *RaftCluster) Start(s Server) error {

c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())

}

// newCoordinator creates a new coordinator.


func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator {

ctx, cancel := context.WithCancel(ctx)

opController := schedule.NewOperatorController(ctx, cluster, hbStreams)

return &coordinator{

ctx: ctx,

cancel: cancel,

cluster: cluster,

checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController),

regionScatterer: schedule.NewRegionScatterer(ctx, cluster),

regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),

schedulers: make(map[string]*scheduleController),

opController: opController,

hbStreams: hbStreams,

pluginInterface: schedule.NewPluginInterface(),

}

}

调度器管理模块,说白了,就是为了生成一系列调度操作,以及控制调度操作生成的速度,每类调度操作对应region的管理,一个region对应一个raft group,所以针对region的管理,就是针对raft group 成员的管理,例如增加/删除member,基于不同目的(负载,数据量)的raft member 位置转移等,调度模块从属关系coordinator–>RaftCluster -->Server


调度器coordinator模块和其它模块的关系

type RaftCluster struct {

coordinator *coordinator

}

type Server struct {


cluster *cluster.RaftCluster

}

调度器模块定义,从调度器定义里,发现调试器模块包含一系列其它模块


// coordinator is used to manage all schedulers and checkers to decide //if the region needs to be scheduled.

type coordinator struct {

sync.RWMutex

	wg              sync.WaitGroup
	ctx             context.Context
	cancel          context.CancelFunc
	cluster         *RaftCluster
	checkers        *schedule.CheckerController
	regionScatterer *schedule.RegionScatterer
	regionSplitter  *schedule.RegionSplitter
	schedulers      map[string]*scheduleController
	opController    *schedule.OperatorController
	hbStreams       *hbstream.HeartbeatStreams
	pluginInterface *schedule.PluginInterface
}

从上面coordinator的定义,也能看出来coordinator 包含一系列子模块,例如子模块分成三类:


不同目的的调度操作生成模块:checkers,regionScatterer,regionSplitter

限制调度速度的模块:opController

用于调度操作发送到tikv的模块:hbStreams

生成调度子模块分析


2.1 子模块 CheckerController

这个模块主要用于定期检察所有的数据,如果发现数据副本数,或者位置等不符合用户的期望,会生成一系列的调度操作,这些调度操作会不停的针对region实施调度操作,最终达到用户的期望。CheckerController 的初始化以及定义如下:

check 控制器的初始化过程

checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController)

// NewCheckerController create a new CheckerController.


// TODO: isSupportMerge should be removed.

func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController {

regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)

return &CheckerController{

cluster: cluster,

opts: cluster.GetOpts(),

opController: opController,

learnerChecker: checker.NewLearnerChecker(cluster),

replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList),

ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList),

mergeChecker: checker.NewMergeChecker(ctx, cluster),

jointStateChecker: checker.NewJointStateChecker(cluster),

regionWaitingList: regionWaitingList,

}

}

Check 控制器的定义,里面包含一系列不同目的的检察器


// CheckerController is used to manage all checkers.

type CheckerController struct {

cluster opt.Cluster

opts *config.PersistOptions

opController *OperatorController

learnerChecker *checker.LearnerChecker

replicaChecker *checker.ReplicaChecker

ruleChecker *checker.RuleChecker

mergeChecker *checker.MergeChecker

jointStateChecker *checker.JointStateChecker

regionWaitingList cache.Cache

}

从检察控制模块CheckerController 定义能够看出来,这个模块包含一系列用于不同目的检察调度模块,如果每类检察发现问题,会生成对应的调度操作。例如replicaChecker 模块,这个模块主要用于检察复制数,如果发现问题,生成调度操作,并通过stream发现到对应的TIKV,最终让region达到用户期望副本数。由于每类检察调度模块,基本框架一致,我们只要分析一个检察控制模块,其它依次推理分析,本文由于篇幅不会全部展开.

2.1.1 模块 LearnerChecker

初始化,以及模块定义

learnerChecker: checker.NewLearnerChecker(cluster)

// LearnerChecker ensures region has a learner will be promoted.


type LearnerChecker struct {

cluster opt.Cluster

}

// NewLearnerChecker creates a learner checker.


func NewLearnerChecker(cluster opt.Cluster) *LearnerChecker {

return &LearnerChecker{

cluster: cluster,

}

}

2.1.2LearnerChecker生成调度操作operator流程分析


LearnerChecker调用chek 函数分析特定的region, 如果发现这个region存在learner角色,并且这个learner角色,满足升级到voter角色要求,那么就会创建一个operator,用于升级这个region的learndr角色的副本到voter角色,从而让这个副本能够参与后期选举。

// Check verifies a region’s role, creating an Operator if need.

func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {

for _, p := range region.GetLearners() {

op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)

continue

}

return op

}

return nil

}

重点分析函数CreatePromoteLearnerOperator:


op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)

// CreatePromoteLearnerOperator creates an operator that promotes a learner.


func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {

return NewBuilder(desc, cluster, region).

PromoteLearner(peer.GetStoreId()).

Build(0)

}

我们下面重点分析模块Builder模块,这个模块用于针对region生成相应operator

build 模块定义如下

// Builder is used to create operators. Usage:


// op, err := NewBuilder(desc, cluster, region).

// RemovePeer(store1).

// AddPeer(peer1).

// SetLeader(store2).

// Build(kind)

// The generated Operator will choose the most appropriate execution order

// according to various constraints.

type Builder struct {

// basic info

desc string

cluster opt.Cluster

regionID uint64

regionEpoch *metapb.RegionEpoch

rules []*placement.Rule

expectedRoles map[uint64]placement.PeerRoleType

	// operation record
	originPeers         peersMap
	unhealthyPeers      peersMap
	originLeaderStoreID uint64
	targetPeers         peersMap
	targetLeaderStoreID uint64
	err                 error

	// skip origin check flags
	skipOriginJointStateCheck bool

	// build flags
	allowDemote       bool
	useJointConsensus bool
	lightWeight       bool
	forceTargetLeader bool

	// intermediate states
	currentPeers                         peersMap
	currentLeaderStoreID                 uint64
	toAdd, toRemove, toPromote, toDemote peersMap       // pending tasks.
	steps                                []OpStep       // generated steps.
	peerAddStep                          map[uint64]int // record at which step a peer is created.

	// comparison function
	stepPlanPreferFuncs []func(stepPlan) int // for buildStepsWithoutJointConsensus
}

Build 模块初始化过程如下


1.根据参数region 对象,抽取出这个region对应的所有peer成员存放store,处于Pending状态存入的store,当前的leader所在的store,

// NewBuilder creates a Builder.

func NewBuilder(desc string, cluster opt.Cluster, region *core.RegionInfo, opts …BuilderOption) *Builder {

b := &Builder{

desc: desc,

cluster: cluster,

regionID: region.GetID(),

regionEpoch: region.GetRegionEpoch(),

}

	// options
	for _, option := range opts {
		option(b)
	}

	// origin peers
	err := b.err
	originPeers := newPeersMap()
	unhealthyPeers := newPeersMap()

	for _, p := range region.GetPeers() {
		originPeers.Set(p)
	}

	for _, p := range region.GetPendingPeers() {
		unhealthyPeers.Set(p)
	}

	for _, p := range region.GetDownPeers() {
		unhealthyPeers.Set(p.Peer)
	}

	// origin leader
	originLeaderStoreID := region.GetLeader().GetStoreId()
	b.rules = rules
	b.originPeers = originPeers
	b.unhealthyPeers = unhealthyPeers
	b.originLeaderStoreID = originLeaderStoreID
	b.targetPeers = originPeers.Copy()
	b.allowDemote = supportJointConsensus
	return b
}

下一步分析build的PromoteLearner方法,这个方法目的:根据你想要升级的learner peer,初始化相关build 成员 ,也就是修改build用于存放目标peer的角色 ,从之前leaner 角色,修改成metapb.PeerRole_Voter


// CreatePromoteLearnerOperator creates an operator that promotes a learner.

func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {

return NewBuilder(desc, cluster, region).

PromoteLearner(peer.GetStoreId()).

Build(0)

}

// PromoteLearner records a promote learner operation in Builder.


func (b *Builder) PromoteLearner(storeID uint64) *Builder {

		b.targetPeers.Set(&metapb.Peer{
			Id:      peer.GetId(),
			StoreId: peer.GetStoreId(),
			Role:    metapb.PeerRole_Voter,
		}

	return b
}

下一步高用build的Build(0)方法,这个方法,会根据build 成员的状态,生成相应的调试操作,原理其实也很简单,build记录两部分信息,一部分信息对应原始的region 信息,另一部分对应region期望的region目标状态信息。然后两组信息对比,最终生成调度操作(用于修正region到目标状态)。

// Build creates the Operator.


func (b *Builder) Build(kind OpKind) (*Operator, error) {

var brief string

if brief, b.err = b.prepareBuild(); b.err != nil {

return nil, b.err

}

		kind, b.err = b.buildStepsWithoutJointConsensus(kind)
	}
	

	return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil
}

两组信息对比算法如下,我们重点关注leaner到follower角色的转换.


b.originPeers 保存的region原始状态信息

b.targetPeers 保存的region期望的目标状态信息

在我们的例子里,如果 通过b.originPeers 发现 原始的peer是leaner, b.targetPeers对应peer的期望状态是voter角色。那么我们会保存一条记录,表示那个store的peer角色由learner升级到voter角色


// Initialize intermediate states.

// TODO: simplify the code

func (b *Builder) prepareBuild() (string, error) {

b.toPromote = newPeersMap()

// Diff originPeers and targetPeers to initialize toAdd, toRemove, toPromote, toDemote.

for _, o := range b.originPeers {

n := b.targetPeers[o.GetStoreId()]

if core.IsLearner(o) {

if !core.IsLearner(n) {

// learner -> voter

b.toPromote.Set(n)

}

}

}

	b.peerAddStep = make(map[uint64]int)

	return b.brief(), nil
}

每个调度操作,分很多steps,下面这个函数用于生成调度操作的steps

// Some special cases, and stores that do not support using joint consensus.


func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {

for len(b.toPromote) > 0 {

plan := b.peerPlan()

		if plan.promote != nil {
			b.execPromoteLearner(plan.promote)
		}
	}	
	return kind, nil
}

调用b.peerPlan生成对应step paln, 最终基于stepplan生成操作步骤,

func (b *Builder) peerPlan() stepPlan {


if p := b.planPromotePeer(); !p.IsEmpty() {

return p

}

}

func (b *Builder) planPromotePeer() stepPlan {


for _, i := range b.toPromote.IDs() {

peer := b.toPromote[i]

return stepPlan{promote: peer}

}

}

type stepPlan struct {


promote *metapb.Peer

}

基于stepplan生成具体的steps。

PromoteLearner表示promote调度步骤,stote表示leaner对应的stroe,peerID表示对应peer

if plan.promote != nil {

b.execPromoteLearner(plan.promote)

}

func (b *Builder) execPromoteLearner(peer *metapb.Peer) {


b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()})

}

// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.


type PromoteLearner struct {

ToStore, PeerID uint64

}

基于以上步骤,调用build 函数生成opeartor,代码如下:


// CreatePromoteLearnerOperator creates an operator that promotes a learner.

func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {

return NewBuilder(desc, cluster, region).

PromoteLearner(peer.GetStoreId()).

Build(0)

}

// Build creates the Operator.


func (b *Builder) Build(kind OpKind) (*Operator, error) {

return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps…), nil

}

// NewOperator creates a new operator.


func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps …OpStep) *Operator {

return &Operator{

desc: desc,

brief: brief,

regionID: regionID,

regionEpoch: regionEpoch,

kind: kind,

steps: steps,

stepsTime: make([]int64, len(steps)),

status: NewOpStatusTracker(),

level: level,

AdditionalInfos: make(map[string]string),

}

}

限制调度速度的模块opController


模块定义

opController := schedule.NewOperatorController(ctx, cluster, hbStreams)

// NewOperatorController creates a OperatorController.


func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController {

return &OperatorController{

ctx: ctx,

cluster: cluster,

operators: make(map[uint64]*operator.Operator),

hbStreams: hbStreams,

histories: list.New(),

counts: make(map[operator.OpKind]uint64),

opRecords: NewOperatorRecords(ctx),

storesLimit: make(map[uint64]map[storelimit.Type]*storelimit.StoreLimit),

wop: NewRandBuckets(),

wopStatus: NewWaitingOperatorStatus(),

opNotifierQueue: make(operatorQueue, 0),

}

}

// OperatorController is used to limit the speed of scheduling.


type OperatorController struct {

sync.RWMutex

ctx context.Context

cluster opt.Cluster

operators map[uint64]*operator.Operator

hbStreams *hbstream.HeartbeatStreams

histories *list.List

counts map[operator.OpKind]uint64

opRecords *OperatorRecords

storesLimit map[uint64]map[storelimit.Type]*storelimit.StoreLimit

wop WaitingOperator

wopStatus *WaitingOperatorStatus

opNotifierQueue operatorQueue

}

这个模块的核心功能,就是控制把opeartor 发送到TIKV的速度,所以我们找出opcontrol发送opeartor到tikv的函数:


// Dispatch is used to dispatch the operator of a region.

func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {

// Check existed operator.

if op := oc.GetOperator(region.GetID()); op != nil {

// Update operator status:

// The operator status should be STARTED.

// Check will call CheckSuccess and CheckTimeout.

step := op.Check(region)

		switch op.Status() {
		case operator.STARTED:
			oc.SendScheduleCommand(region, step, source)
		}
	}
}

oc.SendScheduleCommand(region, step, source),针对指定region对应opeartor的step到tikv节点,前面分析我们的step对应PromoteLearner,也就是上升leaner到voter。生成heatbeatResponse对象,这个对象会通过region的心跳请求回复给这个tikv.从而推动tikv升级region peer 到voter


// SendScheduleCommand sends a command to the region.

func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) {

var cmd *pdpb.RegionHeartbeatResponse

switch st := step.(type) {

case operator.PromoteLearner:

cmd = &pdpb.RegionHeartbeatResponse{

ChangePeer: &pdpb.ChangePeer{

// reuse AddNode type

ChangeType: eraftpb.ConfChangeType_AddNode,

Peer: &metapb.Peer{

Id: st.PeerID,

StoreId: st.ToStore,

Role: metapb.PeerRole_Voter,

},

},

}

	oc.hbStreams.SendMsg(region, cmd)
}

第三个部分,分析tikv和pd心跳流处理

用于处理心跳信息模块hbStreams


心跳回复信息发送,这个函数会把消息发送到hbstreams的chanel 。用于消息的异常发送

oc.hbStreams.SendMsg(region, cmd)

// SendMsg sends a message to related store.


func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {

	select {
	case s.msgCh <- msg:
	case <-s.hbStreamCtx.Done():
	}
}

hbstream模块,会启动一个后台服务线程,用于定期从channel 模块里取出消息,处理发送到相应的tikv


func (s *HeartbeatStreams) run() {

for {

select {

		case msg := <-s.msgCh:
			storeID := msg.GetTargetPeer().GetStoreId()
			storeLabel := strconv.FormatUint(storeID, 10)
			store := s.storeInformer.GetStore(storeID)

			storeAddress := store.GetAddress()
			if stream, ok := s.streams[storeID]; ok {
				if err := stream.Send(msg); 
			}

}

HeartbeatStreams模块通过一个map结构管理和其它TIKV结点的流,这个数据结构的数据更新,通过每次心跳grpc 处理更新


/ RegionHeartbeat implements gRPC PDServer.

func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {


server := &heartbeatServer{stream: stream}

s.hbStreams.BindStream(storeID, server)

}

type heartbeatServer struct {


stream pdpb.PD_RegionHeartbeatServer

closed int32

}

heartbeatServer是pdpb.PD_RegionHeartbeatServer包装器


func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {

	done := make(chan error, 1)
	go func() { done <- s.stream.Send(m) }()
	select {
	case err := <-done:
		return errors.WithStack(err)
	case <-time.After(regionHeartbeatSendTimeout):
		atomic.StoreInt32(&s.closed, 1)
		return errors.WithStack(errSendRegionHeartbeatTimeout)
	}
}
0
0
0
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论