Kubernetes控制平面组件:Controller Manager 之 NamespaceController 全方位讲解

发布于:2025-05-08 ⋅ 阅读:(20) ⋅ 点赞:(0)

云原生学习路线导航页(持续更新中)

本文是kubernetes的控制面组件ControllerManager系列文章,本篇 以Namespace Controller 为例,详细讲述了kubernetes内置Controller的编写过程,包含kubernetes 内置controller struct 声明,Informer机制原理及代码分析、client-go Clientset 整合资源client的原理、kube-controller-manager如何启动内置controller等。另外对Namespace Controller 代码进行完整分析

  • 希望大家多多 点赞 关注 评论 收藏,作者会更有动力继续编写技术文章

1.Namespace Struct

kubernetes/staging/src/k8s.io/api/core/v1/types.go

  • 首先定义 namespace struct,包括spec、status等
// FinalizerName is the name identifying a finalizer during namespace lifecycle.
type FinalizerName string

// These are internal finalizer values to Kubernetes, must be qualified name unless defined here or
// in metav1.
const (
	FinalizerKubernetes FinalizerName = "kubernetes"
)

// NamespaceSpec describes the attributes on a Namespace.
type NamespaceSpec struct {
	// Finalizers is an opaque list of values that must be empty to permanently remove object from storage.
	// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
	// +optional
	// +listType=atomic
	Finalizers []FinalizerName `json:"finalizers,omitempty" protobuf:"bytes,1,rep,name=finalizers,casttype=FinalizerName"`
}

// NamespaceStatus is information about the current status of a Namespace.
type NamespaceStatus struct {
	// Phase is the current lifecycle phase of the namespace.
	// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
	// +optional
	Phase NamespacePhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=NamespacePhase"`

	// Represents the latest available observations of a namespace's current state.
	// +optional
	// +patchMergeKey=type
	// +patchStrategy=merge
	// +listType=map
	// +listMapKey=type
	Conditions []NamespaceCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,2,rep,name=conditions"`
}

// +enum
type NamespacePhase string

// These are the valid phases of a namespace.
const (
	// NamespaceActive means the namespace is available for use in the system
	NamespaceActive NamespacePhase = "Active"
	// NamespaceTerminating means the namespace is undergoing graceful termination
	NamespaceTerminating NamespacePhase = "Terminating"
)

const (
	// NamespaceTerminatingCause is returned as a defaults.cause item when a change is
	// forbidden due to the namespace being terminated.
	NamespaceTerminatingCause metav1.CauseType = "NamespaceTerminating"
)

type NamespaceConditionType string

// These are built-in conditions of a namespace.
const (
	// NamespaceDeletionDiscoveryFailure contains information about namespace deleter errors during resource discovery.
	NamespaceDeletionDiscoveryFailure NamespaceConditionType = "NamespaceDeletionDiscoveryFailure"
	// NamespaceDeletionContentFailure contains information about namespace deleter errors during deletion of resources.
	NamespaceDeletionContentFailure NamespaceConditionType = "NamespaceDeletionContentFailure"
	// NamespaceDeletionGVParsingFailure contains information about namespace deleter errors parsing GV for legacy types.
	NamespaceDeletionGVParsingFailure NamespaceConditionType = "NamespaceDeletionGroupVersionParsingFailure"
	// NamespaceContentRemaining contains information about resources remaining in a namespace.
	NamespaceContentRemaining NamespaceConditionType = "NamespaceContentRemaining"
	// NamespaceFinalizersRemaining contains information about which finalizers are on resources remaining in a namespace.
	NamespaceFinalizersRemaining NamespaceConditionType = "NamespaceFinalizersRemaining"
)

// NamespaceCondition contains details about state of namespace.
type NamespaceCondition struct {
	// Type of namespace controller condition.
	Type NamespaceConditionType `json:"type" protobuf:"bytes,1,opt,name=type,casttype=NamespaceConditionType"`
	// Status of the condition, one of True, False, Unknown.
	Status ConditionStatus `json:"status" protobuf:"bytes,2,opt,name=status,casttype=ConditionStatus"`
	// Last time the condition transitioned from one status to another.
	// +optional
	LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,4,opt,name=lastTransitionTime"`
	// Unique, one-word, CamelCase reason for the condition's last transition.
	// +optional
	Reason string `json:"reason,omitempty" protobuf:"bytes,5,opt,name=reason"`
	// Human-readable message indicating details about last transition.
	// +optional
	Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"`
}

// +genclient
// +genclient:nonNamespaced
// +genclient:skipVerbs=deleteCollection
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:prerelease-lifecycle-gen:introduced=1.0

// Namespace provides a scope for Names.
// Use of multiple namespaces is optional.
type Namespace struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Spec defines the behavior of the Namespace.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
	// +optional
	Spec NamespaceSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

	// Status describes the current status of a Namespace.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
	// +optional
	Status NamespaceStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:prerelease-lifecycle-gen:introduced=1.0

// NamespaceList is a list of Namespaces.
type NamespaceList struct {
	metav1.TypeMeta `json:",inline"`
	// Standard list metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
	// +optional
	metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Items is the list of Namespace objects in the list.
	// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
	Items []Namespace `json:"items" protobuf:"bytes,2,rep,name=items"`
}
  • 代码中涉及的注释tag:
注释标签 功能描述 生成内容/行为
// +enum 标记类型为枚举类型 生成枚举类型代码,如 NamespacePhaseActiveTerminating 枚举值
// +genclient 声明需要生成客户端代码 生成资源的 CRUD 客户端方法(如 CreateUpdateDelete
// +genclient:nonNamespaced 标记资源为集群范围(非命名空间内资源) 生成的客户端方法不包含命名空间参数(如 Namespace 资源)
// +genclient:skipVerbs=deleteCollection 跳过生成指定动词的客户端方法 不生成 DeleteCollection 方法(避免批量删除操作)
// +k8s:deepcopy-gen:interfaces=... 启用深拷贝并实现指定接口 生成实现 runtime.Object 接口的 DeepCopy() 方法,支持序列化和反序列化
// +k8s:prerelease-lifecycle-gen:introduced=1.0 标记 API 资源的版本生命周期阶段 记录 API 的版本演进(如该资源在 1.0 版本引入)
// +optional 标记字段为可选字段 生成的 OpenAPI Schema 中该字段非必填,JSON 编码时忽略空值
// +listType=atomic 指定列表字段为原子性合并策略 列表字段在更新时会被整体替换(而非合并)
// +listType=map 指定列表字段为映射合并策略 列表字段按 listMapKey 指定的键进行合并(如 type
// +listMapKey=type 定义列表合并的主键字段 生成基于 type 字段的合并逻辑(用于 Conditions 等字段)
// +patchMergeKey=type 定义补丁操作时的合并键 指定 type 字段为合并条件(与 patchStrategy=merge 配合使用)
// +patchStrategy=merge 指定补丁操作为合并策略 字段更新时根据 patchMergeKey 合并而非覆盖
  • 分类说明

  • 代码生成类

  • // +genclient// +enum// +k8s:deepcopy-gen 等用于控制代码生成逻辑。

  • 例如:生成客户端方法、深拷贝方法、枚举类型代码。

  • 字段行为类

  • // +optional// +listType=atomic// +patchStrategy=merge 等定义字段的序列化、更新和合并行为。

  • 例如:Conditions 字段通过 listType=maplistMapKey=type 实现按类型合并。

  • API 生命周期类

  • // +k8s:prerelease-lifecycle-gen 标记 API 版本状态(如 Alpha、Beta、Stable)。

  • 例如:NamespaceNamespaceList 均标记为 1.0 版本引入。

  • 核心场景示例

  • Finalizer 机制:

  • NamespaceSpec 中的 Finalizers 字段通过 // +listType=atomic 确保在删除命名空间时,Finalizer 列表的更新是原子性的。

  • 状态条件管理:

  • NamespaceStatus.Conditions 通过 // +listType=map// +listMapKey=type 实现按条件类型(如 NamespaceDeletionFailure)合并状态信息。

  • 资源删除保护:

  • FinalizerKubernetes 常量标记内置 Finalizer,防止命名空间被直接删除,需等待关联资源清理完成。

