k8s device plugin
1、 为什么要有device plugin
k8s中pod的默认资源通常只有cpu memory,当要想使用gpu npu等资源时候,如果直接在k8s代码中增加,会很麻烦。所有k8s开放了device plugin,方便与外部厂商自定义,实现通过device plugin与k8s通信感知特殊资源
参考的文章:https://www.lixueduan.com/posts/kubernetes/21-device-plugin/
https://github.com/lixd/i-device-plugin
2 流程
以gpu device plugin 0.17.0版本为例子
- kubelet 启动时,会在
/var/lib/kubelet/device-plugins/kubelet.sock
上监听 gRPC 服务,用于 “接受插件的注册”; - NVIDIA Device Plugin 启动时,创建自己的 .sock 文件(如 nvidia-gpu.sock)并开始监听。Plugin 主动连接到 /var/lib/kubelet/device-plugins/kubelet.sock,会跟 kubelet 的 kubelet.sock 进行 gRPC 注册(Register() 告知使用的资源名称是啥,plugin启动的socket是啥,方便后续kubelete通过该socket调用接口);
- 注册成功后,kubelet 就会通过 Device Plugin 内部自己开的 socket(比如
/var/lib/kubelet/device-plugins/nvidia.sock
),去调用ListAndWatch
了解 GPU 列表,并在有容器需要 GPU 时调用Allocate
协商如何设置容器环境变量、挂载、等信息。
对照代码,这个流程最关键的函数有两类:
- Device Plugin 端:
Register(...)
(把自己注册给 kubelet)、ListAndWatch(...)
(向 kubelet 报告设备列表)、Allocate(...)
(提供给 kubelet),等等。 - kubelet 端:内部有一个 Device Manager,对应
ManagerImpl.Register(...)
、ListAndWatch(...)
回调处理,最终将资源更新到 Node Status。
device plugin :接口
要进行设备管理,device plugin 插件需要实现以下接口:
-
GetDevicePluginOptions
:这个接口用于获取设备插件的信息,可以在其返回的响应中指定一些设备插件的配置选项,可以看做是插件的元数据 -
ListAndWatch
:该接口用于列出可用的设备并持续监视这些设备的状态变化。 -
GetPreferredAllocation
:将分配偏好信息提供给 device plugin,以便 device plugin 在分配时可以做出更好的选择
Allocate
:该接口用于向设备插件请求分配指定数量的设备资源。PreStartContainer
: 该接口在容器启动之前调用,用于配置容器使用的设备资源。
只有
ListAndWatch
和Allocate
两个接口是必须的,其他都是可以选的。
Kubelet 部分接口
为了提供该功能,Kubelet 新增了一个 Registration
gRPC service:
|
device plugin 可以调用该接口向 Kubelet 进行注册,注册接口需要提供三个参数:
-
device plugin 对应的 unix socket 名字:后续 kubelet 根据名称找到对应的 unix socket,并向插件发起调用
-
device plugin 调 API version:用于区分不同版本的插件
-
device plugin 提供的 ResourceName:遇到不能处理的资源申请时(CPU和Memory之外的资源),Kubelet 就会根据申请的资源名称来匹配对应的插件
- ResourceName 需要按照
vendor-domain/resourcetype
格式,例如nvidia.com/gpu
。
- ResourceName 需要按照
plugin代码简要分析
0: 启动
0 自身启动 grpc服务
// Serve starts the gRPC server of the device plugin.
func (plugin *nvidiaDevicePlugin) Serve() error {// 在创建新的 Unix 域套接字文件之前,先尝试删除可能存在的、同名的旧文件/*如果插件异常退出,.sock 文件可能不会被自动清理。在同一个路径上再次创建监听会导致 “address already in use” 错误。这是一个健壮性设计,确保服务可以正常启动。*/os.Remove(plugin.socket)// 创建一个名为 plugin.socket(例如 /var/lib/kubelet/device-plugins/nvidia-gpu.sock)的 Unix 域套接字,并开始在这个地址上监听传入的连接。sock, err := net.Listen("unix", plugin.socket)if err != nil {return err}// 将当前的 nvidiaDevicePlugin 对象(它实现了 DevicePluginServer 接口的所有方法,//如 ListAndWatch, Allocate 等)注册到 plugin.server(一个 *grpc.Server 实例)上。//结果:当 kubelet 通过 .sock 文件发起 gRPC 调用时(如调用 ListAndWatch),gRPC 框架知道应该去执行 nvidiaDevicePlugin 结构体中对应的 ListAndWatch 方法。pluginapi.RegisterDevicePluginServer(plugin.server, plugin)go func() {lastCrashTime := time.Now()restartCount := 0for {// quite if it has been restarted too often// i.e. if server has crashed more than 5 times and it didn't last more than one hour each time// // 防止频繁崩溃重启 如果服务器在 一小时内崩溃超过 5 次,插件会认为问题严重,主动退出 (klog.Fatalf),避免无限重启浪费资源。// 循环会立即尝试重新启动服务器,体现了“自愈”能力。if restartCount > 5 {// quitklog.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", plugin.rm.Resource())}klog.Infof("Starting GRPC server for '%s'", plugin.rm.Resource())// plugin.server.Serve(sock) 是阻塞调用,它会一直运行,接受来自 kubelet 的连接和请求。// 如果 Serve 因为网络错误、客户端断开等原因返回非 nil 错误,说明服务器“意外”崩溃了。err := plugin.server.Serve(sock)if err == nil {break}klog.Infof("GRPC server for '%s' crashed with error: %v", plugin.rm.Resource(), err)timeSinceLastCrash := time.Since(lastCrashTime).Seconds()lastCrashTime = time.Now()if timeSinceLastCrash > 3600 {// // 超过一小时,重置计数// it has been one hour since the last crash.. reset the count// to reflect on the frequencyrestartCount = 0} else {restartCount++}}}()// 这是一个同步的“健康检查”步骤,确保 gRPC 服务器已经成功启动并可以接受连接。// 设置了 5 秒的超时,防止无限等待。//如果连接成功,则立即关闭连接 (conn.Close())。//如果连接失败(意味着 gRPC 服务器还没起来或启动失败),则 Serve() 方法返回错误,导致插件整体启动失败。// Wait for server to start by launching a blocking connectionconn, err := plugin.dial(plugin.socket, 5*time.Second)if err != nil {return err}conn.Close()// 保证了 Serve() 方法返回 nil 时,gRPC 服务器一定已经处于可服务状态,后续的注册等操作才能顺利进行。return nil
}
1、启动时候向kubelete注册 插件信息
func main() {func start(c *cli.Context, o *options) error {func startPlugins(c *cli.Context, o *options) ([]plugin.Interface, bool, error) {// 通常是 /var/lib/kubelet/device-plugins/kubelet.sock
err = plugin.Register(kubeletSocket)// Register registers the device plugin for the given resourceName with Kubelet.
// NVIDIA Device Plugin 启动时,会跟 kubelet 的 kubelet.sock 进行 gRPC 注册(Register());
// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *nvidiaDevicePlugin) Register(kubeletSocket string) error {if kubeletSocket == "" {klog.Info("Skipping registration with Kubelet")return nil}conn, err := plugin.dial(kubeletSocket, 5*time.Second)if err != nil {return err}defer conn.Close()client := pluginapi.NewRegistrationClient(conn)reqt := &pluginapi.RegisterRequest{// // 用于区分不同版本的插件Version: pluginapi.Version,// // 插件grpc服务地址,kubelet根据该地址进行资源申请 比如 /var/lib/kubelet/device-plugins/nvidia.sock)Endpoint: path.Base(plugin.socket),// // 资源名称,需按照 vendor-domain/resourcetype 格式,如nvidia.com/gpuResourceName: string(plugin.rm.Resource()),Options: &pluginapi.DevicePluginOptions{GetPreferredAllocationAvailable: true,},}_, err = client.Register(context.Background(), reqt)if err != nil {return err}return nil
}
list方法
:通信方向是 plugin → kubelet,即:
- kubelet 发起一次连接并调用
ListAndWatch
。 - 插件返回初始列表。
- 插件在后台持续监控设备状态(例如,通过
nvidia-smi
检查 GPU 健康状况)。 - 一旦设备状态发生变化(如:某块 GPU 故障、新 GPU 被热插拔、设备被分配/释放),插件立即通过这个已建立的流连接,主动发送一个更新消息给 kubelet。
- kubelet 收到更新后,立即更新其本地缓存的设备状态
// ListAndWatch lists devices and update that list according to the health status
func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {// 目的:当 kubelet 首次调用 ListAndWatch 时,插件必须立即返回当前所有设备的初始状态 这是流式通信的“第一次握手”,确保 kubelet 能快速获取到设备快照if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {return err}for {select {case <-plugin.stop:return nil// plugin.health 是一个 channel,用于接收来自插件其他组件(如健康检查协程)发送的设备健康事件。// 当某个 GPU 设备被检测到不健康(如驱动崩溃、GPU 异常、温度过高)时,相关协程会将设备信息 d 发送到这个 channel。case d := <-plugin.health:// FIXME: there is no way to recover from the Unhealthy state.// 标记为不健康:d.Health = pluginapi.Unhealthd.Health = pluginapi.Unhealthyklog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)// 全量推送更新:再次调用 s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()})//这里不是只发送变化的设备,而是调用 apiDevices() 获取完整的设备列表,然后全量推送。//这是 Device Plugin API 的标准做法——每次状态变化都推送完整列表,简化了 kubelet 的处理逻辑。if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {return nil}}}
}
allocate分配卡
// reqs *pluginapi.AllocateRequest: 请求对象,包含 kubelet 希望为哪些容器分配哪些设备(GPU ID)。
// (*pluginapi.AllocateResponse, error): 返回一个包含分配结果的响应对象和可能的错误。
func (plugin *nvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {responses := pluginapi.AllocateResponse{}// 每个 req 包含了 kubelet 决定分配给该容器的 GPU 设备 ID 列表(DevicesIDs)。for _, req := range reqs.ContainerRequests {// 安全检查。确保 kubelet 请求分配的设备 ID 是有效且可用的。if err := plugin.rm.ValidateRequest(req.DevicesIDs); err != nil {return nil, fmt.Errorf("invalid allocation request for %q: %w", plugin.rm.Resource(), err)}// 负责生成为单个容器分配 GPU 所需的完整配置信息response, err := plugin.getAllocateResponse(req.DevicesIDs)if err != nil {return nil, fmt.Errorf("failed to get allocate response: %v", err)}responses.ContainerResponses = append(responses.ContainerResponses, response)}return &responses, nil
}// 包含了 kubelet 请求分配的 GPU 设备 ID(如 ["GPU-123abc", "GPU-456def"])。
func (plugin *nvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*pluginapi.ContainerAllocateResponse, error) {// requestIds 可能是带有“注解”(Annotations)的 AnnotatedID(例如 GPU-123abc@shared),这些注解用于表示共享策略。//这行代码调用一个辅助方法,剥离注解,只提取出纯净的设备 UUID(如 GPU-123abc)。deviceIDs := plugin.deviceIDsFromAnnotatedDeviceIDs(requestIds)// Create an empty response that will be updated as required below.response := &pluginapi.ContainerAllocateResponse{Envs: make(map[string]string),}// 处理 CDI (Container Device Interface) 配置 检查插件是否配置为使用 CDI 策略(AnyCDIEnabled())。if plugin.deviceListStrategies.AnyCDIEnabled() {responseID := uuid.New().String()// 该方法会向 response 中添加一个 CDIDevices 字段,其值是一个 CDI 设备字符串(如 nvidia.com/gpu=gpu-123abc),容器运行时(如 containerd)会根据此字符串查找并应用对应的设备规范。// 使用一个 UUID 作为 responseID,可能用于日志追踪或确保唯一性if err := plugin.updateResponseForCDI(response, responseID, deviceIDs...); err != nil {return nil, fmt.Errorf("failed to get allocate response for CDI: %v", err)}}// MPS 是什么:NVIDIA 的 Multi-Process Service 允许多个进程(容器)共享同一个 GPU 上下文,减少上下文切换开销,提升多进程 GPU 应用的性能。if plugin.mps.enabled {plugin.updateResponseForMPS(response)}// The following modifications are only made if at least one non-CDI device// list strategy is selected.// 如果所有设备列表策略都是 CDI,那么前面的 updateResponseForCDI 已经完成了主要配置,这里直接返回 response,跳过所有传统的、非-CDI 的配置方式。if plugin.deviceListStrategies.AllCDIEnabled() {return response, nil}// 处理传统设备注入策略// 环境变量注入 (DeviceListStrategyEnvVar):if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyEnvVar) {plugin.updateResponseForDeviceListEnvVar(response, deviceIDs...)// 可能设置与 GPU 内存交换(Memory eXchange)相关的环境变量。plugin.updateResponseForImexChannelsEnvVar(response)}// 设备文件挂载 (DeviceListStrategyVolumeMounts)if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) {plugin.updateResponseForDeviceMounts(response, deviceIDs...)}// 传递设备规格 (PassDeviceSpecs):if *plugin.config.Flags.Plugin.PassDeviceSpecs {// DeviceSpec 明确指定了需要传递给容器的设备文件路径(如 /dev/nvidia0)及其在容器内的路径、权限(读、写、执行)。response.Devices = append(response.Devices, plugin.apiDeviceSpecs(*plugin.config.Flags.NvidiaDevRoot, requestIds)...)}// 如果启用了 GDS(允许 GPU 直接访问存储,绕过 CPU),则设置环境变量 NVIDIA_GDS=enabled。if *plugin.config.Flags.GDSEnabled {response.Envs["NVIDIA_GDS"] = "enabled"}// 如果启用了 MOFED(用于 InfiniBand/RoCE 网络),则设置环境变量 NVIDIA_MOFED=enabled。if *plugin.config.Flags.MOFEDEnabled {response.Envs["NVIDIA_MOFED"] = "enabled"}return response, nil
}func (plugin *nvidiaDevicePlugin) updateResponseForCDI(response *pluginapi.ContainerAllocateResponse, responseID string, deviceIDs ...string) error {var devices []stringfor _, id := range deviceIDs {// 将每个请求的 GPU 设备 ID 转换为一个符合 CDI 规范的“完全限定名称”(Fully Qualified Name)。devices = append(devices, plugin.cdiHandler.QualifiedName("gpu", id))}for _, channel := range plugin.imexChannels {devices = append(devices, plugin.cdiHandler.QualifiedName("imex-channel", channel.ID))}if *plugin.config.Flags.GDSEnabled {devices = append(devices, plugin.cdiHandler.QualifiedName("gds", "all"))}if *plugin.config.Flags.MOFEDEnabled {devices = append(devices, plugin.cdiHandler.QualifiedName("mofed", "all"))}if len(devices) == 0 {return nil}if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyCDIAnnotations) {// 该方法会生成一组 Kubernetes Pod 级别的 Annotations(注解),例如: annotations:// cdi.k8s.io/devices: nvidia.com/gpu=gpu-123abc,nvidia.com/gds=all// 作用:kubelet 在创建 Pod 沙箱时会将这些注解传递给容器运行时。支持 CDI 的运行时(如 nvidia-container-runtime)会读取这些注解,// 并根据其中的 CDI 设备名称查找对应的设备规范文件(.json),然后自动完成设备文件挂载、环境变量设置等操作。annotations, err := plugin.getCDIDeviceAnnotations(responseID, devices...)if err != nil {return err}response.Annotations = annotations}if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyCDICRI) {for _, device := range devices {cdiDevice := pluginapi.CDIDevice{Name: device,}response.CDIDevices = append(response.CDIDevices, &cdiDevice)}}return nil
}
举例说明,如果是用环境变量分配的方式,
|
给容器添加了一个环境变量,value 为设备 id,具体 deviceID 提供了两种测量,可能是编号或者 uuid
|
key 是一个变量 plugin.deviceListEnvvar,初始化如下:
|
也就是说 NVIDIA 这个 device plugin 实现 Allocate 主要就是给容器增加了环境变量,例如:
|
或者
|
在文章 GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建 中提到 GPU Operator 会使用 NVIDIA Container Toolit Installer 安装 NVIDIA Container Toolit。
这个 NVIDIA Container Toolit 的作用就是添加对 GPU 的支持,也包括了识别 NVIDIA_VISIBLE_DEVICES 这个环境变量,然后将对应设备挂载到容器里。
实际上 device plugin 提供了多种方法来完成设备分配,实现时只需要根据具体情况选择其中一种即可:
- Env
- Mounts
- Devices
- Annotations
- CDIDevices
比如 nvidia device plugin 在实现时就同时使用了 Env 和 Devices 方式。
监控 kubelet.sock 状态
使用 fsnotify 库监控 kubelet.sock 文件状态,通过 kubelet.sock 文件的变化来判断 kubelet 是否重启,当 kubelet 重启后 device plugin 也需要重启,然后注册到新的 kubelet.sock。
|
为什么需要重新注册
因为Kubelet 中使用一个 map 来存储注册的插件,因此每次 Kubelet 重启都会丢失,所以我们在实现 device plugin 时就要监控 Kubelet 重启状态并重新注册。
深入K8s内部 设备上报与调度协同原理
4.1 kubelet 对不同设备的统一抽象
首先,kubelet 内部有一个 Device Manager(位于 pkg/kubelet/cm/devicemanager/
)。它的核心思路是:
- 不管什么品牌或类型的硬件(GPU、FPGA ...),只要 Device Plugin 使用了
Register()
报告ResourceName
,kubelet 便将这类硬件视为 统一的“可分配资源”; - kubelet 会把这些资源更新到 Node Status 中——比如
nvidia.com/gpu: 4
,然后在调度时,Scheduler 就会用通用的 资源匹配逻辑 来判断 “哪个节点满足nvidia.com/gpu >= 1
” 等; - 在容器启动阶段,kubelet 会再次调用 Device Manager 的 Allocate(其实是去调用具体的 Plugin 的 Allocate),由插件告诉它如何“挂载”或“配置”真正的硬件。
// Run is for running the device plugin gRPC client.
func (c *client) Run() {// 调用plugin的listwatch方法stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})if err != nil {klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)return}for {response, err := stream.Recv()if err != nil {klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)return}klog.V(2).InfoS("State pushed for device plugin", "resource", c.resource, "resourceCapacity", len(response.Devices))// kubelet 从某个 Device Plugin 接收到 ListAndWatch 流更新时的处理逻辑c.handler.PluginListAndWatchReceiver(c.resource, response)}
}func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {var devices []pluginapi.Devicefor _, d := range resp.Devices {devices = append(devices, *d)}m.genericDeviceUpdateCallback(resourceName, devices)
}func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {healthyCount := 0m.mutex.Lock()// 清空当前资源的 healthyDevices 和 unhealthyDevices 集合m.healthyDevices[resourceName] = sets.New[string]()m.unhealthyDevices[resourceName] = sets.New[string]()oldDevices := m.allDevices[resourceName]podsToUpdate := sets.New[string]()m.allDevices[resourceName] = make(map[string]pluginapi.Device)for _, dev := range devices {if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {// compare with old device's health and send update to the channel if neededif oldDev, ok := oldDevices[dev.ID]; ok {if oldDev.Health != dev.Health {// 找出所有使用了健康状态发生变化的设备的 Pod。podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)podsToUpdate.Insert(podUID)}} else {// if this is a new device, it might have existed before and disappeared for a while// but still be assigned to a Pod. In this case, we need to send an update to the channelpodUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)podsToUpdate.Insert(podUID)}}m.allDevices[resourceName][dev.ID] = devif dev.Health == pluginapi.Healthy {m.healthyDevices[resourceName].Insert(dev.ID)healthyCount++} else {m.unhealthyDevices[resourceName].Insert(dev.ID)}}m.mutex.Unlock()if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {if len(podsToUpdate) > 0 {// 发送健康状态更新通知// 这个 channel 会被 kubelet 中的其他组件(如 Pod Lifecycle 事件处理器)监听。//收到此消息的组件可以采取相应行动,例如://触发对受影响 Pod 的健康检查。//记录事件日志。//在未来版本中,可能支持自动重启或驱逐使用故障设备的 Pod。m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList(),}}}// 调用 writeCheckpoint() 将当前的设备分配状态(包括哪些设备被哪些 Pod 占用)持久化到磁盘(通常是 /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 文件)。// 目的:实现故障恢复。如果 kubelet 重启,它可以读取这个 checkpoint 文件,恢复之前的设备分配状态,避免因 kubelet 重启而导致正在运行的 Pod 丢失其设备访问权限。if err := m.writeCheckpoint(); err != nil {klog.ErrorS(err, "Writing checkpoint encountered")}klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
}
2 调度 filter score bind
3、调度成功后,kubelete调用Allocate调用
当调度器最终确定 某节点 能满足 Pod 需求,Pod 就被 绑定 到该节点上。
然后,该节点上的 kubelet 启动容器前,会在 “Device Manager → Plugin” 调用 Allocate()
,让 plugin 返回如何设置容器。NVIDIA plugin 这里,就会返回 NVIDIA_VISIBLE_DEVICES
环境变量,或 /dev/nvidia*
等特定挂载,让最终的容器只看到被分配到的 GPU。
简化示意(对照前文):
- Scheduler:判断 “nvidia.com/gpu” 是否够用 → 选中节点X;
- kubelet(节点X):接收到调度结果 → 创建容器 → 调用 Device Manager → 进而调用插件的
Allocate()
; - plugin 返回 GPU ID 对应的挂载配置/环境变量 → kubelet 设置到容器 → 容器启动后就能使用那几块 GPU。
调度器最终确定 某节点 能满足 Pod 需求,Pod 就被 绑定 到该节点上。
然后,该节点上的 kubelet 启动容器前,会在 “Device Manager → Plugin” 调用 Allocate()
,让 plugin 返回如何设置容器。NVIDIA plugin 这里,就会返回 NVIDIA_VISIBLE_DEVICES
环境变量,或 /dev/nvidia*
等特定挂载,让最终的容器只看到被分配到的 GPU。
简化示意(对照前文):
- Scheduler:判断 “nvidia.com/gpu” 是否够用 → 选中节点X;
- kubelet(节点X):接收到调度结果 → 创建容器 → 调用 Device Manager → 进而调用插件的
Allocate()
; - plugin 返回 GPU ID 对应的挂载配置/环境变量 → kubelet 设置到容器 → 容器启动后就能使用那几块 GPU。
问题总结:
1)device plugin 是怎么感知节点上的设备的?
- 一般设备都会在 /dev/ 目录下,比如 NVIDIA GPU 就是
/dev/nvidia0
、/dev/nvidia1
这样,不过具体逻辑还是得硬件厂商自己实现 - 然后 device plugin 会以 DaemonSet 方式部署到所有节点,因此能发现每个节点上的设备
2)device plugin Allocate 方法怎么实现分配设备给容器的?
需要注意一点:Allocate 方法并没有真正将设备分配给容器,因为这个时候甚至都还没创建容器,只是在该方法中可以通过 Env、Mounts、Devices、Annotations、CDIDevices 等不同形式来传递
要将那些设备分配给该容器
这个信息给后续组件。
这些信息传给 Kubelet,然后 Kubelet 通过 CRI 调用 Runtime(Docker/Containerd 等等)真正开始创建容器。
比如 NVIDIA 在 Allocate 中就传递了 NVIDIA_VISIBLE_DEVICES 这个 Env,然后自己实现了 nvidia-container-runtime,该 runtime 就可以根据该 Env 知道要把哪个 GPU 分配给容器,然后修改容器的 OCI Spec,最终 runC(或者其他实现)真正创建容器时就会按照这个描述去处理。
又比如 ix-device-plugin 就是用的 Devices 方式,直接指定分配给容器的设备在宿主机的位置,以及要挂载到容器中的位置,这样就不需要实现自己的 container-runtime 了,runC 创建容器时也能把对应设备分配给容器。
这样又引出一个小问题,既然天数(ix-device-plugin)这个实现只用 Devices 就能正常运行,那为什么 NVIDIA 实现了 Devices 又实现了一个 Env?
其实这个 Env 的实现是为了兼容非 k8s 环境,比如 Docker 环境:
nvidia 可以在启动容器时指定 GPU
|
而天数则不行,就像这样:
|
可以看到,需要自己使用 -v 将相关文件挂载进容器才能使用,nvidia-container-runtime 实则是将这部分进行了封装简化,用户只需要传一个参数即可。
3)为什么 device plugin 要 Watch Kubelet 状态,当 Kubelet 重启后 device plugin 也要跟着重启。
这个问题实际上可以翻译为:为什么 Kubelet 重启后,device plugin 需要重新向 Kubelet 注册?
因为 device plugin 的注册信息 Kubelet 是存在内存中的,使用 Go 中的 Map 结构进行存储。重启后就会丢失,因此各个 device plugin 都需要重新注册。
至于为什么 device plugin 一般也会跟着重启,是因为 device plugin 在启动时会调用因此注册接口,因此感知到 Kubelet 重启了,直接让 device plugin 退出即可,然后 DaemonSet 会重新拉起 Pod,这样启动后自动调用注册接口
结语
通过以上分析过程,我们可以看到 GPU Device Plugin 的总体原理相对清晰:
- 插件负责:
- 获取真实硬件信息(如调用 NVML 取 GPU UUID)
- 在本机开一个 gRPC 服务,提供
ListAndWatch
,Allocate
等接口 - 向 kubelet 注册自己
- 持续检测硬件健康,并向 kubelet 实时汇报
- kubelet 负责:
- 接收注册请求、与插件建立长连
- 把设备信息更新到 Node Status
- 在容器启动时调用
Allocate()
让插件告诉它怎么配置容器 - 确保集群调度、资源分配符合“整卡”模型(或厂商自定义模型)
因为 Device Plugin 框架将 “硬件逻辑” 和 “kubelet 核心” 解耦,厂商可以各自实现插件,大大降低了 Kubernetes 自身的复杂度。NVIDIA/k8s-device-plugin 即是其中的一个典型实现,也是目前使用最广泛的 GPU Plugin。
问题: kubelete.socket是啥 与tcp啥区别
kubelet.sock
能够实现通信,其核心原理在于它是一个 Unix Domain Socket (UDS) 文件。这是一种在同一台物理主机上的进程间通信(IPC, Inter-Process Communication)机制,比网络套接字(TCP/IP)更高效。
- Unix Domain Socket 是一种允许同一台机器上的进程进行双向数据交换的通信机制。
- 它使用文件系统中的一个特殊文件(如
/var/lib/kubelet/device-plugins/kubelet.sock
)作为通信的“地址”或“端点”。 -
特性 Unix Domain Socket (UDS) TCP Socket 通信范围 同一台主机上的进程 任意主机间的进程(通过网络) 通信协议 主机内部文件系统/内核 TCP/IP 网络协议栈 寻址方式 文件路径 (e.g., /path/to/socket.sock
)IP 地址 + 端口号 (e.g., 127.0.0.1:8080
)性能 极高:数据不经过网络协议栈,直接在内核内存中交换,无网络开销。 相对较低:需要封装/解封装 IP/TCP 头,可能有网络延迟。 安全性 高:依赖文件系统权限( chmod
,chown
)。只有有权限访问该文件的进程才能连接。需要额外的网络安全机制(如 TLS, 防火墙)。