1
1
3
0
专栏/.../

tiup cluster display 执行流程代码详解

 Raymond  发表于  2023-04-06

一、前言

在运维tidb 集群的时候,经常需要使用tiup cluster display 去查看组件是否是运行状态,笔者之前一直认为是tiup 可能是通过探测组件端口的方式去判断组件是否存活(类似telnet ip port的方式),但是笔者之前遇到过1个问题,当目标组件服务器的firewalld 开启的时候,用tiup cluster 获取该组件的状态时,发现该组件的状态为Down,但是该组件确实是正常运行的,用telnet 去探测也是可以得到返回信息,这就让笔者感到好奇,tiup cluster到底是通过什么样的方式去判断,一个组件是Up还是Down呢?本着好奇的态度去翻阅了tiup cluster 关于display 部分的代码,本文较为详细的解读执行在执行tiup cluster display的时候, tiup是怎么去判断tidb组件的状态

二、前期检查

display 的执行代码入口在src/tiup-1.11.0/components/cluster/command/display.go 里面,在正式执行display的功能之前,会做一些前期检查和准备

1.exist, err := tidbSpec.Exist(dopt.ClusterName)
去判断集群名称是否存在,判断集群名称是否存在的方法是通过判断对应的集群拓扑文件是否存在,
例如我的集群名是tidb-test,那就去判断/home/tidb/.tiup/storage/cluster/clusters/tidb-test/meta.yaml是否存在,如果不存在,则判断集群名是不存在的
2.获取集群的元数据信息,集群的元数据信息例如版本等信息赋值给变量metadata
metadata, err := spec.ClusterMetadata(dopt.ClusterName)
这段代码的主要逻辑,其实就是把集群拓扑文件的信息通过yaml.Unmarshal函数进行解析返回给metadata变量,例如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,通过这段代码就得到集群的这些信息

3.如果指定了--version,只会打印集群的版本信息,通过 metadata.Version 变量就得到了集群的版本信息
if showVersionOnly {
	fmt.Println(metadata.Version)
	return nil
}
3.如果指定了--dashboard参数 ,只会打印dashboard地址
if showDashboardOnly {
	tlsCfg, err := metadata.Topology.TLSConfig(tidbSpec.Path(dopt.ClusterName, spec.TLSCertKeyDir))
	if err != nil {
		return err
	}
	return cm.DisplayDashboardInfo(dopt.ClusterName, time.Second*time.Duration(gOpt.APITimeout), tlsCfg)
}
4.如果指定了--labels ,只会打印集群的label信息
if showTiKVLabels {
	return cm.DisplayTiKVLabels(dopt, gOpt)
}

5. 最后面才是执行查询集群状态信息的方法
例如执行tiup cluster display tidb-test
return cm.Display(dopt, gOpt)

三、Display 执行流程

cm.Display的代码在 src/tiup-1.11.0/pkg/cluster/manager/display.go 里面,而在这段代码中获取集群组件状态信息是通过GetClusterTopology 方法去获得的(src/tiup-1.11.0/pkg/cluster/manager/display.go)

clusterInstInfos, err := m.GetClusterTopology(dopt, opt)

GetClusterTopology 方法的执行流程

1.获取ClusterMeta 结构体指针
metadata, err := m.meta(name)
通过m.meta 方法,metadata变量本质是一个*spec.ClusterMeta ,是ClusterMeta 这个结构体的对应的指针
type ClusterMeta struct {
	User    string `yaml:"user"`         // the user to run and manage cluster on remote
	Version string `yaml:"tidb_version"` // the version of TiDB cluster
	// EnableFirewall bool   `yaml:"firewall"`
	OpsVer string `yaml:"last_ops_ver,omitempty"` // the version of ourself that updated the meta last time

	Topology *Specification `yaml:"topology"`
}

ClusterMeta 结构体所定义的字段与集群拓扑yaml文件的格式是相对应的,比如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,其它的配置项都是从这3个大的属性下面去获得的
user: tidb
tidb_version: v6.1.2
topology:
  global:
    user: tidb
    ssh_port: 22
    ssh_type: builtin
    deploy_dir: /tidb-deploy
    data_dir: /tidb-data
    os: linux
   .........


ClusterMeta 这个结构体有很多方法,例如在src/tiup-1.11.0/pkg/cluster/spec/util.go 下面定义了这个结构体的方法
func (m *ClusterMeta) GetTopology() Topology {
	return m.Topology
}

