文章目录
  1. 1. 背景说明
  2. 2. 原理讲解
    1. 2.1. 1. 启动方法
    2. 2.2. 2. Register 流程
    3. 2.3. 3. ListAndWatch流程
    4. 2.4. 4. Kubelet 重启问题处理方法
    5. 2.5. 5. Allocate 流程
    6. 2.6. 总结

背景说明

在默认情况下,Kubernetes中的容器只能申请Cpu和Memory这两种资源。为了能够扩展更多的硬件资源,尤其是一些用户自定义的硬件,同时保证Kubelet本身的松耦合,Kubernetes提供了一种device plugin的机制。

1
2
3
4
5
6
7
resources:
requests:
memory: "1024Mi"
cpu: "100m"
limits:
memory: "2048Mi"
cpu: "200m"

用户可以通过开发具体的device plugin来与Kubelet进行交互,完成硬件资源的注册与纳管,使得集群中的容器在申请使用这种硬件资源时就像Cpu和Memory一样简单。

1
2
3
4
5
6
7
8
9
10
11
12

apiVersion: v1
kind: Pod
metadata:
name: demo-pod
spec:
containers:
- name: demo-container-1
image: registry.k8s.io/pause:2.0
resources:
limits:
hardware-vendor.example/foo: 2

原理讲解

k8sdeviceplugin

如上图所示,编写一个device plugin组件需要实现三个重要的接口:Register、ListAndWatch、Allocate,分别完成插件注册、资源监控以及资源分配功能。

1. 启动方法

下面以一个具体的device plugin组件为例,解释一下其与Kubelet的完整交互过程,其启动方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

package main

func main() {
flag.Parse()

// 检查binder设备是否存在
if len(aosp.GetDevices()) == 0 {
panic("No Binder Device Found.")
}

klog.Infof("Start watching device plugin socket directory of kubelet ...")
// 使用fsnotify监听Kubernetes设备插件socket目录的变化
watcher, err := utils.NewFSWatcher(devicepluginapi.DevicePluginPath)
if err != nil {
panic(err)
}
defer watcher.Close()

restart := true
var devicePlugin *deviceplugin.AOSPDevicePlugin

L:
for {
// 如果重启,停止当前设备插件实例
if restart {
if devicePlugin != nil {
if err := devicePlugin.Stop(); err != nil {
panic(err)
}
}
// 创建新的插件实例
devicePlugin, err = deviceplugin.NewAOSPDevicePlugin()
if err != nil {
panic(err)
}
// 调用Serve()注册设备插件
if err := devicePlugin.Serve(); err != nil {
klog.Infof("Could not contact Kubelet, retrying. Did you enable the device plugin feature gate?")
} else {
restart = false
}
}
select {
case event := <-watcher.Events:
if strings.HasSuffix(event.Name, ".sock") || strings.HasSuffix(event.Name, ".socket") {
klog.Infof("Event: name - %s, op - %s", event.Name, event.String())
}
if event.Name == devicepluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("Inotify: %s created, restarting ...", devicepluginapi.KubeletSocket)
restart = true
}
if event.Name == deviceplugin.ServerSock && event.Op&fsnotify.Remove == fsnotify.Remove {
klog.Infof("Inotify: %s removed, restarting ...", deviceplugin.ServerSock)
restart = true
}
}
}
}

main()方法中首先去调用aosp包中的GetDevices()方法去查看设备是否存在,然后去监听Kubernetes设备插件的socket目录的变化。接着设置一个restart的变量并初始化为true,进入循环。

循环中如果restart为true,首先会去停止当前的devicePlugin实例,然后调用deviceplugin包中的NewAOSPDevicePlugin()方法初始化一个新的实例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

// 初始化AOSPDevicePlugin
func NewAOSPDevicePlugin() (*AOSPDevicePlugin, error) {
// 获取binder设备列表
devs := aosp.GetDevices()
if len(devs) == 0 {
return nil, errors.New("empty devices list")
}

return &AOSPDevicePlugin{
devs: devs,
socket: ServerSock,
stop: make(chan interface{}),
health: make(chan *devicepluginapi.Device),
}, nil
}

// 获取所有已注册的binder设备
func GetDevices() []*devicepluginapi.Device {
binders := GetBinders()
devs := make([]*devicepluginapi.Device, 0, len(binders))
for _, binder := range binders {
devs = append(devs, &devicepluginapi.Device{
ID: binder,
Health: devicepluginapi.Healthy,
})
}
return devs
}

// 获取所有的前缀为aosp_binders的设备文件
func GetBinders() []string {
klog.Infof("Detecting aosp binders ...")
cmd := exec.Command("ls", "/dev")
stdout, err := cmd.CombinedOutput()
if err != nil {
log.Fatal(err)
}
files := strings.Split(string(stdout), "\n")
var binder []string
for _, f := range files {
if strings.HasPrefix(f, Binder) {
binder = append(binder, f)
}
}
klog.Infof("\tFound aosp binders: %d", len(binder))
return binder
}

