1、项目概况 项目地址:https://github.com/kubernetes-sigs/metrics-server
在k8s
集群中,如果你想要去做弹性伸缩,或者想要使用kubectl top
命令,那么metric-server
是你绕不开的组件。metric-server
主要用来通过aggregate api
向其它组件提供集群中的pod
和node
的cpu
和memory
的监控指标,弹性伸缩中的podautoscaler
就是通过调用这个接口来查看pod的当前资源使用量来进行pod的扩缩容的。
需要注意的是:
metric-server
提供的是实时的指标(实际是最近一次采集的数据,保存在内存中),并没有数据库来存储
这些数据指标并非由metric-server
本身采集,而是由每个节点上的cadvisor
采集,metric-server
只是发请求给cadvisor
并将metric
格式的数据转换成aggregate api
由于需要通过aggregate api
来提供接口,需要集群中的kube-apiserver
开启该功能(开启方法可以参考官方社区的文档)
2、部署方法 metric-server
最佳的安装方法是通过deployment
:
1 kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.3.6/components.yaml
该yaml
中主要的deployment
参数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 apiVersion: apps/v1 kind: Deployment metadata: name: metrics-server namespace: kube-system labels: k8s-app: metrics-server spec: selector: matchLabels: k8s-app: metrics-server template: metadata: name: metrics-server labels: k8s-app: metrics-server spec: serviceAccountName: metrics-server volumes: - name: tmp-dir emptyDir: {} containers: - name: metrics-server image: k8s.gcr.io/metrics-server-amd64:v0.3.6 imagePullPolicy: IfNotPresent args: - --cert-dir=/tmp - --secure-port=4443 ports: - name: main-port containerPort: 4443 protocol: TCP securityContext: readOnlyRootFilesystem: true runAsNonRoot: true runAsUser: 1000 volumeMounts: - name: tmp-dir mountPath: /tmp nodeSelector: kubernetes.io/os: linux kubernetes.io/arch: "amd64"
其中还有一个值得注意的资源是一个APIService
,这个资源主要就是将metrics-server
注册到aggregate api
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 apiVersion: apiregistration.k8s.io/v1beta1 kind: APIService metadata: name: v1beta1.metrics.k8s.io spec: service: name: metrics-server namespace: kube-system group: metrics.k8s.io version: v1beta1 insecureSkipTLSVerify: true groupPriorityMinimum: 100 versionPriority: 100
3、启动参数
参数名称
参数解释
metric-resolution
周期性调用接口获取metric原始数据的时间间隔,默认60s
kubelet-insecure-tls
访问kubelet时不对其证书进行ca校验,仅测试时使用
kubelet-port
调用节点上的kubelet获取metric的端口,默认10250端口
kubeconfig
调用kube-apiserver和kubelet使用的kubeconfig文件路径
kubelet-preferred-address-types
调用kubelet使用的ip地址优先级
kubelet-certificate-authority
访问kubelet使用的ca证书
deprecated-kubelet-completely-insecure
使用非安全方式访问kubelet(即将废弃)
4、代码分析 在开始走读metrics-server
的代码之前,我们先来根据其功能来猜测一下它的代码逻辑。我们知道,通过节点上的cadvisor
接口获取到的数据一般是这样的,包含的信息太多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 [root@node1 ~]# curl -k https://172.17.8.101:10250/stats/summary?only_cpu_and_memory=true { "node": { "nodeName": "node1" , "systemContainers": [ { "name": "kubelet" , "startTime": "2020-05-24T12:54:13Z" , "cpu": { "time": "2020-05-24T14:12:31Z" , "usageNanoCores": 20686133 , "usageCoreNanoSeconds": 156089526198 }, "memory": { "time": "2020-05-24T14:12:31Z" , "usageBytes": 170590208 , "workingSetBytes": 122531840 , "rssBytes": 66949120 , "pageFaults": 763727 , "majorPageFaults": 85 }, "userDefinedMetrics": null }, { "name": "runtime" , "startTime": "2020-05-24T12:54:13Z" , "cpu": { "time": "2020-05-24T14:12:31Z" , "usageNanoCores": 20686133 , "usageCoreNanoSeconds": 156089526198 }, "memory": { "time": "2020-05-24T14:12:31Z" , "usageBytes": 170590208 , "workingSetBytes": 122531840 , "rssBytes": 66949120 , "pageFaults": 763727 , "majorPageFaults": 85 }, "userDefinedMetrics": null }, { "name": "pods" , "startTime": "2020-05-24T12:54:13Z" , "cpu": { "time": "2020-05-24T14:12:39Z" , "usageNanoCores": 0 , "usageCoreNanoSeconds": 42207538504 }, "memory": { "time": "2020-05-24T14:12:39Z" , "availableBytes": 1910824960 , "usageBytes": 33480704 , "workingSetBytes": 16498688 , "rssBytes": 36864 , "pageFaults": 0 , "majorPageFaults": 0 }, "userDefinedMetrics": null } ], "startTime": "2020-05-24T12:52:24Z" , "cpu": { "time": "2020-05-24T14:12:39Z" , "usageNanoCores": 888521168 , "usageCoreNanoSeconds": 776524490477 }, "memory": { "time": "2020-05-24T14:12:39Z" , "availableBytes": 891166720 , "usageBytes": 1627074560 , "workingSetBytes": 1036156928 , "rssBytes": 359944192 , "pageFaults": 1850284 , "majorPageFaults": 1987 } }, "pods": [ { "podRef": { "name": "metrics-server-7668599459-2jxq5" , "namespace": "kube-system" , "uid": "f5af876f-03de-43e5-902b-79bece68c508" }, "startTime": "2020-05-24T13:27:42Z" , "containers": null , "cpu": { "time": "2020-05-24T14:12:36Z" , "usageNanoCores": 0 , "usageCoreNanoSeconds": 6297886 }, "memory": { "time": "2020-05-24T14:12:36Z" , "usageBytes": 434176 , "workingSetBytes": 249856 , "rssBytes": 36864 , "pageFaults": 0 , "majorPageFaults": 0 } } ] }
而我们通过metrics-server
获得的数据则是这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [root@node1 ~]# curl http : { "kind" : "PodMetrics" , "apiVersion" : "metrics.k8s.io/v1beta1" , "metadata" : { "name" : "metrics-server-7668599459-2jxq5" , "namespace" : "kube-system" , "selfLink" : "/apis/metrics.k8s.io/v1beta1/namespaces/kube-system/pods/metrics-server-7668599459-2jxq5" , "creationTimestamp" : "2020-05-24T13:27:42Z" }, "timeStamp" : "2020-05-24T13:27:42Z" , "window" : "30s" , "containers" : [ { "name" : "metrics-server" , "usage" : { "cpu" : "0" , "memory" : "424Ki" } } ] }
也就是说,本质上metrics-server
相当于做了一次数据的转换,把cadvisor
格式的数据转换成了k8s
的api
的json
格式。由此我们不难猜测,metrics-server
的代码中必然存在这种先从metric中获取接口中的所有信息,再解析出其中的数据的过程。除此之外,我们可能也会有一个疑惑,那就是:我们给metric-server
发送请求时,metric-server
是马上向cadvisor
发送请求然后解析请求中的数据再返回回来,还是metrics-server
中已经定期从中cadvisor
获取好数据了(可能缓存在内存中),当请求发过来时直接返回缓存中的数据。我们可以带着这个疑问直接去看源码。
4.1、启动程序 metric-server
的启动流程使用的也是github.com/spf13/cobra
框架,对这个库感兴趣的可以去github
上了解一下,该框架实际执行的是MetricsServerOptions
实现的Run函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 func (o MetricsServerOptions) Run (stopCh <-chan struct {}) error { config, err := o.Config() if err != nil { return err } config.GenericConfig.EnableMetrics = true var clientConfig *rest.Config if len (o.Kubeconfig) > 0 { loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: o.Kubeconfig} loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{}) clientConfig, err = loader.ClientConfig() } else { clientConfig, err = rest.InClusterConfig() } if err != nil { return fmt.Errorf("unable to construct lister client config: %v" , err) } clientConfig.ContentType = "application/vnd.kubernetes.protobuf" kubeClient, err := kubernetes.NewForConfig(clientConfig) if err != nil { return fmt.Errorf("unable to construct lister client: %v" , err) } informerFactory := informers.NewSharedInformerFactory(kubeClient, 0 ) kubeletRestCfg := rest.CopyConfig(clientConfig) if len (o.KubeletCAFile) > 0 { kubeletRestCfg.TLSClientConfig.CAFile = o.KubeletCAFile kubeletRestCfg.TLSClientConfig.CAData = nil } kubeletConfig := summary.GetKubeletConfig(kubeletRestCfg, o.KubeletPort, o.InsecureKubeletTLS, o.DeprecatedCompletelyInsecureKubelet) kubeletClient, err := summary.KubeletClientFor(kubeletConfig) if err != nil { return fmt.Errorf("unable to construct a client to connect to the kubelets: %v" , err) } addrPriority := make ([]corev1.NodeAddressType, len (o.KubeletPreferredAddressTypes)) for i, addrType := range o.KubeletPreferredAddressTypes { addrPriority[i] = corev1.NodeAddressType(addrType) } addrResolver := summary.NewPriorityNodeAddressResolver(addrPriority) sourceProvider := summary.NewSummaryProvider(informerFactory.Core().V1().Nodes().Lister(), kubeletClient, addrResolver) scrapeTimeout := time.Duration(float64 (o.MetricResolution) * 0.90 ) sources.RegisterDurationMetrics(scrapeTimeout) sourceManager := sources.NewSourceManager(sourceProvider, scrapeTimeout) metricSink, metricsProvider := sink.NewSinkProvider() manager.RegisterDurationMetrics(o.MetricResolution) mgr := manager.NewManager(sourceManager, metricSink, o.MetricResolution) config.ProviderConfig.Node = metricsProvider config.ProviderConfig.Pod = metricsProvider server, err := config.Complete(informerFactory).New() if err != nil { return err } server.AddHealthzChecks(healthz.NamedCheck("healthz" , mgr.CheckHealth)) mgr.RunUntil(stopCh) return server.GenericAPIServer.PrepareRun().Run(stopCh) }
从这段代码中可以看出来,数据的抓取和缓存与server
是两个不同的处理流程,他们之间通过共享内存来配合,数据定期抓取完之后缓存到metricSink
(其实也就是metricsProvider
)中,而server
收到请求时从metricSink
中读取数据并返回给client
。这个过程也正好回答了我们之前的问题,metrics-server
中已经定期从中cadvisor
获取好数据了,当请求发过来时直接返回缓存中的数据。
4.2、数据抓取与缓存 4.1章节代码注释中的2/3/4/5小节就是metric-server
的数据抓取流程的启动过程,我们暂且称之为manager
,我们看到这其中主要是起了两个client
,一个是kube-apiserver
的client
用来获取集群中node
资源,另一个client则是调用节点上cadvisor
的接口获取节点和pod
的cpu
和memory
监控数据,同时也创建了一个metricSink
用来保存获取的监控数据。话不多说,我们来看一下这个manager
是如何运转的。
我们将4.1章节代码注释中的4和5连在一起就是manager
的启动过程,创建一个manager
然后运行起来。
1 2 3 4 manager.RegisterDurationMetrics(o.MetricResolution) mgr := manager.NewManager(sourceManager, metricSink, o.MetricResolution) mgr.RunUntil(stopCh)
上面的启动过程实际如下,创建好的manager
运行是其实就是周期性地执行其Collect
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func NewManager (metricSrc sources.MetricSource, metricSink sink.MetricSink, resolution time.Duration) *Manager { manager := Manager{ source: metricSrc, sink: metricSink, resolution: resolution, } return &manager } func (rm *Manager) RunUntil (stopCh <-chan struct {}) { go func () { ticker := time.NewTicker(rm.resolution) defer ticker.Stop() rm.Collect(time.Now()) for { select { case startTime := <-ticker.C: rm.Collect(startTime) case <-stopCh: return } } }() }
接下来看Collect
函数中的逻辑就很简介明了了,主要做了两件事件:
调用sourceManager实现的Collect函数获取metric数据
将获取到的原始metric数据解析成pod和node的数值并保存metricSink中去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (rm *Manager) Collect (startTime time.Time) { rm.healthMu.Lock() rm.lastTickStart = startTime rm.healthMu.Unlock() healthyTick := true ctx, cancelTimeout := context.WithTimeout(context.Background(), rm.resolution) defer cancelTimeout() klog.V(6 ).Infof("Beginning cycle, collecting metrics..." ) data, collectErr := rm.source.Collect(ctx) klog.V(6 ).Infof("...Storing metrics..." ) recvErr := rm.sink.Receive(data) collectTime := time.Since(startTime) tickDuration.Observe(float64 (collectTime) / float64 (time.Second)) klog.V(6 ).Infof("...Cycle complete" ) rm.healthMu.Lock() rm.lastOk = healthyTick rm.healthMu.Unlock() }
4.2.1、获取metric
数据(rm.source.Collect(ctx)
) 获取metric
数据本质上就是调接口获取第4章节开头说的/metric
格式的数据,而这个接口本质上就是k8s集群中节点上的cadvisor
(实际由kubelet
暴露),因此这部分的逻辑就是围绕这个思路展开。
首先需要知道这个集群中有哪些节点,并获取这些节点上获取metric
的ip
和端口
分别调用这些节点上的metric
接口并解析其中node
和pod
的cpu
和memory
数值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func (m *sourceManager) Collect (baseCtx context.Context) (*MetricsBatch, error) { sources, err := m.srcProv.GetMetricSources() var errs []error if err != nil { errs = append (errs, err) } klog.V(1 ).Infof("Scraping metrics from %v sources" , len (sources)) responseChannel := make (chan *MetricsBatch, len (sources)) errChannel := make (chan error, len (sources)) defer close (responseChannel) defer close (errChannel) startTime := time.Now() delayMs := delayPerSourceMs * len (sources) if delayMs > maxDelayMs { delayMs = maxDelayMs } for _, source := range sources { go func (source MetricSource) { sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond time.Sleep(sleepDuration) ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout-sleepDuration) defer cancelTimeout() klog.V(2 ).Infof("Querying source: %s" , source) metrics, err := scrapeWithMetrics(ctx, source) if err != nil { err = fmt.Errorf("unable to fully scrape metrics from source %s: %v" , source.Name(), err) } responseChannel <- metrics errChannel <- err }(source) } res := &MetricsBatch{} for range sources { err := <-errChannel srcBatch := <-responseChannel if err != nil { errs = append (errs, err) } if srcBatch == nil { continue } res.Nodes = append (res.Nodes, srcBatch.Nodes...) res.Pods = append (res.Pods, srcBatch.Pods...) } klog.V(1 ).Infof("ScrapeMetrics: time: %s, nodes: %v, pods: %v" , time.Since(startTime), len (res.Nodes), len (res.Pods)) return res, utilerrors.NewAggregate(errs) }
可以看到上述两个关键点的细节都封装起来了,我们一个一个来看:
获取源头m.srcProv.GetMetricSources()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (p *summaryProvider) GetMetricSources () ([]sources.MetricSource, error) { sources := []sources.MetricSource{} nodes, err := p.nodeLister.List(labels.Everything()) if err != nil { return nil , fmt.Errorf("unable to list nodes: %v" , err) } var errs []error for _, node := range nodes { info, err := p.getNodeInfo(node) if err != nil { errs = append (errs, fmt.Errorf("unable to extract connection information for node %q: %v" , node.Name, err)) continue } sources = append (sources, NewSummaryMetricsSource(info, p.kubeletClient)) } return sources, utilerrors.NewAggregate(errs) } func NewSummaryMetricsSource (node NodeInfo, client KubeletInterface) sources .MetricSource { return &summaryMetricsSource{ node: node, kubeletClient: client, } }
抓取数据并解析数据scrapeWithMetrics(ctx, source)
1 2 3 4 5 6 7 8 9 10 11 12 13 func scrapeWithMetrics (ctx context.Context, s MetricSource) (*MetricsBatch, error) { sourceName := s.Name() startTime := time.Now() defer lastScrapeTimestamp. WithLabelValues(sourceName). Set(float64 (time.Now().Unix())) defer scraperDuration. WithLabelValues(sourceName). Observe(float64 (time.Since(startTime)) / float64 (time.Second)) return s.Collect(ctx) }
这里的s.Collect(ctx)
实际上就是步骤1中NewSummaryMetricsSource
创建出来的MetricSource
的Collect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (src *summaryMetricsSource) Collect (ctx context.Context) (*sources.MetricsBatch, error) { summary, err := func () (*stats.Summary, error) { startTime := time.Now() defer summaryRequestLatency.WithLabelValues(src.node.Name).Observe(float64 (time.Since(startTime)) / float64 (time.Second)) return src.kubeletClient.GetSummary(ctx, src.node.ConnectAddress) }() res := &sources.MetricsBatch{ Nodes: make ([]sources.NodeMetricsPoint, 1 ), Pods: make ([]sources.PodMetricsPoint, len (summary.Pods)), } return res, utilerrors.NewAggregate(errs) }
Collect
的逻辑本质上是执行了GetSummary
,这里起了一个http client
,然后给https://{node ip}:{port}/stats/summary?only_cpu_and_memory=true
发了一个GET
请求并获取了返回的body
体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (kc *kubeletClient) GetSummary (ctx context.Context, host string ) (*stats.Summary, error) { scheme := "https" if kc.deprecatedNoTLS { scheme = "http" } url := url.URL{ Scheme: scheme, Host: net.JoinHostPort(host, strconv.Itoa(kc.port)), Path: "/stats/summary" , RawQuery: "only_cpu_and_memory=true" , } req, err := http.NewRequest("GET" , url.String(), nil ) if err != nil { return nil , err } summary := &stats.Summary{} client := kc.client if client == nil { client = http.DefaultClient } err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary) return summary, err }
总结
通过4.2.1中的代码分析我们印证之前的过程,获取metric
数据本质上就是调用cadvisor
的接口来获取数据而已。这里我们有必要来看一下之前我们忽略掉的关键数据结构MetricsBatch
,这是最终解析出来的包含了node
和pod
的cpu
和memory
信息的对象,也是传递给metricSink
的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 type MetricsBatch struct { Nodes []NodeMetricsPoint Pods []PodMetricsPoint } type NodeMetricsPoint struct { Name string MetricsPoint } type PodMetricsPoint struct { Name string Namespace string Containers []ContainerMetricsPoint } type ContainerMetricsPoint struct { Name string MetricsPoint } type MetricsPoint struct { Timestamp time.Time CpuUsage resource.Quantity MemoryUsage resource.Quantity }
4.2.2、保存metric
数据(rm.sink.Receive(data)
) 4.2.1中已经调用集群中所有节点的cadvisor
接口并获取了所有节点和pod
的metric
数据,接下来就是保存到缓存中了。前面已经将获取到的数据存在了data
(本质上就是4.2.1总结时说到的MetricsBatch
)变量中,接下来就时对这个变量进行处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type sinkMetricsProvider struct { mu sync.RWMutex nodes map [string ]sources.NodeMetricsPoint pods map [apitypes.NamespacedName]sources.PodMetricsPoint } func (p *sinkMetricsProvider) GetNodeMetrics (nodes ...string ) ([]provider.TimeInfo, []corev1.ResourceList, error) { } func (p *sinkMetricsProvider) GetContainerMetrics (pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) { } func (p *sinkMetricsProvider) Receive (batch *sources.MetricsBatch) error { newNodes := make (map [string ]sources.NodeMetricsPoint, len (batch.Nodes)) for _, nodePoint := range batch.Nodes { if _, exists := newNodes[nodePoint.Name]; exists { klog.Errorf("duplicate node %s received" , nodePoint.Name) continue } newNodes[nodePoint.Name] = nodePoint } newPods := make (map [apitypes.NamespacedName]sources.PodMetricsPoint, len (batch.Pods)) for _, podPoint := range batch.Pods { podIdent := apitypes.NamespacedName{Name: podPoint.Name, Namespace: podPoint.Namespace} if _, exists := newPods[podIdent]; exists { klog.Errorf("duplicate pod %s received" , podIdent) continue } newPods[podIdent] = podPoint } p.mu.Lock() defer p.mu.Unlock() p.nodes = newNodes p.pods = newPods return nil }
可以看到sinkMetricsProvider
数据的结构体本质上就是两个map
而已,而保存的逻辑也非常简单,直接创建两个新map并赋值过去就可以,并不需要处理之前的旧数据,简单粗暴。
4.3、metric-server
在4.1节的启动程序中已经说明了metric-server
的启动过程,经过4.2.2的代码分析之后,我们可以猜测metric-server
本质上就是收到请求之后到sinkMetricsProvider
的两个map
中读取数据并返回而已。
我们回到启动程序中,这里包含了两个步骤,一个是创建好一个server
(本质上是k8s.io/apiserver
库的一个GenericAPIServer
),另一个则是直接将这个server
运行起来。
1 2 3 4 server, err := config.Complete(informerFactory).New() return server.GenericAPIServer.PrepareRun().Run(stopCh)
由于k8s.io/apiserver
库的原理比较复杂,暂且不表,我们只讲创建GenericAPIServer
的创建流程并说明server
是如何使用的。对于metric-server
而言,需要先创建一个GenericAPIServer
,然后将metric-server
自己的API
与对应的处理handler
注册进来即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type MetricsServer struct { *genericapiserver.GenericAPIServer } func (c completedConfig) New () (*MetricsServer, error) { genericServer, err := c.CompletedConfig.New("metrics-server" , genericapiserver.NewEmptyDelegate()) if err != nil { return nil , err } if err := generic.InstallStorage(c.ProviderConfig, c.SharedInformerFactory.Core().V1(), genericServer); err != nil { return nil , err } return &MetricsServer{ GenericAPIServer: genericServer, }, nil }
在注册metric-server
自己的API
时,需要先创建一个APIGroup
(即metrics.k8s.io
),然后将这个Group
下面的各个资源(例如这里的"nodes"
和"pods"
)的Storage
注册到VersionedResourcesStorageMap
中,这里面最关键的就是每个资源的Storage
需要实现处理请求的rest接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func InstallStorage (providers *ProviderConfig, informers coreinf.Interface, server *genericapiserver.GenericAPIServer) error { info := BuildStorage(providers, informers) return server.InstallAPIGroup(&info) } func BuildStorage (providers *ProviderConfig, informers coreinf.Interface) genericapiserver .APIGroupInfo { apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs) nodemetricsStorage := nodemetricsstorage.NewStorage(metrics.Resource("nodemetrics" ), providers.Node, informers.Nodes().Lister()) podmetricsStorage := podmetricsstorage.NewStorage(metrics.Resource("podmetrics" ), providers.Pod, informers.Pods().Lister()) metricsServerResources := map [string ]rest.Storage{ "nodes" : nodemetricsStorage, "pods" : podmetricsStorage, } apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = metricsServerResources return apiGroupInfo }
这里以pod
的MetricStorage
的Getter
接口为例,这里实现的Get
函数就是当metric-server
收到获取某个pod
的metric
的请求时处理该请求的Handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 type MetricStorage struct { groupResource schema.GroupResource prov provider.PodMetricsProvider podLister v1listers.PodLister } var _ rest.KindProvider = &MetricStorage{}var _ rest.Storage = &MetricStorage{}var _ rest.Getter = &MetricStorage{}var _ rest.Lister = &MetricStorage{}func NewStorage (groupResource schema.GroupResource, prov provider.PodMetricsProvider, podLister v1listers.PodLister) *MetricStorage { return &MetricStorage{ groupResource: groupResource, prov: prov, podLister: podLister, } } func (m *MetricStorage) Get (ctx context.Context, name string , opts *metav1.GetOptions) (runtime.Object, error) { namespace := genericapirequest.NamespaceValue(ctx) pod, err := m.podLister.Pods(namespace).Get(name) podMetrics, err := m.getPodMetrics(pod) return &podMetrics[0 ], nil } func (m *MetricStorage) getPodMetrics (pods ...*v1.Pod) ([]metrics.PodMetrics, error) { namespacedNames := make ([]apitypes.NamespacedName, len (pods)) for i, pod := range pods { namespacedNames[i] = apitypes.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, } } timestamps, containerMetrics, err := m.prov.GetContainerMetrics(namespacedNames...) if err != nil { return nil , err } res := make ([]metrics.PodMetrics, 0 , len (pods)) for i, pod := range pods { res = append (res, metrics.PodMetrics{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, Namespace: pod.Namespace, CreationTimestamp: metav1.NewTime(time.Now()), }, Timestamp: metav1.NewTime(timestamps[i].Timestamp), Window: metav1.Duration{Duration: timestamps[i].Window}, Containers: containerMetrics[i], }) } return res, nil }
这里我们终于又回到了4.2.2中的sinkMetricsProvider
,这不过这次是从其map
中读取数据。在此数据抓取和metric-server
这两部分就连到一起了,我们其实也可以把这两部分当成一个生产者消费者模式,前者负责生产数据,后者则读取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (p *sinkMetricsProvider) GetContainerMetrics (pods ...apitypes.NamespacedName) ([]provider.TimeInfo, [][]metrics.ContainerMetrics, error) { p.mu.RLock() defer p.mu.RUnlock() timestamps := make ([]provider.TimeInfo, len (pods)) resMetrics := make ([][]metrics.ContainerMetrics, len (pods)) for i, pod := range pods { metricPoint, present := p.pods[pod] if !present { continue } resMetrics[i] = contMetrics } return timestamps, resMetrics, nil }