// SetTopology implement Metadata interface.
func (m *ClusterMeta) SetTopology(topo Topology) {
	tidbTopo, ok := topo.(*Specification)
	//tidbTopo, ok := topo.S
	if !ok {
		panic(fmt.Sprintln("wrong type: ", reflect.TypeOf(topo)))
	}

	m.Topology = tidbTopo
}

// GetBaseMeta implements Metadata interface.
func (m *ClusterMeta) GetBaseMeta() *BaseMeta {
	return &BaseMeta{
		Version: m.Version,
		User:    m.User,
		OpsVer:  &m.OpsVer,
	}
}
在m.meta 方法里面继而又调用了 m.specManager.Metadata这个方法,m.specManager.Metadata 里面的主要逻辑是调用yaml.Unmarshal 函数去解析集群的拓扑文件,得到集群的拓扑文件信息,这一点是非常重要的

2.接下来通过metadata 变量,就可以调用结构体ClusterMeta相应的方法,从而具体得到集群拓扑文件相应的信息
topo := metadata.GetTopology()	
通过调用ClusterMeta结构体的GetTopology方法,这个方法的返回值正好是ClusterMeta结构体里面的Topology字段,这个字段对应的就是集群拓扑文件里面的topology属性信息,当然ClusterMeta结构体里面的Topology字段是一个嵌套结构体
里面嵌套了Specification 这个结构体, Specification 这个结构体 对应的就是集群拓扑文件里面的topology属性下面的配置项

同理base := metadata.GetBaseMeta()通过调用ClusterMeta结构体的GetBaseMeta方法,这个方法通过构造函数的方式,返回的是集群拓扑文件里面的user,tidb_version和last_ops_ver,omitempty 这些信息(不过笔者对last_ops_ver,omitempty 这两项配置信息不是很了解,就不去深究了)
其实,代码写到了这里,我们大抵明白了整个display的逻辑,首先通过yaml.Unmarshal去解析整个集群的拓扑文件,从而得到集群拓扑文件里面的信息

statusTimeout := time.Duration(opt.APITimeout) * time.Second
这里定义了获取组件状态的超时时间,默认是10s,通过      --status-timeout 进行传递
在src/tiup-1.11.0/components/cluster/command/display.go里面的
cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 40, "Timeout in seconds when getting node status")
这行代码通过传递进来的参数对这个变量赋值

3.获取pd地址
masterList := topo.BaseTopo().MasterList
通过topo.BaseTopo() 这个方法通过构造函数将BaseTopo结构体进行赋值,然后通过BaseTopo结构体的MasterList字段获取到了整个集群pd 的地址,pd的地址以切片的形式返回

4.获取需要查看具体组件或者具体节点的信息
filterRoles := set.NewStringSet(opt.Roles...)
filterNodes := set.NewStringSet(opt.Nodes...)
如果这个时候diplay 的时候有-R或者-N指定了需要判断状态的具体组件或者是节点的ip:port,那么上述代码就会将组件或者节点信息存储到filterRoles或者是filterNodes中
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
获取tsl的信息,由于我们一般不使用tls进行通讯,这里不做这个讨论

5.查询pd组件状态
下面的这段代码,通过topo.IterInstance 方法通过函数回调获得了pd组件的状态
这段代码的核心逻辑如下
1.通过ComponentsByStartOrder(src/tiup-1.11.0/pkg/cluster/spec/spec.go) 方法获得了将tidb目前所有的组件的信息放到了1个切片返回,这个切片的类型其实是1个Component接口类型(src/tiup-1.11.0/pkg/cluster/spec/instance.go),也就是说这个切片里面的元素可以调用这个接口所定义的函数
2.构建循环,去轮询每个组件,每个组件的Instances()方法返回的也是1个接口类型的切片,最终调用接口里面的ComponentName方法去判断是否是pd组件
3.查询pd组件的状态,查询pd组件的状态最终是通过每个组件的Status方法去完成的,具体的逻辑,会在文章的最后面说明
当然这段代码还涉及到并发线程去查询组件的信息,并发线程数是 --concurrency 参数指定的
var mu sync.Mutex
	topo.IterInstance(func(ins spec.Instance) {
	
		if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster {
			return
		}
		status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...)
		mu.Lock()
		if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
			instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
			masterActive = append(masterActive, instAddr)
		}
		masterStatus[ins.ID()] = status
		fmt.Println("masterStatus", masterStatus)
		mu.Unlock()
	}, opt.Concurrency)