可以看到NewAOSPDevicePlugin()方法主要是去调用了GetDevices()方法去获取设备的具体信息,然后将设备信息、device plugin的socket文件目录等信息赋值给AOSPDevicePlugin对象的成员变量。在GetDevices()方法中,主要是去调用了GetBinders()去获取具体的设备文件。在GetBinders()中,将/dev目录下的所有文件都放入了binder数组中。在该案例中一共初始化了1000个aosp_binder文件,如下图所示:

k8sdeviceplugin

2. Register 流程

2.1、device plugin发起注册请求

回到main()函数,创建了一个新的插件实例后,便去调用了deviceplugin中的Serve()方法去向Kubelet发起插件注册,并将restart变量置为false。deviceplugin.Serve()方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Serve启动gRPC服务并向Kublet注册插件
func (m *AOSPDevicePlugin) Serve() error {
err := m.Start()
if err != nil {
klog.Errorf("Could not start device plugin: %s", err)
return err
}
klog.Infof("Starting to serve on %s", m.socket)

err = m.Register(devicepluginapi.KubeletSocket, ResourceNameAOSP)
if err != nil {
klog.Errorf("Could not register device plugin: %s", err)
_ = m.Stop()
return err
}
klog.Infof("Registered device plugin with Kubelet")

return nil
}

// 向Kubelet注册插件
func (m *AOSPDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()

client := devicepluginapi.NewRegistrationClient(conn)
reqt := &devicepluginapi.RegisterRequest{
Version: devicepluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}

_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}

首先,调用Start()方法启动一个gRPC服务,然后再调用AOSPDevicePlugin对象的Register方法。该方法中的入参kubeletEndpoint为Kubelet的socket文件所在的位置,用于直接向Kubelet发起服务调用,入参resourceName用于标注注册的硬件资源的名称。

在Register()方法中,首先建立与Kubelet的连接,然后根据连接生成一个新的RegistrationClient。接着拼接注册的请求体,主要由三个参数组成。Version表示插件的版本号,Endpoint表示插件的socket文件所在目录,resourceName为资源名称。其中,资源名称具有一定的命名规范,一般以hardware-vendor.example/resource的方式命名,前半部分为硬件设备提供商名,后半部分为具体的资源名。

同时, 上面提到了很多次socket文件目录。一般来说device plugin和Kubelet的socket文件都在/kubelet/device-plugins目录下,如下图所示:

k8sdeviceplugin

2.2、Kubelet处理注册请求

接下来,我们来看一下Kubelet在接受到注册请求后会去做些什么,本案例使用的Kubelet是release-1.25版本的,其主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *server) Register(ctx context.Context, r *api.RegisterRequest) (*api.Empty, error) {
klog.InfoS("Got registration request from device plugin with resource", "resourceName", r.ResourceName)
// 增加注册计数指标,供监控系统使用
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()

// 版本校验
if !s.isVersionCompatibleWithPlugin(r.Version) {
err := fmt.Errorf(errUnsupportedVersion, r.Version, api.SupportedVersions)
klog.InfoS("Bad registration request from device plugin with resource", "resourceName", r.ResourceName, "err", err)
return &api.Empty{}, err
}

// 资源名称校验
if !v1helper.IsExtendedResourceName(core.ResourceName(r.ResourceName)) {
err := fmt.Errorf(errInvalidResourceName, r.ResourceName)
klog.InfoS("Bad registration request from device plugin", "err", err)
return &api.Empty{}, err
}

// 连接设备插件客户端
if err := s.connectClient(r.ResourceName, filepath.Join(s.socketDir, r.Endpoint)); err != nil {
klog.InfoS("Error connecting to device plugin client", "err", err)
return &api.Empty{}, err
}
return &api.Empty{}, nil
}

首先,Kubelet会去做资源的计数指标,然后对请求中的各参数做校验。接着调用connectClient()方法与device plugin建立连接,其主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

func (s *server) connectClient(name string, socketPath string) error {
c := NewPluginClient(name, socketPath, s.chandler)
s.registerClient(name, c)
if err := c.Connect(); err != nil {
s.deregisterClient(name)
klog.ErrorS(err, "Failed to connect to new client", "resource", name)
return err
}
go func() {
s.runClient(name, c)
}()
return nil
}

func (s *server) registerClient(name string, c Client) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.clients[name] = c
klog.V(2).InfoS("Registered client", "name", name)
}

func (s *server) runClient(name string, c Client) {
c.Run()
c = s.getClient(name)
if c == nil {
return
}
if err := s.disconnectClient(name, c); err != nil {
klog.V(2).InfoS("Unable to disconnect client", "resource", name, "client", c, "err", err)
}
}

