1、项目概况 项目地址:https://github.com/kubernetes-sigs/metrics-server 
在k8s集群中,如果你想要去做弹性伸缩,或者想要使用kubectl top命令,那么metric-server是你绕不开的组件。metric-server主要用来通过aggregate api向其它组件提供集群中的pod和node的cpu和memory的监控指标,弹性伸缩中的podautoscaler就是通过调用这个接口来查看pod的当前资源使用量来进行pod的扩缩容的。
需要注意的是:
metric-server提供的是实时的指标(实际是最近一次采集的数据,保存在内存中),并没有数据库来存储这些数据指标并非由metric-server本身采集,而是由每个节点上的cadvisor采集,metric-server只是发请求给cadvisor并将metric格式的数据转换成aggregate api 
由于需要通过aggregate api来提供接口,需要集群中的kube-apiserver开启该功能(开启方法可以参考官方社区的文档) 
 
2、部署方法 metric-server最佳的安装方法是通过deployment:
1 kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.3.6/components.yaml 
该yaml中主要的deployment参数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 apiVersion:  apps/v1 kind:  Deployment metadata:   name:  metrics-server    namespace:  kube-system    labels:      k8s-app:  metrics-server  spec:   selector:      matchLabels:        k8s-app:  metrics-server    template:      metadata:        name:  metrics-server        labels:          k8s-app:  metrics-server      spec:        serviceAccountName:  metrics-server        volumes:               -  name:  tmp-dir          emptyDir:  {}        containers:        -  name:  metrics-server          image:  k8s.gcr.io/metrics-server-amd64:v0.3.6          imagePullPolicy:  IfNotPresent          args:            -  --cert-dir=/tmp            -  --secure-port=4443          ports:          -  name:  main-port            containerPort:  4443            protocol:  TCP          securityContext:            readOnlyRootFilesystem:  true            runAsNonRoot:  true            runAsUser:  1000          volumeMounts:          -  name:  tmp-dir            mountPath:  /tmp        nodeSelector:          kubernetes.io/os:  linux          kubernetes.io/arch:  "amd64"  
其中还有一个值得注意的资源是一个APIService,这个资源主要就是将metrics-server注册到aggregate api中。
1 2 3 4 5 6 7 8 9 10 11 12 13 apiVersion:  apiregistration.k8s.io/v1beta1 kind:  APIService metadata:   name:  v1beta1.metrics.k8s.io  spec:   service:      name:  metrics-server      namespace:  kube-system    group:  metrics.k8s.io    version:  v1beta1    insecureSkipTLSVerify:  true    groupPriorityMinimum:  100    versionPriority:  100  
3、启动参数 
参数名称 
参数解释 
 
 
metric-resolution 
周期性调用接口获取metric原始数据的时间间隔,默认60s 
 
kubelet-insecure-tls 
访问kubelet时不对其证书进行ca校验,仅测试时使用 
 
kubelet-port 
调用节点上的kubelet获取metric的端口,默认10250端口 
 
kubeconfig 
调用kube-apiserver和kubelet使用的kubeconfig文件路径 
 
kubelet-preferred-address-types 
调用kubelet使用的ip地址优先级 
 
kubelet-certificate-authority 
访问kubelet使用的ca证书 
 
deprecated-kubelet-completely-insecure 
使用非安全方式访问kubelet(即将废弃) 
 