2.Namespace register

kubernetes/staging/src/k8s.io/api/core/v1/register.go

package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
)

// GroupName is the group name use in this package
const GroupName = ""

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
	// We only register manually written functions here. The registration of the
	// generated functions takes place in the generated files. The separation
	// makes the code compile even when the generated files are missing.
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	AddToScheme   = SchemeBuilder.AddToScheme
)

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(SchemeGroupVersion,
		...
		&Namespace{},
		&NamespaceList{},
		...
	)

	// Add common types
	scheme.AddKnownTypes(SchemeGroupVersion, &metav1.Status{})

	// Add the watch version that applies
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

3.Namespace 代码生成器生成代码

3.1.zz_generated.deepcopy.go

kubernetes/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go

  • zz_generated.deepcopy 代码在 api 项目中维护
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Namespace) DeepCopyInto(out *Namespace) {
	*out = *in
	out.TypeMeta = in.TypeMeta
	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
	in.Spec.DeepCopyInto(&out.Spec)
	in.Status.DeepCopyInto(&out.Status)
	return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Namespace.
func (in *Namespace) DeepCopy() *Namespace {
	if in == nil {
		return nil
	}
	out := new(Namespace)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Namespace) DeepCopyObject() runtime.Object {
	if c := in.DeepCopy(); c != nil {
		return c
	}
	return nil
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceCondition) DeepCopyInto(out *NamespaceCondition) {
	*out = *in
	in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
	return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceCondition.
func (in *NamespaceCondition) DeepCopy() *NamespaceCondition {
	if in == nil {
		return nil
	}
	out := new(NamespaceCondition)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceList) DeepCopyInto(out *NamespaceList) {
	*out = *in
	out.TypeMeta = in.TypeMeta
	in.ListMeta.DeepCopyInto(&out.ListMeta)
	if in.Items != nil {
		in, out := &in.Items, &out.Items
		*out = make([]Namespace, len(*in))
		for i := range *in {
			(*in)[i].DeepCopyInto(&(*out)[i])
		}
	}
	return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceList.
func (in *NamespaceList) DeepCopy() *NamespaceList {
	if in == nil {
		return nil
	}
	out := new(NamespaceList)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NamespaceList) DeepCopyObject() runtime.Object {
	if c := in.DeepCopy(); c != nil {
		return c
	}
	return nil
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceSpec) DeepCopyInto(out *NamespaceSpec) {
	*out = *in
	if in.Finalizers != nil {
		in, out := &in.Finalizers, &out.Finalizers
		*out = make([]FinalizerName, len(*in))
		copy(*out, *in)
	}
	return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceSpec.
func (in *NamespaceSpec) DeepCopy() *NamespaceSpec {
	if in == nil {
		return nil
	}
	out := new(NamespaceSpec)
	in.DeepCopyInto(out)
	return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceStatus) DeepCopyInto(out *NamespaceStatus) {
	*out = *in
	if in.Conditions != nil {
		in, out := &in.Conditions, &out.Conditions
		*out = make([]NamespaceCondition, len(*in))
		for i := range *in {
			(*in)[i].DeepCopyInto(&(*out)[i])
		}
	}
	return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceStatus.
func (in *NamespaceStatus) DeepCopy() *NamespaceStatus {
	if in == nil {
		return nil
	}
	out := new(NamespaceStatus)
	in.DeepCopyInto(out)
	return out
}

3.2.zz_generated.prerelease-lifecycle.go

kubernetes/staging/src/k8s.io/api/core/v1/zz_generated.prerelease-lifecycle.go

  • zz_generated.prerelease-lifecycle 代码在 api 项目中维护
// APILifecycleIntroduced is an autogenerated function, returning the release in which the API struct was introduced as int versions of major and minor for comparison.
// It is controlled by "k8s:prerelease-lifecycle-gen:introduced" tags in types.go.
func (in *Namespace) APILifecycleIntroduced() (major, minor int) {
	return 1, 0
}

// APILifecycleIntroduced is an autogenerated function, returning the release in which the API struct was introduced as int versions of major and minor for comparison.
// It is controlled by "k8s:prerelease-lifecycle-gen:introduced" tags in types.go.
func (in *NamespaceList) APILifecycleIntroduced() (major, minor int) {
	return 1, 0
}

3.3.types_swagger_doc_generated.go

kubernetes/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go

  • types_swagger_doc_generated 代码在 api 项目中维护
var map_Namespace = map[string]string{
	"":         "Namespace provides a scope for Names. Use of multiple namespaces is optional.",
	"metadata": "Standard object's metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata",
	"spec":     "Spec defines the behavior of the Namespace. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status",
	"status":   "Status describes the current status of a Namespace. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status",
}

func (Namespace) SwaggerDoc() map[string]string {
	return map_Namespace
}

var map_NamespaceCondition = map[string]string{
	"":                   "NamespaceCondition contains details about state of namespace.",
	"type":               "Type of namespace controller condition.",
	"status":             "Status of the condition, one of True, False, Unknown.",
	"lastTransitionTime": "Last time the condition transitioned from one status to another.",
	"reason":             "Unique, one-word, CamelCase reason for the condition's last transition.",
	"message":            "Human-readable message indicating details about last transition.",
}

func (NamespaceCondition) SwaggerDoc() map[string]string {
	return map_NamespaceCondition
}

var map_NamespaceList = map[string]string{
	"":         "NamespaceList is a list of Namespaces.",
	"metadata": "Standard list metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
	"items":    "Items is the list of Namespace objects in the list. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
}

func (NamespaceList) SwaggerDoc() map[string]string {
	return map_NamespaceList
}

var map_NamespaceSpec = map[string]string{
	"":           "NamespaceSpec describes the attributes on a Namespace.",
	"finalizers": "Finalizers is an opaque list of values that must be empty to permanently remove object from storage. More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/",
}

func (NamespaceSpec) SwaggerDoc() map[string]string {
	return map_NamespaceSpec
}

var map_NamespaceStatus = map[string]string{
	"":           "NamespaceStatus is information about the current status of a Namespace.",
	"phase":      "Phase is the current lifecycle phase of the namespace. More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/",
	"conditions": "Represents the latest available observations of a namespace's current state.",
}

func (NamespaceStatus) SwaggerDoc() map[string]string {
	return map_NamespaceStatus
}

3.4.generated.proto.go

kubernetes/staging/src/k8s.io/api/core/v1/generated.proto

  • generated.proto 代码在 api 项目中维护
// Namespace provides a scope for Names.
// Use of multiple namespaces is optional.
message Namespace {
  // Standard object's metadata.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
  // +optional
  optional .k8s.io.apimachinery.pkg.apis.meta.v1.ObjectMeta metadata = 1;

  // Spec defines the behavior of the Namespace.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
  // +optional
  optional NamespaceSpec spec = 2;

  // Status describes the current status of a Namespace.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
  // +optional
  optional NamespaceStatus status = 3;
}

// NamespaceCondition contains details about state of namespace.
message NamespaceCondition {
  // Type of namespace controller condition.
  optional string type = 1;

  // Status of the condition, one of True, False, Unknown.
  optional string status = 2;

  // Last time the condition transitioned from one status to another.
  // +optional
  optional .k8s.io.apimachinery.pkg.apis.meta.v1.Time lastTransitionTime = 4;

  // Unique, one-word, CamelCase reason for the condition's last transition.
  // +optional
  optional string reason = 5;

  // Human-readable message indicating details about last transition.
  // +optional
  optional string message = 6;
}

// NamespaceList is a list of Namespaces.
message NamespaceList {
  // Standard list metadata.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
  // +optional
  optional .k8s.io.apimachinery.pkg.apis.meta.v1.ListMeta metadata = 1;

  // Items is the list of Namespace objects in the list.
  // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
  repeated Namespace items = 2;
}

// NamespaceSpec describes the attributes on a Namespace.
message NamespaceSpec {
  // Finalizers is an opaque list of values that must be empty to permanently remove object from storage.
  // More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
  // +optional
  // +listType=atomic
  repeated string finalizers = 1;
}

// NamespaceStatus is information about the current status of a Namespace.
message NamespaceStatus {
  // Phase is the current lifecycle phase of the namespace.
  // More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
  // +optional
  optional string phase = 1;

  // Represents the latest available observations of a namespace's current state.
  // +optional
  // +patchMergeKey=type
  // +patchStrategy=merge
  // +listType=map
  // +listMapKey=type
  repeated NamespaceCondition conditions = 2;
}

3.5.generated.pb.go

kubernetes/staging/src/k8s.io/api/core/v1/generated.pb.go

  • generated.pb 代码在 api 项目中维护
func (m *Namespace) Reset()      { *m = Namespace{} }
func (*Namespace) ProtoMessage() {}
func (*Namespace) Descriptor() ([]byte, []int) {
	return fileDescriptor_6c07b07c062484ab, []int{91}
}
func (m *Namespace) XXX_Unmarshal(b []byte) error {
	return m.Unmarshal(b)
}
func (m *Namespace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	b = b[:cap(b)]
	n, err := m.MarshalToSizedBuffer(b)
	if err != nil {
		return nil, err
	}
	return b[:n], nil
}
func (m *Namespace) XXX_Merge(src proto.Message) {
	xxx_messageInfo_Namespace.Merge(m, src)
}
func (m *Namespace) XXX_Size() int {
	return m.Size()
}
func (m *Namespace) XXX_DiscardUnknown() {
	xxx_messageInfo_Namespace.DiscardUnknown(m)
}

var xxx_messageInfo_Namespace proto.InternalMessageInfo

func (m *NamespaceCondition) Reset()      { *m = NamespaceCondition{} }
func (*NamespaceCondition) ProtoMessage() {}
func (*NamespaceCondition) Descriptor() ([]byte, []int) {
	return fileDescriptor_6c07b07c062484ab, []int{92}
}
func (m *NamespaceCondition) XXX_Unmarshal(b []byte) error {
	return m.Unmarshal(b)
}
func (m *NamespaceCondition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	b = b[:cap(b)]
	n, err := m.MarshalToSizedBuffer(b)
	if err != nil {
		return nil, err
	}
	return b[:n], nil
}
func (m *NamespaceCondition) XXX_Merge(src proto.Message) {
	xxx_messageInfo_NamespaceCondition.Merge(m, src)
}
func (m *NamespaceCondition) XXX_Size() int {
	return m.Size()
}
func (m *NamespaceCondition) XXX_DiscardUnknown() {
	xxx_messageInfo_NamespaceCondition.DiscardUnknown(m)
}

var xxx_messageInfo_NamespaceCondition proto.InternalMessageInfo

func (m *NamespaceList) Reset()      { *m = NamespaceList{} }
func (*NamespaceList) ProtoMessage() {}
func (*NamespaceList) Descriptor() ([]byte, []int) {
	return fileDescriptor_6c07b07c062484ab, []int{93}
}
func (m *NamespaceList) XXX_Unmarshal(b []byte) error {
	return m.Unmarshal(b)
}
func (m *NamespaceList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	b = b[:cap(b)]
	n, err := m.MarshalToSizedBuffer(b)
	if err != nil {
		return nil, err
	}
	return b[:n], nil
}
func (m *NamespaceList) XXX_Merge(src proto.Message) {
	xxx_messageInfo_NamespaceList.Merge(m, src)
}
func (m *NamespaceList) XXX_Size() int {
	return m.Size()
}
func (m *NamespaceList) XXX_DiscardUnknown() {
	xxx_messageInfo_NamespaceList.DiscardUnknown(m)
}

var xxx_messageInfo_NamespaceList proto.InternalMessageInfo

func (m *NamespaceSpec) Reset()      { *m = NamespaceSpec{} }
func (*NamespaceSpec) ProtoMessage() {}
func (*NamespaceSpec) Descriptor() ([]byte, []int) {
	return fileDescriptor_6c07b07c062484ab, []int{94}
}
func (m *NamespaceSpec) XXX_Unmarshal(b []byte) error {
	return m.Unmarshal(b)
}
func (m *NamespaceSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	b = b[:cap(b)]
	n, err := m.MarshalToSizedBuffer(b)
	if err != nil {
		return nil, err
	}
	return b[:n], nil
}
func (m *NamespaceSpec) XXX_Merge(src proto.Message) {
	xxx_messageInfo_NamespaceSpec.Merge(m, src)
}
func (m *NamespaceSpec) XXX_Size() int {
	return m.Size()
}
func (m *NamespaceSpec) XXX_DiscardUnknown() {
	xxx_messageInfo_NamespaceSpec.DiscardUnknown(m)
}

var xxx_messageInfo_NamespaceSpec proto.InternalMessageInfo

func (m *NamespaceStatus) Reset()      { *m = NamespaceStatus{} }
func (*NamespaceStatus) ProtoMessage() {}
func (*NamespaceStatus) Descriptor() ([]byte, []int) {
	return fileDescriptor_6c07b07c062484ab, []int{95}
}
func (m *NamespaceStatus) XXX_Unmarshal(b []byte) error {
	return m.Unmarshal(b)
}
func (m *NamespaceStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	b = b[:cap(b)]
	n, err := m.MarshalToSizedBuffer(b)
	if err != nil {
		return nil, err
	}
	return b[:n], nil
}
func (m *NamespaceStatus) XXX_Merge(src proto.Message) {
	xxx_messageInfo_NamespaceStatus.Merge(m, src)
}
func (m *NamespaceStatus) XXX_Size() int {
	return m.Size()
}
func (m *NamespaceStatus) XXX_DiscardUnknown() {
	xxx_messageInfo_NamespaceStatus.DiscardUnknown(m)
}

var xxx_messageInfo_NamespaceStatus proto.InternalMessageInfo

4.Namespace Informer机制代码

  • Informer 代码在 client-go 项目中维护

4.1.Namespace Informer代码

kubernetes/staging/src/k8s.io/client-go/informers/core/v1/namespace.go

package v1

import (
	context "context"
	time "time"

	apicorev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	runtime "k8s.io/apimachinery/pkg/runtime"
	watch "k8s.io/apimachinery/pkg/watch"
	internalinterfaces "k8s.io/client-go/informers/internalinterfaces"
	kubernetes "k8s.io/client-go/kubernetes"
	corev1 "k8s.io/client-go/listers/core/v1"
	cache "k8s.io/client-go/tools/cache"
)

// NamespaceInformer provides access to a shared informer and lister for
// Namespaces.
type NamespaceInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() corev1.NamespaceLister
}

type namespaceInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// NewNamespaceInformer constructs a new informer for Namespace type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewNamespaceInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
	return NewFilteredNamespaceInformer(client, resyncPeriod, indexers, nil)
}

// NewFilteredNamespaceInformer constructs a new informer for Namespace type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredNamespaceInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Namespaces().List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Namespaces().Watch(context.TODO(), options)
			},
		},
		&apicorev1.Namespace{},
		resyncPeriod,
		indexers,
	)
}