func (s *server) getClient(name string) Client {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.clients[name]
}

第一步是初始化一个pluginClient,然后调用registerClient()方法去注册Client,就是在serve实例的clients数组中加入一个新的插件元素,key为资源名称,value为client。然后调用runClient()方法启动会话。在runClient()中,可以看到调用的是Run()方法,其主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

func (c *client) Run() {
stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})
if err != nil {
klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)
return
}

for {
response, err := stream.Recv()
if err != nil {
klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)
return
}
klog.V(2).InfoS("State pushed for device plugin", "resource", c.resource, "resourceCapacity", len(response.Devices))
c.handler.PluginListAndWatchReceiver(c.resource, response)
}
}

3. ListAndWatch流程

3.1、device plugin处理资源监察请求

在Run()方法中主要是去调用了device plugin的ListAndWatch接口来获取插件注册的硬件资源的具体情况。插件的ListAndWatch方法实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

func (m *AOSPDevicePlugin) ListAndWatch(_ *devicepluginapi.Empty, s devicepluginapi.DevicePlugin_ListAndWatchServer) error {
if err := s.Send(&devicepluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
klog.Errorf("Send ListAndWatchResponse error: %v", err)
}
// 对非健康设备的处理
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
d.Health = devicepluginapi.Unhealthy
if err := s.Send(&devicepluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
klog.Errorf("Send ListAndWatchResponse error: %v", err)
}
}
}
}

在本案例中,插件直接将AOSPDevicePlugin实例中的devs数组报了上去,其中就包含了1000个aosp_binder文件目录并全部标记为了健康状态。对于不健康的实例,本案例中并未做额外的处理。

3.2、Kubelet处理资源监察响应

接下来,我们再来看看,Kubelet接受到ListAndWatch的请求响应后会去做些什么,其主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
var devices []pluginapi.Device
for _, d := range resp.Devices {
devices = append(devices, *d)
}
m.genericDeviceUpdateCallback(resourceName, devices)
}

func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
healthyCount := 0
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
healthyCount++
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
}

func (m *ManagerImpl) writeCheckpoint() error {
m.mutex.Lock()
registeredDevs := make(map[string][]string)
for resource, devices := range m.healthyDevices {
registeredDevs[resource] = devices.UnsortedList()
}
data := checkpoint.New(m.podDevices.toCheckpointData(),
registeredDevs)
m.mutex.Unlock()
err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
if err != nil {
err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
klog.InfoS("Failed to write checkpoint file", "err", err)
return err2
}
return nil
}

首先,Kubelet将响应中的device数组拿出来,并作为入参调用了genericDeviceUpdateCallback()方法。在该方法中,主要是对ManagerImpl实例对象(该实例对象主要用于管理与device plugin的各种交互)的m.healthyDevices[resourceName]、m.unhealthyDevices[resourceName]、m.allDevices[resourceName]这三个多维数组添加新元素。其中,数组的第一维度为资源名称,第二维度则是具体的资源编号,即上述1000个aosp_binder文件。最后,会去调用writeCheckpoint()方法将healthyDevices数组中的元素全部写入到一个checkpoint文件中做持久化保存。该文件的位置也在/kubelet/device-plugins目录下。

k8sdeviceplugin

4. Kubelet 重启问题处理方法

4.1、checkpoint文件持久化

在这里介绍一下checkpoint文件的一个主要用途就是在Kubelet出现异常重启后可以快速地恢复上下文数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager")

m.activePods = activePods
m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet

// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
if err != nil {
klog.InfoS("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date", "err", err)
}

return m.server.Start()
}

func (m *ManagerImpl) readCheckpoint() error {
// the vast majority of time we restore a compatible checkpoint, so we try
// the current version first. Trying to restore older format checkpoints is
// relevant only in the kubelet upgrade flow, which happens once in a
// (long) while.
cp, err := m.getCheckpointV2()
if err != nil {
if err == errors.ErrCheckpointNotFound {
// no point in trying anything else
klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
return nil
}

var errv1 error
// one last try: maybe it's a old format checkpoint?
cp, errv1 = m.getCheckpointV1()
if errv1 != nil {
klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
// intentionally return the parent error. We expect to restore V1 checkpoints
// a tiny fraction of time, so what matters most is the current checkpoint read error.
return err
}
klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
}
m.mutex.Lock()
defer m.mutex.Unlock()
podDevices, registeredDevs := cp.GetDataInLatestFormat()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices()
for resource := range registeredDevs {
// During start up, creates empty healthyDevices list so that the resource capacity
// will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString()
m.unhealthyDevices[resource] = sets.NewString()
m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
}
return nil
}

可以看到当Kubelet重启后会去调用ManagerImpl实例对象的Start()方法去与device plugin做交互,而在Start()方法中主要就是去调用了readCheckpoint()方法去读取checkpoint文件。在readCheckpoint()中做了一系列的操作,主要就是恢复了上述的三个数组,将其写入了内存中。

