客户端对象创建以及初始化
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
// newBaseClient returns a new baseClient.
func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {
ctx1, cancel := context.WithCancel(ctx)
c := &baseClient{
urls: urls,
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
}
c.initRetry(c.initClusterID);
c.initRetry(c.updateMember);
c.wg.Add(1)
go c.memberLoop()
return c, nil
}
client包含子模块basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到pd leader和tso dc-location 目的地。因为部分请求,只能PD leader或者dc-location leader能够处理,如果不是leader收到请求,还是要转发到对应的leader。而basic client就是为了维护两类leader,以及建立连接.
baseClient 子模块分析,主要包含以下信息:
对应pd leader对象
对应follower对象
dc-locatopn对应的leader对象以及grpc
一些channel,checkLeaderCh,checkTSODispatcherCh 。用于两类leader的管理
// baseClient is a basic client for all other complex client.
type baseClient struct {
urls []string
clusterID uint64
// PD leader URL
leader atomic.Value // Store as string
// PD follower URLs
followers atomic.Value // Store as []string
// dc-location -> TSO allocator leader gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
}
下面我们详细分析basic client的创建流程,三个核心步骤
调用c.updateMember
func (c *baseClient) updateMember() error {
for _, u := range c.urls {
members, err := c.getMembers(ctx, u)
更新c.clientConns以及c.allocators,用于匹配dc-location到grpc连接
c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());
更新pd所有成员的url
c.updateURLs(members.GetMembers())
更新pd follower的urls
c.updateFollowers(members.GetMembers(), members.GetLeader())
更新c.leader,指定pd的leader.以及指定dc global对应的leader
c.switchLeader(members.GetLeader().GetClientUrls());
给c.checkTSODispatcherCh发送空消息,触发check tso
c.scheduleCheckTSODispatcher()
return nil
}
}
启动后台服务线程go c.memberLoop()
这个后台服务,每隔1分钟触发一次updateMemer用于更新pd成员角色。或者通过channel触发成员角色更新
func (c *baseClient) memberLoop() {
for {
select {
case <-c.checkLeaderCh:
case <-time.After(time.Minute):
case <-ctx.Done():
return
}
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
}
}
}
Client流程分析:
核心流程分析,当用户创建pd 客户端的时候,触发以下步骤,
1.创建client 对象,包含子模块basic client对象的创建
2.调用c.updateTSODispatcher(),
3.启动一系列后台管理线程
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
c := &client{
baseClient: base,
checkTSDeadlineCh: make(chan struct{}),
}
c.updateTSODispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
go c.leaderCheckLoop()
return c, nil
}
调度每个dc-location,用于请求批量处理发往这个dc-location 的tso 请要求
在client,每个dc对应一个TSODispatcher,dispatcher用于批量发送请求到相应的dc-location leader
func (c *client) updateTSODispatcher() {
// Set up the new TSO dispatcher
c.allocators.Range(func(dcLocationKey, _ interface{}) bool {
dcLocation := dcLocationKey.(string)
if !c.checkTSODispatcher(dcLocation) {
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
}
return true
})
}
启动后台服务handleDispatcher,这个后台服务调用processTSORequests 用于批量从PD请求一定数目的TSO
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {
for {
select {
case first := <-tsoDispatcher:
pendingPlus1 := len(tsoDispatcher) + 1
requests[0] = first
for i := 1; i < pendingPlus1; i++ {
requests[i] = <-tsoDispatcher
}
done := make(chan struct{})
dl := deadline{
timer: time.After(c.timeout),
done: done,
cancel: cancel,
}
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
select {
case tsDeadlineCh.(chan deadline) <- dl:
case <-dispatcherCtx.Done():
return
}
err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts)
}
}
}
client后台服务
tsLoop流程分析:定期检察或者通过checkTSODispatcherCh 触发是否有新加入的dc-location,如果有的话,调用updateTSODispatcher,用于批量处理发往这个dc-location的tso 请求
func (c *client) tsLoop() {
ticker := time.NewTicker(tsLoopDCCheckInterval)
for {
c.updateTSODispatcher()
select {
case <-ticker.C:
case <-c.checkTSODispatcherCh:
case <-loopCtx.Done():
return
}
}
}
tsCancelLoop后台服务线程主要用于 tso 请求超时检察,原理比较简单,当我们处理每次TSO请求时,会生成一个对象发送给对应dc-location对应超时处理的channel. 这个对象包含超时检察以及超时处理函数。 然后在tsCancelLoop服务里,我们会watch每个dc-location,watch函数会进入一个loop. 等待channel 收到对象。如果发现对象,进入超时处理,要么超时,取消操作,要么处理完成,接受下一个对象
func (c *client) tsCancelLoop() {
ticker := time.NewTicker(tsLoopDCCheckInterval)
for {
// Watch every dc-location's tsDeadlineCh
c.allocators.Range(func(dcLocation, _ interface{}) bool {
c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string))
return true
})
select {
case <-c.checkTSDeadlineCh:
continue
case <-ticker.C:
continue
case <-tsCancelLoopCtx.Done():
return
}
}
}
func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error(“tso request is canceled due to timeout”, zap.String(“dc-location”, dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
case <-d.done:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(dcLocation, tsDeadlineCh)
}
}
后台leader检察线程,如果发现leader变化,更新相关信息
func (c *client) leaderCheckLoop() {
ticker := time.NewTicker(LeaderHealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.checkLeaderHealth(leaderCheckLoopCtx)
}
}
}