func (f *namespaceInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredNamespaceInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *namespaceInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&apicorev1.Namespace{}, f.defaultInformer)
}

func (f *namespaceInformer) Lister() corev1.NamespaceLister {
	return corev1.NewNamespaceLister(f.Informer().GetIndexer())
}

4.2.Namespace Lister代码

kubernetes/staging/src/k8s.io/client-go/listers/core/v1/namespace.go

  • client-go对 lister的代码统一放在 listers 目录下
package v1

import (
	corev1 "k8s.io/api/core/v1"
	labels "k8s.io/apimachinery/pkg/labels"
	listers "k8s.io/client-go/listers"
	cache "k8s.io/client-go/tools/cache"
)

// NamespaceLister helps list Namespaces.
// All objects returned here must be treated as read-only.
type NamespaceLister interface {
	// List lists all Namespaces in the indexer.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*corev1.Namespace, err error)
	// Get retrieves the Namespace from the index for a given name.
	// Objects returned here must be treated as read-only.
	Get(name string) (*corev1.Namespace, error)
	NamespaceListerExpansion
}

// namespaceLister implements the NamespaceLister interface.
type namespaceLister struct {
	listers.ResourceIndexer[*corev1.Namespace]
}

// NewNamespaceLister returns a new NamespaceLister.
func NewNamespaceLister(indexer cache.Indexer) NamespaceLister {
	return &namespaceLister{listers.New[*corev1.Namespace](indexer, corev1.Resource("namespace"))}
}