6.获取除pd组件以外的其它组件的状态
下面的这段代码,是整个display 过程中去真正查询组件状态信息的代码
整体的执行逻辑如下
1.如果指定-R或者-N参数去判断某个组件或者是某个实例的地址去查询状态的时候,如果这个组件或者实例的地址不存在的时候就会直接报错退出执行
2.如果是pd组件,不需要再次查询pd的状态了,直接就可以得到pd的状态了,如果pd的地址是dashboard的地址,status变量会加上"|UI"字符串
3.查询除pd以外组件的其它组件的状态
topo.IterInstance(func(ins spec.Instance) {
		// apply role filter
		if len(filterRoles) > 0 && !filterRoles.Exist(ins.Role()) {
			fmt.Println("role not exists", ins.Role())
			return
		}
		// apply node filter
		if len(filterNodes) > 0 && !filterNodes.Exist(ins.ID()) {
			fmt.Println("node not exists", ins.ID())
			return
		}

		dataDir := "-"
		insDirs := ins.UsedDirs()
		deployDir := insDirs[0]
		if len(insDirs) > 1 {
			dataDir = insDirs[1]
		}

		var status, memory string
    switch ins.ComponentName() {
		case spec.ComponentPD:
			status = masterStatus[ins.ID()]
			instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
			if dashboardAddr == instAddr {
				status += "|UI"
			}
		case spec.ComponentDMMaster:
			status = masterStatus[ins.ID()]
		default:
			status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...)
		}
......

查询组件的状态,是通过status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...) 这行代码完成的

四、查询组件状态的总体执行流程

查询组件的状态的整体逻辑为:之前在上段代码中提过,去循环每个组件定义Instances()方法,得到Instance的接口,从而可以调用Instance的接口里面的所定义的Status方法去获得各个组件的运行状态
Instance的接口的定义(src/tiup-1.11.0/pkg/cluster/spec/instance.go)
type Instance interface {
	InstanceSpec
	ID() string
	Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
	InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
	ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
	PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
	ComponentName() string
	InstanceName() string
	ServiceName() string
	ResourceControl() meta.ResourceControl
	GetHost() string
	GetPort() int
	GetSSHPort() int
	DeployDir() string
	UsedPorts() []int
	UsedDirs() []string
	Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string
	Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration
	DataDir() string
	LogDir() string
	OS() string // only linux supported now
	Arch() string
	IsPatched() bool
	SetPatched(bool)
	setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]interface{}, paths meta.DirPaths) (map[string]interface{}, error)
}


func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string {	
	return i.StatusFn(ctx, timeout, tlsCfg, pdList...)
}

五、查询drainer等组件状态的执行流程

在目前现有tiup代码中,各个组件状态的查询其实是有稍许差别的,tidb、drainer、prometheus等组件的状态查询的代码逻辑其实和pd、tikv 组件的代码逻辑是不一样的,接下来就以查询drainer 组件状态为例,去观察查询drainer等组件状态的大体逻辑是怎么样的,drainer、tidb、prometheus等组件的状态的查询是通过Status方法调用了statusByHost函数(src/tiup-1.11.0/pkg/cluster/spec/drainer.go)去确定组件的状态是up还是down

func statusByHost(host string, port int, path string, timeout time.Duration, tlsCfg *tls.Config) string {
	if timeout < time.Second {
		timeout = statusQueryTimeout
	}

	client := utils.NewHTTPClient(timeout, tlsCfg)
	//是否对集群启用 TLS。启用之后,组件之间、客户端与组件之间都必须使用生成的 TLS 证书进行连接,默认值:false
	scheme := "http"
	if tlsCfg != nil {
		scheme = "https"
	}
	if path == "" {
		path = "/"
	}
	url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path)
	instance_address := fmt.Sprintf("%s:%d", host, port)
	// body doesn't have any status section needed
	body, err := client.Get(context.TODO(), url)
	if err != nil || body == nil {
		return "Down"
	}
	
	return "Up"
}