4、代码分析 在开始走读metrics-server的代码之前,我们先来根据其功能来猜测一下它的代码逻辑。我们知道,通过节点上的cadvisor接口获取到的数据一般是这样的,包含的信息太多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 [root@node1  ~]#  curl  -k  https://172.17.8.101:10250/stats/summary?only_cpu_and_memory=true {   "node":  {      "nodeName":  "node1" ,      "systemContainers":  [        {          "name":  "kubelet" ,          "startTime":  "2020-05-24T12:54:13Z" ,          "cpu":  {            "time":  "2020-05-24T14:12:31Z" ,            "usageNanoCores":  20686133 ,            "usageCoreNanoSeconds":  156089526198          },          "memory":  {            "time":  "2020-05-24T14:12:31Z" ,            "usageBytes":  170590208 ,            "workingSetBytes":  122531840 ,            "rssBytes":  66949120 ,            "pageFaults":  763727 ,            "majorPageFaults":  85          },          "userDefinedMetrics":  null        },        {          "name":  "runtime" ,          "startTime":  "2020-05-24T12:54:13Z" ,          "cpu":  {            "time":  "2020-05-24T14:12:31Z" ,            "usageNanoCores":  20686133 ,            "usageCoreNanoSeconds":  156089526198          },          "memory":  {            "time":  "2020-05-24T14:12:31Z" ,            "usageBytes":  170590208 ,            "workingSetBytes":  122531840 ,            "rssBytes":  66949120 ,            "pageFaults":  763727 ,            "majorPageFaults":  85          },          "userDefinedMetrics":  null        },        {          "name":  "pods" ,          "startTime":  "2020-05-24T12:54:13Z" ,          "cpu":  {            "time":  "2020-05-24T14:12:39Z" ,            "usageNanoCores":  0 ,            "usageCoreNanoSeconds":  42207538504          },          "memory":  {            "time":  "2020-05-24T14:12:39Z" ,            "availableBytes":  1910824960 ,            "usageBytes":  33480704 ,            "workingSetBytes":  16498688 ,            "rssBytes":  36864 ,            "pageFaults":  0 ,            "majorPageFaults":  0          },          "userDefinedMetrics":  null        }      ],      "startTime":  "2020-05-24T12:52:24Z" ,      "cpu":  {        "time":  "2020-05-24T14:12:39Z" ,        "usageNanoCores":  888521168 ,        "usageCoreNanoSeconds":  776524490477      },      "memory":  {        "time":  "2020-05-24T14:12:39Z" ,        "availableBytes":  891166720 ,        "usageBytes":  1627074560 ,        "workingSetBytes":  1036156928 ,        "rssBytes":  359944192 ,        "pageFaults":  1850284 ,        "majorPageFaults":  1987      }    },    "pods":  [      {        "podRef":  {          "name":  "metrics-server-7668599459-2jxq5" ,          "namespace":  "kube-system" ,          "uid":  "f5af876f-03de-43e5-902b-79bece68c508"        },        "startTime":  "2020-05-24T13:27:42Z" ,        "containers":  null ,        "cpu":  {          "time":  "2020-05-24T14:12:36Z" ,          "usageNanoCores":  0 ,          "usageCoreNanoSeconds":  6297886        },        "memory":  {          "time":  "2020-05-24T14:12:36Z" ,          "usageBytes":  434176 ,          "workingSetBytes":  249856 ,          "rssBytes":  36864 ,          "pageFaults":  0 ,          "majorPageFaults":  0        }      }    ]  } 
而我们通过metrics-server获得的数据则是这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [root@node1 ~]# curl http : {   "kind" : "PodMetrics" ,   "apiVersion" : "metrics.k8s.io/v1beta1" ,   "metadata" : {     "name" : "metrics-server-7668599459-2jxq5" ,     "namespace" : "kube-system" ,     "selfLink" : "/apis/metrics.k8s.io/v1beta1/namespaces/kube-system/pods/metrics-server-7668599459-2jxq5" ,     "creationTimestamp" : "2020-05-24T13:27:42Z"    },   "timeStamp" : "2020-05-24T13:27:42Z" ,   "window" : "30s" ,   "containers" : [     {       "name" : "metrics-server" ,       "usage" : {         "cpu" : "0" ,         "memory" : "424Ki"        }     }   ] } 
也就是说,本质上metrics-server相当于做了一次数据的转换,把cadvisor格式的数据转换成了k8s的api的json格式。由此我们不难猜测,metrics-server的代码中必然存在这种先从metric中获取接口中的所有信息,再解析出其中的数据的过程。除此之外,我们可能也会有一个疑惑,那就是:我们给metric-server发送请求时,metric-server是马上向cadvisor发送请求然后解析请求中的数据再返回回来,还是metrics-server中已经定期从中cadvisor获取好数据了(可能缓存在内存中),当请求发过来时直接返回缓存中的数据。我们可以带着这个疑问直接去看源码。
4.1、启动程序 metric-server的启动流程使用的也是github.com/spf13/cobra框架,对这个库感兴趣的可以去githubMetricsServerOptions实现的Run函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 func  (o MetricsServerOptions)  Run (stopCh <-chan  struct {})  error 	 	config, err := o.Config() 	if  err != nil  { 		return  err 	} 	config.GenericConfig.EnableMetrics = true  	      	var  clientConfig *rest.Config 	if  len (o.Kubeconfig) > 0  { 		loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: o.Kubeconfig} 		loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{}) 		clientConfig, err = loader.ClientConfig() 	} else  { 		clientConfig, err = rest.InClusterConfig() 	} 	if  err != nil  { 		return  fmt.Errorf("unable to construct lister client config: %v" , err) 	} 	 	clientConfig.ContentType = "application/vnd.kubernetes.protobuf"  	 	kubeClient, err := kubernetes.NewForConfig(clientConfig) 	if  err != nil  { 		return  fmt.Errorf("unable to construct lister client: %v" , err) 	} 	 	informerFactory := informers.NewSharedInformerFactory(kubeClient, 0 ) 	 	kubeletRestCfg := rest.CopyConfig(clientConfig) 	if  len (o.KubeletCAFile) > 0  { 		kubeletRestCfg.TLSClientConfig.CAFile = o.KubeletCAFile 		kubeletRestCfg.TLSClientConfig.CAData = nil  	} 	kubeletConfig := summary.GetKubeletConfig(kubeletRestCfg, o.KubeletPort, o.InsecureKubeletTLS, o.DeprecatedCompletelyInsecureKubelet) 	kubeletClient, err := summary.KubeletClientFor(kubeletConfig) 	if  err != nil  { 		return  fmt.Errorf("unable to construct a client to connect to the kubelets: %v" , err) 	} 	 	addrPriority := make ([]corev1.NodeAddressType, len (o.KubeletPreferredAddressTypes)) 	for  i, addrType := range  o.KubeletPreferredAddressTypes { 		addrPriority[i] = corev1.NodeAddressType(addrType) 	} 	addrResolver := summary.NewPriorityNodeAddressResolver(addrPriority)                	sourceProvider := summary.NewSummaryProvider(informerFactory.Core().V1().Nodes().Lister(), kubeletClient, addrResolver) 	scrapeTimeout := time.Duration(float64 (o.MetricResolution) * 0.90 )       	sources.RegisterDurationMetrics(scrapeTimeout) 	sourceManager := sources.NewSourceManager(sourceProvider, scrapeTimeout) 	      	metricSink, metricsProvider := sink.NewSinkProvider() 	 	manager.RegisterDurationMetrics(o.MetricResolution) 	mgr := manager.NewManager(sourceManager, metricSink, o.MetricResolution) 	 	config.ProviderConfig.Node = metricsProvider 	config.ProviderConfig.Pod = metricsProvider 	 	server, err := config.Complete(informerFactory).New() 	if  err != nil  { 		return  err 	} 	 	server.AddHealthzChecks(healthz.NamedCheck("healthz" , mgr.CheckHealth)) 	 	mgr.RunUntil(stopCh)      	return  server.GenericAPIServer.PrepareRun().Run(stopCh) } 
从这段代码中可以看出来,数据的抓取和缓存与server是两个不同的处理流程,他们之间通过共享内存来配合,数据定期抓取完之后缓存到metricSink(其实也就是metricsProvider)中,而server收到请求时从metricSink中读取数据并返回给client。这个过程也正好回答了我们之前的问题,metrics-server中已经定期从中cadvisor获取好数据了,当请求发过来时直接返回缓存中的数据。
4.2、数据抓取与缓存 4.1章节代码注释中的2/3/4/5小节就是metric-server的数据抓取流程的启动过程,我们暂且称之为manager,我们看到这其中主要是起了两个client,一个是kube-apiserver的client用来获取集群中node资源,另一个client则是调用节点上cadvisor的接口获取节点和pod的cpu和memory监控数据,同时也创建了一个metricSink用来保存获取的监控数据。话不多说,我们来看一下这个manager是如何运转的。
我们将4.1章节代码注释中的4和5连在一起就是manager的启动过程,创建一个manager然后运行起来。
1 2 3 4 manager.RegisterDurationMetrics(o.MetricResolution) mgr := manager.NewManager(sourceManager, metricSink, o.MetricResolution) mgr.RunUntil(stopCh) 
上面的启动过程实际如下,创建好的manager运行是其实就是周期性地执行其Collect函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func  NewManager (metricSrc sources.MetricSource, metricSink sink.MetricSink, resolution time.Duration)  *Manager 	manager := Manager{ 		source:     metricSrc,  		sink:       metricSink,  		resolution: resolution,  	} 	return  &manager } func  (rm *Manager)  RunUntil (stopCh <-chan  struct {}) 	go  func ()           		ticker := time.NewTicker(rm.resolution) 		defer  ticker.Stop() 		rm.Collect(time.Now()) 		for  { 			select  {              			case  startTime := <-ticker.C: 				rm.Collect(startTime)  			case  <-stopCh: 				return  			} 		} 	}() } 
接下来看Collect函数中的逻辑就很简介明了了,主要做了两件事件:
调用sourceManager实现的Collect函数获取metric数据 
将获取到的原始metric数据解析成pod和node的数值并保存metricSink中去 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func  (rm *Manager)  Collect (startTime time.Time) 	rm.healthMu.Lock() 	rm.lastTickStart = startTime 	rm.healthMu.Unlock() 	healthyTick := true       	ctx, cancelTimeout := context.WithTimeout(context.Background(), rm.resolution) 	defer  cancelTimeout() 	klog.V(6 ).Infof("Beginning cycle, collecting metrics..." )      	data, collectErr := rm.source.Collect(ctx)           	klog.V(6 ).Infof("...Storing metrics..." )      	recvErr := rm.sink.Receive(data)                	collectTime := time.Since(startTime) 	tickDuration.Observe(float64 (collectTime) / float64 (time.Second)) 	klog.V(6 ).Infof("...Cycle complete" ) 	rm.healthMu.Lock() 	rm.lastOk = healthyTick 	rm.healthMu.Unlock() } 
4.2.1、获取metric数据(rm.source.Collect(ctx)) 获取metric数据本质上就是调接口获取第4章节开头说的/metric格式的数据,而这个接口本质上就是k8s集群中节点上的cadvisor(实际由kubelet暴露),因此这部分的逻辑就是围绕这个思路展开。
首先需要知道这个集群中有哪些节点,并获取这些节点上获取metric的ip和端口 
分别调用这些节点上的metric接口并解析其中node和pod的cpu和memory数值 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func  (m *sourceManager)  Collect (baseCtx context.Context)  (*MetricsBatch, error)      	sources, err := m.srcProv.GetMetricSources() 	var  errs []error 	if  err != nil  { 		errs = append (errs, err) 	} 	klog.V(1 ).Infof("Scraping metrics from %v sources" , len (sources))      	responseChannel := make (chan  *MetricsBatch, len (sources)) 	errChannel := make (chan  error, len (sources)) 	defer  close (responseChannel) 	defer  close (errChannel) 	startTime := time.Now() 	delayMs := delayPerSourceMs * len (sources) 	if  delayMs > maxDelayMs { 		delayMs = maxDelayMs 	} 	for  _, source := range  sources {          		go  func (source MetricSource)               			sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond 			time.Sleep(sleepDuration) 			 			ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration) 			defer  cancelTimeout() 			klog.V(2 ).Infof("Querying source: %s" , source)              			metrics, err := scrapeWithMetrics(ctx, source) 			if  err != nil  { 				err = fmt.Errorf("unable to fully scrape metrics from source %s: %v" , source.Name(), err) 			} 			responseChannel <- metrics 			errChannel <- err 		}(source) 	} 	res := &MetricsBatch{} 	for  range  sources {          		err := <-errChannel 		srcBatch := <-responseChannel 		if  err != nil  { 			errs = append (errs, err) 		} 		if  srcBatch == nil  { 			continue  		} 		res.Nodes = append (res.Nodes, srcBatch.Nodes...) 		res.Pods = append (res.Pods, srcBatch.Pods...) 	} 	klog.V(1 ).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v" , time.Since(startTime), len (res.Nodes), len (res.Pods)) 	return  res, utilerrors.NewAggregate(errs) } 
可以看到上述两个关键点的细节都封装起来了,我们一个一个来看:
获取源头m.srcProv.GetMetricSources() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func  (p *summaryProvider)  GetMetricSources ()  ([]sources.MetricSource, error) 	sources := []sources.MetricSource{}      	nodes, err := p.nodeLister.List(labels.Everything()) 	if  err != nil  { 		return  nil , fmt.Errorf("unable to list nodes: %v" , err) 	} 	var  errs []error 	for  _, node := range  nodes {                   		info, err := p.getNodeInfo(node) 		if  err != nil  { 			errs = append (errs, fmt.Errorf("unable to extract connection information for node %q: %v" , node.Name, err)) 			continue  		}          		sources = append (sources, NewSummaryMetricsSource(info, p.kubeletClient)) 	} 	return  sources, utilerrors.NewAggregate(errs) } func  NewSummaryMetricsSource (node NodeInfo, client KubeletInterface)  sources .MetricSource 	return  &summaryMetricsSource{ 		node:          node, 		kubeletClient: client, 	} } 
抓取数据并解析数据scrapeWithMetrics(ctx, source)  
1 2 3 4 5 6 7 8 9 10 11 12 13 func  scrapeWithMetrics (ctx context.Context, s MetricSource)  (*MetricsBatch, error) 	sourceName := s.Name() 	startTime := time.Now() 	defer  lastScrapeTimestamp. 		WithLabelValues(sourceName). 		Set(float64 (time.Now().Unix())) 	defer  scraperDuration. 		WithLabelValues(sourceName). 		Observe(float64 (time.Since(startTime)) / float64 (time.Second))      	return  s.Collect(ctx) } 
这里的s.Collect(ctx)实际上就是步骤1中NewSummaryMetricsSource创建出来的MetricSource的Collect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func  (src *summaryMetricsSource)  Collect (ctx context.Context)  (*sources.MetricsBatch, error)           	summary, err := func ()  (*stats.Summary, error)  		startTime := time.Now() 		defer  summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64 (time.Since(startTime)) / float64 (time.Second))   		return  src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress) 	}()      	res := &sources.MetricsBatch{ 		Nodes: make ([]sources.NodeMetricsPoint, 1 ), 		Pods:  make ([]sources.PodMetricsPoint, len (summary.Pods)), 	}           	return  res, utilerrors.NewAggregate(errs) } 
Collect的逻辑本质上是执行了GetSummary,这里起了一个http client,然后给https://{node ip}:{port}/stats/summary?only_cpu_and_memory=true发了一个GET请求并获取了返回的body体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func  (kc *kubeletClient)  GetSummary (ctx context.Context, host string )  (*stats.Summary, error) 	scheme := "https"  	if  kc.deprecatedNoTLS { 		scheme = "http"  	} 	url := url.URL{ 		Scheme:   scheme, 		Host:     net.JoinHostPort(host, strconv.Itoa(kc.port)), 		Path:     "/stats/summary" , 		RawQuery: "only_cpu_and_memory=true" , 	} 	req, err := http.NewRequest("GET" , url.String(), nil ) 	if  err != nil  { 		return  nil , err 	} 	summary := &stats.Summary{} 	client := kc.client 	if  client == nil  { 		client = http.DefaultClient 	}      	err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary) 	return  summary, err } 
总结  
通过4.2.1中的代码分析我们印证之前的过程,获取metric数据本质上就是调用cadvisor的接口来获取数据而已。这里我们有必要来看一下之前我们忽略掉的关键数据结构MetricsBatch,这是最终解析出来的包含了node和pod的cpu和memory信息的对象,也是传递给metricSink的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 type  MetricsBatch struct  {	Nodes []NodeMetricsPoint 	Pods  []PodMetricsPoint } type  NodeMetricsPoint struct  {	Name string  	MetricsPoint } type  PodMetricsPoint struct  {	Name      string  	Namespace string  	Containers []ContainerMetricsPoint } type  ContainerMetricsPoint struct  {	Name string  	MetricsPoint } type  MetricsPoint struct  {	Timestamp time.Time 	 	CpuUsage resource.Quantity 	 	MemoryUsage resource.Quantity } 
4.2.2、保存metric数据(rm.sink.Receive(data)) 4.2.1中已经调用集群中所有节点的cadvisor接口并获取了所有节点和pod的metric数据,接下来就是保存到缓存中了。前面已经将获取到的数据存在了data(本质上就是4.2.1总结时说到的MetricsBatch)变量中,接下来就时对这个变量进行处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type  sinkMetricsProvider struct  {	mu    sync.RWMutex 	nodes map [string ]sources.NodeMetricsPoint 	pods  map [apitypes.NamespacedName]sources.PodMetricsPoint } func  (p *sinkMetricsProvider)  GetNodeMetrics (nodes ...string )  ([]provider.TimeInfo, []corev1.ResourceList, error)      } func  (p *sinkMetricsProvider)  GetContainerMetrics (pods ...apitypes.NamespacedName)  ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error)      } func  (p *sinkMetricsProvider)  Receive (batch *sources.MetricsBatch)  error      	newNodes := make (map [string ]sources.NodeMetricsPoint, len (batch.Nodes)) 	for  _, nodePoint := range  batch.Nodes { 		if  _, exists := newNodes[nodePoint.Name]; exists { 			klog.Errorf("duplicate node %s received" , nodePoint.Name) 			continue  		} 		newNodes[nodePoint.Name] = nodePoint 	}      	newPods := make (map [apitypes.NamespacedName]sources.PodMetricsPoint, len (batch.Pods)) 	for  _, podPoint := range  batch.Pods { 		podIdent := apitypes.NamespacedName{Name: podPoint.Name, Namespace: podPoint.Namespace} 		if  _, exists := newPods[podIdent]; exists { 			klog.Errorf("duplicate pod %s received" , podIdent) 			continue  		} 		newPods[podIdent] = podPoint 	} 	p.mu.Lock() 	defer  p.mu.Unlock()      	p.nodes = newNodes 	p.pods = newPods 	return  nil  } 
可以看到sinkMetricsProvider数据的结构体本质上就是两个map而已,而保存的逻辑也非常简单,直接创建两个新map并赋值过去就可以,并不需要处理之前的旧数据,简单粗暴。
4.3、metric-server 在4.1节的启动程序中已经说明了metric-server的启动过程,经过4.2.2的代码分析之后,我们可以猜测metric-server本质上就是收到请求之后到sinkMetricsProvider的两个map中读取数据并返回而已。
我们回到启动程序中,这里包含了两个步骤,一个是创建好一个server(本质上是k8s.io/apiserver库的一个GenericAPIServer),另一个则是直接将这个server运行起来。
1 2 3 4 server, err := config.Complete(informerFactory).New() return  server.GenericAPIServer.PrepareRun().Run(stopCh)
由于k8s.io/apiserver库的原理比较复杂,暂且不表,我们只讲创建GenericAPIServer的创建流程并说明server是如何使用的。对于metric-server而言,需要先创建一个GenericAPIServer,然后将metric-server自己的API与对应的处理handler注册进来即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type  MetricsServer struct  {	*genericapiserver.GenericAPIServer } func  (c completedConfig)  New ()  (*MetricsServer, error)      	genericServer, err := c.CompletedConfig.New("metrics-server" , genericapiserver.NewEmptyDelegate())  	if  err != nil  { 		return  nil , err 	}      	if  err := generic.InstallStorage(c.ProviderConfig, c.SharedInformerFactory.Core().V1(), genericServer); err != nil  { 		return  nil , err 	} 	return  &MetricsServer{ 		GenericAPIServer: genericServer, 	}, nil  } 
在注册metric-server自己的API时,需要先创建一个APIGroup(即metrics.k8s.io),然后将这个Group下面的各个资源(例如这里的"nodes"和"pods")的Storage注册到VersionedResourcesStorageMap中,这里面最关键的就是每个资源的Storage需要实现处理请求的rest接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func  InstallStorage (providers *ProviderConfig, informers coreinf.Interface, server *genericapiserver.GenericAPIServer)  error      	info := BuildStorage(providers, informers)      	return  server.InstallAPIGroup(&info) } func  BuildStorage (providers *ProviderConfig, informers coreinf.Interface)  genericapiserver .APIGroupInfo      	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs)      	nodemetricsStorage := nodemetricsstorage.NewStorage(metrics.Resource("nodemetrics" ), providers.Node, informers.Nodes().Lister())      	podmetricsStorage := podmetricsstorage.NewStorage(metrics.Resource("podmetrics" ), providers.Pod, informers.Pods().Lister())      	metricsServerResources := map [string ]rest.Storage{ 		"nodes" : nodemetricsStorage, 		"pods" :  podmetricsStorage, 	}      	apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = metricsServerResources 	return  apiGroupInfo } 
这里以pod的MetricStorage的Getter接口为例,这里实现的Get函数就是当metric-server收到获取某个pod的metric的请求时处理该请求的Handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 type  MetricStorage struct  {	groupResource schema.GroupResource 	prov          provider.PodMetricsProvider 	podLister     v1listers.PodLister } var  _ rest.KindProvider = &MetricStorage{}var  _ rest.Storage = &MetricStorage{}var  _ rest.Getter = &MetricStorage{}var  _ rest.Lister = &MetricStorage{}func  NewStorage (groupResource schema.GroupResource, prov provider.PodMetricsProvider, podLister v1listers.PodLister)  *MetricStorage 	return  &MetricStorage{ 		groupResource: groupResource, 		prov:          prov, 		podLister:     podLister, 	} } func  (m *MetricStorage)  Get (ctx context.Context, name string , opts *metav1.GetOptions)  (runtime.Object, error) 	namespace := genericapirequest.NamespaceValue(ctx)      	pod, err := m.podLister.Pods(namespace).Get(name)                     	podMetrics, err := m.getPodMetrics(pod)                	return  &podMetrics[0 ], nil  } func  (m *MetricStorage)  getPodMetrics (pods ...*v1.Pod)  ([]metrics.PodMetrics, error) 	namespacedNames := make ([]apitypes.NamespacedName, len (pods)) 	for  i, pod := range  pods { 		namespacedNames[i] = apitypes.NamespacedName{ 			Name:      pod.Name, 			Namespace: pod.Namespace, 		} 	}      	timestamps, containerMetrics, err := m.prov.GetContainerMetrics(namespacedNames...) 	if  err != nil  { 		return  nil , err 	} 	res := make ([]metrics.PodMetrics, 0 , len (pods)) 	for  i, pod := range  pods {                   		res = append (res, metrics.PodMetrics{ 			ObjectMeta: metav1.ObjectMeta{ 				Name:              pod.Name, 				Namespace:         pod.Namespace, 				CreationTimestamp: metav1.NewTime(time.Now()), 			}, 			Timestamp:  metav1.NewTime(timestamps[i].Timestamp), 			Window:     metav1.Duration{Duration: timestamps[i].Window}, 			Containers: containerMetrics[i], 		}) 	} 	return  res, nil  } 
这里我们终于又回到了4.2.2中的sinkMetricsProvider,这不过这次是从其map中读取数据。在此数据抓取和metric-server这两部分就连到一起了,我们其实也可以把这两部分当成一个生产者消费者模式,前者负责生产数据,后者则读取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func  (p *sinkMetricsProvider)  GetContainerMetrics (pods ...apitypes.NamespacedName)  ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) 	p.mu.RLock() 	defer  p.mu.RUnlock() 	timestamps := make ([]provider.TimeInfo, len (pods)) 	resMetrics := make ([][]metrics.ContainerMetrics, len (pods)) 	for  i, pod := range  pods {          		metricPoint, present := p.pods[pod] 		if  !present { 			continue  		} 		 		resMetrics[i] = contMetrics 	} 	return  timestamps, resMetrics, nil  }