4.3.代码讲解

  • Informer代码

    • NamespaceInformer 接口,包含 Informer()、Lister() 两个方法
      • Informer():用于获取 Namespace Informer,kubernetes内部资源的 Informer 都统一放在缓存中,所以这个方法是从缓存中获取对应的Informer,获取不到就创建放入缓存再返回
      • Lister():用于获取一个 NamespaceLister,NamespaceLister是用于从共享缓存中获取Namespace对象的,避免从apiserver读取
    • namespaceInformer 结构体 实现了 NamespaceInformer 接口,并且包含 factory、tweakListOptions两个参数
      • factory:工厂接口,包含2个方法,其中InformerFor方法是创建api对象的通用方法,namespaceInformer可以通过这个factory的InformerFor方法,快捷简便的创建一个cache.SharedIndexInformer对象
      • tweakListOptions:对option的调整函数,可以定义通用的ns调整方法,在需要的时机对ns对象做调整
    • NewNamespaceInformer/NewFilteredNamespaceInformer包级函数(静态方法):创建一个NamespaceInformer,类型为 cache.SharedIndexInformer,用于操作ns 的ListAndWatch
    • namespaceInformer 的接收器函数(成员方法):
      • Informer():实现NamespaceInformer 接口方法,获取缓存中的 namespace Informer。调用namespaceInformer中factory的InformerFor方法,即可获取缓存 cache.SharedIndexInformer 中的namespace Informer,如果缓存中不存在,就会创建一个加入缓存并返回
      • Lister():实现 NamespaceInformer 接口方法,New一个新的 namespace Lister
  • Lister代码

    • NamespaceLister 接口,包含 List()、Get() 两个方法
      • List():List Namespace 对象
      • Get():Get namespace 对象
    • namespaceLister 结构体,实现NamespaceLister 接口,不过它不是自己实现List、Get方法,而是通过一个 资源索引器 listers.ResourceIndexer,拿到索引器中的namespace索引器 listers.ResourceIndexer[*corev1.Namespace]。这个索引器实现了List()、Get() 两个方法
      • 因此创建namespaceLister对象时,需要传入一个 cache.Indexer
    • NewNamespaceLister 包级函数:创建一个NamespaceLister,入参 cache.Indexer,方法逻辑为 根据cache.Indexer创建一个listers.ResourceIndexer[*corev1.Namespace],封装为 NamespaceLister 对象后返回
  • 注意事项

    • 从上面代码细节可以看出,Informer是存在缓存中的,创建一次即可,后续都会直接复用。而Lister对象没有放入缓存,每次调用 namespaceInformer.Lister(),都会创建一个新的Lister,所以我们在编写CRD Controller时,有时会创建一个 Lister 放在 CRDController中,之后就都可以使用了

5.client-go 如何实现 namespace 资源与kubernetes集群的交互?

在这里插入图片描述

5.1.Clientset 维护了所有 GV 的 Client

kubernetes/staging/src/k8s.io/client-go/kubernetes/clientset.go

  • client-go中,所有的内置资源client都属于clientset,在client-go/kubernetes/clientset.go中,定义了统一的Clientset接口Interface,每个GV都有一个对应的方法。Namespace属于core/v1,所以我们关注CoreV1即可
  • Clientset 结构体,内部包含了所有GV的 client(core/v1的client为*corev1.CoreV1Client
  • Clientset 实现了 Interface 接口,CoreV1方法就是返回 coreV1 的client
  • core/v1 的 client 为 corev1.CoreV1Client 类型
package kubernetes

import (
	fmt "fmt"
	http "net/http"
	...
	corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
	...
)

type Interface interface {
	Discovery() discovery.DiscoveryInterface
	...
	CoreV1() corev1.CoreV1Interface
	...
}

// Clientset contains the clients for groups.
type Clientset struct {
	*discovery.DiscoveryClient
	...
	coreV1                        *corev1.CoreV1Client
	...
}

...

// CoreV1 retrieves the CoreV1Client
func (c *Clientset) CoreV1() corev1.CoreV1Interface {
	return c.coreV1
}

...

// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c

	if configShallowCopy.UserAgent == "" {
		configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	// share the transport between all clients
	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
	if err != nil {
		return nil, err
	}

	return NewForConfigAndClient(&configShallowCopy, httpClient)
}

// NewForConfigAndClient creates a new Clientset for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfigAndClient will generate a rate-limiter in configShallowCopy.
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
	configShallowCopy := *c
	if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
		if configShallowCopy.Burst <= 0 {
			return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
		}
		configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
	}

	var cs Clientset
	var err error
	...
	cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
	if err != nil {
		return nil, err
	}
	...
}

...
// New creates a new Clientset for the given RESTClient.
func New(c rest.Interface) *Clientset {
	var cs Clientset
	...
	cs.coreV1 = corev1.New(c)
	...

	cs.DiscoveryClient = discovery.NewDiscoveryClient(c)
	return &cs
}

5.2.corev1 Client

kubernetes/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/core_client.go

  • corev1 Client 包含一个 CoreV1Interface 接口,该接口继承了 所有的资源接口,包括 Namespace 的接口 NamespacesGetter 接口
  • CoreV1Client 结构体,实现了 CoreV1Interface 接口,继而实现了所有的资源接口,包括 NamespacesGetter 接口。NamespacesGetter 接口只有一个方法 Namespaces()
package v1

import (
	http "net/http"

	corev1 "k8s.io/api/core/v1"
	scheme "k8s.io/client-go/kubernetes/scheme"
	rest "k8s.io/client-go/rest"
)

type CoreV1Interface interface {
	RESTClient() rest.Interface
	...
	NamespacesGetter
	...
}

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
	restClient rest.Interface
}
...

func (c *CoreV1Client) Namespaces() NamespaceInterface {
	return newNamespaces(c)
}

...

// New creates a new CoreV1Client for the given RESTClient.
func New(c rest.Interface) *CoreV1Client {
	return &CoreV1Client{c}
}

func setConfigDefaults(config *rest.Config) error {
	gv := corev1.SchemeGroupVersion
	config.GroupVersion = &gv
	config.APIPath = "/api"
	config.NegotiatedSerializer = rest.CodecFactoryForGeneratedClient(scheme.Scheme, scheme.Codecs).WithoutConversion()

	if config.UserAgent == "" {
		config.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	return nil
}

// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *CoreV1Client) RESTClient() rest.Interface {
	if c == nil {
		return nil
	}
	return c.restClient
}

5.3.NamespacesGetter接口

kubernetes/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/namespace.go

  • NamespacesGetter 接口只有一个方法 Namespaces(),返回值为 NamespaceInterface 接口类型
  • NamespaceInterface 接口包含了操作namespace的所有方法:Create、Update、UpdateStatus、Delete、Get、List、Watch、Patch、Apply、ApplyStatus
  • namespaces 结构体 实现了NamespacesGetter 接口,所以 CoreV1Client 在实现NamespacesGetter时,其实就是返回了一个namespaces类型的对象,该对象就具有操作namespace的所有方法
    • ​​namespaces 结构体通过嵌入 gentype.ClientWithListAndApply(泛型客户端),​​自动继承其所有方法​​。ClientWithListAndApply 实现了 NamespaceInterface 接口的方法,则 namespaces 也会隐式实现该接口。
    • ClientWithListAndApply 封装了对 Kubernetes API 的底层操作(如 List、Apply 等方法),通过泛型参数 *corev1.Namespace、*corev1.NamespaceList 等指定资源类型