我把tiup 关于statusByHost 函数 的执行代码单独拿了出来,放到了下面的代码中,利用这部分代码可以直接获取某个实例的运行状态为up还是down,其实statusByHost 函数 总体执行的逻辑其实是在得到相关实例的url(api)(比如http://172.16.1.1:8249/status)后去调用http的相关模块去请求这个url,然后然后通过请求的url的返回内容去判断实例是up还是down 状态

package main

import (
	"context"
	"crypto/tls"
	"fmt"

	"io"
	"net"
	"net/http"
	"net/url"
	"os"
	"time"
)

type HTTPClient struct {
	client *http.Client
	header http.Header
}

// NewHTTPClient returns a new HTTP client with timeout and HTTPS support
func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
	if timeout < time.Second {
		timeout = 10 * time.Second // default timeout is 10s
	}
	tr := &http.Transport{
		TLSClientConfig: tlsConfig,
		Dial:            (&net.Dialer{Timeout: 3 * time.Second}).Dial,
	}
	// prefer to use the inner http proxy
	httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY")
	if len(httpProxy) == 0 {
		httpProxy = os.Getenv("HTTP_PROXY")
	}
	if len(httpProxy) > 0 {
		if proxyURL, err := url.Parse(httpProxy); err == nil {
			tr.Proxy = http.ProxyURL(proxyURL)
		}
	}
	return &HTTPClient{
		client: &http.Client{
			Timeout:   timeout,
			Transport: tr,
		},
	}
}

// GetWithStatusCode fetch a URL with GET method and returns the response, also the status code.
func (c *HTTPClient) GetWithStatusCode(ctx context.Context, url string) ([]byte, int, error) {
	var statusCode int
	req, err := http.NewRequest("GET", url, nil)
	////发送GET请求
	////url:请求地址
	//req 返回请求url的返回内容,是1个结构体类型
	if err != nil {
		return nil, statusCode, err
	}

	req.Header = c.header

	if ctx != nil {
		req = req.WithContext(ctx)
	}
	res, err := c.client.Do(req)

	if err != nil {
		fmt.Printf("url:%v, statusCode is:%v,err is:%v \n", url, statusCode, err)

		return nil, statusCode, err
		//如果实例状态获取异常,比如实例关闭,那么就会将错误和statusCode 返回
	}
	defer res.Body.Close()
	fmt.Println("the res.StatusCode is", res.StatusCode)
	//the res.StatusCode is 200
	data, err := checkHTTPResponse(res)

	return data, res.StatusCode, err
	//如果实例状态正常,res.StatusCode 为200
}

// checkHTTPResponse checks if an HTTP response is with normal status codes
func checkHTTPResponse(res *http.Response) ([]byte, error) {
	body, err := io.ReadAll(res.Body)

	if err != nil {
		return nil, err
	}

	if res.StatusCode < 200 || res.StatusCode >= 400 {
		return body, fmt.Errorf("error requesting %s, response: %s, code %d",
			res.Request.URL, string(body), res.StatusCode)
	}

	return body, nil
}

func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) {
	data, _, err := c.GetWithStatusCode(ctx, url)
	return data, err
}

func GetInstanceStatus(url string) string {
	client := NewHTTPClient(10, nil)
	//通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client
	body, err := client.Get(context.TODO(), url)
	if err != nil || body == nil {
		return "Down"
	}
	return "Up"
}

//0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client
//1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态
//2.get 函数调用GetWithStatusCode函数 查看url 地址的状态
//3.GetWithStatusCode 函数又调用checkHTTPResponse 函数,
//总结 如果GetWithStatusCode 返回错误信息或者http 请求返回的内容为空,则判断实例状态为Down

func main() {
	url := "http://172.16.1.1:8249/status"
	status := GetInstanceStatus(url)
	//GetInstanceStatus(url)
	fmt.Println(status)

}

statusByHost 函数的大概执行过程,所以去判断1个组件的状态是Up还是Down其实不是通过探测端口的方式,而是通过组件自身暴露出来的url(或者叫api),然后通过http去访问这个url的响应内容获得的,这一点之前是没有想到的。

//0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client
//1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态
//2.get 函数调用GetWithStatusCode函数 查看url 地址的状态
//3.GetWithStatusCode 函数又调用checkHTTPResponse 函数,
//总结 如果GetWithStatusCode 返回错误或者http 请求返回的内容为空,则判断实例状态为Down

六、查询pd组件状态的执行流程

查询pd组件状态的代码主要集中在src/tiup-1.11.0/pkg/cluster/spec/pd.go里面的Status方法

