Go + Kubernetes 客户端实战——client-go 操作 K8s 资源的完整方案
Go + Kubernetes 客户端实战——client-go 操作 K8s 资源的完整方案
适读人群:需要用代码操控 K8s 集群的 Go 工程师 | 阅读时长:约16分钟 | 核心价值:掌握 client-go 的四种客户端,能写出生产级 K8s 自动化工具
从一个运维自动化需求说起
去年双十一前,我们平台需要做一个"弹性扩缩容"的功能:根据业务流量监控数据,自动调整各服务的 Deployment replicas。
最开始我以为直接调 K8s API 就行了,查了下文档发现 K8s 提供的是 REST API,用 Go 的 net/http 包理论上可以直接调,但那样要自己处理认证、序列化、Watch 事件流……工作量太大了。
后来同事推荐我用 client-go——K8s 官方的 Go 客户端库。用上之后确实顺畅了很多,但也踩了不少坑:Informer 和直接 Get 的差别没搞清楚导致频繁打 API Server;Watch 事件处理写错了导致内存泄漏;Dynamic Client 和 Typed Client 傻傻分不清楚……
今天把这些经验都整理出来,希望你少走弯路。
client-go 的四种客户端,选哪个?
client-go 提供了四种操作 K8s 资源的方式,这是很多人第一次看文档时懵的地方。
| 客户端类型 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Clientset | 操作 K8s 内置资源(Pod、Deployment 等) | 类型安全,IDE 友好 | 不支持 CRD |
| Dynamic Client | 操作任意资源(包括 CRD) | 通用,不需要提前知道资源类型 | 操作 unstructured,容易出错 |
| RESTClient | 底层 HTTP 操作 | 最灵活 | 太底层,不建议直接用 |
| Discovery Client | 查询集群支持的 API 资源列表 | 用于元信息查询 | 功能单一 |
我的建议:
- 操作标准 K8s 资源 → 用 Clientset
- 操作 CRD → 用 Dynamic Client 或给 CRD 生成 typed client(用 code-generator)
- 查询集群有哪些 API → 用 Discovery Client
初始化客户端
package main
import (
"context"
"fmt"
"os"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func buildConfig() (*rest.Config, error) {
// 方式一:集群内运行(Pod 里)
config, err := rest.InClusterConfig()
if err == nil {
return config, nil
}
// 方式二:集群外运行(本地开发)
kubeconfigPath := os.Getenv("KUBECONFIG")
if kubeconfigPath == "" {
if home := homedir.HomeDir(); home != "" {
kubeconfigPath = filepath.Join(home, ".kube", "config")
}
}
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}
func main() {
config, err := buildConfig()
if err != nil {
panic(err.Error())
}
// Typed 客户端(操作内置资源)
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// Dynamic 客户端(操作任意资源)
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 列出所有 Pod
pods, err := clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("找到 %d 个 Pod\n", len(pods.Items))
_ = dynamicClient // 后面用
}Clientset:操作内置 K8s 资源
package k8sops
import (
"context"
"encoding/json"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)
type K8sClient struct {
cs kubernetes.Interface
}
func NewK8sClient(cs kubernetes.Interface) *K8sClient {
return &K8sClient{cs: cs}
}
// ScaleDeployment 调整 Deployment 副本数
func (c *K8sClient) ScaleDeployment(ctx context.Context, namespace, name string, replicas int32) error {
// 使用 Patch 而不是 Update,避免版本冲突问题
patchData := map[string]interface{}{
"spec": map[string]interface{}{
"replicas": replicas,
},
}
patchBytes, err := json.Marshal(patchData)
if err != nil {
return err
}
_, err = c.cs.AppsV1().Deployments(namespace).Patch(
ctx, name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
)
return err
}
// GetDeploymentStatus 查看 Deployment 状态
func (c *K8sClient) GetDeploymentStatus(ctx context.Context, namespace, name string) (*DeploymentStatus, error) {
deploy, err := c.cs.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, fmt.Errorf("deployment %s/%s not found", namespace, name)
}
return nil, err
}
return &DeploymentStatus{
Name: deploy.Name,
Replicas: deploy.Spec.Replicas,
ReadyReplicas: deploy.Status.ReadyReplicas,
AvailableReplicas: deploy.Status.AvailableReplicas,
UpdatedReplicas: deploy.Status.UpdatedReplicas,
}, nil
}
type DeploymentStatus struct {
Name string
Replicas *int32
ReadyReplicas int32
AvailableReplicas int32
UpdatedReplicas int32
}
// CreateNamespace 创建命名空间(如果不存在)
func (c *K8sClient) EnsureNamespace(ctx context.Context, name string, labels map[string]string) error {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
}
_, err := c.cs.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
return nil // 已存在,不是错误
}
return err
}
// ListPodsWithLabel 列出指定标签的 Pod
func (c *K8sClient) ListPodsWithLabel(ctx context.Context, namespace string, labelSelector map[string]string) ([]corev1.Pod, error) {
selector := metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchLabels: labelSelector,
})
pods, err := c.cs.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
return nil, err
}
return pods.Items, nil
}
// WaitForDeploymentReady 等待 Deployment 就绪
func (c *K8sClient) WaitForDeploymentReady(ctx context.Context, namespace, name string) error {
// 使用 Watch 而不是轮询
watcher, err := c.cs.AppsV1().Deployments(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watch channel closed")
}
deploy, ok := event.Object.(*appsv1.Deployment)
if !ok {
continue
}
desired := int32(1)
if deploy.Spec.Replicas != nil {
desired = *deploy.Spec.Replicas
}
if deploy.Status.ReadyReplicas >= desired {
return nil // 就绪了
}
case <-ctx.Done():
return ctx.Err()
}
}
}Informer:生产级监听资源变化
很多初学者喜欢用 Watch 或者 List 轮询来监控资源变化,这在生产环境有两个大问题:
- 大量请求打到 API Server:每次 Get/List 都是一次 HTTP 请求,高并发下把 API Server 压垮是真实发生过的事
- Watch 连接断开后事件丢失:网络抖动导致 Watch 断开,期间发生的事件完全丢失
Informer 解决了这两个问题:它在内存里维护一个资源缓存(Store),通过 List + Watch 保持同步,所有查询走缓存而不打 API Server。
package informer
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type DeploymentWatcher struct {
clientset kubernetes.Interface
factory informers.SharedInformerFactory
}
func NewDeploymentWatcher(cs kubernetes.Interface, namespace string) *DeploymentWatcher {
// resyncPeriod: 多久强制全量同步一次(即使没有变化)
// 生产建议设为 10m~1h
factory := informers.NewSharedInformerFactoryWithOptions(
cs,
10*time.Minute,
informers.WithNamespace(namespace),
)
return &DeploymentWatcher{clientset: cs, factory: factory}
}
func (w *DeploymentWatcher) Run(ctx context.Context) {
// 获取 Deployment Informer
deployInformer := w.factory.Apps().V1().Deployments().Informer()
// 注册事件处理器
deployInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
fmt.Printf("[ADD] Deployment: %s/%s\n", deploy.Namespace, deploy.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeploy := oldObj.(*appsv1.Deployment)
newDeploy := newObj.(*appsv1.Deployment)
// 只关注副本数变化
if oldDeploy.Status.ReadyReplicas != newDeploy.Status.ReadyReplicas {
fmt.Printf("[UPDATE] %s/%s: %d -> %d ready replicas\n",
newDeploy.Namespace, newDeploy.Name,
oldDeploy.Status.ReadyReplicas,
newDeploy.Status.ReadyReplicas,
)
}
},
DeleteFunc: func(obj interface{}) {
// 注意:删除事件的 obj 可能是 DeletedFinalStateUnknown
deploy, ok := obj.(*appsv1.Deployment)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
deploy, ok = tombstone.Obj.(*appsv1.Deployment)
if !ok {
return
}
}
fmt.Printf("[DELETE] Deployment: %s/%s\n", deploy.Namespace, deploy.Name)
},
})
// 启动所有 Informer
w.factory.Start(ctx.Done())
// 等待缓存同步完成(第一次 List 完毕)
if !cache.WaitForCacheSync(ctx.Done(), deployInformer.HasSynced) {
fmt.Println("等待缓存同步超时")
return
}
fmt.Println("Informer 缓存同步完成,开始监听...")
// 从缓存里查询(不打 API Server)
lister := w.factory.Apps().V1().Deployments().Lister()
deploys, err := lister.List(nil) // nil 表示全部
if err == nil {
fmt.Printf("当前共有 %d 个 Deployment(来自缓存)\n", len(deploys))
}
<-ctx.Done()
}
// Pod Informer 示例 - 监听 Pod 调度事件
func WatchPodScheduling(ctx context.Context, cs kubernetes.Interface) {
factory := informers.NewSharedInformerFactory(cs, 5*time.Minute)
podInformer := factory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
// 只处理 Pending 状态的 Pod
FilterFunc: func(obj interface{}) bool {
pod, ok := obj.(*corev1.Pod)
return ok && pod.Status.Phase == corev1.PodPending
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("新的 Pending Pod: %s/%s\n", pod.Namespace, pod.Name)
},
},
})
factory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)
<-ctx.Done()
}Dynamic Client:操作 CRD 资源
当你需要操作 CRD(自定义资源)时,如果没有生成 typed client,就需要用 Dynamic Client。
package dynamic
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
// GVR: Group/Version/Resource,唯一标识一种 K8s 资源
var mysqlClusterGVR = schema.GroupVersionResource{
Group: "db.example.com",
Version: "v1alpha1",
Resource: "mysqlclusters",
}
func ListMySQLClusters(ctx context.Context, dc dynamic.Interface, namespace string) error {
list, err := dc.Resource(mysqlClusterGVR).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, item := range list.Items {
name := item.GetName()
// 从 unstructured 里取嵌套字段
phase, found, err := unstructured.NestedString(item.Object, "status", "phase")
if err != nil || !found {
phase = "Unknown"
}
replicas, _, _ := unstructured.NestedInt64(item.Object, "spec", "replicas")
fmt.Printf("MySQLCluster: %s, Phase: %s, Replicas: %d\n", name, phase, replicas)
}
return nil
}
func CreateMySQLCluster(ctx context.Context, dc dynamic.Interface, namespace, name string, replicas int64) error {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "db.example.com/v1alpha1",
"kind": "MySQLCluster",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"version": "8.0",
"storageSize": "10Gi",
},
},
}
_, err := dc.Resource(mysqlClusterGVR).Namespace(namespace).Create(
ctx, obj, metav1.CreateOptions{},
)
return err
}
// PatchMySQLClusterReplicas 用 Patch 更新副本数
func PatchMySQLClusterReplicas(ctx context.Context, dc dynamic.Interface, namespace, name string, replicas int64) error {
patch := map[string]interface{}{
"spec": map[string]interface{}{
"replicas": replicas,
},
}
patchBytes, _ := json.Marshal(patch)
_, err := dc.Resource(mysqlClusterGVR).Namespace(namespace).Patch(
ctx, name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
)
return err
}三个真实踩坑记录
坑一:List 轮询把 API Server 打挂
现象:某天 K8s 集群 API Server 响应变慢,排查发现有个监控服务每秒调用 clientset.CoreV1().Pods("").List() 扫描全集群 Pod。
原因:直接调 List/Get 每次都是真实 HTTP 请求打到 API Server,全集群 Pod 扫描的数据量大,频率高,直接把 API Server 压垮了。
解法:换成 SharedInformer,所有查询走内存缓存,API Server 只承受 Watch 长连接的压力,负载降低了 90%+。
坑二:Watch 断开后事件丢失
现象:使用原生 Watch API 监控 Deployment 变化,偶发性地有些更新事件没被处理到。
原因:Watch 连接是长连接,网络抖动会导致连接断开,断开到重连期间发生的事件就丢了。
解法:换成 Informer,它内部用 ListWatch 实现,断开重连时会从上次的 resourceVersion 开始续传,不会丢事件。如果 resourceVersion 过期(etcd 里已经被 compact 掉了),会自动 relist 重新全量同步。
坑三:DeleteFunc 里拿到的是 DeletedFinalStateUnknown
现象:Informer 的 DeleteFunc 里 obj.(*appsv1.Deployment) 类型断言 panic。
原因:如果 Informer 错过了删除事件(比如程序崩溃后重启,资源已经被删了),再次同步时 Informer 会生成一个 cache.DeletedFinalStateUnknown 对象来通知你"这个资源已经不在了,但我不确定删除的具体时间"。
解法:DeleteFunc 里必须处理这种情况:
DeleteFunc: func(obj interface{}) {
var deploy *appsv1.Deployment
switch o := obj.(type) {
case *appsv1.Deployment:
deploy = o
case cache.DeletedFinalStateUnknown:
var ok bool
deploy, ok = o.Obj.(*appsv1.Deployment)
if !ok {
return
}
default:
return
}
// 处理 deploy 删除事件...
},Go vs Java 在 K8s 客户端上的体验差异
Java 生态有 fabric8io/kubernetes-client,功能完整,API 设计上做了很多 DSL 封装,写起来比较像流式 API。
Go 的 client-go 更贴近 K8s 的原始概念,几乎没有做额外封装,学习曲线稍陡,但一旦理解了 GVR、Unstructured、Informer 这些核心概念,代码写起来非常直接。
有一点 Go 天然优势:Goroutine + Channel 和 Informer 的事件驱动模型非常搭配。在 Java 里实现类似逻辑需要 ThreadPool + BlockingQueue,代码量要多一倍,还要小心 ThreadLocal 之类的并发陷阱。
总结
client-go 操作 K8s 的核心要点:
- 用 Informer 代替 List/Watch:缓存查询,降低 API Server 压力
- 更新用 Patch 而不是 Update:避免因 resourceVersion 冲突而失败
- DeleteFunc 要处理 DeletedFinalStateUnknown:否则重启后会 panic
- CRD 资源用 Dynamic Client 或生成 typed client:Clientset 只能操作内置资源
- InClusterConfig 自动处理 ServiceAccount 认证:Pod 内不需要挂载 kubeconfig