package v1

import (
	context "context"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	types "k8s.io/apimachinery/pkg/types"
	watch "k8s.io/apimachinery/pkg/watch"
	applyconfigurationscorev1 "k8s.io/client-go/applyconfigurations/core/v1"
	gentype "k8s.io/client-go/gentype"
	scheme "k8s.io/client-go/kubernetes/scheme"
)

// NamespacesGetter has a method to return a NamespaceInterface.
// A group's client should implement this interface.
type NamespacesGetter interface {
	Namespaces() NamespaceInterface
}

// NamespaceInterface has methods to work with Namespace resources.
type NamespaceInterface interface {
	Create(ctx context.Context, namespace *corev1.Namespace, opts metav1.CreateOptions) (*corev1.Namespace, error)
	Update(ctx context.Context, namespace *corev1.Namespace, opts metav1.UpdateOptions) (*corev1.Namespace, error)
	// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
	UpdateStatus(ctx context.Context, namespace *corev1.Namespace, opts metav1.UpdateOptions) (*corev1.Namespace, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Namespace, error)
	List(ctx context.Context, opts metav1.ListOptions) (*corev1.NamespaceList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *corev1.Namespace, err error)
	Apply(ctx context.Context, namespace *applyconfigurationscorev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *corev1.Namespace, err error)
	// Add a +genclient:noStatus comment above the type to avoid generating ApplyStatus().
	ApplyStatus(ctx context.Context, namespace *applyconfigurationscorev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *corev1.Namespace, err error)
	NamespaceExpansion
}

// namespaces implements NamespaceInterface
type namespaces struct {
	*gentype.ClientWithListAndApply[*corev1.Namespace, *corev1.NamespaceList, *applyconfigurationscorev1.NamespaceApplyConfiguration]
}

// newNamespaces returns a Namespaces
func newNamespaces(c *CoreV1Client) *namespaces {
	return &namespaces{
		gentype.NewClientWithListAndApply[*corev1.Namespace, *corev1.NamespaceList, *applyconfigurationscorev1.NamespaceApplyConfiguration](
			"namespaces",
			c.RESTClient(),
			scheme.ParameterCodec,
			"",
			func() *corev1.Namespace { return &corev1.Namespace{} },
			func() *corev1.NamespaceList { return &corev1.NamespaceList{} },
			gentype.PrefersProtobuf[*corev1.Namespace](),
		),
	}
}

5.4.总结

在这里插入图片描述

  • 现在再看这张图,就知道为什么 很多Controller编写的时候,都会自己持有一个 kubernetes.Interface 接口对象,用于操作环境内置资源。
  • 创建一个 kubernetes.Interface,其实就是Clientset对象
  • 调用 Clientset 对象的CoreV1(),即可获得一个CoreV1Interface,其实就是CoreV1Client对象
  • 调用 CoreV1Client 对象的Namespaces(),即可获取一个操作Namespace的对象,即namespaces对象
  • namespaces对象 具有诸多方法,用于操作环境中的ns。如:Create、Update等

6.Namespace Controller代码

在这里插入图片描述

  • 前置学习中,我们知道controller informer内部架构是这样,所以new controller的时候,我们需要提供:event handlers、workqueue、processNextItem、worker、indexer。

6.1.声明 NamespaceController struct

kubernetes/pkg/controller/namespace/namespace_controller.go

  • lister corelisters.NamespaceLister
    • 用于从共享缓存中列出命名空间(Namespace)对象。Lister 其实就是indexer,包含Get、List方法,用于从缓存中只读访问资源,避免直接调用 API Server
  • listerSynced cache.InformerSynced
    • InformerSynced 是一个函数类型,返回一个布尔值,表示缓存是否已同步
    • 确保在控制器开始处理事件之前,缓存中的数据已经与 API Server 同步
    • 如果缓存未同步,控制器可能会处理过时或不完整的数据
  • queue workqueue.TypedRateLimitingInterface[string]
    • queue 是一个速率限制队列,其实就是workqueue,用于存储需要处理的命名空间key。
    • 它支持对事件进行去重、延迟处理和速率限制。
    • 例如,当命名空间被删除时,事件会被加入队列,控制器的 worker 会从队列中取出key并处理。
  • namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
    • ns资源删除器,用于删除命名空间中的所有资源的接口
    • 当命名空间被标记为删除时,调用此字段的实现来清理命名空间中的所有资源(如 Pod、Service 等)。确保命名空间中的资源被安全地清理后,命名空间本身才能被删除
package namespace

// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
	// lister that can list namespaces from a shared cache
	lister corelisters.NamespaceLister
	// returns true when the namespace cache is ready
	listerSynced cache.InformerSynced
	// namespaces that have been queued up for processing by workers
	queue workqueue.TypedRateLimitingInterface[string]
	// helper to delete all resources in the namespace when the namespace is deleted.
	namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}

6.2.创建 Namespace Controller

kubernetes/pkg/controller/namespace/namespace_controller.go

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/time/rate"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	coreinformers "k8s.io/client-go/informers/core/v1"
	clientset "k8s.io/client-go/kubernetes"
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/metadata"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/kubernetes/pkg/controller"
	"k8s.io/kubernetes/pkg/controller/namespace/deletion"

	"k8s.io/klog/v2"
)


// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
	ctx context.Context,
	kubeClient clientset.Interface,
	metadataClient metadata.Interface,
	discoverResourcesFn func() ([]*metav1.APIResourceList, error),
	namespaceInformer coreinformers.NamespaceInformer,
	resyncPeriod time.Duration,
	finalizerToken v1.FinalizerName) *NamespaceController {

	// create the controller so we can inject the enqueue function
	namespaceController := &NamespaceController{
		queue: workqueue.NewTypedRateLimitingQueueWithConfig(
			nsControllerRateLimiter(),
			workqueue.TypedRateLimitingQueueConfig[string]{
				Name: "namespace",
			},
		),
		namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
	}

	// configure the namespace informer event handlers
	namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
		cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				namespace := obj.(*v1.Namespace)
				namespaceController.enqueueNamespace(namespace)
			},
			UpdateFunc: func(oldObj, newObj interface{}) {
				namespace := newObj.(*v1.Namespace)
				namespaceController.enqueueNamespace(namespace)
			},
		},
		resyncPeriod,
	)
	namespaceController.lister = namespaceInformer.Lister()
	namespaceController.listerSynced = namespaceInformer.Informer().HasSynced

	return namespaceController
}

6.2.1.入参分析

  • 入参:
    • clientset.Interface
      • 从上面导包看,clientset "k8s.io/client-go/kubernetes",就是第5章讲的clientset,该client可用于操作ns
    • metadata.Interface
      • "k8s.io/client-go/metadata" 是 Kubernetes client-go 库中用于 ​​轻量级元数据操作​​ 的核心组件,其设计目标是通过 ​​不依赖具体资源类型结构​​ 的方式,实现对任意 Kubernetes 资源(包括 CRD)的元数据(如标签、注解、所有者引用等)进行高效操作
    • discoverResourcesFn func() ([]*metav1.APIResourceList, error)
      • 一个函数,用于发现集群中所有的 API 资源,动态发现集群中支持的资源类型。在清理命名空间时,确定需要删除的资源类型
      • 实际上就类似通过 kubectl api-resources 找到所有的资源类型,确定哪些需要回收,哪些不需要回收
    • namespaceInformer coreinformers.NamespaceInformer
      • 命名空间的 informer,用于监听和缓存命名空间的变化
      • 提供命名空间的事件通知(如新增、更新、删除),提供命名空间的缓存访问接口,避免频繁调用 API Server
    • resyncPeriod time.Duration
      • informer 的重新同步周期。定期触发重新同步,即使没有资源变化,也会重新处理资源
    • finalizerToken v1.FinalizerName
      • 命名空间删除时使用的 finalizer 标识符。一般使用 kubernetes

