- Created by 郭强, last modified on Apr 24, 2022
本文主要是以client-go为入口,介绍Kubernetes的Informer核心组件的原理,以及如何创建自定义的Controller以及CRD。
一、基本架构
1、K8S中的Informer
K8S是典型的Server-Client架构。K8S内部通过etcd服务存储集群的数据信息,通过apiserver作为统一的操作入口,任何对数据的操作都必须经过apiserver。客户端通过List&Watch机制查询apiserver,而Informer模块则封装了List-watch。《kubernetes源码剖析》一书中的Informer机制架构图:
Informer In Kubernetes
2、自定义Controller与client-go
我们再来看一张经典的架构图,来自于client-go的官方介绍,展示了client-go与自定义Controller之间的相关组件以及数据交互流程。原文地址:https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
client-go项目地址:https://github.com/kubernetes/client-go
自定义Controller使用client-go与Kubernetes的事件交互
二、相关组件
1、Reflector
Reflector 用来直接和 kubernetes api server 通信,内部实现了 list&watch 机制,list&watch 就是用来监听资源变化的,一个list&watch 只对应一个特定的资源,这个资源可以是 K8S 中内部的资源也可以是自定义的资源,当收到资源变化时(创建、删除、修改)时会将资源放到 Delta Fifo 队列中。
2、DeltaFIFO
Delta代表变化, FIFO则是先入先出的队列。DeltaFIFO将接受来的资源event,转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。
3、Informer
Informer 是我们要监听的资源的一个代码抽象,在 Controller 的驱动下运行,能够将 Delta Filo 队列中的数据弹出,然后保存到本地缓存也就是图中的步骤5),同时将数据分发到自定义Controller 中进行事件处理也就是图中的步骤6)。
4、Indexer
Indexer 能够基于一些索引函数以及对象的标签计算出索引存储到本地缓存,索引器使用线程安全的数据存储来存储对象及其键。可以看到,在自定义 Controller 中处理事件时,就是通过键名从Indexer中查询出事件中的对象再执行自定义的逻辑处理。
三、源码示例
package main import ( "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "log" ) func main() { config, err := clientcmd.BuildConfigFromFlags("", "/Users/john/.kube/config") if err != nil { panic(err) } clientSet, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } var ( stopCh = make(chan struct{}) sharedInformers = informers.NewSharedInformerFactory(clientSet, 0) podsInformer = sharedInformers.Core().V1().Pods().Informer() ) defer close(stopCh) podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Stroe: %s", mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Stroe : %s", mObj.GetName()) }, }) podsInformer.Run(stopCh) }
在这个示例中,我们监听集群中Pod的变化。
代码运行后,终端输出为:
随后我们随便操作一下集群的Pod,例如,删除一下Pod试试,看看监听的事件有什么变化没有:
可以看到,当我们执行 kubectl delete 操作的时候,我们的示例程序也监听到了事件:
四、源码解析
在以上示例中,我们通过 podsInformer.Run(stopCh)
来执行我们的监听,其实这个方法是依靠Reflector对象
来实现的。
1、Reflector
Informer对Kubernetes的Api Server资源进行监控(Watch)操作。其中最核心的功能是Reflector,Reflector用于监控指定的Kubernetes资源,当监控的资源发生变化时,触发相应的变更事件。并将其资源对象存放到本地缓冲DeltaFIFO中。通过NewReflector方法实例化Reflector对象,方法必须传入ListerWatcher数据接口对象。 ListerWatcher拥有List和Watch方法,用于获取和监控资源列表,只要实现了List和Watch方法的对象都可以成为ListerWatcher。
Reflector通过Run函数启动监控进程,并处理监控的事件。其中最主要的是ListAndWatch函数,它负责List和Watch指定的Kubernetes Api Server资源。
2、ListAndWatch函数
ListAndWatch第一次运行时,通过List获取资源下的所有对象和版本信息,后续通过版本进行watch。
五、参考资料
- https://github.com/kubernetes/client-go
- https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
- https://segmentfault.com/a/1190000022643082
- https://www.kubernetes.org.cn/2693.html
- https://www.cnblogs.com/yangyuliufeng/p/13611126.html
- No labels