- kubernetes学习系列快捷链接
- Kubernetes架构原则和对象设计(一)
- Kubernetes架构原则和对象设计(二)
- Kubernetes架构原则和对象设计(三)
- Kubernetes控制平面组件:etcd(一)
- Kubernetes控制平面组件:etcd(二)
- Kubernetes控制平面组件:API Server详解(一)
- Kubernetes控制平面组件:API Server详解(二)
- Kubernetes控制平面组件:调度器Scheduler(一)
- Kubernetes控制平面组件:调度器Scheduler(二)
- Kubernetes控制平面组件:Controller Manager详解
- Kubernetes控制平面组件:Controller Manager 之 内置Controller详解
- Kubernetes控制平面组件:Controller Manager 之 NamespaceController 全方位讲解
本文是kubernetes的控制面组件ControllerManager系列文章,本篇 以Namespace Controller 为例,详细讲述了kubernetes内置Controller的编写过程,包含kubernetes 内置controller struct 声明,Informer机制原理及代码分析、client-go Clientset 整合资源client的原理、kube-controller-manager如何启动内置controller等。另外对Namespace Controller 代码进行完整分析
- 希望大家多多 点赞 关注 评论 收藏,作者会更有动力继续编写技术文章
1.Namespace Struct
kubernetes/staging/src/k8s.io/api/core/v1/types.go
- 首先定义 namespace struct,包括spec、status等
// FinalizerName is the name identifying a finalizer during namespace lifecycle.
type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here or
// in metav1.
const (
FinalizerKubernetes FinalizerName = "kubernetes"
)
// NamespaceSpec describes the attributes on a Namespace.
type NamespaceSpec struct {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage.
// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
// +optional
// +listType=atomic
Finalizers []FinalizerName `json:"finalizers,omitempty" protobuf:"bytes,1,rep,name=finalizers,casttype=FinalizerName"`
}
// NamespaceStatus is information about the current status of a Namespace.
type NamespaceStatus struct {
// Phase is the current lifecycle phase of the namespace.
// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
// +optional
Phase NamespacePhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=NamespacePhase"`
// Represents the latest available observations of a namespace's current state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=map
// +listMapKey=type
Conditions []NamespaceCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,2,rep,name=conditions"`
}
// +enum
type NamespacePhase string
// These are the valid phases of a namespace.
const (
// NamespaceActive means the namespace is available for use in the system
NamespaceActive NamespacePhase = "Active"
// NamespaceTerminating means the namespace is undergoing graceful termination
NamespaceTerminating NamespacePhase = "Terminating"
)
const (
// NamespaceTerminatingCause is returned as a defaults.cause item when a change is
// forbidden due to the namespace being terminated.
NamespaceTerminatingCause metav1.CauseType = "NamespaceTerminating"
)
type NamespaceConditionType string
// These are built-in conditions of a namespace.
const (
// NamespaceDeletionDiscoveryFailure contains information about namespace deleter errors during resource discovery.
NamespaceDeletionDiscoveryFailure NamespaceConditionType = "NamespaceDeletionDiscoveryFailure"
// NamespaceDeletionContentFailure contains information about namespace deleter errors during deletion of resources.
NamespaceDeletionContentFailure NamespaceConditionType = "NamespaceDeletionContentFailure"
// NamespaceDeletionGVParsingFailure contains information about namespace deleter errors parsing GV for legacy types.
NamespaceDeletionGVParsingFailure NamespaceConditionType = "NamespaceDeletionGroupVersionParsingFailure"
// NamespaceContentRemaining contains information about resources remaining in a namespace.
NamespaceContentRemaining NamespaceConditionType = "NamespaceContentRemaining"
// NamespaceFinalizersRemaining contains information about which finalizers are on resources remaining in a namespace.
NamespaceFinalizersRemaining NamespaceConditionType = "NamespaceFinalizersRemaining"
)
// NamespaceCondition contains details about state of namespace.
type NamespaceCondition struct {
// Type of namespace controller condition.
Type NamespaceConditionType `json:"type" protobuf:"bytes,1,opt,name=type,casttype=NamespaceConditionType"`
// Status of the condition, one of True, False, Unknown.
Status ConditionStatus `json:"status" protobuf:"bytes,2,opt,name=status,casttype=ConditionStatus"`
// Last time the condition transitioned from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,4,opt,name=lastTransitionTime"`
// Unique, one-word, CamelCase reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,5,opt,name=reason"`
// Human-readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"`
}
// +genclient
// +genclient:nonNamespaced
// +genclient:skipVerbs=deleteCollection
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:prerelease-lifecycle-gen:introduced=1.0
// Namespace provides a scope for Names.
// Use of multiple namespaces is optional.
type Namespace struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Spec defines the behavior of the Namespace.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Spec NamespaceSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Status describes the current status of a Namespace.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Status NamespaceStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:prerelease-lifecycle-gen:introduced=1.0
// NamespaceList is a list of Namespaces.
type NamespaceList struct {
metav1.TypeMeta `json:",inline"`
// Standard list metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
// +optional
metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Items is the list of Namespace objects in the list.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
Items []Namespace `json:"items" protobuf:"bytes,2,rep,name=items"`
}
- 代码中涉及的注释tag:
注释标签 | 功能描述 | 生成内容/行为 |
---|---|---|
// +enum |
标记类型为枚举类型 | 生成枚举类型代码,如 NamespacePhase 的 Active 和 Terminating 枚举值 |
// +genclient |
声明需要生成客户端代码 | 生成资源的 CRUD 客户端方法(如 Create 、Update 、Delete ) |
// +genclient:nonNamespaced |
标记资源为集群范围(非命名空间内资源) | 生成的客户端方法不包含命名空间参数(如 Namespace 资源) |
// +genclient:skipVerbs=deleteCollection |
跳过生成指定动词的客户端方法 | 不生成 DeleteCollection 方法(避免批量删除操作) |
// +k8s:deepcopy-gen:interfaces=... |
启用深拷贝并实现指定接口 | 生成实现 runtime.Object 接口的 DeepCopy() 方法,支持序列化和反序列化 |
// +k8s:prerelease-lifecycle-gen:introduced=1.0 |
标记 API 资源的版本生命周期阶段 | 记录 API 的版本演进(如该资源在 1.0 版本引入) |
// +optional |
标记字段为可选字段 | 生成的 OpenAPI Schema 中该字段非必填,JSON 编码时忽略空值 |
// +listType=atomic |
指定列表字段为原子性合并策略 | 列表字段在更新时会被整体替换(而非合并) |
// +listType=map |
指定列表字段为映射合并策略 | 列表字段按 listMapKey 指定的键进行合并(如 type ) |
// +listMapKey=type |
定义列表合并的主键字段 | 生成基于 type 字段的合并逻辑(用于 Conditions 等字段) |
// +patchMergeKey=type |
定义补丁操作时的合并键 | 指定 type 字段为合并条件(与 patchStrategy=merge 配合使用) |
// +patchStrategy=merge |
指定补丁操作为合并策略 | 字段更新时根据 patchMergeKey 合并而非覆盖 |
分类说明
代码生成类
// +genclient
、// +enum
、// +k8s:deepcopy-gen
等用于控制代码生成逻辑。例如:生成客户端方法、深拷贝方法、枚举类型代码。
字段行为类
// +optional
、// +listType=atomic
、// +patchStrategy=merge
等定义字段的序列化、更新和合并行为。例如:
Conditions
字段通过listType=map
和listMapKey=type
实现按类型合并。API 生命周期类
// +k8s:prerelease-lifecycle-gen
标记 API 版本状态(如 Alpha、Beta、Stable)。例如:
Namespace
和NamespaceList
均标记为 1.0 版本引入。核心场景示例
Finalizer 机制:
NamespaceSpec
中的Finalizers
字段通过// +listType=atomic
确保在删除命名空间时,Finalizer 列表的更新是原子性的。状态条件管理:
NamespaceStatus.Conditions
通过// +listType=map
和// +listMapKey=type
实现按条件类型(如NamespaceDeletionFailure
)合并状态信息。资源删除保护:
FinalizerKubernetes
常量标记内置 Finalizer,防止命名空间被直接删除,需等待关联资源清理完成。
2.Namespace register
kubernetes/staging/src/k8s.io/api/core/v1/register.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// GroupName is the group name use in this package
const GroupName = ""
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// We only register manually written functions here. The registration of the
// generated functions takes place in the generated files. The separation
// makes the code compile even when the generated files are missing.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
...
&Namespace{},
&NamespaceList{},
...
)
// Add common types
scheme.AddKnownTypes(SchemeGroupVersion, &metav1.Status{})
// Add the watch version that applies
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
3.Namespace 代码生成器生成代码
3.1.zz_generated.deepcopy.go
kubernetes/staging/src/k8s.io/api/core/v1/zz_generated.deepcopy.go
- zz_generated.deepcopy 代码在 api 项目中维护
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Namespace) DeepCopyInto(out *Namespace) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Namespace.
func (in *Namespace) DeepCopy() *Namespace {
if in == nil {
return nil
}
out := new(Namespace)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Namespace) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceCondition) DeepCopyInto(out *NamespaceCondition) {
*out = *in
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceCondition.
func (in *NamespaceCondition) DeepCopy() *NamespaceCondition {
if in == nil {
return nil
}
out := new(NamespaceCondition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceList) DeepCopyInto(out *NamespaceList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Namespace, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceList.
func (in *NamespaceList) DeepCopy() *NamespaceList {
if in == nil {
return nil
}
out := new(NamespaceList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NamespaceList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceSpec) DeepCopyInto(out *NamespaceSpec) {
*out = *in
if in.Finalizers != nil {
in, out := &in.Finalizers, &out.Finalizers
*out = make([]FinalizerName, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceSpec.
func (in *NamespaceSpec) DeepCopy() *NamespaceSpec {
if in == nil {
return nil
}
out := new(NamespaceSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceStatus) DeepCopyInto(out *NamespaceStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]NamespaceCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceStatus.
func (in *NamespaceStatus) DeepCopy() *NamespaceStatus {
if in == nil {
return nil
}
out := new(NamespaceStatus)
in.DeepCopyInto(out)
return out
}
3.2.zz_generated.prerelease-lifecycle.go
kubernetes/staging/src/k8s.io/api/core/v1/zz_generated.prerelease-lifecycle.go
- zz_generated.prerelease-lifecycle 代码在 api 项目中维护
// APILifecycleIntroduced is an autogenerated function, returning the release in which the API struct was introduced as int versions of major and minor for comparison.
// It is controlled by "k8s:prerelease-lifecycle-gen:introduced" tags in types.go.
func (in *Namespace) APILifecycleIntroduced() (major, minor int) {
return 1, 0
}
// APILifecycleIntroduced is an autogenerated function, returning the release in which the API struct was introduced as int versions of major and minor for comparison.
// It is controlled by "k8s:prerelease-lifecycle-gen:introduced" tags in types.go.
func (in *NamespaceList) APILifecycleIntroduced() (major, minor int) {
return 1, 0
}
3.3.types_swagger_doc_generated.go
kubernetes/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go
- types_swagger_doc_generated 代码在 api 项目中维护
var map_Namespace = map[string]string{
"": "Namespace provides a scope for Names. Use of multiple namespaces is optional.",
"metadata": "Standard object's metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata",
"spec": "Spec defines the behavior of the Namespace. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status",
"status": "Status describes the current status of a Namespace. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status",
}
func (Namespace) SwaggerDoc() map[string]string {
return map_Namespace
}
var map_NamespaceCondition = map[string]string{
"": "NamespaceCondition contains details about state of namespace.",
"type": "Type of namespace controller condition.",
"status": "Status of the condition, one of True, False, Unknown.",
"lastTransitionTime": "Last time the condition transitioned from one status to another.",
"reason": "Unique, one-word, CamelCase reason for the condition's last transition.",
"message": "Human-readable message indicating details about last transition.",
}
func (NamespaceCondition) SwaggerDoc() map[string]string {
return map_NamespaceCondition
}
var map_NamespaceList = map[string]string{
"": "NamespaceList is a list of Namespaces.",
"metadata": "Standard list metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
"items": "Items is the list of Namespace objects in the list. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
}
func (NamespaceList) SwaggerDoc() map[string]string {
return map_NamespaceList
}
var map_NamespaceSpec = map[string]string{
"": "NamespaceSpec describes the attributes on a Namespace.",
"finalizers": "Finalizers is an opaque list of values that must be empty to permanently remove object from storage. More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/",
}
func (NamespaceSpec) SwaggerDoc() map[string]string {
return map_NamespaceSpec
}
var map_NamespaceStatus = map[string]string{
"": "NamespaceStatus is information about the current status of a Namespace.",
"phase": "Phase is the current lifecycle phase of the namespace. More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/",
"conditions": "Represents the latest available observations of a namespace's current state.",
}
func (NamespaceStatus) SwaggerDoc() map[string]string {
return map_NamespaceStatus
}
3.4.generated.proto.go
kubernetes/staging/src/k8s.io/api/core/v1/generated.proto
- generated.proto 代码在 api 项目中维护
// Namespace provides a scope for Names.
// Use of multiple namespaces is optional.
message Namespace {
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
optional .k8s.io.apimachinery.pkg.apis.meta.v1.ObjectMeta metadata = 1;
// Spec defines the behavior of the Namespace.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
optional NamespaceSpec spec = 2;
// Status describes the current status of a Namespace.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
optional NamespaceStatus status = 3;
}
// NamespaceCondition contains details about state of namespace.
message NamespaceCondition {
// Type of namespace controller condition.
optional string type = 1;
// Status of the condition, one of True, False, Unknown.
optional string status = 2;
// Last time the condition transitioned from one status to another.
// +optional
optional .k8s.io.apimachinery.pkg.apis.meta.v1.Time lastTransitionTime = 4;
// Unique, one-word, CamelCase reason for the condition's last transition.
// +optional
optional string reason = 5;
// Human-readable message indicating details about last transition.
// +optional
optional string message = 6;
}
// NamespaceList is a list of Namespaces.
message NamespaceList {
// Standard list metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
// +optional
optional .k8s.io.apimachinery.pkg.apis.meta.v1.ListMeta metadata = 1;
// Items is the list of Namespace objects in the list.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
repeated Namespace items = 2;
}
// NamespaceSpec describes the attributes on a Namespace.
message NamespaceSpec {
// Finalizers is an opaque list of values that must be empty to permanently remove object from storage.
// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
// +optional
// +listType=atomic
repeated string finalizers = 1;
}
// NamespaceStatus is information about the current status of a Namespace.
message NamespaceStatus {
// Phase is the current lifecycle phase of the namespace.
// More info: https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
// +optional
optional string phase = 1;
// Represents the latest available observations of a namespace's current state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=map
// +listMapKey=type
repeated NamespaceCondition conditions = 2;
}
3.5.generated.pb.go
kubernetes/staging/src/k8s.io/api/core/v1/generated.pb.go
- generated.pb 代码在 api 项目中维护
func (m *Namespace) Reset() { *m = Namespace{} }
func (*Namespace) ProtoMessage() {}
func (*Namespace) Descriptor() ([]byte, []int) {
return fileDescriptor_6c07b07c062484ab, []int{91}
}
func (m *Namespace) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Namespace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (m *Namespace) XXX_Merge(src proto.Message) {
xxx_messageInfo_Namespace.Merge(m, src)
}
func (m *Namespace) XXX_Size() int {
return m.Size()
}
func (m *Namespace) XXX_DiscardUnknown() {
xxx_messageInfo_Namespace.DiscardUnknown(m)
}
var xxx_messageInfo_Namespace proto.InternalMessageInfo
func (m *NamespaceCondition) Reset() { *m = NamespaceCondition{} }
func (*NamespaceCondition) ProtoMessage() {}
func (*NamespaceCondition) Descriptor() ([]byte, []int) {
return fileDescriptor_6c07b07c062484ab, []int{92}
}
func (m *NamespaceCondition) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NamespaceCondition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (m *NamespaceCondition) XXX_Merge(src proto.Message) {
xxx_messageInfo_NamespaceCondition.Merge(m, src)
}
func (m *NamespaceCondition) XXX_Size() int {
return m.Size()
}
func (m *NamespaceCondition) XXX_DiscardUnknown() {
xxx_messageInfo_NamespaceCondition.DiscardUnknown(m)
}
var xxx_messageInfo_NamespaceCondition proto.InternalMessageInfo
func (m *NamespaceList) Reset() { *m = NamespaceList{} }
func (*NamespaceList) ProtoMessage() {}
func (*NamespaceList) Descriptor() ([]byte, []int) {
return fileDescriptor_6c07b07c062484ab, []int{93}
}
func (m *NamespaceList) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NamespaceList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (m *NamespaceList) XXX_Merge(src proto.Message) {
xxx_messageInfo_NamespaceList.Merge(m, src)
}
func (m *NamespaceList) XXX_Size() int {
return m.Size()
}
func (m *NamespaceList) XXX_DiscardUnknown() {
xxx_messageInfo_NamespaceList.DiscardUnknown(m)
}
var xxx_messageInfo_NamespaceList proto.InternalMessageInfo
func (m *NamespaceSpec) Reset() { *m = NamespaceSpec{} }
func (*NamespaceSpec) ProtoMessage() {}
func (*NamespaceSpec) Descriptor() ([]byte, []int) {
return fileDescriptor_6c07b07c062484ab, []int{94}
}
func (m *NamespaceSpec) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NamespaceSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (m *NamespaceSpec) XXX_Merge(src proto.Message) {
xxx_messageInfo_NamespaceSpec.Merge(m, src)
}
func (m *NamespaceSpec) XXX_Size() int {
return m.Size()
}
func (m *NamespaceSpec) XXX_DiscardUnknown() {
xxx_messageInfo_NamespaceSpec.DiscardUnknown(m)
}
var xxx_messageInfo_NamespaceSpec proto.InternalMessageInfo
func (m *NamespaceStatus) Reset() { *m = NamespaceStatus{} }
func (*NamespaceStatus) ProtoMessage() {}
func (*NamespaceStatus) Descriptor() ([]byte, []int) {
return fileDescriptor_6c07b07c062484ab, []int{95}
}
func (m *NamespaceStatus) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *NamespaceStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (m *NamespaceStatus) XXX_Merge(src proto.Message) {
xxx_messageInfo_NamespaceStatus.Merge(m, src)
}
func (m *NamespaceStatus) XXX_Size() int {
return m.Size()
}
func (m *NamespaceStatus) XXX_DiscardUnknown() {
xxx_messageInfo_NamespaceStatus.DiscardUnknown(m)
}
var xxx_messageInfo_NamespaceStatus proto.InternalMessageInfo
4.Namespace Informer机制代码
- Informer 代码在 client-go 项目中维护
4.1.Namespace Informer代码
kubernetes/staging/src/k8s.io/client-go/informers/core/v1/namespace.go
package v1
import (
context "context"
time "time"
apicorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
internalinterfaces "k8s.io/client-go/informers/internalinterfaces"
kubernetes "k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/listers/core/v1"
cache "k8s.io/client-go/tools/cache"
)
// NamespaceInformer provides access to a shared informer and lister for
// Namespaces.
type NamespaceInformer interface {
Informer() cache.SharedIndexInformer
Lister() corev1.NamespaceLister
}
type namespaceInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// NewNamespaceInformer constructs a new informer for Namespace type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewNamespaceInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredNamespaceInformer(client, resyncPeriod, indexers, nil)
}
// NewFilteredNamespaceInformer constructs a new informer for Namespace type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredNamespaceInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Namespaces().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Namespaces().Watch(context.TODO(), options)
},
},
&apicorev1.Namespace{},
resyncPeriod,
indexers,
)
}
func (f *namespaceInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredNamespaceInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *namespaceInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&apicorev1.Namespace{}, f.defaultInformer)
}
func (f *namespaceInformer) Lister() corev1.NamespaceLister {
return corev1.NewNamespaceLister(f.Informer().GetIndexer())
}
4.2.Namespace Lister代码
kubernetes/staging/src/k8s.io/client-go/listers/core/v1/namespace.go
- client-go对 lister的代码统一放在 listers 目录下
package v1
import (
corev1 "k8s.io/api/core/v1"
labels "k8s.io/apimachinery/pkg/labels"
listers "k8s.io/client-go/listers"
cache "k8s.io/client-go/tools/cache"
)
// NamespaceLister helps list Namespaces.
// All objects returned here must be treated as read-only.
type NamespaceLister interface {
// List lists all Namespaces in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*corev1.Namespace, err error)
// Get retrieves the Namespace from the index for a given name.
// Objects returned here must be treated as read-only.
Get(name string) (*corev1.Namespace, error)
NamespaceListerExpansion
}
// namespaceLister implements the NamespaceLister interface.
type namespaceLister struct {
listers.ResourceIndexer[*corev1.Namespace]
}
// NewNamespaceLister returns a new NamespaceLister.
func NewNamespaceLister(indexer cache.Indexer) NamespaceLister {
return &namespaceLister{listers.New[*corev1.Namespace](indexer, corev1.Resource("namespace"))}
}
4.3.代码讲解
Informer代码
- NamespaceInformer 接口,包含 Informer()、Lister() 两个方法
- Informer():用于获取 Namespace Informer,kubernetes内部资源的 Informer 都统一放在缓存中,所以这个方法是从缓存中获取对应的Informer,获取不到就创建放入缓存再返回
- Lister():用于获取一个 NamespaceLister,NamespaceLister是用于从共享缓存中获取Namespace对象的,避免从apiserver读取
- namespaceInformer 结构体 实现了 NamespaceInformer 接口,并且包含 factory、tweakListOptions两个参数
- factory:工厂接口,包含2个方法,其中InformerFor方法是创建api对象的通用方法,namespaceInformer可以通过这个factory的InformerFor方法,快捷简便的创建一个cache.SharedIndexInformer对象
- tweakListOptions:对option的调整函数,可以定义通用的ns调整方法,在需要的时机对ns对象做调整
- NewNamespaceInformer/NewFilteredNamespaceInformer包级函数(静态方法):创建一个NamespaceInformer,类型为 cache.SharedIndexInformer,用于操作ns 的ListAndWatch
- namespaceInformer 的接收器函数(成员方法):
- Informer():实现NamespaceInformer 接口方法,获取缓存中的 namespace Informer。调用namespaceInformer中factory的InformerFor方法,即可获取缓存
cache.SharedIndexInformer
中的namespace Informer,如果缓存中不存在,就会创建一个加入缓存并返回 - Lister():实现 NamespaceInformer 接口方法,New一个新的 namespace Lister
- Informer():实现NamespaceInformer 接口方法,获取缓存中的 namespace Informer。调用namespaceInformer中factory的InformerFor方法,即可获取缓存
- NamespaceInformer 接口,包含 Informer()、Lister() 两个方法
Lister代码
- NamespaceLister 接口,包含 List()、Get() 两个方法
- List():List Namespace 对象
- Get():Get namespace 对象
- namespaceLister 结构体,实现NamespaceLister 接口,不过它不是自己实现List、Get方法,而是通过一个 资源索引器
listers.ResourceIndexer
,拿到索引器中的namespace索引器listers.ResourceIndexer[*corev1.Namespace]
。这个索引器实现了List()、Get() 两个方法- 因此创建namespaceLister对象时,需要传入一个
cache.Indexer
- 因此创建namespaceLister对象时,需要传入一个
- NewNamespaceLister 包级函数:创建一个NamespaceLister,入参
cache.Indexer
,方法逻辑为 根据cache.Indexer创建一个listers.ResourceIndexer[*corev1.Namespace]
,封装为 NamespaceLister 对象后返回
- NamespaceLister 接口,包含 List()、Get() 两个方法
注意事项
- 从上面代码细节可以看出,Informer是存在缓存中的,创建一次即可,后续都会直接复用。而Lister对象没有放入缓存,每次调用 namespaceInformer.Lister(),都会创建一个新的Lister,所以我们在编写CRD Controller时,有时会创建一个 Lister 放在 CRDController中,之后就都可以使用了
5.client-go 如何实现 namespace 资源与kubernetes集群的交互?
5.1.Clientset 维护了所有 GV 的 Client
kubernetes/staging/src/k8s.io/client-go/kubernetes/clientset.go
- client-go中,所有的内置资源client都属于clientset,在client-go/kubernetes/clientset.go中,定义了统一的Clientset接口Interface,每个GV都有一个对应的方法。Namespace属于core/v1,所以我们关注CoreV1即可
- Clientset 结构体,内部包含了所有GV的 client(
core/v1的client为*corev1.CoreV1Client
) - Clientset 实现了 Interface 接口,CoreV1方法就是返回 coreV1 的client
- core/v1 的 client 为 corev1.CoreV1Client 类型
package kubernetes
import (
fmt "fmt"
http "net/http"
...
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
...
)
type Interface interface {
Discovery() discovery.DiscoveryInterface
...
CoreV1() corev1.CoreV1Interface
...
}
// Clientset contains the clients for groups.
type Clientset struct {
*discovery.DiscoveryClient
...
coreV1 *corev1.CoreV1Client
...
}
...
// CoreV1 retrieves the CoreV1Client
func (c *Clientset) CoreV1() corev1.CoreV1Interface {
return c.coreV1
}
...
// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.UserAgent == "" {
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
}
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
// NewForConfigAndClient creates a new Clientset for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfigAndClient will generate a rate-limiter in configShallowCopy.
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
if configShallowCopy.Burst <= 0 {
return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
}
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var cs Clientset
var err error
...
cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
...
}
...
// New creates a new Clientset for the given RESTClient.
func New(c rest.Interface) *Clientset {
var cs Clientset
...
cs.coreV1 = corev1.New(c)
...
cs.DiscoveryClient = discovery.NewDiscoveryClient(c)
return &cs
}
5.2.corev1 Client
kubernetes/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/core_client.go
- corev1 Client 包含一个 CoreV1Interface 接口,该接口继承了 所有的资源接口,包括 Namespace 的接口 NamespacesGetter 接口
- CoreV1Client 结构体,实现了 CoreV1Interface 接口,继而实现了所有的资源接口,包括 NamespacesGetter 接口。
NamespacesGetter
接口只有一个方法Namespaces()
package v1
import (
http "net/http"
corev1 "k8s.io/api/core/v1"
scheme "k8s.io/client-go/kubernetes/scheme"
rest "k8s.io/client-go/rest"
)
type CoreV1Interface interface {
RESTClient() rest.Interface
...
NamespacesGetter
...
}
// CoreV1Client is used to interact with features provided by the group.
type CoreV1Client struct {
restClient rest.Interface
}
...
func (c *CoreV1Client) Namespaces() NamespaceInterface {
return newNamespaces(c)
}
...
// New creates a new CoreV1Client for the given RESTClient.
func New(c rest.Interface) *CoreV1Client {
return &CoreV1Client{c}
}
func setConfigDefaults(config *rest.Config) error {
gv := corev1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/api"
config.NegotiatedSerializer = rest.CodecFactoryForGeneratedClient(scheme.Scheme, scheme.Codecs).WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *CoreV1Client) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}
5.3.NamespacesGetter接口
kubernetes/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/namespace.go
NamespacesGetter
接口只有一个方法Namespaces()
,返回值为NamespaceInterface
接口类型NamespaceInterface
接口包含了操作namespace的所有方法:Create、Update、UpdateStatus、Delete、Get、List、Watch、Patch、Apply、ApplyStatus
- namespaces 结构体 实现了
NamespacesGetter
接口,所以 CoreV1Client 在实现NamespacesGetter时,其实就是返回了一个namespaces类型的对象,该对象就具有操作namespace的所有方法- namespaces 结构体通过嵌入 gentype.ClientWithListAndApply(泛型客户端),自动继承其所有方法。ClientWithListAndApply 实现了 NamespaceInterface 接口的方法,则 namespaces 也会隐式实现该接口。
- ClientWithListAndApply 封装了对 Kubernetes API 的底层操作(如 List、Apply 等方法),通过泛型参数 *corev1.Namespace、*corev1.NamespaceList 等指定资源类型
package v1
import (
context "context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
applyconfigurationscorev1 "k8s.io/client-go/applyconfigurations/core/v1"
gentype "k8s.io/client-go/gentype"
scheme "k8s.io/client-go/kubernetes/scheme"
)
// NamespacesGetter has a method to return a NamespaceInterface.
// A group's client should implement this interface.
type NamespacesGetter interface {
Namespaces() NamespaceInterface
}
// NamespaceInterface has methods to work with Namespace resources.
type NamespaceInterface interface {
Create(ctx context.Context, namespace *corev1.Namespace, opts metav1.CreateOptions) (*corev1.Namespace, error)
Update(ctx context.Context, namespace *corev1.Namespace, opts metav1.UpdateOptions) (*corev1.Namespace, error)
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
UpdateStatus(ctx context.Context, namespace *corev1.Namespace, opts metav1.UpdateOptions) (*corev1.Namespace, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Namespace, error)
List(ctx context.Context, opts metav1.ListOptions) (*corev1.NamespaceList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *corev1.Namespace, err error)
Apply(ctx context.Context, namespace *applyconfigurationscorev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *corev1.Namespace, err error)
// Add a +genclient:noStatus comment above the type to avoid generating ApplyStatus().
ApplyStatus(ctx context.Context, namespace *applyconfigurationscorev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *corev1.Namespace, err error)
NamespaceExpansion
}
// namespaces implements NamespaceInterface
type namespaces struct {
*gentype.ClientWithListAndApply[*corev1.Namespace, *corev1.NamespaceList, *applyconfigurationscorev1.NamespaceApplyConfiguration]
}
// newNamespaces returns a Namespaces
func newNamespaces(c *CoreV1Client) *namespaces {
return &namespaces{
gentype.NewClientWithListAndApply[*corev1.Namespace, *corev1.NamespaceList, *applyconfigurationscorev1.NamespaceApplyConfiguration](
"namespaces",
c.RESTClient(),
scheme.ParameterCodec,
"",
func() *corev1.Namespace { return &corev1.Namespace{} },
func() *corev1.NamespaceList { return &corev1.NamespaceList{} },
gentype.PrefersProtobuf[*corev1.Namespace](),
),
}
}
5.4.总结
- 现在再看这张图,就知道为什么 很多Controller编写的时候,都会自己持有一个 kubernetes.Interface 接口对象,用于操作环境内置资源。
- 创建一个 kubernetes.Interface,其实就是Clientset对象
- 调用 Clientset 对象的CoreV1(),即可获得一个CoreV1Interface,其实就是CoreV1Client对象
- 调用 CoreV1Client 对象的Namespaces(),即可获取一个操作Namespace的对象,即namespaces对象
- namespaces对象 具有诸多方法,用于操作环境中的ns。如:Create、Update等
6.Namespace Controller代码
- 前置学习中,我们知道controller informer内部架构是这样,所以new controller的时候,我们需要提供:event handlers、workqueue、processNextItem、worker、indexer。
6.1.声明 NamespaceController struct
kubernetes/pkg/controller/namespace/namespace_controller.go
- lister corelisters.NamespaceLister
- 用于从共享缓存中列出命名空间(Namespace)对象。Lister 其实就是indexer,包含Get、List方法,用于从缓存中只读访问资源,避免直接调用 API Server
- listerSynced cache.InformerSynced
- InformerSynced 是一个函数类型,返回一个布尔值,表示缓存是否已同步
- 确保在控制器开始处理事件之前,缓存中的数据已经与 API Server 同步
- 如果缓存未同步,控制器可能会处理过时或不完整的数据
- queue workqueue.TypedRateLimitingInterface[string]
- queue 是一个速率限制队列,其实就是workqueue,用于存储需要处理的命名空间key。
- 它支持对事件进行去重、延迟处理和速率限制。
- 例如,当命名空间被删除时,事件会被加入队列,控制器的 worker 会从队列中取出key并处理。
- namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
- ns资源删除器,用于删除命名空间中的所有资源的接口
- 当命名空间被标记为删除时,调用此字段的实现来清理命名空间中的所有资源(如 Pod、Service 等)。确保命名空间中的资源被安全地清理后,命名空间本身才能被删除
package namespace
// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
// lister that can list namespaces from a shared cache
lister corelisters.NamespaceLister
// returns true when the namespace cache is ready
listerSynced cache.InformerSynced
// namespaces that have been queued up for processing by workers
queue workqueue.TypedRateLimitingInterface[string]
// helper to delete all resources in the namespace when the namespace is deleted.
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}
6.2.创建 Namespace Controller
kubernetes/pkg/controller/namespace/namespace_controller.go
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
"k8s.io/klog/v2"
)
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
ctx context.Context,
kubeClient clientset.Interface,
metadataClient metadata.Interface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
finalizerToken v1.FinalizerName) *NamespaceController {
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
nsControllerRateLimiter(),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "namespace",
},
),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
}
// configure the namespace informer event handlers
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
resyncPeriod,
)
namespaceController.lister = namespaceInformer.Lister()
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
return namespaceController
}
6.2.1.入参分析
- 入参:
clientset.Interface
- 从上面导包看,
clientset "k8s.io/client-go/kubernetes"
,就是第5章讲的clientset,该client可用于操作ns
- 从上面导包看,
metadata.Interface
"k8s.io/client-go/metadata"
是 Kubernetes client-go 库中用于 轻量级元数据操作 的核心组件,其设计目标是通过 不依赖具体资源类型结构 的方式,实现对任意 Kubernetes 资源(包括 CRD)的元数据(如标签、注解、所有者引用等)进行高效操作
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
- 一个函数,用于发现集群中所有的 API 资源,动态发现集群中支持的资源类型。在清理命名空间时,确定需要删除的资源类型
- 实际上就类似通过 kubectl api-resources 找到所有的资源类型,确定哪些需要回收,哪些不需要回收
namespaceInformer coreinformers.NamespaceInformer
- 命名空间的 informer,用于监听和缓存命名空间的变化
- 提供命名空间的事件通知(如新增、更新、删除),提供命名空间的缓存访问接口,避免频繁调用 API Server
resyncPeriod time.Duration
- informer 的重新同步周期。定期触发重新同步,即使没有资源变化,也会重新处理资源
finalizerToken v1.FinalizerName
- 命名空间删除时使用的 finalizer 标识符。一般使用
kubernetes
- 命名空间删除时使用的 finalizer 标识符。一般使用
6.2.2.创建 NamespaceController
实例
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
nsControllerRateLimiter(),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "namespace",
},
),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(
ctx,
kubeClient.CoreV1().Namespaces(),
metadataClient,
kubeClient.CoreV1(),
discoverResourcesFn,
finalizerToken
),
}
// nsControllerRateLimiter is tuned for a faster than normal recycle time with default backoff speed and default overall
// requeing speed. We do this so that namespace cleanup is reliably faster and we know that the number of namespaces being
// deleted is smaller than total number of other namespace scoped resources in a cluster.
func nsControllerRateLimiter() workqueue.TypedRateLimiter[string] {
return workqueue.NewTypedMaxOfRateLimiter(
// this ensures that we retry namespace deletion at least every minute, never longer.
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 60*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
queue
:- 创建一个速率限制队列,作为workqueue,用于存储需待处理的Namespace key
- 使用
nsControllerRateLimiter
配置速率限制器,确保事件处理的速率可控
nsControllerRateLimiter方法
- nsControllerRateLimiter 是一个专门为命名空间清理任务设计的速率限制器函数。
- 它的目的是通过调整重试和请求速率,使命名空间的清理过程更加高效和可靠。
- 由于命名空间的清理通常涉及较少的资源,但需要快速完成,因此该函数的速率限制器被调优为比默认速率更快。
namespacedResourcesDeleter
:- 创建一个
NamespacedResourcesDeleter
实例,用于清理命名空间中的资源。 - 传入上下文、命名空间客户端、元数据客户端、资源发现函数和 finalizer 标识符。
- 创建一个
6.2.3.配置命名空间事件处理器
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
resyncPeriod,
)
// enqueueNamespace adds an object to the controller work queue
// obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item.
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
namespace := obj.(*v1.Namespace)
// don't queue if we aren't deleted
if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
return
}
// delay processing namespace events to allow HA api servers to observe namespace deletion,
// and HA etcd servers to observe last minute object creations inside the namespace
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}
AddEventHandlerWithResyncPeriod
:- 为命名空间的 informer 配置事件处理器。
- 监听命名空间的新增和更新事件,并将事件加入队列。
事件处理逻辑:
AddFunc
: 当有新的命名空间被创建时,调用enqueueNamespace
将其加入队列。UpdateFunc
: 当命名空间被更新时,调用enqueueNamespace
将其加入队列。- 不过这里 enqueueNamespace 只会把删除中的 namespace 入队,其他的忽略
为什么delete操作,需要通过update事件将key入队?
- ns默认都有finalizer的,当发起delete时,不会直接删除,而是在metadata中添加deletiontimestramp,标记为逻辑删除,该过程是一个update事件
6.2.4.设置命名空间缓存和同步函数
namespaceController.lister = namespaceInformer.Lister()
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
lister
:- 从
namespaceInformer
获取命名空间的 lister,放入Controller中维护,用于高效访问缓存中的命名空间对象。
- 从
listerSynced
:- 设置缓存同步函数,用于检查缓存是否已与 API Server 同步
6.3.Namespace Controller 删除逻辑
6.3.1.事件key入队
kubernetes/pkg/controller/namespace/namespace_controller.go
- 这里 enqueueNamespace 只会把删除中的 namespace 入队,其他的忽略
// enqueueNamespace adds an object to the controller work queue
// obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item.
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
namespace := obj.(*v1.Namespace)
// don't queue if we aren't deleted
if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
return
}
// delay processing namespace events to allow HA api servers to observe namespace deletion,
// and HA etcd servers to observe last minute object creations inside the namespace
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}
6.3.2.worker逻辑
kubernetes/pkg/controller/namespace/namespace_controller.go
- worker:
- 将ns处理逻辑封装为一个方法 workFunc,后面就是死循环,不停调用这个workFunc
- workFunc 代码分析:
- 从队列中弹出一个key。如果收到停止信号则终止
- 对该 key 进行处理,nm.syncNamespaceFromKey(ctx, key)
- 处理过程发生error,根据错误类型采取不同的处理方式,以确保命名空间的删除任务能够被有效地重试。
- *deletion.ResourcesRemainingError 类型的错误,通常表示命名空间中仍然存在未清理的资源。提取错误中的 Estimate 字段,表示剩余资源的估计数量。然后计算一个等待时间 t,其值为剩余资源数量的一半加 1。这种计算方式是为了动态调整重试间隔,避免频繁重试导致系统负载过高。然后通过调用 nm.queue.AddAfter 方法,将命名空间重新加入队列,并设置延迟时间为 t 秒。
- 其他类型的删除失败。为了避免等待完整的重新同步周期,代码调用 nm.queue.AddRateLimited 方法,将命名空间以速率限制的方式重新加入队列。这种方式可以控制重试频率,防止队列过载。同时,代码使用 utilruntime.HandleError 记录错误信息,方便后续的调试和问题排查。
- syncNamespaceFromKey:
- 记录处理耗时,输出到日志备查
- 使用namespaceController中的Lister,从缓存中查询ns
- 使用namespacedResourcesDeleter删除ns
// worker processes the queue of namespace objects.
// Each namespace can be in the queue at most once.
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker(ctx context.Context) {
workFunc := func(ctx context.Context) bool {
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(ctx, key)
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
return false
}
if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
t := estimate.Estimate/2 + 1
klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
nm.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err))
}
return false
}
for {
quit := workFunc(ctx)
if quit {
return
}
}
}
// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
}()
namespace, err := nm.lister.Get(key)
if errors.IsNotFound(err) {
logger.Info("Namespace has been deleted", "namespace", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
return err
}
return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name)
}
6.4.namespacedResourcesDeleter删除器
kubernetes/pkg/controller/namespace/deletion/namespaced_resources_deleter.go
6.4.1.NamespacedResourcesDeleter数据结构
- NamespacedResourcesDeleterInterface接口:Namespace删除器接口,NamespaceController持有一个该接口实例,调用Delete()完成ns删除
- namespacedResourcesDeleter结构体:实现了NamespacedResourcesDeleterInterface接口,并且内部维护了一些客户端、资源发现方法、finalizer的key
- NewNamespacedResourcesDeleter方法:创建一个NamespacedResourcesDeleter实例,即namespacedResourcesDeleter对象。
- 其中
d.initOpCache(ctx)
是初始化 namespacedResourcesDeleter 的操作缓存 (opCache),以便记录哪些操作在特定的资源上不被支持。 - 它通过调用 discoverResourcesFn 获取集群中所有支持的资源信息,并根据资源的操作能力填充缓存。这种缓存机制可以优化后续的资源操作逻辑,避免不必要的 HTTP 405 错误。
- 这里主要是判断哪些资源不支持delete操作,缓存下来
- 其中
// NamespacedResourcesDeleterInterface is the interface to delete a namespace with all resources in it.
type NamespacedResourcesDeleterInterface interface {
Delete(ctx context.Context, nsName string) error
}
// NewNamespacedResourcesDeleter returns a new NamespacedResourcesDeleter.
func NewNamespacedResourcesDeleter(ctx context.Context, nsClient v1clientset.NamespaceInterface,
metadataClient metadata.Interface, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{
nsClient: nsClient,
metadataClient: metadataClient,
podsGetter: podsGetter,
opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool),
},
discoverResourcesFn: discoverResourcesFn,
finalizerToken: finalizerToken,
}
d.initOpCache(ctx)
return d
}
var _ NamespacedResourcesDeleterInterface = &namespacedResourcesDeleter{}
// namespacedResourcesDeleter is used to delete all resources in a given namespace.
type namespacedResourcesDeleter struct {
// Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources.
metadataClient metadata.Interface
// Interface to get PodInterface.
podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource.
opCache *operationNotSupportedCache
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
// The finalizer token that should be removed from the namespace
// when all resources in that namespace have been deleted.
finalizerToken v1.FinalizerName
}
6.4.2.namespacedResourcesDeleter实现了Delete方法
- 删除ns之前,会先删除ns中所有的资源,核心方法即为 deleteAllContent
// Delete deletes all resources in the given namespace.
// Before deleting resources:
// - It ensures that deletion timestamp is set on the
// namespace (does nothing if deletion timestamp is missing).
// - Verifies that the namespace is in the "terminating" phase
// (updates the namespace phase if it is not yet marked terminating)
//
// After deleting the resources:
// * It removes finalizer token from the given namespace.
//
// Returns an error if any of those steps fail.
// Returns ResourcesRemainingError if it deleted some resources but needs
// to wait for them to go away.
// Caller is expected to keep calling this until it succeeds.
func (d *namespacedResourcesDeleter) Delete(ctx context.Context, nsName string) error {
// Multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
namespace, err := d.nsClient.Get(ctx, nsName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if namespace.DeletionTimestamp == nil {
return nil
}
klog.FromContext(ctx).V(5).Info("Namespace controller - syncNamespace", "namespace", namespace.Name, "finalizerToken", d.finalizerToken)
// ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone
namespace, err = d.retryOnConflictError(ctx, namespace, d.updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
// the latest view of the namespace asserts that namespace is no longer deleting..
if namespace.DeletionTimestamp.IsZero() {
return nil
}
// return if it is already finalized.
if finalized(namespace) {
return nil
}
// there may still be content for us to remove
estimate, err := d.deleteAllContent(ctx, namespace)
if err != nil {
return err
}
if estimate > 0 {
return &ResourcesRemainingError{estimate}
}
// we have removed content, so mark it finalized by us
_, err = d.retryOnConflictError(ctx, namespace, d.finalizeNamespace)
if err != nil {
// in normal practice, this should not be possible, but if a deployment is running
// two controllers to do namespace deletion that share a common finalizer token it's
// possible that a not found could occur since the other controller would have finished the delete.
if errors.IsNotFound(err) {
return nil
}
return err
}
return nil
}
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent(ctx context.Context, ns *v1.Namespace) (int64, error) {
namespace := ns.Name
namespaceDeletedAt := *ns.DeletionTimestamp
var errs []error
conditionUpdater := namespaceConditionUpdater{}
estimate := int64(0)
logger := klog.FromContext(ctx)
logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace)
resources, err := d.discoverResourcesFn()
if err != nil {
// discovery errors are not fatal. We often have some set of resources we can operate against even if we don't have a complete list
errs = append(errs, err)
conditionUpdater.ProcessDiscoverResourcesErr(err)
}
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
// discovery errors are not fatal. We often have some set of resources we can operate against even if we don't have a complete list
errs = append(errs, err)
conditionUpdater.ProcessGroupVersionErr(err)
}
numRemainingTotals := allGVRDeletionMetadata{
gvrToNumRemaining: map[schema.GroupVersionResource]int{},
finalizersToNumRemaining: map[string]int{},
}
for gvr := range groupVersionResources {
gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(ctx, gvr, namespace, namespaceDeletedAt)
if err != nil {
// If there is an error, hold on to it but proceed with all the remaining
// groupVersionResources.
errs = append(errs, err)
conditionUpdater.ProcessDeleteContentErr(err)
}
if gvrDeletionMetadata.finalizerEstimateSeconds > estimate {
estimate = gvrDeletionMetadata.finalizerEstimateSeconds
}
if gvrDeletionMetadata.numRemaining > 0 {
numRemainingTotals.gvrToNumRemaining[gvr] = gvrDeletionMetadata.numRemaining
for finalizer, numRemaining := range gvrDeletionMetadata.finalizersToNumRemaining {
if numRemaining == 0 {
continue
}
numRemainingTotals.finalizersToNumRemaining[finalizer] = numRemainingTotals.finalizersToNumRemaining[finalizer] + numRemaining
}
}
}
conditionUpdater.ProcessContentTotals(numRemainingTotals)
// we always want to update the conditions because if we have set a condition to "it worked" after it was previously, "it didn't work",
// we need to reflect that information. Recall that additional finalizers can be set on namespaces, so this finalizer may clear itself and
// NOT remove the resource instance.
if hasChanged := conditionUpdater.Update(ns); hasChanged {
if _, err = d.nsClient.UpdateStatus(ctx, ns, metav1.UpdateOptions{}); err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't update status condition for namespace %q: %v", namespace, err))
}
}
// if len(errs)==0, NewAggregate returns nil.
err = utilerrors.NewAggregate(errs)
logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace, "estimate", estimate, "err", err)
return estimate, err
}
7.Namespace Controller 启动
7.1.Namespace Controller 启动核心方法
kubernetes/pkg/controller/namespace/namespace_controller.go
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer nm.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting namespace controller")
defer logger.Info("Shutting down namespace controller")
if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) {
return
}
logger.V(5).Info("Starting workers of namespace controller")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, nm.worker, time.Second)
}
<-ctx.Done()
}
- 在namespace_controller.go中有一个Run方法,是 NamespaceController 的核心启动逻辑,用于启动命名空间控制器并管理其生命周期。
- 接受一个上下文 (ctx) 和工作线程数 (workers) 作为参数
- 负责初始化控制器、启动工作线程并监听系统的终止信号
- 核心逻辑:
- 首先,函数通过 defer 语句设置了两个清理操作:utilruntime.HandleCrash() 和 nm.queue.ShutDown()。HandleCrash 用于捕获运行时的恐慌(panic),防止程序崩溃,同时记录相关信息。ShutDown 则用于在控制器停止时关闭工作队列,确保资源被正确释放。
- 接着,函数通过 klog.FromContext(ctx) 获取日志记录器,并记录控制器启动的日志信息。函数的最后一个 defer 语句确保在控制器关闭时记录一条日志,表明控制器已停止。
- 随后,函数调用 cache.WaitForNamedCacheSync 方法等待缓存同步完成。nm.listerSynced 是一个检查缓存是否同步的函数。如果缓存未能成功同步,函数会立即返回。这一步非常重要,因为控制器依赖缓存中的数据来处理命名空间相关的事件,未同步的缓存可能导致错误的行为。
- 在缓存同步完成后,函数记录一条日志,表明工作线程即将启动。然后,它启动指定数量的工作线程(由 workers 参数决定)。每个线程通过 goroutine 并发运行,并调用 wait.UntilWithContext 方法。wait.UntilWithContext 会以固定的时间间隔(这里是 1 秒)调用 nm.worker 方法,直到上下文被取消。
- 最后,函数通过 <-ctx.Done() 阻塞,等待上下文的取消信号。当上下文被取消时,控制器会停止所有工作线程并执行清理操作。
7.2.kube-controller-manager为每个Controller都声明了描述器
kubernetes/cmd/kube-controller-manager/app/core.go
func newNamespaceControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.NamespaceController,
aliases: []string{"namespace"},
initFunc: startNamespaceController,
}
}
func startNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig.QPS *= 20
nsKubeconfig.Burst *= 100
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
}
func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil {
return nil, true, err
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
namespaceController := namespacecontroller.NewNamespaceController(
ctx,
namespaceKubeClient,
metadataClient,
discoverResourcesFn,
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes,
)
go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs))
return nil, true, nil
}
- kube-controller-manager为每个controller都声明了 描述器
ControllerDescriptor
,记录controller的名称、别名、启动方法等 newNamespaceControllerDescriptor
函数返回一个ControllerDescriptor
对象- 用于描述命名空间控制器的基本信息。
name
字段指定了控制器的名称为NamespaceController
aliases
字段提供了控制器的别名(如"namespace"
)initFunc
字段指向了控制器的初始化函数startNamespaceController
- 这种描述符模式便于在控制器框架中注册和管理多个控制器
startNamespaceController
函数负责初始化命名空间控制器的客户端配置并调用startModifiedNamespaceController
- 由于命名空间清理控制器需要频繁地进行资源发现和删除操作,因此函数对默认的客户端配置进行了调整,将
QPS
(每秒请求数)放大了 20 倍,将Burst
(突发请求数)放大了 100 倍。 - 这种调整可以显著提高控制器的操作速度,特别是在需要删除大量资源时。随后,函数通过
clientset.NewForConfigOrDie
创建了一个 Kubernetes 客户端,并将其传递给下一个函数
- 由于命名空间清理控制器需要频繁地进行资源发现和删除操作,因此函数对默认的客户端配置进行了调整,将
startModifiedNamespaceController
函数完成了命名空间控制器的核心初始化逻辑。- 使用
metadata.NewForConfig
创建了一个元数据客户端,用于处理资源的元信息。如果创建失败,函数会返回错误。 - 定义了一个
discoverResourcesFn
函数,用于动态发现集群中支持的命名空间资源。 - 函数调用
namespacecontroller.NewNamespaceController
创建了一个命名空间控制器实例,并将必要的依赖注入到控制器中,包括 Kubernetes 客户端、元数据客户端、资源发现函数、命名空间的共享信息工厂等。 - 最后控制器通过
Run
方法启动,指定的工作线程数由controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs
决定。controllerContext.ComponentConfig
类型为 kubectrlmgrconfig.KubeControllerManagerConfiguration,其实就是kube-controller-manager的配置,其中包含NamespaceController的配置,可以指定worker数量
- 使用
7.3.注册Namespace Controller描述器
kubernetes/cmd/kube-controller-manager/app/controllermanager.go
- controllermanager会在启动时,将注册的所有Controller都启动起来
// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
// paired to their ControllerDescriptor wrapper object that includes InitFunc.
// This allows for structured downstream composition and subdivision.
func NewControllerDescriptors() map[string]*ControllerDescriptor {
controllers := map[string]*ControllerDescriptor{}
aliases := sets.NewString()
// All the controllers must fulfil common constraints, or else we will explode.
register := func(controllerDesc *ControllerDescriptor) {
if controllerDesc == nil {
panic("received nil controller for a registration")
}
name := controllerDesc.Name()
if len(name) == 0 {
panic("received controller without a name for a registration")
}
if _, found := controllers[name]; found {
panic(fmt.Sprintf("controller name %q was registered twice", name))
}
if controllerDesc.GetInitFunc() == nil {
panic(fmt.Sprintf("controller %q does not have an init function", name))
}
for _, alias := range controllerDesc.GetAliases() {
if aliases.Has(alias) {
panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
}
aliases.Insert(alias)
}
controllers[name] = controllerDesc
}
// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
// The only known special case is the ServiceAccountTokenController which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
// special controllers.
register(newServiceAccountTokenControllerDescriptor(nil))
...
register(newNamespaceControllerDescriptor())
register(newServiceAccountControllerDescriptor())
...
for _, alias := range aliases.UnsortedList() {
if _, ok := controllers[alias]; ok {
panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
}
}
return controllers
}
8.总结
通过上述对NamespaceController的分析,我们应该对kubernetes内置controller的编写过程很清晰了
- 首先学习了如何声明 资源的struct,并添加一些code generation tag
- 通过code-generation生成器,生成深拷贝、版本管理、swagger、proto/pb序列化、informer、clientset等代码
- 还深入学习了informer机制 的代码实现,结合Informer框架的理论知识,有了更深的理解
- informer机制中,每种资源都会有自己的 InfromerInterface,包含Infromer()、Lister()两个方法,分别用于获取Informer对象、Lister对象
- Informer对象 负责watch+交互 环境资源
- Lister对象 相当于Indexer,负责与本地缓存交互,包括 List()、Get()
- informer机制中,每种资源都会有自己的 InfromerInterface,包含Infromer()、Lister()两个方法,分别用于获取Informer对象、Lister对象
- 分析了client-go把所有资源client整合在一起的过程
- Clientset包含了所有GV的Client,每个GV的Client 都包含自己gv下所有资源的Client
- 每个gvr的client包含了 Create、List、Update、Apply 等方法,用于操作环境资源
- 详细讲解了namespace controller的代码
- 从NewNamespaceController开始,讲解了eventhandler设置、workqueue的配置、key的入队、worker的处理逻辑、删除ns前其他资源的回收
- 最后还讲解了kube-controller-manager对内置controller的启动过程
- 通过上述学习,预期达到的效果
- 对informer、client-go内在逻辑更清晰,后续使用时更清楚
- 具备对 kubernetes 其他的内置controller学习的能力。除了controller 核心代码的不同,其他的内容应该都大同小异,也正因如此,其他的代码才会在后面被封装成自动生成的代码,有了kubebuilder、controller-runtime等控制器框架
- 为 CRD Controller 编写提供范本