6.2.2.创建 NamespaceController 实例

// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
	queue: workqueue.NewTypedRateLimitingQueueWithConfig(
		nsControllerRateLimiter(),
		workqueue.TypedRateLimitingQueueConfig[string]{
			Name: "namespace",
		},
	),
	namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(
		 ctx,
		 kubeClient.CoreV1().Namespaces(), 
		 metadataClient, 
		 kubeClient.CoreV1(), 
		 discoverResourcesFn, 
		 finalizerToken
	),
}

// nsControllerRateLimiter is tuned for a faster than normal recycle time with default backoff speed and default overall
// requeing speed.  We do this so that namespace cleanup is reliably faster and we know that the number of namespaces being
// deleted is smaller than total number of other namespace scoped resources in a cluster.
func nsControllerRateLimiter() workqueue.TypedRateLimiter[string] {
	return workqueue.NewTypedMaxOfRateLimiter(
		// this ensures that we retry namespace deletion at least every minute, never longer.
		workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 60*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}
  • queue:
    • 创建一个速率限制队列,作为workqueue,用于存储需待处理的Namespace key
    • 使用 nsControllerRateLimiter 配置速率限制器,确保事件处理的速率可控
  • nsControllerRateLimiter方法
    • nsControllerRateLimiter 是一个专门为命名空间清理任务设计的速率限制器函数。
    • 它的目的是通过调整重试和请求速率,使命名空间的清理过程更加高效和可靠。
    • 由于命名空间的清理通常涉及较少的资源,但需要快速完成,因此该函数的速率限制器被调优为比默认速率更快。
  • namespacedResourcesDeleter:
    • 创建一个 NamespacedResourcesDeleter 实例,用于清理命名空间中的资源。
    • 传入上下文、命名空间客户端、元数据客户端、资源发现函数和 finalizer 标识符。

6.2.3.配置命名空间事件处理器

namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
	cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			namespace := obj.(*v1.Namespace)
			namespaceController.enqueueNamespace(namespace)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			namespace := newObj.(*v1.Namespace)
			namespaceController.enqueueNamespace(namespace)
		},
	},
	resyncPeriod,
)

// enqueueNamespace adds an object to the controller work queue
// obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item.
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
		return
	}

	namespace := obj.(*v1.Namespace)
	// don't queue if we aren't deleted
	if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
		return
	}

	// delay processing namespace events to allow HA api servers to observe namespace deletion,
	// and HA etcd servers to observe last minute object creations inside the namespace
	nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}
  • AddEventHandlerWithResyncPeriod:

    • 为命名空间的 informer 配置事件处理器。
    • 监听命名空间的新增和更新事件,并将事件加入队列。
  • 事件处理逻辑:

    • AddFunc: 当有新的命名空间被创建时,调用 enqueueNamespace 将其加入队列。
    • UpdateFunc: 当命名空间被更新时,调用 enqueueNamespace 将其加入队列。
    • 不过这里 enqueueNamespace 只会把删除中的 namespace 入队,其他的忽略
  • 为什么delete操作,需要通过update事件将key入队?

    • ns默认都有finalizer的,当发起delete时,不会直接删除,而是在metadata中添加deletiontimestramp,标记为逻辑删除,该过程是一个update事件

6.2.4.设置命名空间缓存和同步函数

namespaceController.lister = namespaceInformer.Lister()
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
  • lister:
    • namespaceInformer 获取命名空间的 lister,放入Controller中维护,用于高效访问缓存中的命名空间对象。
  • listerSynced:
    • 设置缓存同步函数,用于检查缓存是否已与 API Server 同步

6.3.Namespace Controller 删除逻辑

6.3.1.事件key入队

kubernetes/pkg/controller/namespace/namespace_controller.go

  • 这里 enqueueNamespace 只会把删除中的 namespace 入队,其他的忽略
// enqueueNamespace adds an object to the controller work queue
// obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item.
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
		return
	}

	namespace := obj.(*v1.Namespace)
	// don't queue if we aren't deleted
	if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
		return
	}

	// delay processing namespace events to allow HA api servers to observe namespace deletion,
	// and HA etcd servers to observe last minute object creations inside the namespace
	nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}

6.3.2.worker逻辑

kubernetes/pkg/controller/namespace/namespace_controller.go

  • worker
    • 将ns处理逻辑封装为一个方法 workFunc,后面就是死循环,不停调用这个workFunc
    • workFunc 代码分析:
      • 从队列中弹出一个key。如果收到停止信号则终止
      • 对该 key 进行处理,nm.syncNamespaceFromKey(ctx, key)
      • 处理过程发生error,根据错误类型采取不同的处理方式,以确保命名空间的删除任务能够被有效地重试。
        • *deletion.ResourcesRemainingError 类型的错误,通常表示命名空间中仍然存在未清理的资源。提取错误中的 Estimate 字段,表示剩余资源的估计数量。然后计算一个等待时间 t,其值为剩余资源数量的一半加 1。这种计算方式是为了动态调整重试间隔,避免频繁重试导致系统负载过高。然后通过调用 nm.queue.AddAfter 方法,将命名空间重新加入队列,并设置延迟时间为 t 秒。
        • 其他类型的删除失败。为了避免等待完整的重新同步周期,代码调用 nm.queue.AddRateLimited 方法,将命名空间以速率限制的方式重新加入队列。这种方式可以控制重试频率,防止队列过载。同时,代码使用 utilruntime.HandleError 记录错误信息,方便后续的调试和问题排查。
  • syncNamespaceFromKey
    • 记录处理耗时,输出到日志备查
    • 使用namespaceController中的Lister,从缓存中查询ns
    • 使用namespacedResourcesDeleter删除ns
// worker processes the queue of namespace objects.
// Each namespace can be in the queue at most once.
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker(ctx context.Context) {
	workFunc := func(ctx context.Context) bool {
		key, quit := nm.queue.Get()
		if quit {
			return true
		}
		defer nm.queue.Done(key)

		err := nm.syncNamespaceFromKey(ctx, key)
		if err == nil {
			// no error, forget this entry and return
			nm.queue.Forget(key)
			return false
		}

		if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
			t := estimate.Estimate/2 + 1
			klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t)
			nm.queue.AddAfter(key, time.Duration(t)*time.Second)
		} else {
			// rather than wait for a full resync, re-add the namespace to the queue to be processed
			nm.queue.AddRateLimited(key)
			utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err))
		}
		return false
	}
	for {
		quit := workFunc(ctx)

		if quit {
			return
		}
	}
}

// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) {
	startTime := time.Now()
	logger := klog.FromContext(ctx)
	defer func() {
		logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
	}()

	namespace, err := nm.lister.Get(key)
	if errors.IsNotFound(err) {
		logger.Info("Namespace has been deleted", "namespace", key)
		return nil
	}
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
		return err
	}
	return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name)
}

6.4.namespacedResourcesDeleter删除器

kubernetes/pkg/controller/namespace/deletion/namespaced_resources_deleter.go

6.4.1.NamespacedResourcesDeleter数据结构

  • NamespacedResourcesDeleterInterface接口:Namespace删除器接口,NamespaceController持有一个该接口实例,调用Delete()完成ns删除
  • namespacedResourcesDeleter结构体:实现了NamespacedResourcesDeleterInterface接口,并且内部维护了一些客户端、资源发现方法、finalizer的key
  • NewNamespacedResourcesDeleter方法:创建一个NamespacedResourcesDeleter实例,即namespacedResourcesDeleter对象。
    • 其中 d.initOpCache(ctx) 是初始化 namespacedResourcesDeleter 的操作缓存 (opCache),以便记录哪些操作在特定的资源上不被支持。
    • 它通过调用 discoverResourcesFn 获取集群中所有支持的资源信息,并根据资源的操作能力填充缓存。这种缓存机制可以优化后续的资源操作逻辑,避免不必要的 HTTP 405 错误。
    • 这里主要是判断哪些资源不支持delete操作,缓存下来