func (s *PDSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {
	if timeout < time.Second {
		timeout = statusQueryTimeout
	}
	addr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort)
	pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg)
	
	// check health
	err := pc.CheckHealth()
	if err != nil {
		return "Down"
	}
	// find leader node
	leader, err := pc.GetLeader()
	if err != nil {
		return "ERR"
	}
	res := "Up"
	if s.Name == leader.Name {
		res += "|L"
	}
	return res
}

这段代码的主要逻辑如下

1.生成1个pd状态的url(或者叫做api),例如http://172.168.1.3:2379/pd/ping
2.执行CheckHealth()方法调用http相关模块的get请求去访问这个pd状态的url,如果请求出错,判断这个pd实例的状态为down
3.生成pd的leader状态的url(或者叫做api),例如http://172.168.1.3:2379/pd/api/v1/leader,执行 GetLeader()方法调用http相关模块的get请求去访问这个url,通过访问这个url会返回pd 组件的leader的地址(比如pd leader是172.168.1.4:2379,但是通过访问172.168.1.3:2379/pd/api/v1/leader 也可以将leader的地址也返回,比如可以执行linux命令curl http://172.168.1.3:2379/pd/api/v1/leader 看看返回的信息是怎样的)
4.如果本次的pd地址刚好和通过访问leader url返回信息中的leader地址是同1个,则说明本次的pd地址就是leader,则返回"Up|L",否则就返回"Up"

七、查询tikv组件状态的执行流程

https://docs.pingcap.com/zh/tidb/stable/tidb-scheduling#信息收集

在tidb所有组件中,tikv的组件状态是最复杂的,有up、Disconnect、Offline、Tombstone 等各种状态,那么这些状态的逻辑是怎么获得的?tikv获取状态的逻辑主要集中在src/tiup-1.11.0/pkg/cluster/spec/tikv.go里面的checkStoreStatus和Status方法

func checkStoreStatus(ctx context.Context, storeAddr string, tlsCfg *tls.Config, pdList ...string) string {
	if len(pdList) < 1 {
		return "N/A"
	}
	pdapi := api.NewPDClient(ctx, pdList, statusQueryTimeout, tlsCfg)
	store, err := pdapi.GetCurrentStore(storeAddr)
	if err != nil {
		if errors.Is(err, api.ErrNoStore) {
			return "N/A"
		}
		return "Down"
	}
	return store.Store.StateName
}

总体执行逻辑为:

1.通过http的模块去访问pd的url,例如http://172.168.1.3:2379/pd/api/v1/stores?state=0&state=1&state=2,这个url会返回所有tikv的状态信息,然后把这些信息放到1个叫storesInfo的结构体中,用linux命令curl(或者浏览器访问) 可以访问http://172.168.1.3:2379/pd/api/v1/stores?state=0&state=1&state=2
2.去循环这个结构体里面的信息,去判断如果某个tikv实例的状态不是Tombstone状态,则终止循环,并且判断这个如果这个实例的状态值是不是Pending Offline状态,如果不是,则直接返回这个tikv实例状态值(如果tikv的状态值是Up、Disconnected、Down 走的就是这个逻辑)
3.如果通过循环storesInfo结构体得到某个tikv的实例是Tombstone状态,那么并不会马上结束循环,而是继续去循环,直到找到最大的storeid,然后才返回这个这个tikv store的状态,至于为什么这么涉及,通过查看代码注释,如果pd发生切换, store ID 可能存在重复现象,在这里,笔者就只解读代码的执行逻辑,如果对这个逻辑感兴趣可以查看这个issue(https://github.com/tikv/pd/issues/3303 )和具体的代码逻辑(src/tiup-1.11.0/pkg/cluster/api/pdapi.go GetCurrentStore方法)
4.如果某个tikv在集群拓扑文件有offline为true的标识,并且返回的状态是offline,那么就判断这个tikv的状态是Pending Offline
    ssh_port: 22
    port: 20161
    status_port: 20181
    deploy_dir: /tidb-deploy/tikv-20161
    data_dir: /tidb-data/tikv-20161
    log_dir: /tidb-deploy/tikv-20161/log
    offline: true

八、结论

通过对tiup cluster display 代码的学习,对如何判断组件的状态有了一个较为清晰的了解,总之来说,tiup 是去通过http请求的方式去访问各个组件暴露出来的接口url才获得组件的运行状态(tikv的状态信息是通过pd的api获得的),当然如果操作系统的防火墙如果打开的话,也会导致http请求失败,就会判断组件的状态是Down。

1
1
3
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论