4.2device plugin重注册

同时,在本案例中,对于Kubelet的重启,device plugin也会有相应的措施应对,其代码见main()方法中for循环下的select结构中,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

select {
case event := <-watcher.Events:
if strings.HasSuffix(event.Name, ".sock") || strings.HasSuffix(event.Name, ".socket") {
klog.Infof("Event: name - %s, op - %s", event.Name, event.String())
}
if event.Name == devicepluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("Inotify: %s created, restarting ...", devicepluginapi.KubeletSocket)
restart = true
}
if event.Name == deviceplugin.ServerSock && event.Op&fsnotify.Remove == fsnotify.Remove {
klog.Infof("Inotify: %s removed, restarting ...", deviceplugin.ServerSock)
restart = true
}
}

在对socket文件所在目录的监控中,一旦发现Kubelet的socket文件被重新创建了或者插件的socket文件被删除了,变量restart都会被赋值为true,从而重新创建一个新的AOSPDevicePlugin实例,并重新向Kubelet发起注册。由此,Kubelet的checkpoint文件和device plugin的重注册机制就一起保证了在复杂的集群环境中,可分配资源数据的一致性。

5. Allocate 流程

最后,我们来简单讲一下当Kubelet收到一个容器的资源申请请求后,调用device plugin的Allocate方法中都做了什么,其主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (m *AOSPDevicePlugin) Allocate(_ context.Context, reqs *devicepluginapi.AllocateRequest) (*devicepluginapi.AllocateResponse, error) {
var mounts []*devicepluginapi.Mount
var devices []*devicepluginapi.DeviceSpec

// 获取所有设备都需要的挂载点
for mnt, exist := range aosp.AllMountPoints() {
if !exist {
klog.Warningf("WARNING: Mount point %s not found", mnt)
continue
}
mounts = append(mounts, &devicepluginapi.Mount{
ContainerPath: mnt,
HostPath: mnt,
ReadOnly: false,
})
}

// 获取所有设备都需要的驱动
for dev, exist := range aosp.AllDeviceDrivers() {
if !exist {
klog.Warningf("WARNING: Device %s not found", dev)
continue
}
devices = append(devices, &devicepluginapi.DeviceSpec{
ContainerPath: dev,
HostPath: dev,
Permissions: "rwm",
})
}

responses := devicepluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
for deviceId := range req.DevicesIDs {
devices = append(devices, &devicepluginapi.DeviceSpec{
ContainerPath: "/dev/" + deviceId,
HostPath: "/dev/" + deviceId,
Permissions: "rwm",
})
}
}

response := devicepluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"BINDER_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","),
"PATH": "/system/bin:/system/xbin",
},
Devices: devices,
Mounts: mounts,
}

klog.Infof("[Allocate] %s", req.String())

for _, id := range req.DevicesIDs {
if !aosp.DeviceExists(m.devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}

responses.ContainerResponses = append(responses.ContainerResponses, &response)

return &responses, nil
}

Kubelet会根据容器的资源需求数量去申请硬件资源。首先,从内存中的device数组中取出健康且未被分配容器的device子数组,然后从中根据一定的算法选出同等需求数量的具体的资源ID,发送给device plugin。

device plugin接收到请求后,首先会去调用aosp包中的方法获取到使用该类硬件资源都需要的容器挂载目录及驱动目录。接着根据请求中具体的资源ID,将这些资源使用所需要的文件目录添加进devices数组中。最后将设备挂载点mounts数组、设备文件目录devices数组、容器使用该硬件资源需要的一些环境变量Envs拼接生成response返回给Kubelet。Kubelet拿到响应后便会使用容器运行时制作容器并运行。

总结

k8sdeviceplugin

  • device plugin以daemonset的方式部署在集群中,启动后首先向Kubelet发起注册;

  • Kubelet在收到注册请求后,会去调用device plugin的ListAndWatch接口获取硬件资源的具体信息;

  • device plugin返回资源信息后,Kubelet会向api-server更新节点状态来记录上一步中发现的资源;

  • 当用户创建Pod并申请该资源,Scheduler会将Pod调度到指定的节点上;

  • api-server接着会去调用该节点上的Kubelet对容器进行资源分配并启动容器;

  • Kubelet在选择完分配的资源ID后,便会调用device plugin的Allocate接口完成具体的资源分配
文章目录
  1. 1. 背景说明
  2. 2. 原理讲解
    1. 2.1. 1. 启动方法
    2. 2.2. 2. Register 流程
    3. 2.3. 3. ListAndWatch流程
    4. 2.4. 4. Kubelet 重启问题处理方法
    5. 2.5. 5. Allocate 流程
    6. 2.6. 总结