// NamespacedResourcesDeleterInterface is the interface to delete a namespace with all resources in it.
type NamespacedResourcesDeleterInterface interface {
	Delete(ctx context.Context, nsName string) error
}

// NewNamespacedResourcesDeleter returns a new NamespacedResourcesDeleter.
func NewNamespacedResourcesDeleter(ctx context.Context, nsClient v1clientset.NamespaceInterface,
	metadataClient metadata.Interface, podsGetter v1clientset.PodsGetter,
	discoverResourcesFn func() ([]*metav1.APIResourceList, error),
	finalizerToken v1.FinalizerName) NamespacedResourcesDeleterInterface {
	d := &namespacedResourcesDeleter{
		nsClient:       nsClient,
		metadataClient: metadataClient,
		podsGetter:     podsGetter,
		opCache: &operationNotSupportedCache{
			m: make(map[operationKey]bool),
		},
		discoverResourcesFn: discoverResourcesFn,
		finalizerToken:      finalizerToken,
	}
	d.initOpCache(ctx)
	return d
}

var _ NamespacedResourcesDeleterInterface = &namespacedResourcesDeleter{}

// namespacedResourcesDeleter is used to delete all resources in a given namespace.
type namespacedResourcesDeleter struct {
	// Client to manipulate the namespace.
	nsClient v1clientset.NamespaceInterface
	// Dynamic client to list and delete all namespaced resources.
	metadataClient metadata.Interface
	// Interface to get PodInterface.
	podsGetter v1clientset.PodsGetter
	// Cache of what operations are not supported on each group version resource.
	opCache             *operationNotSupportedCache
	discoverResourcesFn func() ([]*metav1.APIResourceList, error)
	// The finalizer token that should be removed from the namespace
	// when all resources in that namespace have been deleted.
	finalizerToken v1.FinalizerName
}

6.4.2.namespacedResourcesDeleter实现了Delete方法

  • 删除ns之前,会先删除ns中所有的资源,核心方法即为 deleteAllContent
// Delete deletes all resources in the given namespace.
// Before deleting resources:
//   - It ensures that deletion timestamp is set on the
//     namespace (does nothing if deletion timestamp is missing).
//   - Verifies that the namespace is in the "terminating" phase
//     (updates the namespace phase if it is not yet marked terminating)
//
// After deleting the resources:
// * It removes finalizer token from the given namespace.
//
// Returns an error if any of those steps fail.
// Returns ResourcesRemainingError if it deleted some resources but needs
// to wait for them to go away.
// Caller is expected to keep calling this until it succeeds.
func (d *namespacedResourcesDeleter) Delete(ctx context.Context, nsName string) error {
	// Multiple controllers may edit a namespace during termination
	// first get the latest state of the namespace before proceeding
	// if the namespace was deleted already, don't do anything
	namespace, err := d.nsClient.Get(ctx, nsName, metav1.GetOptions{})
	if err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	if namespace.DeletionTimestamp == nil {
		return nil
	}

	klog.FromContext(ctx).V(5).Info("Namespace controller - syncNamespace", "namespace", namespace.Name, "finalizerToken", d.finalizerToken)

	// ensure that the status is up to date on the namespace
	// if we get a not found error, we assume the namespace is truly gone
	namespace, err = d.retryOnConflictError(ctx, namespace, d.updateNamespaceStatusFunc)
	if err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}

	// the latest view of the namespace asserts that namespace is no longer deleting..
	if namespace.DeletionTimestamp.IsZero() {
		return nil
	}

	// return if it is already finalized.
	if finalized(namespace) {
		return nil
	}

	// there may still be content for us to remove
	estimate, err := d.deleteAllContent(ctx, namespace)
	if err != nil {
		return err
	}
	if estimate > 0 {
		return &ResourcesRemainingError{estimate}
	}

	// we have removed content, so mark it finalized by us
	_, err = d.retryOnConflictError(ctx, namespace, d.finalizeNamespace)
	if err != nil {
		// in normal practice, this should not be possible, but if a deployment is running
		// two controllers to do namespace deletion that share a common finalizer token it's
		// possible that a not found could occur since the other controller would have finished the delete.
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	return nil
}

// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent(ctx context.Context, ns *v1.Namespace) (int64, error) {
	namespace := ns.Name
	namespaceDeletedAt := *ns.DeletionTimestamp
	var errs []error
	conditionUpdater := namespaceConditionUpdater{}
	estimate := int64(0)
	logger := klog.FromContext(ctx)
	logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace)

	resources, err := d.discoverResourcesFn()
	if err != nil {
		// discovery errors are not fatal.  We often have some set of resources we can operate against even if we don't have a complete list
		errs = append(errs, err)
		conditionUpdater.ProcessDiscoverResourcesErr(err)
	}
	// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
	deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
	groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
	if err != nil {
		// discovery errors are not fatal.  We often have some set of resources we can operate against even if we don't have a complete list
		errs = append(errs, err)
		conditionUpdater.ProcessGroupVersionErr(err)
	}

	numRemainingTotals := allGVRDeletionMetadata{
		gvrToNumRemaining:        map[schema.GroupVersionResource]int{},
		finalizersToNumRemaining: map[string]int{},
	}
	for gvr := range groupVersionResources {
		gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(ctx, gvr, namespace, namespaceDeletedAt)
		if err != nil {
			// If there is an error, hold on to it but proceed with all the remaining
			// groupVersionResources.
			errs = append(errs, err)
			conditionUpdater.ProcessDeleteContentErr(err)
		}
		if gvrDeletionMetadata.finalizerEstimateSeconds > estimate {
			estimate = gvrDeletionMetadata.finalizerEstimateSeconds
		}
		if gvrDeletionMetadata.numRemaining > 0 {
			numRemainingTotals.gvrToNumRemaining[gvr] = gvrDeletionMetadata.numRemaining
			for finalizer, numRemaining := range gvrDeletionMetadata.finalizersToNumRemaining {
				if numRemaining == 0 {
					continue
				}
				numRemainingTotals.finalizersToNumRemaining[finalizer] = numRemainingTotals.finalizersToNumRemaining[finalizer] + numRemaining
			}
		}
	}
	conditionUpdater.ProcessContentTotals(numRemainingTotals)

	// we always want to update the conditions because if we have set a condition to "it worked" after it was previously, "it didn't work",
	// we need to reflect that information.  Recall that additional finalizers can be set on namespaces, so this finalizer may clear itself and
	// NOT remove the resource instance.
	if hasChanged := conditionUpdater.Update(ns); hasChanged {
		if _, err = d.nsClient.UpdateStatus(ctx, ns, metav1.UpdateOptions{}); err != nil {
			utilruntime.HandleError(fmt.Errorf("couldn't update status condition for namespace %q: %v", namespace, err))
		}
	}

	// if len(errs)==0, NewAggregate returns nil.
	err = utilerrors.NewAggregate(errs)
	logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace, "estimate", estimate, "err", err)
	return estimate, err
}

7.Namespace Controller 启动

7.1.Namespace Controller 启动核心方法

kubernetes/pkg/controller/namespace/namespace_controller.go

// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(ctx context.Context, workers int) {
	defer utilruntime.HandleCrash()
	defer nm.queue.ShutDown()
	logger := klog.FromContext(ctx)
	logger.Info("Starting namespace controller")
	defer logger.Info("Shutting down namespace controller")

	if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) {
		return
	}

	logger.V(5).Info("Starting workers of namespace controller")
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, nm.worker, time.Second)
	}
	<-ctx.Done()
}

  • 在namespace_controller.go中有一个Run方法,是 NamespaceController 的核心启动逻辑,用于启动命名空间控制器并管理其生命周期
    • 接受一个上下文 (ctx) 和工作线程数 (workers) 作为参数
    • 负责初始化控制器、启动工作线程并监听系统的终止信号
  • 核心逻辑
    • 首先,函数通过 defer 语句设置了两个清理操作:utilruntime.HandleCrash() 和 nm.queue.ShutDown()。HandleCrash 用于捕获运行时的恐慌(panic),防止程序崩溃,同时记录相关信息。ShutDown 则用于在控制器停止时关闭工作队列,确保资源被正确释放。
    • 接着,函数通过 klog.FromContext(ctx) 获取日志记录器,并记录控制器启动的日志信息。函数的最后一个 defer 语句确保在控制器关闭时记录一条日志,表明控制器已停止。
    • 随后,函数调用 cache.WaitForNamedCacheSync 方法等待缓存同步完成。nm.listerSynced 是一个检查缓存是否同步的函数。如果缓存未能成功同步,函数会立即返回。这一步非常重要,因为控制器依赖缓存中的数据来处理命名空间相关的事件,未同步的缓存可能导致错误的行为。
    • 在缓存同步完成后,函数记录一条日志,表明工作线程即将启动。然后,它启动指定数量的工作线程(由 workers 参数决定)。每个线程通过 goroutine 并发运行,并调用 wait.UntilWithContext 方法。wait.UntilWithContext 会以固定的时间间隔(这里是 1 秒)调用 nm.worker 方法,直到上下文被取消。
    • 最后,函数通过 <-ctx.Done() 阻塞,等待上下文的取消信号。当上下文被取消时,控制器会停止所有工作线程并执行清理操作。

7.2.kube-controller-manager为每个Controller都声明了描述器

kubernetes/cmd/kube-controller-manager/app/core.go

func newNamespaceControllerDescriptor() *ControllerDescriptor {
	return &ControllerDescriptor{
		name:     names.NamespaceController,
		aliases:  []string{"namespace"},
		initFunc: startNamespaceController,
	}
}

func startNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
	// the namespace cleanup controller is very chatty.  It makes lots of discovery calls and then it makes lots of delete calls
	// the ratelimiter negatively affects its speed.  Deleting 100 total items in a namespace (that's only a few of each resource
	// including events), takes ~10 seconds by default.
	nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
	nsKubeconfig.QPS *= 20
	nsKubeconfig.Burst *= 100
	namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
	return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
}

func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {

	metadataClient, err := metadata.NewForConfig(nsKubeconfig)
	if err != nil {
		return nil, true, err
	}

	discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources

	namespaceController := namespacecontroller.NewNamespaceController(
		ctx,
		namespaceKubeClient,
		metadataClient,
		discoverResourcesFn,
		controllerContext.InformerFactory.Core().V1().Namespaces(),
		controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
		v1.FinalizerKubernetes,
	)
	go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs))

	return nil, true, nil
}
  • kube-controller-manager为每个controller都声明了 描述器ControllerDescriptor,记录controller的名称、别名、启动方法等
  • newNamespaceControllerDescriptor 函数返回一个 ControllerDescriptor 对象
    • 用于描述命名空间控制器的基本信息。
    • name 字段指定了控制器的名称为 NamespaceController
    • aliases 字段提供了控制器的别名(如 "namespace"
    • initFunc 字段指向了控制器的初始化函数 startNamespaceController
    • 这种描述符模式便于在控制器框架中注册和管理多个控制器
  • startNamespaceController 函数负责初始化命名空间控制器的客户端配置并调用startModifiedNamespaceController
    • 由于命名空间清理控制器需要频繁地进行资源发现和删除操作,因此函数对默认的客户端配置进行了调整,将 QPS(每秒请求数)放大了 20 倍,将 Burst(突发请求数)放大了 100 倍。
    • 这种调整可以显著提高控制器的操作速度,特别是在需要删除大量资源时。随后,函数通过 clientset.NewForConfigOrDie 创建了一个 Kubernetes 客户端,并将其传递给下一个函数
  • startModifiedNamespaceController 函数完成了命名空间控制器的核心初始化逻辑。
    • 使用 metadata.NewForConfig 创建了一个元数据客户端,用于处理资源的元信息。如果创建失败,函数会返回错误。
    • 定义了一个 discoverResourcesFn 函数,用于动态发现集群中支持的命名空间资源。
    • 函数调用 namespacecontroller.NewNamespaceController 创建了一个命名空间控制器实例,并将必要的依赖注入到控制器中,包括 Kubernetes 客户端、元数据客户端、资源发现函数、命名空间的共享信息工厂等。
    • 最后控制器通过 Run 方法启动,指定的工作线程数由 controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs 决定。
      • controllerContext.ComponentConfig 类型为 kubectrlmgrconfig.KubeControllerManagerConfiguration,其实就是kube-controller-manager的配置,其中包含NamespaceController的配置,可以指定worker数量

7.3.注册Namespace Controller描述器

kubernetes/cmd/kube-controller-manager/app/controllermanager.go

  • controllermanager会在启动时,将注册的所有Controller都启动起来
// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
// paired to their ControllerDescriptor wrapper object that includes InitFunc.
// This allows for structured downstream composition and subdivision.
func NewControllerDescriptors() map[string]*ControllerDescriptor {
	controllers := map[string]*ControllerDescriptor{}
	aliases := sets.NewString()

	// All the controllers must fulfil common constraints, or else we will explode.
	register := func(controllerDesc *ControllerDescriptor) {
		if controllerDesc == nil {
			panic("received nil controller for a registration")
		}
		name := controllerDesc.Name()
		if len(name) == 0 {
			panic("received controller without a name for a registration")
		}
		if _, found := controllers[name]; found {
			panic(fmt.Sprintf("controller name %q was registered twice", name))
		}
		if controllerDesc.GetInitFunc() == nil {
			panic(fmt.Sprintf("controller %q does not have an init function", name))
		}

		for _, alias := range controllerDesc.GetAliases() {
			if aliases.Has(alias) {
				panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
			}
			aliases.Insert(alias)
		}

		controllers[name] = controllerDesc
	}

	// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
	// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
	// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
	// The only known special case is the ServiceAccountTokenController which *must* be started
	// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
	// special controllers.
	register(newServiceAccountTokenControllerDescriptor(nil))

	...
	register(newNamespaceControllerDescriptor())
	register(newServiceAccountControllerDescriptor())
	...

	for _, alias := range aliases.UnsortedList() {
		if _, ok := controllers[alias]; ok {
			panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
		}
	}

	return controllers
}

8.总结

通过上述对NamespaceController的分析,我们应该对kubernetes内置controller的编写过程很清晰了

  • 首先学习了如何声明 资源的struct,并添加一些code generation tag
    • 通过code-generation生成器,生成深拷贝、版本管理、swagger、proto/pb序列化、informer、clientset等代码
  • 还深入学习了informer机制 的代码实现,结合Informer框架的理论知识,有了更深的理解
    • informer机制中,每种资源都会有自己的 InfromerInterface,包含Infromer()、Lister()两个方法,分别用于获取Informer对象、Lister对象
      • Informer对象 负责watch+交互 环境资源
      • Lister对象 相当于Indexer,负责与本地缓存交互,包括 List()、Get()
  • 分析了client-go把所有资源client整合在一起的过程
    • Clientset包含了所有GV的Client,每个GV的Client 都包含自己gv下所有资源的Client
    • 每个gvr的client包含了 Create、List、Update、Apply 等方法,用于操作环境资源
  • 详细讲解了namespace controller的代码
    • 从NewNamespaceController开始,讲解了eventhandler设置、workqueue的配置、key的入队、worker的处理逻辑、删除ns前其他资源的回收
    • 最后还讲解了kube-controller-manager对内置controller的启动过程
  • 通过上述学习,预期达到的效果
    • 对informer、client-go内在逻辑更清晰,后续使用时更清楚
    • 具备对 kubernetes 其他的内置controller学习的能力。除了controller 核心代码的不同,其他的内容应该都大同小异,也正因如此,其他的代码才会在后面被封装成自动生成的代码,有了kubebuilder、controller-runtime等控制器框架
    • 为 CRD Controller 编写提供范本

网站公告

今日签到

点亮在社区的每一天
去签到